Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/Improve syncing (3x): Adding ping mechanism to TaskManager for replacing StartHeight and PendingKnownHashes strategy #899

Merged
merged 33 commits into from
Dec 1, 2019
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
99ca3f2
Drop the duplicate NewTasks for the TaskManagerMailbox
yongjiema Jul 9, 2019
2048e7b
Try to get headers from remote nodes if the current header is not upd…
yongjiema Jul 9, 2019
079a74b
Merge branch 'master' into fix-sync-stuck
shargon Jul 9, 2019
755ab15
Update TaskManager.cs
shargon Jul 9, 2019
4ed0e22
Update TaskManager.cs
shargon Jul 9, 2019
4b410ad
Merge branch 'master' into fix-sync-stuck
vncoelho Jul 11, 2019
64dd8bd
Merge branch 'master' into fix-sync-stuck
shargon Jul 30, 2019
58d2ff0
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 7, 2019
9e271e1
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 9, 2019
7b32b10
Merge branch 'master' into fix-sync-stuck
shargon Aug 10, 2019
ae46f80
Merge branch 'master' into fix-sync-stuck
shargon Aug 12, 2019
70b2072
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 12, 2019
b44e1eb
Try to get headers from remote nodes even the current header height i…
yongjiema Aug 13, 2019
bf01bab
Correct the timestamp usage according to NEO3
yongjiema Aug 16, 2019
261e7ff
Merge branch 'master' into fix-sync-stuck
erikzhang Aug 18, 2019
152a7cb
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 19, 2019
cd5cb5f
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 28, 2019
692a9ad
Merge branch 'master' into fix-sync-stuck
vncoelho Sep 24, 2019
11e811c
Merge branch 'master' into fix-sync-stuck
lock9 Sep 26, 2019
8196dff
Improve syncing
yongjiema Sep 29, 2019
0e21d78
Merge branch 'master' into fix-sync-stuck
vncoelho Oct 23, 2019
bd3b3fc
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 4, 2019
fe2d98a
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 10, 2019
100b30d
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 13, 2019
ebe25e2
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 13, 2019
fd0e712
Merge branch 'master' into fix-sync-stuck
lock9 Nov 13, 2019
9a73bf8
Merge branch 'master' into fix-sync-stuck
erikzhang Nov 25, 2019
04fdfa8
Update ProtocolHandler.cs
erikzhang Nov 25, 2019
1d985e0
Update ProtocolHandler.cs
erikzhang Nov 25, 2019
daf7141
move projects
erikzhang Nov 26, 2019
75f4470
Merge branch 'master' into fix-sync-stuck
erikzhang Nov 26, 2019
e18184e
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 29, 2019
98f11f8
Merge branch 'master' into fix-sync-stuck
erikzhang Dec 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions src/neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net;

Expand All @@ -19,24 +20,52 @@ namespace Neo.Network.P2P
internal class ProtocolHandler : UntypedActor
{
public class SetFilter { public BloomFilter Filter; }
internal class Timer { }

private class PendingKnownHashesCollection : KeyedCollection<UInt256, (UInt256, DateTime)>
{
protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
{
return item.Item1;
}
}

private readonly NeoSystem system;
private readonly PendingKnownHashesCollection pendingKnownHashes;
private readonly FIFOSet<UInt256> knownHashes;
private readonly FIFOSet<UInt256> sentHashes;
private VersionPayload version;
private bool verack = false;
private BloomFilter bloom_filter;

private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30);
private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1);

private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);

public ProtocolHandler(NeoSystem system)
{
this.system = system;
this.pendingKnownHashes = new PendingKnownHashesCollection();
this.knownHashes = new FIFOSet<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2);
this.sentHashes = new FIFOSet<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2);
}

protected override void OnReceive(object message)
{
if (!(message is Message msg)) return;
switch (message)
{
case Message msg:
OnMessage(msg);
break;
case Timer _:
OnTimer();
break;
}
}

private void OnMessage(Message msg)
{
foreach (IP2PPlugin plugin in Plugin.P2PPlugins)
if (!plugin.OnP2PMessage(msg))
return;
Expand Down Expand Up @@ -264,11 +293,13 @@ private void OnInventoryReceived(IInventory inventory)
{
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }, Context.Parent);
system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory });
pendingKnownHashes.Remove(inventory.Hash);
knownHashes.Add(inventory.Hash);
}

private void OnInvMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => knownHashes.Add(p) && !sentHashes.Contains(p)).ToArray();
UInt256[] hashes = payload.Hashes.Where(p => !pendingKnownHashes.Contains(p) && !knownHashes.Contains(p) && !sentHashes.Contains(p)).ToArray();
if (hashes.Length == 0) return;
switch (payload.Type)
{
Expand All @@ -282,6 +313,8 @@ private void OnInvMessageReceived(InvPayload payload)
break;
}
if (hashes.Length == 0) return;
foreach (UInt256 hash in hashes)
pendingKnownHashes.Add((hash, DateTime.UtcNow));
system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }, Context.Parent);
}

