Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding some comments to P2P classes #1212

Merged
merged 40 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a219962
Adding random hashes for OnGetDataMessageReceived
vncoelho Oct 18, 2019
9efa148
Adding static readonly Random
vncoelho Oct 18, 2019
79d9c3e
Merge branch 'master' of github.com:neo-project/neo
vncoelho Nov 8, 2019
f381677
Adding some comments to Peers class
vncoelho Nov 8, 2019
1390057
Reverting change on ProtocolHandler
vncoelho Nov 8, 2019
7172645
dotnet format
vncoelho Nov 8, 2019
8c79d93
Additional comments
vncoelho Nov 8, 2019
5d7f0fd
Adding extra comments
vncoelho Nov 10, 2019
841e199
Merge branch 'master' into adding-comments-peers
vncoelho Nov 10, 2019
871f14f
Fixing typo
lock9 Nov 14, 2019
bb89d9e
Fixing typo
lock9 Nov 14, 2019
4356483
Fixing typo
lock9 Nov 14, 2019
b968d83
Merge branch 'master' into adding-comments-peers
shargon Nov 14, 2019
6d9ff9c
Merge branch 'master' into adding-comments-peers
lock9 Nov 18, 2019
9ff443a
Merge branch 'master' into adding-comments-peers
lock9 Nov 25, 2019
1653ec0
Adding more comments
vncoelho Nov 25, 2019
d6f5a24
Merge branch 'master' into adding-comments-peers
vncoelho Nov 25, 2019
caf3e0a
adding more comments
vncoelho Nov 25, 2019
1850780
Merge branch 'adding-comments-peers' of github.com:neo-project/neo in…
vncoelho Nov 25, 2019
44d25a4
Merge branch 'master' into adding-comments-peers
vncoelho Nov 28, 2019
31976ce
Merge branch 'master' into adding-comments-peers
vncoelho Nov 28, 2019
a1c8f3d
Add some comments of P2P (#1303)
ShawnYun Nov 29, 2019
6d7b0ff
dotnet format
vncoelho Nov 29, 2019
ca3df0f
Merge branch 'master' into adding-comments-peers
vncoelho Nov 29, 2019
8dff674
Minor changes
vncoelho Nov 29, 2019
0b03c83
Minor changes
vncoelho Nov 29, 2019
cf5dacd
Additional comments
vncoelho Nov 29, 2019
cc4825b
Minor changes
vncoelho Nov 29, 2019
a68e828
Reverting variable change
vncoelho Nov 29, 2019
84061b9
Merge branch 'master' into adding-comments-peers
vncoelho Dec 3, 2019
38c63f5
Merge branch 'master' into adding-comments-peers
vncoelho Dec 4, 2019
c01c70d
Merge branch 'master' into adding-comments-peers
vncoelho Dec 5, 2019
bf28f7d
Dotnet format
vncoelho Dec 5, 2019
c9d3ad1
Minor changes
vncoelho Dec 9, 2019
8e238c5
Minor changes
vncoelho Dec 9, 2019
945d0b7
Minor changes
vncoelho Dec 9, 2019
86fd901
Minor changes
vncoelho Dec 9, 2019
055b6d4
Minor changes
vncoelho Dec 9, 2019
f17af50
Merge branch 'master' into adding-comments-peers
vncoelho Dec 9, 2019
f588b22
Merge branch 'master' into adding-comments-peers
shargon Dec 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/neo/Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,21 @@ public static ulong ToTimestampMS(this DateTime time)
return (ulong)(time.ToUniversalTime() - unixEpoch).TotalMilliseconds;
}

/// <summary>
/// Checks if address is IPv4 Maped to IPv6 format, if so, Map to IPv4.
/// Otherwise, return current address.
/// </summary>
internal static IPAddress Unmap(this IPAddress address)
{
if (address.IsIPv4MappedToIPv6)
address = address.MapToIPv4();
return address;
}

/// <summary>
/// Checks if IPEndPoint is IPv4 Maped to IPv6 format, if so, unmap to IPv4.
/// Otherwise, return current endpoint.
/// </summary>
internal static IPEndPoint Unmap(this IPEndPoint endPoint)
{
if (!endPoint.Address.IsIPv4MappedToIPv6)
Expand Down
28 changes: 28 additions & 0 deletions src/neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,21 @@ public LocalNode(NeoSystem system)
}
}

