Skip to content

Commit

Permalink
CreateActor
Browse files Browse the repository at this point in the history
  • Loading branch information
igormcoelho committed Jul 1, 2019
1 parent ac4ba6b commit 3ad56f7
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 19 deletions.
6 changes: 4 additions & 2 deletions neo.UnitTests/UT_RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public void RemoteNode_Test_Abort_DifferentMagic()
var testProbe = CreateTestProbe();
var connectionTestProbe = CreateTestProbe();
var protocolActor = ActorOfAsTestActorRef<ProtocolHandler>(() => new ProtocolHandler(testBlockchain));
var remoteNodeActor = ActorOfAsTestActorRef<RemoteNode>(() => new RemoteNode(testBlockchain, connectionTestProbe, null, null, protocolActor));
// TODO: setup 'testBlockchain' mock CreateActor to return desired 'protocolActor' (using ActorOfAsTestActorRef)
var remoteNodeActor = ActorOfAsTestActorRef<RemoteNode>(() => new RemoteNode(testBlockchain, connectionTestProbe, null, null));

connectionTestProbe.ExpectMsg<Tcp.Write>();

Expand Down Expand Up @@ -60,7 +61,8 @@ public void RemoteNode_Test_Accept_IfSameMagic()
var testProbe = CreateTestProbe();
var connectionTestProbe = CreateTestProbe();
var protocolActor = ActorOfAsTestActorRef<ProtocolHandler>(() => new ProtocolHandler(testBlockchain));
var remoteNodeActor = ActorOfAsTestActorRef<RemoteNode>(() => new RemoteNode(testBlockchain, connectionTestProbe, null, null, protocolActor));
// TODO: setup 'testBlockchain' mock CreateActor to return desired 'protocolActor' (using ActorOfAsTestActorRef)
var remoteNodeActor = ActorOfAsTestActorRef<RemoteNode>(() => new RemoteNode(testBlockchain, connectionTestProbe, null, null));

connectionTestProbe.ExpectMsg<Tcp.Write>();

Expand Down
2 changes: 1 addition & 1 deletion neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ protected override void PostStop()

public static Props Props(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet)
{
return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox");
return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet));
}

private void RequestChangeView()
Expand Down
2 changes: 1 addition & 1 deletion neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ protected override void PostStop()

public static Props Props(NeoSystem system, Store store)
{
return Akka.Actor.Props.Create(() => new Blockchain(system, store)).WithMailbox("blockchain-mailbox");
return Akka.Actor.Props.Create(() => new Blockchain(system, store));
}

private void SaveHeaderHashList(Snapshot snapshot = null)
Expand Down
22 changes: 18 additions & 4 deletions neo/NeoSystem.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.IO;
using Neo.Consensus;
using Neo.Ledger;
using Neo.Network.P2P;
Expand All @@ -8,6 +10,8 @@
using Neo.Wallets;
using System;
using System.Net;
using System.Linq;
using System.Linq.Expressions;

namespace Neo
{
Expand All @@ -26,6 +30,16 @@ public class NeoSystem : IDisposable
public IActorRef Consensus { get; private set; }
public RpcServer RpcServer { get; private set; }

public IActorRef CreateActor(Props props, string mailbox = null, string actorName = null)
{
if(mailbox != null)
props = props.WithMailbox(mailbox);
if(actorName != null)
return this.ActorSystem.ActorOf(props, actorName);
else
return this.ActorSystem.ActorOf(props);
}

private readonly Store store;
private ChannelsConfig start_message = null;
private bool suspend = false;
Expand All @@ -34,9 +48,9 @@ public NeoSystem(Store store)
{
this.store = store;
Plugin.LoadPlugins(this);
this.Blockchain = ActorSystem.ActorOf(Ledger.Blockchain.Props(this, store));
this.LocalNode = ActorSystem.ActorOf(Network.P2P.LocalNode.Props(this));
this.TaskManager = ActorSystem.ActorOf(Network.P2P.TaskManager.Props(this));
this.Blockchain = CreateActor(Ledger.Blockchain.Props(this, store), "blockchain-mailbox");
this.LocalNode = CreateActor(Network.P2P.LocalNode.Props(this));
this.TaskManager = CreateActor(Network.P2P.TaskManager.Props(this), "task-manager-mailbox");
Plugin.NotifyPluginsLoadedAfterSystemConstructed();
}

Expand Down Expand Up @@ -69,7 +83,7 @@ internal void ResumeNodeStartup()

public void StartConsensus(Wallet wallet, Store consensus_store = null, bool ignoreRecoveryLogs = false)
{
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet));
Consensus = CreateActor(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet), "consensus-service-mailbox");
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs }, Blockchain);
}

