Skip to content

Commit

Permalink
Fix/Improve syncing (3x): Adding ping mechanism to TaskManager for re…
Browse files Browse the repository at this point in the history
…placing StartHeight and PendingKnownHashes strategy (#899)
  • Loading branch information
yongjiema authored and erikzhang committed Dec 1, 2019
1 parent 32a6320 commit 5cb2620
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 4 deletions.
60 changes: 58 additions & 2 deletions src/neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net;

Expand All @@ -19,24 +20,52 @@ namespace Neo.Network.P2P
internal class ProtocolHandler : UntypedActor
{
public class SetFilter { public BloomFilter Filter; }
internal class Timer { }

private class PendingKnownHashesCollection : KeyedCollection<UInt256, (UInt256, DateTime)>
{
protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
{
return item.Item1;
}
}

private readonly NeoSystem system;
private readonly PendingKnownHashesCollection pendingKnownHashes;
private readonly FIFOSet<UInt256> knownHashes;
private readonly FIFOSet<UInt256> sentHashes;
private VersionPayload version;
private bool verack = false;
private BloomFilter bloom_filter;

private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30);
private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1);

private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);

public ProtocolHandler(NeoSystem system)
{
this.system = system;
this.pendingKnownHashes = new PendingKnownHashesCollection();
this.knownHashes = new FIFOSet<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2);
this.sentHashes = new FIFOSet<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2);
}

protected override void OnReceive(object message)
{
if (!(message is Message msg)) return;
switch (message)
{
case Message msg:
OnMessage(msg);
break;
case Timer _:
OnTimer();
break;
}
}

private void OnMessage(Message msg)
{
foreach (IP2PPlugin plugin in Plugin.P2PPlugins)
if (!plugin.OnP2PMessage(msg))
return;
Expand Down Expand Up @@ -264,11 +293,13 @@ private void OnInventoryReceived(IInventory inventory)
{
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }, Context.Parent);
system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory });
pendingKnownHashes.Remove(inventory.Hash);
knownHashes.Add(inventory.Hash);
}

private void OnInvMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => knownHashes.Add(p) && !sentHashes.Contains(p)).ToArray();
UInt256[] hashes = payload.Hashes.Where(p => !pendingKnownHashes.Contains(p) && !knownHashes.Contains(p) && !sentHashes.Contains(p)).ToArray();
if (hashes.Length == 0) return;
switch (payload.Type)
{
Expand All @@ -282,6 +313,8 @@ private void OnInvMessageReceived(InvPayload payload)
break;
}
if (hashes.Length == 0) return;
foreach (UInt256 hash in hashes)
pendingKnownHashes.Add((hash, DateTime.UtcNow));
system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }, Context.Parent);
}

Expand Down Expand Up @@ -314,6 +347,28 @@ private void OnVersionMessageReceived(VersionPayload payload)
Context.Parent.Tell(payload);
}

private void OnTimer()
{
RefreshPendingKnownHashes();
}

protected override void PostStop()
{
timer.CancelIfNotNull();
base.PostStop();
}

private void RefreshPendingKnownHashes()
{
while (pendingKnownHashes.Count > 0)
{
var (_, time) = pendingKnownHashes[0];
if (DateTime.UtcNow - time <= PendingTimeout)
break;
pendingKnownHashes.RemoveAt(0);
}
}

public static Props Props(NeoSystem system)
{
return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox");
Expand Down Expand Up @@ -347,6 +402,7 @@ internal protected override bool IsHighPriority(object message)

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (message is ProtocolHandler.Timer) return false;
if (!(message is Message msg)) return true;
switch (msg.Command)
{
Expand Down
3 changes: 3 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ protected override void OnReceive(object message)
private void OnPingPayload(PingPayload payload)
{
if (payload.LastBlockIndex > LastBlockIndex)
{
LastBlockIndex = payload.LastBlockIndex;
system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex });
}
}