/// <summary>
/// Packs a MessageCommand to a full Message with an optional ISerializable payload.
/// Forwards it to <see cref="BroadcastMessage(Message message)"/>.
/// </summary>
/// <param name="command">The message command to be packed.</param>
/// <param name="payload">Optional payload to be Serialized along the message.</param>
private void BroadcastMessage(MessageCommand command, ISerializable payload = null)
{
BroadcastMessage(Message.Create(command, payload));
}

/// <summary>
/// Broadcast a message to all connected nodes, namely <see cref="Connections"/>.
/// </summary>
/// <param name="message">The message to be broadcasted.</param>
private void BroadcastMessage(Message message)
{
Connections.Tell(message);
Expand All @@ -87,6 +97,10 @@ private static IPEndPoint GetIPEndpointFromHostPort(string hostNameOrAddress, in
return new IPEndPoint(ipAddress, port);
}

/// <summary>
/// Return an amount of random seeds nodes from the default SeedList file defined on <see cref="ProtocolSettings"/>.
/// </summary>
/// <param name="seedsToTake">Limit of random seed nodes to be obtained, also limited by the available seeds from file.</param>
private static IEnumerable<IPEndPoint> GetIPEndPointsFromSeedList(int seedsToTake)
{
if (seedsToTake > 0)
Expand Down Expand Up @@ -122,6 +136,12 @@ public IEnumerable<IPEndPoint> GetUnconnectedPeers()
return UnconnectedPeers;
}

/// <summary>
/// Override of abstract class that is triggered when <see cref="UnconnectedPeers"/> is empty.
/// Performs a BroadcastMessage with the command `MessageCommand.GetAddr`, which, eventually, tells all known connections.
/// If there are no connected peers it will try with the default, respecting MaxCountFromSeedList limit.
/// </summary>
/// <param name="count">The count of peers required</param>
protected override void NeedMorePeers(int count)
{
count = Math.Max(count, MaxCountFromSeedList);
Expand All @@ -131,6 +151,8 @@ protected override void NeedMorePeers(int count)
}
else
{
// Will call AddPeers with default SeedList set cached on <see cref="ProtocolSettings"/>.
// It will try to add those, sequentially, to the list of currently uncconected ones.
AddPeers(GetIPEndPointsFromSeedList(count));
}
}
Expand All @@ -157,6 +179,12 @@ protected override void OnReceive(object message)
}
}

/// <summary>
/// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus.
/// Otherwise, tell the inventory to the actor of Blockchain.
/// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload.
/// </summary>
/// <param name="inventory">The inventory to be relayed.</param>
private void OnRelay(IInventory inventory)
{
if (inventory is Transaction transaction)
Expand Down
4 changes: 4 additions & 0 deletions src/neo/Network/P2P/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public class Message : ISerializable
private const int CompressionMinSize = 128;
private const int CompressionThreshold = 64;

/// <summary>
/// Flags that represents whether a message is compressed.
/// 0 for None, 1 for Compressed.
/// </summary>
public MessageFlags Flags;
public MessageCommand Command;
public ISerializable Payload;
Expand Down
39 changes: 39 additions & 0 deletions src/neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,19 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p

private static readonly HashSet<IPAddress> localAddresses = new HashSet<IPAddress>();
private readonly Dictionary<IPAddress, int> ConnectedAddresses = new Dictionary<IPAddress, int>();
/// <summary>
/// A dictionary that stores the connected nodes.
/// </summary>
protected readonly ConcurrentDictionary<IActorRef, IPEndPoint> ConnectedPeers = new ConcurrentDictionary<IActorRef, IPEndPoint>();
/// <summary>
/// An ImmutableHashSet that stores the Peers received: 1) from other nodes or 2) from default file.
/// If the number of desired connections is not enough, first try to connect with the peers from this set.
/// </summary>
protected ImmutableHashSet<IPEndPoint> UnconnectedPeers = ImmutableHashSet<IPEndPoint>.Empty;
/// <summary>
/// When a TCP connection request is sent to a peer, the peer will be added to the ImmutableHashSet.
/// If a Tcp.Connected or a Tcp.CommandFailed (with TCP.Command of type Tcp.Connect) is received, the related peer will be removed.
/// </summary>
protected ImmutableHashSet<IPEndPoint> ConnectingPeers = ImmutableHashSet<IPEndPoint>.Empty;
protected HashSet<IPAddress> TrustedIpAddresses { get; } = new HashSet<IPAddress>();

Expand All @@ -63,10 +74,16 @@ static Peer()
localAddresses.UnionWith(NetworkInterface.GetAllNetworkInterfaces().SelectMany(p => p.GetIPProperties().UnicastAddresses).Select(p => p.Address.Unmap()));
}