Expand Down Expand Up @@ -314,6 +347,28 @@ private void OnVersionMessageReceived(VersionPayload payload)
Context.Parent.Tell(payload);
}

private void OnTimer()
{
RefreshPendingKnownHashes();
}

protected override void PostStop()
{
timer.CancelIfNotNull();
base.PostStop();
}

private void RefreshPendingKnownHashes()
{
while (pendingKnownHashes.Count > 0)
{
var (_, time) = pendingKnownHashes[0];
if (DateTime.UtcNow - time <= PendingTimeout)
break;
pendingKnownHashes.RemoveAt(0);
}
}

public static Props Props(NeoSystem system)
{
return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox");
Expand Down Expand Up @@ -347,6 +402,7 @@ internal protected override bool IsHighPriority(object message)

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (message is ProtocolHandler.Timer) return false;
if (!(message is Message msg)) return true;
switch (msg.Command)
{
Expand Down
3 changes: 3 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ protected override void OnReceive(object message)
private void OnPingPayload(PingPayload payload)
{
if (payload.LastBlockIndex > LastBlockIndex)
{
LastBlockIndex = payload.LastBlockIndex;
system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex });
}
}

private void OnRelay(IInventory inventory)
Expand Down
31 changes: 29 additions & 2 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Neo.Ledger;
using Neo.Network.P2P.Payloads;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
Expand All @@ -14,6 +15,7 @@ namespace Neo.Network.P2P
internal class TaskManager : UntypedActor
{
public class Register { public VersionPayload Version; }
public class Update { public uint LastBlockIndex; }
public class NewTasks { public InvPayload Payload; }
public class TaskCompleted { public UInt256 Hash; }
public class HeaderTaskCompleted { }
Expand All @@ -25,6 +27,7 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;
private const int PingCoolingOffPeriod = 60; // in secconds.
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -86,6 +89,9 @@ protected override void OnReceive(object message)
case Register register:
OnRegister(register.Version);
break;
case Update update:
OnUpdate(update.LastBlockIndex);
break;
case NewTasks tasks:
OnNewTasks(tasks.Payload);
break;
Expand Down Expand Up @@ -115,6 +121,13 @@ private void OnRegister(VersionPayload version)
RequestTasks(session);
}

private void OnUpdate(uint lastBlockIndex)
{
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
session.LastBlockIndex = lastBlockIndex;
}

private void OnRestartTasks(InvPayload payload)
{
knownHashes.ExceptWith(payload.Hashes);
Expand Down Expand Up @@ -221,13 +234,13 @@ private void RequestTasks(TaskSession session)
return;
}
}
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.StartHeight)
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
IncrementGlobalTask(HeaderTaskHash);
session.RemoteNode.Tell(Message.Create(MessageCommand.GetHeaders, GetBlocksPayload.Create(Blockchain.Singleton.CurrentHeaderHash)));
}
else if (Blockchain.Singleton.Height < session.StartHeight)
else if (Blockchain.Singleton.Height < session.LastBlockIndex)
{
UInt256 hash = Blockchain.Singleton.CurrentBlockHash;
for (uint i = Blockchain.Singleton.Height + 1; i <= Blockchain.Singleton.HeaderHeight; i++)
Expand All @@ -241,6 +254,11 @@ private void RequestTasks(TaskSession session)
}
session.RemoteNode.Tell(Message.Create(MessageCommand.GetBlocks, GetBlocksPayload.Create(hash)));
}
else if (Blockchain.Singleton.HeaderHeight >= session.LastBlockIndex
&& TimeProvider.Current.UtcNow.ToTimestamp() - PingCoolingOffPeriod >= Blockchain.Singleton.GetBlock(Blockchain.Singleton.CurrentHeaderHash)?.Timestamp)
{
session.RemoteNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height)));
}
}
}

Expand All @@ -256,6 +274,7 @@ internal protected override bool IsHighPriority(object message)
switch (message)
{
case TaskManager.Register _:
case TaskManager.Update _:
case TaskManager.RestartTasks _:
return true;
case TaskManager.NewTasks tasks:
Expand All @@ -266,5 +285,13 @@ internal protected override bool IsHighPriority(object message)
return false;
}
}

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (!(message is TaskManager.NewTasks tasks)) return false;
// Remove duplicate tasks
if (queue.OfType<TaskManager.NewTasks>().Any(x => x.Payload.Type == tasks.Payload.Type && x.Payload.Hashes.SequenceEqual(tasks.Payload.Hashes))) return true;
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
}
}
2 changes: 2 additions & 0 deletions src/neo/Network/P2P/TaskSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class TaskSession

public bool HasTask => Tasks.Count > 0;
public uint StartHeight { get; }
public uint LastBlockIndex { get; set; }

public TaskSession(IActorRef node, VersionPayload version)
{
Expand All @@ -24,6 +25,7 @@ public TaskSession(IActorRef node, VersionPayload version)
this.StartHeight = version.Capabilities
.OfType<FullNodeCapability>()
.FirstOrDefault()?.StartHeight ?? 0;
this.LastBlockIndex = this.StartHeight;
}
}
}