Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some comments of P2P #1303

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ private void BroadcastMessage(MessageCommand command, ISerializable payload = nu
BroadcastMessage(Message.Create(command, payload));
}

/// <summary>
/// Broadcast a message to all connected nodes.
/// </summary>
/// <param name="message">The message to be broadcasted.</param>
private void BroadcastMessage(Message message)
{
Connections.Tell(message);
Expand Down Expand Up @@ -127,6 +131,7 @@ public IEnumerable<IPEndPoint> GetUnconnectedPeers()
/// Performs a BroadcastMessage with the command `MessageCommand.GetAddr`, which, eventually, tells all known connections
/// If there are no connected peers it will try with the default, respecting MaxCountFromSeedList limit
/// </summary>
/// <param name="count">The count of peers required</param>
protected override void NeedMorePeers(int count)
{
count = Math.Max(count, MaxCountFromSeedList);
Expand Down Expand Up @@ -164,6 +169,12 @@ protected override void OnReceive(object message)
}
}

/// <summary>
/// There are three implementations of IInventory. Block, ConsensusPayload and Transaction.
/// For Transaction, it will tell Transaction to the actor of Consensus.
/// Otherwise, tell the inventory to the actor of Blockchain.
/// </summary>
/// <param name="inventory">The inventory to be relayed.</param>
private void OnRelay(IInventory inventory)
{
if (inventory is Transaction transaction)
Expand Down
4 changes: 4 additions & 0 deletions src/neo/Network/P2P/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public class Message : ISerializable
private const int CompressionMinSize = 128;
private const int CompressionThreshold = 64;

/// <summary>
/// Flags that represents whether a message is compressed.
/// 0 for None, 1 for Compressed.
/// </summary>
public MessageFlags Flags;
public MessageCommand Command;
public ISerializable Payload;
Expand Down
29 changes: 27 additions & 2 deletions src/neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,19 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p

private static readonly HashSet<IPAddress> localAddresses = new HashSet<IPAddress>();
private readonly Dictionary<IPAddress, int> ConnectedAddresses = new Dictionary<IPAddress, int>();
/// <summary>
/// A dictionary that stores the connected nodes.
/// </summary>
protected readonly ConcurrentDictionary<IActorRef, IPEndPoint> ConnectedPeers = new ConcurrentDictionary<IActorRef, IPEndPoint>();
/// <summary>
/// A ImmutableHashSet that stores the Peers received from other nodes.
/// If the number of desired connections is not enough, first try to connect with the peers in UnconnectedPeers.
/// </summary>
protected ImmutableHashSet<IPEndPoint> UnconnectedPeers = ImmutableHashSet<IPEndPoint>.Empty;
/// <summary>
/// When a TCP connection request is sent to a peer, the peer will be added to the ImmutableHashSet.
/// If a Tcp.Connected or a Tcp.CommandFailed is received, the related peer will be removed.
/// </summary>
protected ImmutableHashSet<IPEndPoint> ConnectingPeers = ImmutableHashSet<IPEndPoint>.Empty;
protected HashSet<IPAddress> TrustedIpAddresses { get; } = new HashSet<IPAddress>();

Expand All @@ -65,7 +76,7 @@ static Peer()
}

/// <summary>
/// Tries to add a set ff peers to an immutable hashset of UnconnectedPeers
/// Tries to add a set of peers to an immutable hashset of UnconnectedPeers
/// </summary>
/// <param name="peers">Peers that the method will try to add (union) to (with) UnconnectedPeers</param>
protected void AddPeers(IEnumerable<IPEndPoint> peers)
Expand All @@ -86,6 +97,7 @@ protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false)
if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return;

if (isTrusted) TrustedIpAddresses.Add(endPoint.Address);
// If connections with the peer greater than or equal to MaxConnectionsPerAddress, return.
if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress)
return;
if (ConnectedPeers.Values.Contains(endPoint)) return;
Expand Down Expand Up @@ -189,6 +201,13 @@ private void OnStart(ChannelsConfig config)
}
}