/// <summary>
/// Tries to add a set of peers to the immutable ImmutableHashSet of UnconnectedPeers.
/// </summary>
/// <param name="peers">Peers that the method will try to add (union) to (with) UnconnectedPeers.</param>
protected void AddPeers(IEnumerable<IPEndPoint> peers)
{
if (UnconnectedPeers.Count < UnconnectedMax)
{
// Do not select peers to be added that are already on the ConnectedPeers
// If the address is the same, the ListenerTcpPort should be different
peers = peers.Where(p => (p.Port != ListenerTcpPort || !localAddresses.Contains(p.Address)) && !ConnectedPeers.Values.Contains(p));
ImmutableInterlocked.Update(ref UnconnectedPeers, p => p.Union(peers));
}
Expand All @@ -75,9 +92,11 @@ protected void AddPeers(IEnumerable<IPEndPoint> peers)
protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false)
{
endPoint = endPoint.Unmap();
// If the address is the same, the ListenerTcpPort should be different, otherwise, return
if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return;

if (isTrusted) TrustedIpAddresses.Add(endPoint.Address);
// If connections with the peer greater than or equal to MaxConnectionsPerAddress, return.
if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress)
return;
if (ConnectedPeers.Values.Contains(endPoint)) return;
Expand All @@ -96,6 +115,10 @@ private static bool IsIntranetAddress(IPAddress address)
return (value & 0xff000000) == 0x0a000000 || (value & 0xff000000) == 0x7f000000 || (value & 0xfff00000) == 0xac100000 || (value & 0xffff0000) == 0xc0a80000 || (value & 0xffff0000) == 0xa9fe0000;
}

/// <summary>
/// Abstract method for asking for more peers. Currently triggered when UnconnectedPeers is empty.
/// </summary>
/// <param name="count">Number of peers that are being requested.</param>
protected abstract void NeedMorePeers(int count);

protected override void OnReceive(object message)
Expand Down Expand Up @@ -141,6 +164,7 @@ private void OnStart(ChannelsConfig config)
MaxConnections = config.MaxConnections;
MaxConnectionsPerAddress = config.MaxConnectionsPerAddress;

// schedule time to trigger `OnTimer` event every TimerMillisecondsInterval ms
timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, 5000, Context.Self, new Timer(), ActorRefs.NoSender);
if ((ListenerTcpPort > 0 || ListenerWsPort > 0)
&& localAddresses.All(p => !p.IsIPv4MappedToIPv6 || IsIntranetAddress(p))
Expand Down Expand Up @@ -174,6 +198,13 @@ private void OnStart(ChannelsConfig config)
}
}

/// <summary>
/// Will be triggered when a Tcp.Connected message is received.
/// If the conditions are met, the remote endpoint will be added to ConnectedPeers.
/// Increase the connection number with the remote endpoint by one.
/// </summary>
/// <param name="remote">The remote endpoint of TCP connection.</param>
/// <param name="local">The local endpoint of TCP connection.</param>
private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
{
ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote));
Expand All @@ -198,6 +229,11 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
}
}

