Skip to content

Commit

Permalink
Add some comments of P2P (#1303)
Browse files Browse the repository at this point in the history
* add some comments of P2P

* fix

* Minor changes

* Minor changes

* Minor Changes

* Minor changes

* Minor changes
  • Loading branch information
ShawnYun authored and vncoelho committed Nov 29, 2019
1 parent 31976ce commit a1c8f3d
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ private void BroadcastMessage(MessageCommand command, ISerializable payload = nu
BroadcastMessage(Message.Create(command, payload));
}

/// <summary>
/// Broadcast a message to all connected nodes.
/// </summary>
/// <param name="message">The message to be broadcasted.</param>
private void BroadcastMessage(Message message)
{
Connections.Tell(message);
Expand Down Expand Up @@ -127,6 +131,7 @@ public IEnumerable<IPEndPoint> GetUnconnectedPeers()
/// Performs a BroadcastMessage with the command `MessageCommand.GetAddr`, which, eventually, tells all known connections
/// If there are no connected peers it will try with the default, respecting MaxCountFromSeedList limit
/// </summary>
/// <param name="count">The count of peers required</param>
protected override void NeedMorePeers(int count)
{
count = Math.Max(count, MaxCountFromSeedList);
Expand Down Expand Up @@ -164,6 +169,12 @@ protected override void OnReceive(object message)
}
}

/// <summary>
/// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus.
/// Otherwise, tell the inventory to the actor of Blockchain.
/// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload
/// </summary>
/// <param name="inventory">The inventory to be relayed.</param>
private void OnRelay(IInventory inventory)
{
if (inventory is Transaction transaction)
Expand Down
4 changes: 4 additions & 0 deletions src/neo/Network/P2P/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public class Message : ISerializable
private const int CompressionMinSize = 128;
private const int CompressionThreshold = 64;

/// <summary>
/// Flags that represents whether a message is compressed.
/// 0 for None, 1 for Compressed.
/// </summary>
public MessageFlags Flags;
public MessageCommand Command;
public ISerializable Payload;
Expand Down
28 changes: 26 additions & 2 deletions src/neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,19 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p

private static readonly HashSet<IPAddress> localAddresses = new HashSet<IPAddress>();
private readonly Dictionary<IPAddress, int> ConnectedAddresses = new Dictionary<IPAddress, int>();
/// <summary>
/// A dictionary that stores the connected nodes.
/// </summary>
protected readonly ConcurrentDictionary<IActorRef, IPEndPoint> ConnectedPeers = new ConcurrentDictionary<IActorRef, IPEndPoint>();
/// <summary>
/// An ImmutableHashSet that stores the Peers received: 1) from other nodes or 2) from default file.
/// If the number of desired connections is not enough, first try to connect with the peers from this set.
/// </summary>
protected ImmutableHashSet<IPEndPoint> UnconnectedPeers = ImmutableHashSet<IPEndPoint>.Empty;
/// <summary>
/// When a TCP connection request is sent to a peer, the peer will be added to the ImmutableHashSet.
/// If a Tcp.Connected or a Tcp.CommandFailed (with TCP.Command of type Tcp.Connect) is received, the related peer will be removed.
/// </summary>
protected ImmutableHashSet<IPEndPoint> ConnectingPeers = ImmutableHashSet<IPEndPoint>.Empty;
protected HashSet<IPAddress> TrustedIpAddresses { get; } = new HashSet<IPAddress>();

Expand All @@ -65,7 +76,7 @@ static Peer()
}

/// <summary>
/// Tries to add a set ff peers to an immutable hashset of UnconnectedPeers
/// Tries to add a set of peers to the immutable ImmutableHashSet of UnconnectedPeers
/// </summary>
/// <param name="peers">Peers that the method will try to add (union) to (with) UnconnectedPeers</param>
protected void AddPeers(IEnumerable<IPEndPoint> peers)
Expand All @@ -86,6 +97,7 @@ protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false)
if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return;