private void OnRelay(IInventory inventory)
Expand Down
31 changes: 29 additions & 2 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Neo.Ledger;
using Neo.Network.P2P.Payloads;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
Expand All @@ -14,6 +15,7 @@ namespace Neo.Network.P2P
internal class TaskManager : UntypedActor
{
public class Register { public VersionPayload Version; }
public class Update { public uint LastBlockIndex; }
public class NewTasks { public InvPayload Payload; }
public class TaskCompleted { public UInt256 Hash; }
public class HeaderTaskCompleted { }
Expand All @@ -25,6 +27,7 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;
private const int PingCoolingOffPeriod = 60; // in secconds.
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -86,6 +89,9 @@ protected override void OnReceive(object message)
case Register register:
OnRegister(register.Version);
break;
case Update update:
OnUpdate(update.LastBlockIndex);
break;
case NewTasks tasks:
OnNewTasks(tasks.Payload);
break;
Expand Down Expand Up @@ -115,6 +121,13 @@ private void OnRegister(VersionPayload version)
RequestTasks(session);
}

private void OnUpdate(uint lastBlockIndex)
{
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
session.LastBlockIndex = lastBlockIndex;
}

private void OnRestartTasks(InvPayload payload)
{
knownHashes.ExceptWith(payload.Hashes);
Expand Down Expand Up @@ -221,13 +234,13 @@ private void RequestTasks(TaskSession session)
return;
}
}
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.StartHeight)
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
IncrementGlobalTask(HeaderTaskHash);
session.RemoteNode.Tell(Message.Create(MessageCommand.GetHeaders, GetBlocksPayload.Create(Blockchain.Singleton.CurrentHeaderHash)));
}
else if (Blockchain.Singleton.Height < session.StartHeight)
else if (Blockchain.Singleton.Height < session.LastBlockIndex)
{
UInt256 hash = Blockchain.Singleton.CurrentBlockHash;
for (uint i = Blockchain.Singleton.Height + 1; i <= Blockchain.Singleton.HeaderHeight; i++)
Expand All @@ -241,6 +254,11 @@ private void RequestTasks(TaskSession session)
}
session.RemoteNode.Tell(Message.Create(MessageCommand.GetBlocks, GetBlocksPayload.Create(hash)));
}
else if (Blockchain.Singleton.HeaderHeight >= session.LastBlockIndex
&& TimeProvider.Current.UtcNow.ToTimestamp() - PingCoolingOffPeriod >= Blockchain.Singleton.GetBlock(Blockchain.Singleton.CurrentHeaderHash)?.Timestamp)
{
session.RemoteNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height)));
}
}
}

Expand All @@ -256,6 +274,7 @@ internal protected override bool IsHighPriority(object message)
switch (message)
{
case TaskManager.Register _:
case TaskManager.Update _:
case TaskManager.RestartTasks _:
return true;
case TaskManager.NewTasks tasks:
Expand All @@ -266,5 +285,13 @@ internal protected override bool IsHighPriority(object message)
return false;
}
}

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (!(message is TaskManager.NewTasks tasks)) return false;
// Remove duplicate tasks
if (queue.OfType<TaskManager.NewTasks>().Any(x => x.Payload.Type == tasks.Payload.Type && x.Payload.Hashes.SequenceEqual(tasks.Payload.Hashes))) return true;
return false;
}
}
}
2 changes: 2 additions & 0 deletions src/neo/Network/P2P/TaskSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class TaskSession

public bool HasTask => Tasks.Count > 0;
public uint StartHeight { get; }
public uint LastBlockIndex { get; set; }

public TaskSession(IActorRef node, VersionPayload version)
{
Expand All @@ -24,6 +25,7 @@ public TaskSession(IActorRef node, VersionPayload version)
this.StartHeight = version.Capabilities
.OfType<FullNodeCapability>()
.FirstOrDefault()?.StartHeight ?? 0;
this.LastBlockIndex = this.StartHeight;
}
}
}

0 comments on commit 5cb2620

Please sign in to comment.