Expand Down
4 changes: 2 additions & 2 deletions neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public static Props Props(NeoSystem system)
return Akka.Actor.Props.Create(() => new LocalNode(system));
}

protected override Props ProtocolProps(object connection, IPEndPoint remote, IPEndPoint local)
protected override IActorRef ProtocolActor(NeoSystem system, string actorName, object connection, IPEndPoint remote, IPEndPoint local)
{
return RemoteNode.Props(system, connection, remote, local);
return system.CreateActor(RemoteNode.Props(system, connection, remote, local), "remote-node-mailbox", actorName);
}
}
}
8 changes: 5 additions & 3 deletions neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class Connect { public IPEndPoint EndPoint; public bool IsTrusted = false
private class Timer { }
private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; public IPEndPoint Local; }

public NeoSystem neoSystem; // TODO: initialize

public const int DefaultMinDesiredConnections = 10;
public const int DefaultMaxConnections = DefaultMinDesiredConnections * 4;

Expand Down Expand Up @@ -191,7 +193,7 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
else
{
ConnectedAddresses[remote.Address] = count + 1;
IActorRef connection = Context.ActorOf(ProtocolProps(Sender, remote, local), $"connection_{Guid.NewGuid()}");
IActorRef connection = ProtocolActor(neoSystem, $"connection_{Guid.NewGuid()}", Sender, remote, local);
Context.Watch(connection);
Sender.Tell(new Tcp.Register(connection));
ConnectedPeers.TryAdd(connection, remote);
Expand Down Expand Up @@ -244,7 +246,7 @@ private void OnWsConnected(WebSocket ws, IPEndPoint remote, IPEndPoint local)
else
{
ConnectedAddresses[remote.Address] = count + 1;
Context.ActorOf(ProtocolProps(ws, remote, local), $"connection_{Guid.NewGuid()}");
ProtocolActor(neoSystem, $"connection_{Guid.NewGuid()}", ws, remote, local);
}
}

Expand All @@ -268,6 +270,6 @@ private async Task ProcessWebSocketAsync(HttpContext context)
});
}

protected abstract Props ProtocolProps(object connection, IPEndPoint remote, IPEndPoint local);
protected abstract IActorRef ProtocolActor(NeoSystem system, string actorName, object connection, IPEndPoint remote, IPEndPoint local);
}
}
2 changes: 1 addition & 1 deletion neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private void OnVersionMessageReceived(VersionPayload payload)

public static Props Props(NeoSystem system)
{
return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox");
return Akka.Actor.Props.Create(() => new ProtocolHandler(system));
}
}

Expand Down
7 changes: 3 additions & 4 deletions neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ internal class Relay { public IInventory Inventory; }
public uint LastBlockIndex { get; private set; } = 0;
public bool IsFullNode { get; private set; } = false;

public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndPoint local, IActorRef protocolHandler)
public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndPoint local)
: base(connection, remote, local)
{
this.system = system;
this.protocol = protocolHandler;
this.protocol = system.CreateActor(ProtocolHandler.Props(system), "protocol-handler-mailbox");
LocalNode.Singleton.RemoteNodes.TryAdd(Self, this);

var capabilities = new List<NodeCapability>
Expand Down Expand Up @@ -224,8 +224,7 @@ protected override void PostStop()

internal static Props Props(NeoSystem system, object connection, IPEndPoint remote, IPEndPoint local)
{
var protocol = system.ActorSystem.ActorOf(ProtocolHandler.Props(system));
return Akka.Actor.Props.Create(() => new RemoteNode(system, connection, remote, local, protocol)).WithMailbox("remote-node-mailbox");
return Akka.Actor.Props.Create(() => new RemoteNode(system, connection, remote, local));
}

private void SendMessage(Message message)
Expand Down
2 changes: 1 addition & 1 deletion neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected override void PostStop()

public static Props Props(NeoSystem system)
{
return Akka.Actor.Props.Create(() => new TaskManager(system)).WithMailbox("task-manager-mailbox");
return Akka.Actor.Props.Create(() => new TaskManager(system));
}

private void RequestTasks(TaskSession session)
Expand Down

0 comments on commit 3ad56f7

Please sign in to comment.