if (isTrusted) TrustedIpAddresses.Add(endPoint.Address);
// If connections with the peer greater than or equal to MaxConnectionsPerAddress, return.
if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress)
return;
if (ConnectedPeers.Values.Contains(endPoint)) return;
Expand Down Expand Up @@ -189,6 +201,13 @@ private void OnStart(ChannelsConfig config)
}
}

/// <summary>
/// Will be triggered when a Tcp.Connected message is received.
/// If the conditions are met, the remote endpoint will be added to ConnectedPeers.
/// Increase the connection number with the remote endpoint by one.
/// </summary>
/// <param name="remote">The remote endpoint of TCP connection</param>
/// <param name="local">The local endpoint of TCP connection</param>
private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
{
ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote));
Expand All @@ -197,7 +216,7 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
Sender.Tell(Tcp.Abort.Instance);
return;
}

ConnectedAddresses.TryGetValue(remote.Address, out int count);
if (count >= MaxConnectionsPerAddress)
{
Expand All @@ -213,6 +232,11 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
}
}

/// <summary>
/// Will be triggered when a Tcp.CommandFailed message is received.
/// If it's a Tcp.Connect command, remove the related endpoint from ConnectingPeers.
/// </summary>
/// <param name="cmd"></param>
private void OnTcpCommandFailed(Tcp.Command cmd)
{
switch (cmd)
Expand Down
24 changes: 24 additions & 0 deletions src/neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ private void OnFilterLoadMessageReceived(FilterLoadPayload payload)
Context.Parent.Tell(new SetFilter { Filter = bloom_filter });
}

/// <summary>
/// Will be triggered when a MessageCommand.GetAddr message is received.
/// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message.
/// The message contains a list of networkAddresses from those selected random peers.
/// </summary>
private void OnGetAddrMessageReceived()
{
Random rand = new Random();
Expand All @@ -158,9 +163,16 @@ private void OnGetAddrMessageReceived()
Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
}

/// <summary>
/// Will be triggered when a MessageCommand.GetBlocks message is received.
/// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount
/// Responses are sent to RemoteNode actor as MessageCommand.Inv Message.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks requested</param>
private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
// The default value of payload.Count is -1
int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count;
TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash);
if (state == null) return;
Expand Down Expand Up @@ -198,6 +210,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetData message is received.
/// The payload includes an array of hash values.
/// For different payload.Type (Tx, Block, Consensus), get the corresponding (Tx, Block, Consensus) and tell it to RemoteNode actor.
/// </summary>
/// <param name="payload">The payload containing the requested information</param>
private void OnGetDataMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray();
Expand Down Expand Up @@ -233,6 +251,12 @@ private void OnGetDataMessageReceived(InvPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetHeaders message is received.
/// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor.
/// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks' headers requested</param>
private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
Expand Down
10 changes: 10 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP
SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray())));
}

/// <summary>
/// It defines the message queue to be used for dequeuing.
/// If the high-priority message queue is not empty, choose the high-priority message queue.
/// Otherwise, choose the low-priority message queue.
/// Finally, it sends the first message of the queue.
/// </summary>
private void CheckMessageQueue()
{
if (!verack || !ack) return;
Expand All @@ -67,6 +73,10 @@ private void EnqueueMessage(MessageCommand command, ISerializable payload = null
EnqueueMessage(Message.Create(command, payload));
}

/// <summary>
/// Add message to high priority queue or low priority queue depending on the message type.
/// </summary>
/// <param name="message">The message to be added</param>
private void EnqueueMessage(Message message)
{
bool is_single = false;
Expand Down
5 changes: 5 additions & 0 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;
/// <summary>
/// A set of known hashes for inventories or payloads already received
/// </summary>
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -228,6 +231,8 @@ private void RequestTasks(TaskSession session)
return;
}
}
// When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers
// If not HeaderTask pending to be processed it should ask for more Blocks
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.StartHeight)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
Expand Down

0 comments on commit a1c8f3d

Please sign in to comment.