Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some comments of P2P #1303

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ private void BroadcastMessage(MessageCommand command, ISerializable payload = nu
BroadcastMessage(Message.Create(command, payload));
}

/// <summary>
/// Broadcast a message to all connected nodes.
/// </summary>
/// <param name="message">The message to be broadcasted.</param>
private void BroadcastMessage(Message message)
{
Connections.Tell(message);
Expand Down Expand Up @@ -127,6 +131,7 @@ public IEnumerable<IPEndPoint> GetUnconnectedPeers()
/// Performs a BroadcastMessage with the command `MessageCommand.GetAddr`, which, eventually, tells all known connections
/// If there are no connected peers it will try with the default, respecting MaxCountFromSeedList limit
/// </summary>
/// <param name="count">The count of peers required</param>
protected override void NeedMorePeers(int count)
{
count = Math.Max(count, MaxCountFromSeedList);
Expand Down Expand Up @@ -164,6 +169,12 @@ protected override void OnReceive(object message)
}
}

/// <summary>
/// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus.
/// Otherwise, tell the inventory to the actor of Blockchain.
/// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload
/// </summary>
/// <param name="inventory">The inventory to be relayed.</param>
private void OnRelay(IInventory inventory)
{
if (inventory is Transaction transaction)
Expand Down
4 changes: 4 additions & 0 deletions src/neo/Network/P2P/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public class Message : ISerializable
private const int CompressionMinSize = 128;
private const int CompressionThreshold = 64;

/// <summary>
/// Flags that represents whether a message is compressed.
/// 0 for None, 1 for Compressed.
/// </summary>
public MessageFlags Flags;
public MessageCommand Command;
public ISerializable Payload;
Expand Down
28 changes: 26 additions & 2 deletions src/neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,19 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p

private static readonly HashSet<IPAddress> localAddresses = new HashSet<IPAddress>();
private readonly Dictionary<IPAddress, int> ConnectedAddresses = new Dictionary<IPAddress, int>();
/// <summary>
/// A dictionary that stores the connected nodes.
/// </summary>
protected readonly ConcurrentDictionary<IActorRef, IPEndPoint> ConnectedPeers = new ConcurrentDictionary<IActorRef, IPEndPoint>();
/// <summary>
/// An ImmutableHashSet that stores the Peers received: 1) from other nodes or 2) from default file.
/// If the number of desired connections is not enough, first try to connect with the peers from this set.
/// </summary>
protected ImmutableHashSet<IPEndPoint> UnconnectedPeers = ImmutableHashSet<IPEndPoint>.Empty;
/// <summary>
/// When a TCP connection request is sent to a peer, the peer will be added to the ImmutableHashSet.
/// If a Tcp.Connected or a Tcp.CommandFailed (with TCP.Command of type Tcp.Connect) is received, the related peer will be removed.
/// </summary>
protected ImmutableHashSet<IPEndPoint> ConnectingPeers = ImmutableHashSet<IPEndPoint>.Empty;
protected HashSet<IPAddress> TrustedIpAddresses { get; } = new HashSet<IPAddress>();

Expand All @@ -65,7 +76,7 @@ static Peer()
}

/// <summary>
/// Tries to add a set ff peers to an immutable hashset of UnconnectedPeers
/// Tries to add a set of peers to the immutable ImmutableHashSet of UnconnectedPeers
/// </summary>
/// <param name="peers">Peers that the method will try to add (union) to (with) UnconnectedPeers</param>
protected void AddPeers(IEnumerable<IPEndPoint> peers)
Expand All @@ -86,6 +97,7 @@ protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false)
if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return;

if (isTrusted) TrustedIpAddresses.Add(endPoint.Address);
// If connections with the peer greater than or equal to MaxConnectionsPerAddress, return.
if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress)
return;
if (ConnectedPeers.Values.Contains(endPoint)) return;
Expand Down Expand Up @@ -189,6 +201,13 @@ private void OnStart(ChannelsConfig config)
}
}

/// <summary>
/// Will be triggered when a Tcp.Connected message is received.
/// If the conditions are met, the remote endpoint will be added to ConnectedPeers.
/// Increase the connection number with the remote endpoint by one.
/// </summary>
/// <param name="remote">The remote endpoint of TCP connection</param>
/// <param name="local">The local endpoint of TCP connection</param>
private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
{
ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote));
Expand All @@ -197,7 +216,7 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
Sender.Tell(Tcp.Abort.Instance);
return;
}

ConnectedAddresses.TryGetValue(remote.Address, out int count);
if (count >= MaxConnectionsPerAddress)
{
Expand All @@ -213,6 +232,11 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
}
}

/// <summary>
/// Will be triggered when a Tcp.CommandFailed message is received.
/// If it's a Tcp.Connect command, remove the related endpoint from ConnectingPeers.
/// </summary>
/// <param name="cmd"></param>
private void OnTcpCommandFailed(Tcp.Command cmd)
{
switch (cmd)
Expand Down
24 changes: 24 additions & 0 deletions src/neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ private void OnFilterLoadMessageReceived(FilterLoadPayload payload)
Context.Parent.Tell(new SetFilter { Filter = bloom_filter });
}

/// <summary>
/// Will be triggered when a MessageCommand.GetAddr message is received.
/// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message.
/// The message contains a list of networkAddresses from those selected random peers.
/// </summary>
private void OnGetAddrMessageReceived()
{
Random rand = new Random();
Expand All @@ -158,9 +163,16 @@ private void OnGetAddrMessageReceived()
Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
}

/// <summary>
/// Will be triggered when a MessageCommand.GetBlocks message is received.
/// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount
/// Responses are sent to RemoteNode actor as MessageCommand.Inv Message.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks requested</param>
private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
// The default value of payload.Count is -1
int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count;
TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash);
if (state == null) return;
Expand Down Expand Up @@ -198,6 +210,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetData message is received.
/// The payload includes an array of hash values.
/// For different payload.Type (Tx, Block, Consensus), get the corresponding (Tx, Block, Consensus) and tell it to RemoteNode actor.
/// </summary>
/// <param name="payload">The payload containing the requested information</param>
private void OnGetDataMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray();
Expand Down Expand Up @@ -233,6 +251,12 @@ private void OnGetDataMessageReceived(InvPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetHeaders message is received.
/// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor.
/// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks' headers requested</param>
private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
Expand Down
10 changes: 10 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP
SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray())));
}

/// <summary>
/// It defines the message queue to be used for dequeuing.
/// If the high-priority message queue is not empty, choose the high-priority message queue.
/// Otherwise, choose the low-priority message queue.
/// Finally, it sends the first message of the queue.
/// </summary>
private void CheckMessageQueue()
{
if (!verack || !ack) return;
Expand All @@ -67,6 +73,10 @@ private void EnqueueMessage(MessageCommand command, ISerializable payload = null
EnqueueMessage(Message.Create(command, payload));
}

/// <summary>
/// Add message to high priority queue or low priority queue depending on the message type.
/// </summary>
/// <param name="message">The message to be added</param>
private void EnqueueMessage(Message message)
{
bool is_single = false;
Expand Down
5 changes: 5 additions & 0 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;
/// <summary>
/// A set of known hashes for inventories or payloads already received
/// </summary>
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -228,6 +231,8 @@ private void RequestTasks(TaskSession session)
return;
}
}
// When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers
// If not HeaderTask pending to be processed it should ask for more Blocks
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.StartHeight)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
Expand Down