/// <summary>
/// Will be triggered when a Tcp.Connected message is received.
/// If the conditions are met, the remote endpoint will be removed from ConnectingPeers and added to ConnectedPeers.
/// Increase the connection number with the remote endpoint by one.
/// </summary>
/// <param name="remote">The remote endpoint of TCP connection</param>
/// <param name="local">The local endpoint of TCP connection</param>
private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
{
ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote));
Expand All @@ -197,7 +216,7 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
Sender.Tell(Tcp.Abort.Instance);
return;
}

// If connections with the peer greater than or equal to MaxConnectionsPerAddress, abort the TCP connection.
ConnectedAddresses.TryGetValue(remote.Address, out int count);
if (count >= MaxConnectionsPerAddress)
{
Expand All @@ -206,13 +225,19 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
else
{
ConnectedAddresses[remote.Address] = count + 1;
// An actor reference of the connection.
IActorRef connection = Context.ActorOf(ProtocolProps(Sender, remote, local), $"connection_{Guid.NewGuid()}");
Context.Watch(connection);
Sender.Tell(new Tcp.Register(connection));
ConnectedPeers.TryAdd(connection, remote);
}
}

/// <summary>
/// Will be triggered when a Tcp.CommandFailed message is received.
/// If it's a Tcp.Connect command, remove the related endpoint from ConnectingPeers.
/// </summary>
/// <param name="cmd"></param>
private void OnTcpCommandFailed(Tcp.Command cmd)
{
switch (cmd)
Expand Down
21 changes: 21 additions & 0 deletions src/neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ private void OnFilterLoadMessageReceived(FilterLoadPayload payload)
Context.Parent.Tell(new SetFilter { Filter = bloom_filter });
}

/// <summary>
/// Will be triggered when a MessageCommand.GetAddr message is received.
/// Randomly select nodes from the local RemoteNodes, package and tell to RemoteNode actor as MessageCommand.Addr Message.
/// </summary>
private void OnGetAddrMessageReceived()
{
Random rand = new Random();
Expand All @@ -158,9 +162,15 @@ private void OnGetAddrMessageReceived()
Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
}

/// <summary>
/// Will be triggered when a MessageCommand.GetBlocks message is received.
/// Tell the specified number of blocks' hashes starting with the requested HashStart to RemoteNode actor as MessageCommand.Inv Message.
/// </summary>
/// <param name="payload">Getsblockpayload, including start block Hash and number of blocks requested</param>
private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
// The default value of payload.Count is -1
int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count;
TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash);
if (state == null) return;
Expand Down Expand Up @@ -198,6 +208,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetData message is received.
/// The payload includes a array of hash values.
/// For different payload.Type (Tx, Block, Consensus), get the corresponding (Tx, Block, Consensus) and tell it to RemoteNode actor.
/// </summary>
/// <param name="payload">The payload containing the request information</param>
private void OnGetDataMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray();
Expand Down Expand Up @@ -233,6 +249,11 @@ private void OnGetDataMessageReceived(InvPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetHeaders message is received.
/// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor.
/// </summary>
/// <param name="payload">Getsblockpayload, including start block Hash and number of blocks' headers requested</param>
private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
Expand Down
10 changes: 10 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP
SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray())));
}

/// <summary>
/// Check the message queue.
/// If the high-priority message queue is not empty, choose the high-priority message queue,.
/// Otherwise, choose the low-priority message queue.
/// Send the first message of the queue.
/// </summary>
private void CheckMessageQueue()
{
if (!verack || !ack) return;
Expand All @@ -67,6 +73,10 @@ private void EnqueueMessage(MessageCommand command, ISerializable payload = null
EnqueueMessage(Message.Create(command, payload));
}

/// <summary>
/// Add message to high priority queue or low priority queue depending on the message type.
/// </summary>
/// <param name="message">The message to be added</param>
private void EnqueueMessage(Message message)
{
bool is_single = false;
Expand Down
4 changes: 4 additions & 0 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;
/// <summary>
/// The hashes corresponding to the completed tasks.
vncoelho marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -228,6 +231,7 @@ private void RequestTasks(TaskSession session)
return;
}
}
// When the number of AvailableTasks is no more than 0, processing the task of getting the headers and blocks.
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.StartHeight)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
Expand Down