From a1c8f3d2b8df1c1720d726584bc5de7814da463f Mon Sep 17 00:00:00 2001 From: ShawnYun <42930111+ShawnYun@users.noreply.github.com> Date: Fri, 29 Nov 2019 20:41:33 +0800 Subject: [PATCH] Add some comments of P2P (#1303) * add some comments of P2P * fix * Minor changes * Minor changes * Minor Changes * Minor changes * Minor changes --- src/neo/Network/P2P/LocalNode.cs | 11 ++++++++++ src/neo/Network/P2P/Message.cs | 4 ++++ src/neo/Network/P2P/Peer.cs | 28 ++++++++++++++++++++++++-- src/neo/Network/P2P/ProtocolHandler.cs | 24 ++++++++++++++++++++++ src/neo/Network/P2P/RemoteNode.cs | 10 +++++++++ src/neo/Network/P2P/TaskManager.cs | 5 +++++ 6 files changed, 80 insertions(+), 2 deletions(-) diff --git a/src/neo/Network/P2P/LocalNode.cs b/src/neo/Network/P2P/LocalNode.cs index aec90a9f30..d93563d3d5 100644 --- a/src/neo/Network/P2P/LocalNode.cs +++ b/src/neo/Network/P2P/LocalNode.cs @@ -64,6 +64,10 @@ private void BroadcastMessage(MessageCommand command, ISerializable payload = nu BroadcastMessage(Message.Create(command, payload)); } + /// + /// Broadcast a message to all connected nodes. + /// + /// The message to be broadcasted. private void BroadcastMessage(Message message) { Connections.Tell(message); @@ -127,6 +131,7 @@ public IEnumerable GetUnconnectedPeers() /// Performs a BroadcastMessage with the command `MessageCommand.GetAddr`, which, eventually, tells all known connections /// If there are no connected peers it will try with the default, respecting MaxCountFromSeedList limit /// + /// The count of peers required protected override void NeedMorePeers(int count) { count = Math.Max(count, MaxCountFromSeedList); @@ -164,6 +169,12 @@ protected override void OnReceive(object message) } } + /// + /// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus. + /// Otherwise, tell the inventory to the actor of Blockchain. + /// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload + /// + /// The inventory to be relayed. private void OnRelay(IInventory inventory) { if (inventory is Transaction transaction) diff --git a/src/neo/Network/P2P/Message.cs b/src/neo/Network/P2P/Message.cs index 578259f789..2d45f254ad 100644 --- a/src/neo/Network/P2P/Message.cs +++ b/src/neo/Network/P2P/Message.cs @@ -13,6 +13,10 @@ public class Message : ISerializable private const int CompressionMinSize = 128; private const int CompressionThreshold = 64; + /// + /// Flags that represents whether a message is compressed. + /// 0 for None, 1 for Compressed. + /// public MessageFlags Flags; public MessageCommand Command; public ISerializable Payload; diff --git a/src/neo/Network/P2P/Peer.cs b/src/neo/Network/P2P/Peer.cs index d1afd6246a..131d9087a1 100644 --- a/src/neo/Network/P2P/Peer.cs +++ b/src/neo/Network/P2P/Peer.cs @@ -37,8 +37,19 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p private static readonly HashSet localAddresses = new HashSet(); private readonly Dictionary ConnectedAddresses = new Dictionary(); + /// + /// A dictionary that stores the connected nodes. + /// protected readonly ConcurrentDictionary ConnectedPeers = new ConcurrentDictionary(); + /// + /// An ImmutableHashSet that stores the Peers received: 1) from other nodes or 2) from default file. + /// If the number of desired connections is not enough, first try to connect with the peers from this set. + /// protected ImmutableHashSet UnconnectedPeers = ImmutableHashSet.Empty; + /// + /// When a TCP connection request is sent to a peer, the peer will be added to the ImmutableHashSet. + /// If a Tcp.Connected or a Tcp.CommandFailed (with TCP.Command of type Tcp.Connect) is received, the related peer will be removed. + /// protected ImmutableHashSet ConnectingPeers = ImmutableHashSet.Empty; protected HashSet TrustedIpAddresses { get; } = new HashSet(); @@ -65,7 +76,7 @@ static Peer() } /// - /// Tries to add a set ff peers to an immutable hashset of UnconnectedPeers + /// Tries to add a set of peers to the immutable ImmutableHashSet of UnconnectedPeers /// /// Peers that the method will try to add (union) to (with) UnconnectedPeers protected void AddPeers(IEnumerable peers) @@ -86,6 +97,7 @@ protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false) if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return; if (isTrusted) TrustedIpAddresses.Add(endPoint.Address); + // If connections with the peer greater than or equal to MaxConnectionsPerAddress, return. if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress) return; if (ConnectedPeers.Values.Contains(endPoint)) return; @@ -189,6 +201,13 @@ private void OnStart(ChannelsConfig config) } } + /// + /// Will be triggered when a Tcp.Connected message is received. + /// If the conditions are met, the remote endpoint will be added to ConnectedPeers. + /// Increase the connection number with the remote endpoint by one. + /// + /// The remote endpoint of TCP connection + /// The local endpoint of TCP connection private void OnTcpConnected(IPEndPoint remote, IPEndPoint local) { ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote)); @@ -197,7 +216,7 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local) Sender.Tell(Tcp.Abort.Instance); return; } - + ConnectedAddresses.TryGetValue(remote.Address, out int count); if (count >= MaxConnectionsPerAddress) { @@ -213,6 +232,11 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local) } } + /// + /// Will be triggered when a Tcp.CommandFailed message is received. + /// If it's a Tcp.Connect command, remove the related endpoint from ConnectingPeers. + /// + /// private void OnTcpCommandFailed(Tcp.Command cmd) { switch (cmd) diff --git a/src/neo/Network/P2P/ProtocolHandler.cs b/src/neo/Network/P2P/ProtocolHandler.cs index e16d7f238c..40939bd574 100644 --- a/src/neo/Network/P2P/ProtocolHandler.cs +++ b/src/neo/Network/P2P/ProtocolHandler.cs @@ -145,6 +145,11 @@ private void OnFilterLoadMessageReceived(FilterLoadPayload payload) Context.Parent.Tell(new SetFilter { Filter = bloom_filter }); } + /// + /// Will be triggered when a MessageCommand.GetAddr message is received. + /// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message. + /// The message contains a list of networkAddresses from those selected random peers. + /// private void OnGetAddrMessageReceived() { Random rand = new Random(); @@ -158,9 +163,16 @@ private void OnGetAddrMessageReceived() Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); } + /// + /// Will be triggered when a MessageCommand.GetBlocks message is received. + /// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount + /// Responses are sent to RemoteNode actor as MessageCommand.Inv Message. + /// + /// A GetBlocksPayload including start block Hash and number of blocks requested private void OnGetBlocksMessageReceived(GetBlocksPayload payload) { UInt256 hash = payload.HashStart; + // The default value of payload.Count is -1 int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count; TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash); if (state == null) return; @@ -198,6 +210,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) } } + /// + /// Will be triggered when a MessageCommand.GetData message is received. + /// The payload includes an array of hash values. + /// For different payload.Type (Tx, Block, Consensus), get the corresponding (Tx, Block, Consensus) and tell it to RemoteNode actor. + /// + /// The payload containing the requested information private void OnGetDataMessageReceived(InvPayload payload) { UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray(); @@ -233,6 +251,12 @@ private void OnGetDataMessageReceived(InvPayload payload) } } + /// + /// Will be triggered when a MessageCommand.GetHeaders message is received. + /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. + /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. + /// + /// A GetBlocksPayload including start block Hash and number of blocks' headers requested private void OnGetHeadersMessageReceived(GetBlocksPayload payload) { UInt256 hash = payload.HashStart; diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index ec2a16e047..a7cab65902 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -50,6 +50,12 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray()))); } + /// + /// It defines the message queue to be used for dequeuing. + /// If the high-priority message queue is not empty, choose the high-priority message queue. + /// Otherwise, choose the low-priority message queue. + /// Finally, it sends the first message of the queue. + /// private void CheckMessageQueue() { if (!verack || !ack) return; @@ -67,6 +73,10 @@ private void EnqueueMessage(MessageCommand command, ISerializable payload = null EnqueueMessage(Message.Create(command, payload)); } + /// + /// Add message to high priority queue or low priority queue depending on the message type. + /// + /// The message to be added private void EnqueueMessage(Message message) { bool is_single = false; diff --git a/src/neo/Network/P2P/TaskManager.cs b/src/neo/Network/P2P/TaskManager.cs index 866528f618..423572c958 100644 --- a/src/neo/Network/P2P/TaskManager.cs +++ b/src/neo/Network/P2P/TaskManager.cs @@ -25,6 +25,9 @@ private class Timer { } private readonly NeoSystem system; private const int MaxConncurrentTasks = 3; + /// + /// A set of known hashes for inventories or payloads already received + /// private readonly FIFOSet knownHashes; private readonly Dictionary globalTasks = new Dictionary(); private readonly Dictionary sessions = new Dictionary(); @@ -228,6 +231,8 @@ private void RequestTasks(TaskSession session) return; } } + // When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers + // If not HeaderTask pending to be processed it should ask for more Blocks if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.StartHeight) { session.Tasks[HeaderTaskHash] = DateTime.UtcNow;