/// <summary>
/// Will be triggered when a Tcp.CommandFailed message is received.
/// If it's a Tcp.Connect command, remove the related endpoint from ConnectingPeers.
/// </summary>
/// <param name="cmd">Tcp.Command message/event.</param>
private void OnTcpCommandFailed(Tcp.Command cmd)
{
switch (cmd)
Expand All @@ -223,7 +259,10 @@ private void OnTerminated(IActorRef actorRef)

private void OnTimer()
{
// Check if the number of desired connections is already enough
if (ConnectedPeers.Count >= MinDesiredConnections) return;

// If there aren't available UnconnectedPeers, it triggers an abstract implementation of NeedMorePeers
if (UnconnectedPeers.Count == 0)
NeedMorePeers(MinDesiredConnections - ConnectedPeers.Count);
IPEndPoint[] endpoints = UnconnectedPeers.Take(MinDesiredConnections - ConnectedPeers.Count).ToArray();
Expand Down
24 changes: 24 additions & 0 deletions src/neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ private void OnFilterLoadMessageReceived(FilterLoadPayload payload)
Context.Parent.Tell(new SetFilter { Filter = bloom_filter });
}

/// <summary>
/// Will be triggered when a MessageCommand.GetAddr message is received.
/// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message.
/// The message contains a list of networkAddresses from those selected random peers.
/// </summary>
private void OnGetAddrMessageReceived()
{
Random rand = new Random();
Expand All @@ -187,9 +192,16 @@ private void OnGetAddrMessageReceived()
Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
}

/// <summary>
/// Will be triggered when a MessageCommand.GetBlocks message is received.
/// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount
/// Responses are sent to RemoteNode actor as MessageCommand.Inv Message.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks requested.</param>
private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
// The default value of payload.Count is -1
int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count;
TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash);
if (state == null) return;
Expand Down Expand Up @@ -227,6 +239,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetData message is received.
/// The payload includes an array of hash values.
/// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor.
/// </summary>
/// <param name="payload">The payload containing the requested information.</param>
private void OnGetDataMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray();
Expand Down Expand Up @@ -262,6 +280,12 @@ private void OnGetDataMessageReceived(InvPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetHeaders message is received.
/// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor.
/// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks' headers requested.</param>
private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
Expand Down
10 changes: 10 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP
SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray())));
}

/// <summary>
/// It defines the message queue to be used for dequeuing.
/// If the high-priority message queue is not empty, choose the high-priority message queue.
/// Otherwise, choose the low-priority message queue.
/// Finally, it sends the first message of the queue.
/// </summary>
private void CheckMessageQueue()
{
if (!verack || !ack) return;
Expand All @@ -67,6 +73,10 @@ private void EnqueueMessage(MessageCommand command, ISerializable payload = null
EnqueueMessage(Message.Create(command, payload));
}

/// <summary>
/// Add message to high priority queue or low priority queue depending on the message type.
/// </summary>
/// <param name="message">The message to be added.</param>
private void EnqueueMessage(Message message)
{
bool is_single = false;
Expand Down
14 changes: 14 additions & 0 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;

private const int PingCoolingOffPeriod = 60; // in secconds.
/// <summary>
/// A set of known hashes, of inventories or payloads, already received.
/// </summary>
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -55,23 +59,28 @@ private void OnNewTasks(InvPayload payload)
{
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
// Do not accept payload of type InventoryType.TX if not synced on best known HeaderHeight
if (payload.Type == InventoryType.TX && Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight)
{
RequestTasks(session);
return;
}
HashSet<UInt256> hashes = new HashSet<UInt256>(payload.Hashes);
// Remove all previously processed knownHashes from the list that is being requested
hashes.Remove(knownHashes);
// Add to AvailableTasks the ones, of type InventoryType.Block, that are global (already under process by other sessions)
if (payload.Type == InventoryType.Block)
session.AvailableTasks.UnionWith(hashes.Where(p => globalTasks.ContainsKey(p)));

// Remove those that are already in process by other sessions
hashes.Remove(globalTasks);
if (hashes.Count == 0)
{
RequestTasks(session);
return;
}

// Update globalTasks with the ones that will be requested within this current session
foreach (UInt256 hash in hashes)
{
IncrementGlobalTask(hash);
Expand Down Expand Up @@ -214,9 +223,11 @@ public static Props Props(NeoSystem system)
private void RequestTasks(TaskSession session)
{
if (session.HasTask) return;
// If there are pending tasks of InventoryType.Block we should process them
if (session.AvailableTasks.Count > 0)
{
session.AvailableTasks.Remove(knownHashes);
// Search any similar hash that is on Singleton's knowledge, which means, on the way or already processed
session.AvailableTasks.RemoveWhere(p => Blockchain.Singleton.ContainsBlock(p));
HashSet<UInt256> hashes = new HashSet<UInt256>(session.AvailableTasks);
if (hashes.Count > 0)
Expand All @@ -234,6 +245,9 @@ private void RequestTasks(TaskSession session)
return;
}
}

// When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers
// If not HeaderTask pending to be processed it should ask for more Blocks
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
Expand Down