diff --git a/src/neo/IO/Caching/IndexedQueue.cs b/src/neo/IO/Caching/IndexedQueue.cs
new file mode 100644
index 0000000000..b01c6ffea9
--- /dev/null
+++ b/src/neo/IO/Caching/IndexedQueue.cs
@@ -0,0 +1,233 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Neo.IO.Caching
+{
+ ///
+ /// Represents a queue with indexed access to the items
+ ///
+ /// The type of items in the queue
+ class IndexedQueue : IReadOnlyCollection
+ {
+ private const int DefaultCapacity = 16;
+ private const int GrowthFactor = 2;
+ private const float TrimThreshold = 0.9f;
+
+ private T[] _array;
+ private int _head;
+ private int _count;
+
+ ///
+ /// Indicates the count of items in the queue
+ ///
+ public int Count => _count;
+
+ ///
+ /// Creates a queue with the default capacity
+ ///
+ public IndexedQueue() : this(DefaultCapacity)
+ {
+ }
+
+ ///
+ /// Creates a queue with the specified capacity
+ ///
+ /// The initial capacity of the queue
+ public IndexedQueue(int capacity)
+ {
+ if (capacity <= 0)
+ throw new ArgumentOutOfRangeException(nameof(capacity), "The capacity must be greater than zero.");
+ _array = new T[capacity];
+ _head = 0;
+ _count = 0;
+ }
+
+ ///
+ /// Creates a queue filled with the specified items
+ ///
+ /// The collection of items to fill the queue with
+ public IndexedQueue(IEnumerable collection)
+ {
+ _array = collection.ToArray();
+ _head = 0;
+ _count = _array.Length;
+ }
+
+ ///
+ /// Gets the value at the index
+ ///
+ /// The index
+ /// The value at the specified index
+ public ref T this[int index]
+ {
+ get
+ {
+ if (index < 0 || index >= _count)
+ throw new IndexOutOfRangeException();
+ return ref _array[(index + _head) % _array.Length];
+ }
+ }
+
+ ///
+ /// Inserts an item at the rear of the queue
+ ///
+ /// The item to insert
+ public void Enqueue(T item)
+ {
+ if (_array.Length == _count)
+ {
+ int newSize = _array.Length * GrowthFactor;
+ if (_head == 0)
+ {
+ Array.Resize(ref _array, newSize);
+ }
+ else
+ {
+ T[] buffer = new T[newSize];
+ Array.Copy(_array, _head, buffer, 0, _array.Length - _head);
+ Array.Copy(_array, 0, buffer, _array.Length - _head, _head);
+ _array = buffer;
+ _head = 0;
+ }
+ }
+ _array[(_head + _count) % _array.Length] = item;
+ ++_count;
+ }
+
+ ///
+ /// Provides access to the item at the front of the queue without dequeueing it
+ ///
+ /// The frontmost item
+ public T Peek()
+ {
+ if (_count == 0)
+ throw new InvalidOperationException("The queue is empty.");
+ return _array[_head];
+ }
+
+ ///
+ /// Attempts to return an item from the front of the queue without removing it
+ ///
+ /// The item
+ /// True if the queue returned an item or false if the queue is empty
+ public bool TryPeek(out T item)
+ {
+ if (_count == 0)
+ {
+ item = default;
+ return false;
+ }
+ else
+ {
+ item = _array[_head];
+ return true;
+ }
+ }
+
+ ///
+ /// Removes an item from the front of the queue, returning it
+ ///
+ /// The item that was removed
+ public T Dequeue()
+ {
+ if (_count == 0)
+ throw new InvalidOperationException("The queue is empty");
+ T result = _array[_head];
+ ++_head;
+ _head %= _array.Length;
+ --_count;
+ return result;
+ }
+
+ ///
+ /// Attempts to return an item from the front of the queue, removing it
+ ///
+ /// The item
+ /// True if the queue returned an item or false if the queue is empty
+ public bool TryDequeue(out T item)
+ {
+ if (_count == 0)
+ {
+ item = default;
+ return false;
+ }
+ else
+ {
+ item = _array[_head];
+ ++_head;
+ _head %= _array.Length;
+ --_count;
+ return true;
+ }
+ }
+
+ ///
+ /// Clears the items from the queue
+ ///
+ public void Clear()
+ {
+ _head = 0;
+ _count = 0;
+ }
+
+ ///
+ /// Trims the extra array space that isn't being used.
+ ///
+ public void TrimExcess()
+ {
+ if (_count == 0)
+ {
+ _array = new T[DefaultCapacity];
+ }
+ else if (_array.Length * TrimThreshold >= _count)
+ {
+ T[] arr = new T[_count];
+ CopyTo(arr, 0);
+ _array = arr;
+ _head = 0;
+ }
+ }
+
+ ///
+ /// Copys the queue's items to a destination array
+ ///
+ /// The destination array
+ /// The index in the destination to start copying at
+ public void CopyTo(T[] array, int arrayIndex)
+ {
+ if (array is null) throw new ArgumentNullException(nameof(array));
+ if (arrayIndex < 0 || arrayIndex + _count > array.Length)
+ throw new ArgumentOutOfRangeException(nameof(arrayIndex));
+ if (_head + _count <= _array.Length)
+ {
+ Array.Copy(_array, _head, array, arrayIndex, _count);
+ }
+ else
+ {
+ Array.Copy(_array, _head, array, arrayIndex, _array.Length - _head);
+ Array.Copy(_array, 0, array, arrayIndex + _array.Length - _head, _count + _head - _array.Length);
+ }
+ }
+
+ ///
+ /// Returns an array of the items in the queue
+ ///
+ /// An array containing the queue's items
+ public T[] ToArray()
+ {
+ T[] result = new T[_count];
+ CopyTo(result, 0);
+ return result;
+ }
+
+ public IEnumerator GetEnumerator()
+ {
+ for (int i = 0; i < _count; i++)
+ yield return _array[(_head + i) % _array.Length];
+ }
+
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+ }
+}
diff --git a/src/neo/Ledger/Blockchain.cs b/src/neo/Ledger/Blockchain.cs
index 8a3ac9c6d4..8a923e87d0 100644
--- a/src/neo/Ledger/Blockchain.cs
+++ b/src/neo/Ledger/Blockchain.cs
@@ -16,6 +16,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
+using System.Diagnostics;
using System.Linq;
using System.Threading;
@@ -74,6 +75,7 @@ private class UnverifiedBlocksList { public LinkedList Blocks = new Linke
///
public DataCache View => new SnapshotCache(Store);
public MemoryPool MemPool { get; }
+ public HeaderCache HeaderCache { get; } = new HeaderCache();
private static Blockchain singleton;
public static Blockchain Singleton
@@ -124,6 +126,12 @@ public Blockchain(NeoSystem system, IStore store)
}
}
+ protected override void PostStop()
+ {
+ base.PostStop();
+ HeaderCache.Dispose();
+ }
+
private bool ContainsTransaction(UInt256 hash)
{
if (MemPool.ContainsKey(hash)) return true;
@@ -221,32 +229,84 @@ private VerifyResult OnNewBlock(Block block)
{
DataCache snapshot = View;
uint currentHeight = NativeContract.Ledger.CurrentIndex(snapshot);
+ uint headerHeight = HeaderCache.Last?.Index ?? NativeContract.Ledger.CurrentIndex(snapshot);
if (block.Index <= currentHeight)
return VerifyResult.AlreadyExists;
- if (block.Index - 1 > currentHeight)
+ if (block.Index - 1 > headerHeight)
{
AddUnverifiedBlockToCache(block);
return VerifyResult.UnableToVerify;
}
- if (block.Index == currentHeight + 1)
+ if (block.Index == headerHeight + 1)
{
if (!block.Verify(snapshot))
return VerifyResult.Invalid;
- block_cache.TryAdd(block.Hash, block);
- block_cache_unverified.Remove(block.Index);
- Persist(block);
- if (block_cache_unverified.TryGetValue(block.Index + 1, out var unverifiedBlocks))
+ }
+ else
+ {
+ if (!block.Hash.Equals(HeaderCache[block.Index].Hash))
+ return VerifyResult.Invalid;
+ }
+ block_cache.TryAdd(block.Hash, block);
+ if (block.Index == currentHeight + 1)
+ {
+ Block block_persist = block;
+ List blocksToPersistList = new List();
+ while (true)
+ {
+ blocksToPersistList.Add(block_persist);
+ if (block_persist.Index + 1 > headerHeight) break;
+ UInt256 hash = HeaderCache[block_persist.Index + 1].Hash;
+ if (!block_cache.TryGetValue(hash, out block_persist)) break;
+ }
+
+ int blocksPersisted = 0;
+ // 15000 is the default among of seconds per block, while MilliSecondsPerBlock is the current
+ uint extraBlocks = (15000 - MillisecondsPerBlock) / 1000;
+ foreach (Block blockToPersist in blocksToPersistList)
+ {
+ block_cache_unverified.Remove(blockToPersist.Index);
+ Persist(blockToPersist);
+
+ if (blocksPersisted++ < blocksToPersistList.Count - (2 + Math.Max(0, extraBlocks))) continue;
+ // Empirically calibrated for relaying the most recent 2 blocks persisted with 15s network
+ // Increase in the rate of 1 block per second in configurations with faster blocks
+
+ if (blockToPersist.Index + 99 >= headerHeight)
+ system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = blockToPersist });
+ }
+ if (block_cache_unverified.TryGetValue(currentHeight + 1, out var unverifiedBlocks))
{
foreach (var unverifiedBlock in unverifiedBlocks.Blocks)
Self.Tell(unverifiedBlock, ActorRefs.NoSender);
block_cache_unverified.Remove(block.Index + 1);
}
- // We can store the new block in block_cache and tell the new height to other nodes after Persist().
- system.LocalNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(block.Index)));
+ }
+ else
+ {
+ if (block.Index + 99 >= headerHeight)
+ system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = block });
+ if (block.Index == headerHeight + 1)
+ HeaderCache.Add(block.Header);
}
return VerifyResult.Succeed;
}
+ private void OnNewHeaders(Header[] headers)
+ {
+ if (HeaderCache.Full) return;
+ DataCache snapshot = View;
+ uint headerHeight = HeaderCache.Last?.Index ?? NativeContract.Ledger.CurrentIndex(snapshot);
+ foreach (Header header in headers)
+ {
+ if (header.Index > headerHeight + 1) break;
+ if (header.Index < headerHeight + 1) continue;
+ if (!header.Verify(snapshot)) break;
+ HeaderCache.Add(header);
+ ++headerHeight;
+ }
+ }
+
private VerifyResult OnNewInventory(IInventory inventory)
{
if (!inventory.Verify(View)) return VerifyResult.Invalid;
@@ -278,6 +338,9 @@ protected override void OnReceive(object message)
case FillMemoryPool fill:
OnFillMemoryPool(fill.Transactions);
break;
+ case Header[] headers:
+ OnNewHeaders(headers);
+ break;
case Block block:
OnInventory(block, false);
break;
@@ -373,6 +436,8 @@ private void Persist(Block block)
}
block_cache.TryRemove(block.PrevHash, out _);
Context.System.EventStream.Publish(new PersistCompleted { Block = block });
+ if (HeaderCache.TryRemoveFirst(out Header header))
+ Debug.Assert(header.Index == block.Index);
}
public static Props Props(NeoSystem system, IStore store)
@@ -431,6 +496,7 @@ internal protected override bool IsHighPriority(object message)
{
switch (message)
{
+ case Header[] _:
case Block _:
case ExtensiblePayload _:
case Terminated _:
diff --git a/src/neo/Ledger/HeaderCache.cs b/src/neo/Ledger/HeaderCache.cs
new file mode 100644
index 0000000000..ea3380baa9
--- /dev/null
+++ b/src/neo/Ledger/HeaderCache.cs
@@ -0,0 +1,102 @@
+using Neo.IO.Caching;
+using Neo.Network.P2P.Payloads;
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Neo.Ledger
+{
+ public sealed class HeaderCache : IDisposable, IEnumerable
+ {
+ private readonly IndexedQueue headers = new IndexedQueue();
+ private readonly ReaderWriterLockSlim readerWriterLock = new ReaderWriterLockSlim();
+
+ public Header this[uint index]
+ {
+ get
+ {
+ readerWriterLock.EnterReadLock();
+ try
+ {
+ if (headers.Count == 0) return null;
+ uint firstIndex = headers[0].Index;
+ if (index < firstIndex) return null;
+ index -= firstIndex;
+ if (index >= headers.Count) return null;
+ return headers[(int)index];
+ }
+ finally
+ {
+ readerWriterLock.ExitReadLock();
+ }
+ }
+ }
+
+ public bool Full => headers.Count >= 10000;
+
+ public Header Last
+ {
+ get
+ {
+ readerWriterLock.EnterReadLock();
+ try
+ {
+ if (headers.Count == 0) return null;
+ return headers[^1];
+ }
+ finally
+ {
+ readerWriterLock.ExitReadLock();
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ readerWriterLock.Dispose();
+ }
+
+ internal void Add(Header header)
+ {
+ readerWriterLock.EnterWriteLock();
+ try
+ {
+ headers.Enqueue(header);
+ }
+ finally
+ {
+ readerWriterLock.ExitWriteLock();
+ }
+ }
+
+ internal bool TryRemoveFirst(out Header header)
+ {
+ readerWriterLock.EnterWriteLock();
+ try
+ {
+ return headers.TryDequeue(out header);
+ }
+ finally
+ {
+ readerWriterLock.ExitWriteLock();
+ }
+ }
+
+ public IEnumerator GetEnumerator()
+ {
+ readerWriterLock.EnterReadLock();
+ try
+ {
+ foreach (Header header in headers)
+ yield return header;
+ }
+ finally
+ {
+ readerWriterLock.ExitReadLock();
+ }
+ }
+
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+ }
+}
diff --git a/src/neo/Network/P2P/MessageCommand.cs b/src/neo/Network/P2P/MessageCommand.cs
index 04856c4a48..c66c15799a 100644
--- a/src/neo/Network/P2P/MessageCommand.cs
+++ b/src/neo/Network/P2P/MessageCommand.cs
@@ -33,6 +33,7 @@ public enum MessageCommand : byte
GetData = 0x28,
[ReflectionCache(typeof(GetBlockByIndexPayload))]
GetBlockByIndex = 0x29,
+ [ReflectionCache(typeof(InvPayload))]
NotFound = 0x2a,
[ReflectionCache(typeof(Transaction))]
Transaction = 0x2b,
diff --git a/src/neo/Network/P2P/Payloads/BlockBase.cs b/src/neo/Network/P2P/Payloads/BlockBase.cs
index 08b2657295..a073858cf6 100644
--- a/src/neo/Network/P2P/Payloads/BlockBase.cs
+++ b/src/neo/Network/P2P/Payloads/BlockBase.cs
@@ -1,5 +1,6 @@
using Neo.IO;
using Neo.IO.Json;
+using Neo.Ledger;
using Neo.Persistence;
using Neo.SmartContract;
using Neo.SmartContract.Native;
@@ -76,7 +77,7 @@ void IVerifiable.DeserializeUnsigned(BinaryReader reader)
UInt160[] IVerifiable.GetScriptHashesForVerifying(DataCache snapshot)
{
if (PrevHash == UInt256.Zero) return new[] { Witness.ScriptHash };
- TrimmedBlock prev = NativeContract.Ledger.GetTrimmedBlock(snapshot, PrevHash);
+ BlockBase prev = Blockchain.Singleton.HeaderCache[Index - 1] ?? (BlockBase)NativeContract.Ledger.GetTrimmedBlock(snapshot, PrevHash);
if (prev is null) throw new InvalidOperationException();
return new[] { prev.NextConsensus };
}
@@ -114,9 +115,9 @@ public virtual JObject ToJson()
public virtual bool Verify(DataCache snapshot)
{
- TrimmedBlock prev = NativeContract.Ledger.GetTrimmedBlock(snapshot, PrevHash);
+ var prev = Blockchain.Singleton.HeaderCache[Index - 1] ?? NativeContract.Ledger.GetHeader(snapshot, Index - 1);
if (prev is null) return false;
- if (prev.Index + 1 != Index) return false;
+ if (prev.Hash != PrevHash) return false;
if (prev.Timestamp >= Timestamp) return false;
if (!this.VerifyWitnesses(snapshot, 1_00000000)) return false;
return true;
diff --git a/src/neo/Network/P2P/Payloads/HeadersPayload.cs b/src/neo/Network/P2P/Payloads/HeadersPayload.cs
index 2e95d4ebf4..9251a7f5a1 100644
--- a/src/neo/Network/P2P/Payloads/HeadersPayload.cs
+++ b/src/neo/Network/P2P/Payloads/HeadersPayload.cs
@@ -1,4 +1,5 @@
using Neo.IO;
+using System;
using System.IO;
namespace Neo.Network.P2P.Payloads
@@ -22,6 +23,7 @@ public static HeadersPayload Create(params Header[] headers)
void ISerializable.Deserialize(BinaryReader reader)
{
Headers = reader.ReadSerializableArray(MaxHeadersCount);
+ if (Headers.Length == 0) throw new FormatException();
}
void ISerializable.Serialize(BinaryWriter writer)
diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
index fb9fd5cc5b..9c3749d415 100644
--- a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
+++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
@@ -34,8 +34,6 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
private BloomFilter bloom_filter;
private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30);
- private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1);
-
private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);
private void OnMessage(Message msg)
@@ -90,6 +88,9 @@ private void OnMessage(Message msg)
case MessageCommand.GetHeaders:
OnGetHeadersMessageReceived((GetBlockByIndexPayload)msg.Payload);
break;
+ case MessageCommand.Headers:
+ OnHeadersMessageReceived((HeadersPayload)msg.Payload);
+ break;
case MessageCommand.Inv:
OnInvMessageReceived((InvPayload)msg.Payload);
break;
@@ -110,7 +111,6 @@ private void OnMessage(Message msg)
case MessageCommand.Version:
throw new ProtocolViolationException();
case MessageCommand.Alert:
- case MessageCommand.Headers:
case MessageCommand.MerkleBlock:
case MessageCommand.NotFound:
case MessageCommand.Reject:
@@ -269,7 +269,7 @@ private void OnGetDataMessageReceived(InvPayload payload)
/// Tell the specified number of blocks' headers starting with the requested IndexStart to RemoteNode actor.
/// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count.
///
- /// A GetBlocksPayload including start block index and number of blocks' headers requested.
+ /// A GetBlockByIndexPayload including start block index and number of blocks' headers requested.
private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload)
{
DataCache snapshot = Blockchain.Singleton.View;
@@ -286,13 +286,20 @@ private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload)
EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray())));
}
+ private void OnHeadersMessageReceived(HeadersPayload payload)
+ {
+ UpdateLastBlockIndex(payload.Headers[^1].Index);
+ system.TaskManager.Tell(payload.Headers);
+ system.Blockchain.Tell(payload.Headers);
+ }
+
private void OnInventoryReceived(IInventory inventory)
{
pendingKnownHashes.Remove(inventory.Hash);
if (inventory is Block block)
{
+ UpdateLastBlockIndex(block.Index);
if (block.Index > NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View) + InvPayload.MaxHashesCount) return;
- UpdateLastBlockIndex(block.Index, false);
}
knownHashes.Add(inventory.Hash);
system.TaskManager.Tell(inventory);
@@ -332,13 +339,13 @@ private void OnMemPoolMessageReceived()
private void OnPingMessageReceived(PingPayload payload)
{
- UpdateLastBlockIndex(payload.LastBlockIndex, true);
+ UpdateLastBlockIndex(payload.LastBlockIndex);
EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View), payload.Nonce)));
}
private void OnPongMessageReceived(PingPayload payload)
{
- UpdateLastBlockIndex(payload.LastBlockIndex, true);
+ UpdateLastBlockIndex(payload.LastBlockIndex);
}
private void OnVerackMessageReceived()
@@ -373,23 +380,25 @@ private void OnVersionMessageReceived(VersionPayload payload)
SendMessage(Message.Create(MessageCommand.Verack));
}
- private void RefreshPendingKnownHashes()
+ private void OnTimer()
{
+ DateTime oneMinuteAgo = TimeProvider.Current.UtcNow.AddMinutes(-1);
while (pendingKnownHashes.Count > 0)
{
var (_, time) = pendingKnownHashes[0];
- if (TimeProvider.Current.UtcNow - time <= PendingTimeout)
- break;
+ if (oneMinuteAgo <= time) break;
pendingKnownHashes.RemoveAt(0);
}
+ if (oneMinuteAgo > lastSent)
+ EnqueueMessage(Message.Create(MessageCommand.Ping, PingPayload.Create(NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View))));
}
- private void UpdateLastBlockIndex(uint lastBlockIndex, bool requestTasks)
+ private void UpdateLastBlockIndex(uint lastBlockIndex)
{
if (lastBlockIndex > LastBlockIndex)
{
LastBlockIndex = lastBlockIndex;
- system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex, RequestTasks = requestTasks });
+ system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex });
}
}
}
diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs
index a417e4115e..dee2b893f3 100644
--- a/src/neo/Network/P2P/RemoteNode.cs
+++ b/src/neo/Network/P2P/RemoteNode.cs
@@ -8,6 +8,7 @@
using Neo.Network.P2P.Capabilities;
using Neo.Network.P2P.Payloads;
using Neo.SmartContract.Native;
+using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
@@ -23,6 +24,7 @@ internal class Relay { public IInventory Inventory; }
private readonly NeoSystem system;
private readonly Queue message_queue_high = new Queue();
private readonly Queue message_queue_low = new Queue();
+ private DateTime lastSent = TimeProvider.Current.UtcNow;
private readonly bool[] sentCommands = new bool[1 << (sizeof(MessageCommand) * 8)];
private ByteString msg_buffer = ByteString.Empty;
private bool ack = true;
@@ -100,7 +102,10 @@ private void EnqueueMessage(Message message)
break;
}
if (!is_single || message_queue.All(p => p.Command != message.Command))
+ {
message_queue.Enqueue(message);
+ lastSent = TimeProvider.Current.UtcNow;
+ }
CheckMessageQueue();
}
@@ -124,7 +129,7 @@ protected override void OnReceive(object message)
switch (message)
{
case Timer _:
- RefreshPendingKnownHashes();
+ OnTimer();
break;
case Message msg:
if (msg.Payload is PingPayload payload)
diff --git a/src/neo/Network/P2P/TaskManager.cs b/src/neo/Network/P2P/TaskManager.cs
index 52aa7013a5..8ea8457a72 100644
--- a/src/neo/Network/P2P/TaskManager.cs
+++ b/src/neo/Network/P2P/TaskManager.cs
@@ -1,5 +1,6 @@
using Akka.Actor;
using Akka.Configuration;
+using Akka.IO;
using Neo.IO.Actors;
using Neo.IO.Caching;
using Neo.Ledger;
@@ -17,95 +18,82 @@ namespace Neo.Network.P2P
public class TaskManager : UntypedActor
{
internal class Register { public VersionPayload Version; }
- internal class Update { public uint LastBlockIndex; public bool RequestTasks; }
+ internal class Update { public uint LastBlockIndex; }
internal class NewTasks { public InvPayload Payload; }
public class RestartTasks { public InvPayload Payload; }
private class Timer { }
private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30);
private static readonly TimeSpan TaskTimeout = TimeSpan.FromMinutes(1);
- private static readonly UInt256 MemPoolTaskHash = UInt256.Parse("0x0000000000000000000000000000000000000000000000000000000000000001");
+ private static readonly UInt256 HeaderTaskHash = UInt256.Zero;
private const int MaxConncurrentTasks = 3;
- private const int MaxSyncTasksCount = 50;
- private const int PingCoolingOffPeriod = 60_000; // in ms.
private readonly NeoSystem system;
///
/// A set of known hashes, of inventories or payloads, already received.
///
private readonly HashSetCache knownHashes;
- private readonly Dictionary globalTasks = new Dictionary();
- private readonly Dictionary receivedBlockIndex = new Dictionary();
- private readonly HashSet failedSyncTasks = new HashSet();
+ private readonly Dictionary globalInvTasks = new Dictionary();
+ private readonly Dictionary globalIndexTasks = new Dictionary();
private readonly Dictionary sessions = new Dictionary();
private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);
- private uint lastTaskIndex = 0;
+
+ private bool HasHeaderTask => globalInvTasks.ContainsKey(HeaderTaskHash);
public TaskManager(NeoSystem system)
{
this.system = system;
this.knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
- this.lastTaskIndex = NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View);
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted));
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult));
}
- private bool AssignSyncTask(uint index, TaskSession filterSession = null)
- {
- if (index <= NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View) || sessions.Values.Any(p => p != filterSession && p.IndexTasks.ContainsKey(index)))
- return true;
- Random rand = new Random();
- KeyValuePair remoteNode = sessions.Where(p => p.Value != filterSession && p.Value.LastBlockIndex >= index)
- .OrderBy(p => p.Value.IndexTasks.Count)
- .ThenBy(s => rand.Next())
- .FirstOrDefault();
- if (remoteNode.Value == null)
- {
- failedSyncTasks.Add(index);
- return false;
- }
- TaskSession session = remoteNode.Value;
- session.IndexTasks.TryAdd(index, TimeProvider.Current.UtcNow);
- remoteNode.Key.Tell(Message.Create(MessageCommand.GetBlockByIndex, GetBlockByIndexPayload.Create(index, 1)));
- failedSyncTasks.Remove(index);
- return true;
- }
-
- private void OnBlock(Block block)
+ private void OnHeaders(Header[] _)
{
- var session = sessions.Values.FirstOrDefault(p => p.IndexTasks.ContainsKey(block.Index));
- if (session is null) return;
- session.IndexTasks.Remove(block.Index);
- receivedBlockIndex.TryAdd(block.Index, session);
- RequestTasks(false);
+ if (!sessions.TryGetValue(Sender, out TaskSession session))
+ return;
+ if (session.InvTasks.Remove(HeaderTaskHash))
+ DecrementGlobalTask(HeaderTaskHash);
+ RequestTasks(Sender, session);
}
private void OnInvalidBlock(Block invalidBlock)
{
- receivedBlockIndex.TryGetValue(invalidBlock.Index, out TaskSession session);
- if (session is null) return;
- session.InvalidBlockCount++;
- session.IndexTasks.Remove(invalidBlock.Index);
- receivedBlockIndex.Remove(invalidBlock.Index);
- AssignSyncTask(invalidBlock.Index, session);
+ foreach (var (actor, session) in sessions)
+ if (session.ReceivedBlock.TryGetValue(invalidBlock.Index, out Block block))
+ if (block.Hash == invalidBlock.Hash)
+ actor.Tell(Tcp.Abort.Instance);
}
private void OnNewTasks(InvPayload payload)
{
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
- // Do not accept payload of type InventoryType.TX if not synced on best known HeaderHeight
- if (payload.Type == InventoryType.TX && NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View) < sessions.Values.Max(p => p.LastBlockIndex))
+
+ // Do not accept payload of type InventoryType.TX if not synced on HeaderHeight
+ uint currentHeight = NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View);
+ uint headerHeight = Blockchain.Singleton.HeaderCache.Last?.Index ?? currentHeight;
+ if (currentHeight < headerHeight && (payload.Type == InventoryType.TX || (payload.Type == InventoryType.Block && currentHeight < session.LastBlockIndex - InvPayload.MaxHashesCount)))
+ {
+ RequestTasks(Sender, session);
return;
+ }
+
HashSet hashes = new HashSet(payload.Hashes);
// Remove all previously processed knownHashes from the list that is being requested
hashes.Remove(knownHashes);
+ // Add to AvailableTasks the ones, of type InventoryType.Block, that are global (already under process by other sessions)
+ if (payload.Type == InventoryType.Block)
+ session.AvailableTasks.UnionWith(hashes.Where(p => globalInvTasks.ContainsKey(p)));
// Remove those that are already in process by other sessions
- hashes.Remove(globalTasks);
+ hashes.Remove(globalInvTasks);
if (hashes.Count == 0)
+ {
+ RequestTasks(Sender, session);
return;
+ }
// Update globalTasks with the ones that will be requested within this current session
foreach (UInt256 hash in hashes)
@@ -120,8 +108,14 @@ private void OnNewTasks(InvPayload payload)
private void OnPersistCompleted(Block block)
{
- receivedBlockIndex.Remove(block.Index);
- RequestTasks(false);
+ foreach (var (actor, session) in sessions)
+ if (session.ReceivedBlock.Remove(block.Index, out Block receivedBlock))
+ {
+ if (block.Hash == receivedBlock.Hash)
+ RequestTasks(actor, session);
+ else
+ actor.Tell(Tcp.Abort.Instance);
+ }
}
protected override void OnReceive(object message)
@@ -140,11 +134,11 @@ protected override void OnReceive(object message)
case RestartTasks restart:
OnRestartTasks(restart.Payload);
break;
- case Block block:
- OnBlock(block);
+ case Header[] headers:
+ OnHeaders(headers);
break;
case IInventory inventory:
- OnTaskCompleted(inventory.Hash);
+ OnTaskCompleted(inventory);
break;
case Blockchain.PersistCompleted pc:
OnPersistCompleted(pc.Block);
@@ -166,10 +160,8 @@ private void OnRegister(VersionPayload version)
{
Context.Watch(Sender);
TaskSession session = new TaskSession(version);
- if (session.IsFullNode)
- session.InvTasks.TryAdd(MemPoolTaskHash, TimeProvider.Current.UtcNow);
- sessions.TryAdd(Sender, session);
- RequestTasks(true);
+ sessions.Add(Sender, session);
+ RequestTasks(Sender, session);
}
private void OnUpdate(Update update)
@@ -177,51 +169,100 @@ private void OnUpdate(Update update)
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
session.LastBlockIndex = update.LastBlockIndex;
- session.ExpireTime = TimeProvider.Current.UtcNow.AddMilliseconds(PingCoolingOffPeriod);
- if (update.RequestTasks) RequestTasks(true);
}
private void OnRestartTasks(InvPayload payload)
{
knownHashes.ExceptWith(payload.Hashes);
foreach (UInt256 hash in payload.Hashes)
- globalTasks.Remove(hash);
+ globalInvTasks.Remove(hash);
foreach (InvPayload group in InvPayload.CreateGroup(payload.Type, payload.Hashes))
system.LocalNode.Tell(Message.Create(MessageCommand.GetData, group));
}
- private void OnTaskCompleted(UInt256 hash)
+ private void OnTaskCompleted(IInventory inventory)
{
- knownHashes.Add(hash);
- globalTasks.Remove(hash);
+ Block block = inventory as Block;
+ knownHashes.Add(inventory.Hash);
+ globalInvTasks.Remove(inventory.Hash);
+ if (block is not null)
+ globalIndexTasks.Remove(block.Index);
+ foreach (TaskSession ms in sessions.Values)
+ ms.AvailableTasks.Remove(inventory.Hash);
if (sessions.TryGetValue(Sender, out TaskSession session))
- session.InvTasks.Remove(hash);
+ {
+ session.InvTasks.Remove(inventory.Hash);
+ if (block is not null)
+ {
+ session.IndexTasks.Remove(block.Index);
+ if (session.ReceivedBlock.TryGetValue(block.Index, out var block_old))
+ {
+ if (block.Hash != block_old.Hash)
+ {
+ Sender.Tell(Tcp.Abort.Instance);
+ return;
+ }
+ }
+ else
+ {
+ session.ReceivedBlock.Add(block.Index, block);
+ }
+ }
+ RequestTasks(Sender, session);
+ }
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void DecrementGlobalTask(UInt256 hash)
{
- if (globalTasks.TryGetValue(hash, out var value))
+ if (globalInvTasks.TryGetValue(hash, out var value))
+ {
+ if (value == 1)
+ globalInvTasks.Remove(hash);
+ else
+ globalInvTasks[hash] = value - 1;
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void DecrementGlobalTask(uint index)
+ {
+ if (globalIndexTasks.TryGetValue(index, out var value))
{
if (value == 1)
- globalTasks.Remove(hash);
+ globalIndexTasks.Remove(index);
else
- globalTasks[hash] = value - 1;
+ globalIndexTasks[index] = value - 1;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IncrementGlobalTask(UInt256 hash)
{
- if (!globalTasks.TryGetValue(hash, out var value))
+ if (!globalInvTasks.TryGetValue(hash, out var value))
{
- globalTasks[hash] = 1;
+ globalInvTasks[hash] = 1;
return true;
}
if (value >= MaxConncurrentTasks)
return false;
- globalTasks[hash] = value + 1;
+ globalInvTasks[hash] = value + 1;
+ return true;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private bool IncrementGlobalTask(uint index)
+ {
+ if (!globalIndexTasks.TryGetValue(index, out var value))
+ {
+ globalIndexTasks[index] = 1;
+ return true;
+ }
+ if (value >= MaxConncurrentTasks)
+ return false;
+
+ globalIndexTasks[index] = value + 1;
return true;
}
@@ -229,11 +270,10 @@ private void OnTerminated(IActorRef actor)
{
if (!sessions.TryGetValue(actor, out TaskSession session))
return;
- foreach (uint index in session.IndexTasks.Keys)
- AssignSyncTask(index, session);
-
foreach (UInt256 hash in session.InvTasks.Keys)
DecrementGlobalTask(hash);
+ foreach (uint index in session.IndexTasks.Keys)
+ DecrementGlobalTask(index);
sessions.Remove(actor);
}
@@ -241,26 +281,21 @@ private void OnTimer()
{
foreach (TaskSession session in sessions.Values)
{
- foreach (KeyValuePair kvp in session.IndexTasks)
- {
- if (TimeProvider.Current.UtcNow - kvp.Value > TaskTimeout)
+ foreach (var (hash, time) in session.InvTasks.ToArray())
+ if (TimeProvider.Current.UtcNow - time > TaskTimeout)
{
- session.IndexTasks.Remove(kvp.Key);
- session.TimeoutTimes++;
- AssignSyncTask(kvp.Key, session);
+ if (session.InvTasks.Remove(hash))
+ DecrementGlobalTask(hash);
}
- }
-
- foreach (var task in session.InvTasks.ToArray())
- {
- if (TimeProvider.Current.UtcNow - task.Value > TaskTimeout)
+ foreach (var (index, time) in session.IndexTasks.ToArray())
+ if (TimeProvider.Current.UtcNow - time > TaskTimeout)
{
- if (session.InvTasks.Remove(task.Key))
- DecrementGlobalTask(task.Key);
+ if (session.IndexTasks.Remove(index))
+ DecrementGlobalTask(index);
}
- }
}
- RequestTasks(true);
+ foreach (var (actor, session) in sessions)
+ RequestTasks(actor, session);
}
protected override void PostStop()
@@ -274,56 +309,64 @@ public static Props Props(NeoSystem system)
return Akka.Actor.Props.Create(() => new TaskManager(system)).WithMailbox("task-manager-mailbox");
}
- private void RequestTasks(bool sendPing)
+ private void RequestTasks(IActorRef remoteNode, TaskSession session)
{
- if (sessions.Count == 0) return;
-
- if (sendPing) SendPingMessage();
+ if (session.HasTooManyTasks) return;
- uint currentHeight = NativeContract.Ledger.CurrentIndex(Blockchain.Singleton.View);
+ DataCache snapshot = Blockchain.Singleton.View;
- while (failedSyncTasks.Count > 0)
+ // If there are pending tasks of InventoryType.Block we should process them
+ if (session.AvailableTasks.Count > 0)
{
- var failedTask = failedSyncTasks.First();
- if (failedTask <= currentHeight)
+ session.AvailableTasks.Remove(knownHashes);
+ // Search any similar hash that is on Singleton's knowledge, which means, on the way or already processed
+ session.AvailableTasks.RemoveWhere(p => NativeContract.Ledger.ContainsBlock(snapshot, p));
+ HashSet hashes = new HashSet(session.AvailableTasks);
+ if (hashes.Count > 0)
{
- failedSyncTasks.Remove(failedTask);
- continue;
+ foreach (UInt256 hash in hashes.ToArray())
+ {
+ if (!IncrementGlobalTask(hash))
+ hashes.Remove(hash);
+ }
+ session.AvailableTasks.Remove(hashes);
+ foreach (UInt256 hash in hashes)
+ session.InvTasks[hash] = DateTime.UtcNow;
+ foreach (InvPayload group in InvPayload.CreateGroup(InventoryType.Block, hashes.ToArray()))
+ remoteNode.Tell(Message.Create(MessageCommand.GetData, group));
+ return;
}
- if (!AssignSyncTask(failedTask)) return;
}
- int taskCounts = sessions.Values.Sum(p => p.IndexTasks.Count);
- var highestBlockIndex = sessions.Values.Max(p => p.LastBlockIndex);
- for (; taskCounts < MaxSyncTasksCount; taskCounts++)
+ uint currentHeight = NativeContract.Ledger.CurrentIndex(snapshot);
+ uint headerHeight = Blockchain.Singleton.HeaderCache.Last?.Index ?? currentHeight;
+ // When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers
+ // If not HeaderTask pending to be processed it should ask for more Blocks
+ if ((!HasHeaderTask || globalInvTasks[HeaderTaskHash] < MaxConncurrentTasks) && headerHeight < session.LastBlockIndex && !Blockchain.Singleton.HeaderCache.Full)
{
- if (lastTaskIndex >= highestBlockIndex || lastTaskIndex >= currentHeight + InvPayload.MaxHashesCount) break;
- if (!AssignSyncTask(++lastTaskIndex)) break;
+ session.InvTasks[HeaderTaskHash] = DateTime.UtcNow;
+ IncrementGlobalTask(HeaderTaskHash);
+ remoteNode.Tell(Message.Create(MessageCommand.GetHeaders, GetBlockByIndexPayload.Create(headerHeight)));
}
- }
-
- private void SendPingMessage()
- {
- DataCache snapshot = Blockchain.Singleton.View;
- uint currentHeight = NativeContract.Ledger.CurrentIndex(snapshot);
- UInt256 currentHash = NativeContract.Ledger.CurrentHash(snapshot);
- TrimmedBlock block = NativeContract.Ledger.GetTrimmedBlock(snapshot, currentHash);
- foreach (KeyValuePair item in sessions)
+ else if (currentHeight < session.LastBlockIndex)
{
- var node = item.Key;
- var session = item.Value;
-
- if (session.ExpireTime < TimeProvider.Current.UtcNow ||
- (block.Index >= session.LastBlockIndex &&
- TimeProvider.Current.UtcNow.ToTimestampMS() - PingCoolingOffPeriod >= block.Timestamp))
+ uint startHeight = currentHeight;
+ while (globalIndexTasks.ContainsKey(++startHeight)) { }
+ if (startHeight > session.LastBlockIndex || startHeight >= currentHeight + InvPayload.MaxHashesCount) return;
+ uint endHeight = startHeight;
+ while (!globalIndexTasks.ContainsKey(++endHeight) && endHeight <= session.LastBlockIndex && endHeight <= currentHeight + InvPayload.MaxHashesCount) { }
+ uint count = Math.Min(endHeight - startHeight, InvPayload.MaxHashesCount);
+ for (uint i = 0; i < count; i++)
{
- if (session.InvTasks.Remove(MemPoolTaskHash))
- {
- node.Tell(Message.Create(MessageCommand.Mempool));
- }
- node.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(currentHeight)));
- session.ExpireTime = TimeProvider.Current.UtcNow.AddMilliseconds(PingCoolingOffPeriod);
+ session.IndexTasks[startHeight + i] = TimeProvider.Current.UtcNow;
+ IncrementGlobalTask(startHeight + i);
}
+ remoteNode.Tell(Message.Create(MessageCommand.GetBlockByIndex, GetBlockByIndexPayload.Create(startHeight, (short)count)));
+ }
+ else if (!session.MempoolSent)
+ {
+ session.MempoolSent = true;
+ remoteNode.Tell(Message.Create(MessageCommand.Mempool));
}
}
}
@@ -340,6 +383,7 @@ internal protected override bool IsHighPriority(object message)
switch (message)
{
case TaskManager.Register _:
+ case TaskManager.Update _:
case TaskManager.RestartTasks _:
return true;
case TaskManager.NewTasks tasks:
diff --git a/src/neo/Network/P2P/TaskSession.cs b/src/neo/Network/P2P/TaskSession.cs
index b8a78bd5f6..6764789b24 100644
--- a/src/neo/Network/P2P/TaskSession.cs
+++ b/src/neo/Network/P2P/TaskSession.cs
@@ -8,14 +8,14 @@ namespace Neo.Network.P2P
{
internal class TaskSession
{
- public readonly Dictionary InvTasks = new Dictionary();
- public readonly Dictionary IndexTasks = new Dictionary();
-
+ public Dictionary InvTasks { get; } = new Dictionary();
+ public Dictionary IndexTasks { get; } = new Dictionary();
+ public HashSet AvailableTasks { get; } = new HashSet();
+ public Dictionary ReceivedBlock { get; } = new Dictionary();
+ public bool HasTooManyTasks => InvTasks.Count + IndexTasks.Count >= 100;
public bool IsFullNode { get; }
public uint LastBlockIndex { get; set; }
- public uint TimeoutTimes = 0;
- public uint InvalidBlockCount = 0;
- public DateTime ExpireTime = DateTime.MinValue;
+ public bool MempoolSent { get; set; }
public TaskSession(VersionPayload version)
{
diff --git a/src/neo/Persistence/IReadOnlyStore.cs b/src/neo/Persistence/IReadOnlyStore.cs
index f13fd56d7f..7200691084 100644
--- a/src/neo/Persistence/IReadOnlyStore.cs
+++ b/src/neo/Persistence/IReadOnlyStore.cs
@@ -1,4 +1,3 @@
-using Neo.IO.Caching;
using System.Collections.Generic;
namespace Neo.Persistence
diff --git a/src/neo/Persistence/MemorySnapshot.cs b/src/neo/Persistence/MemorySnapshot.cs
index 7b3c77dbe8..9c2ee79998 100644
--- a/src/neo/Persistence/MemorySnapshot.cs
+++ b/src/neo/Persistence/MemorySnapshot.cs
@@ -1,5 +1,4 @@
using Neo.IO;
-using Neo.IO.Caching;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
diff --git a/src/neo/Persistence/MemoryStore.cs b/src/neo/Persistence/MemoryStore.cs
index 705f509b87..1cf51dde51 100644
--- a/src/neo/Persistence/MemoryStore.cs
+++ b/src/neo/Persistence/MemoryStore.cs
@@ -1,5 +1,4 @@
using Neo.IO;
-using Neo.IO.Caching;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
diff --git a/src/neo/SmartContract/Iterators/StorageIterator.cs b/src/neo/SmartContract/Iterators/StorageIterator.cs
index bcca51b471..74dc5015cc 100644
--- a/src/neo/SmartContract/Iterators/StorageIterator.cs
+++ b/src/neo/SmartContract/Iterators/StorageIterator.cs
@@ -1,4 +1,3 @@
-using Neo.Ledger;
using Neo.VM;
using Neo.VM.Types;
using System.Collections.Generic;
diff --git a/src/neo/SmartContract/KeyBuilder.cs b/src/neo/SmartContract/KeyBuilder.cs
index c725da370b..80bea0e783 100644
--- a/src/neo/SmartContract/KeyBuilder.cs
+++ b/src/neo/SmartContract/KeyBuilder.cs
@@ -1,5 +1,4 @@
using Neo.IO;
-using Neo.Ledger;
using System;
using System.IO;
diff --git a/tests/neo.UnitTests/IO/Caching/UT_IndexedQueue.cs b/tests/neo.UnitTests/IO/Caching/UT_IndexedQueue.cs
new file mode 100644
index 0000000000..e09726924c
--- /dev/null
+++ b/tests/neo.UnitTests/IO/Caching/UT_IndexedQueue.cs
@@ -0,0 +1,98 @@
+using FluentAssertions;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Neo.IO.Caching;
+using System;
+using System.Linq;
+
+namespace Neo.UnitTests.IO.Caching
+{
+ [TestClass]
+ public class UT_IndexedQueue
+ {
+ [TestMethod]
+ public void TestDefault()
+ {
+ var queue = new IndexedQueue(10);
+ queue.Count.Should().Be(0);
+
+ queue = new IndexedQueue();
+ queue.Count.Should().Be(0);
+ queue.TrimExcess();
+ queue.Count.Should().Be(0);
+
+ queue = new IndexedQueue(Array.Empty());
+ queue.Count.Should().Be(0);
+ queue.TryPeek(out var a).Should().BeFalse();
+ a.Should().Be(0);
+ queue.TryDequeue(out a).Should().BeFalse();
+ a.Should().Be(0);
+
+ Assert.ThrowsException(() => queue.Peek());
+ Assert.ThrowsException(() => queue.Dequeue());
+ Assert.ThrowsException(() => _ = queue[-1]);
+ Assert.ThrowsException(() => queue[-1] = 1);
+ Assert.ThrowsException(() => _ = queue[1]);
+ Assert.ThrowsException(() => queue[1] = 1);
+ Assert.ThrowsException(() => new IndexedQueue(-1));
+ }
+
+ [TestMethod]
+ public void TestQueue()
+ {
+ var queue = new IndexedQueue(new int[] { 1, 2, 3 });
+ queue.Count.Should().Be(3);
+
+ queue.Enqueue(4);
+ queue.Count.Should().Be(4);
+ queue.Peek().Should().Be(1);
+ queue.TryPeek(out var a).Should().BeTrue();
+ a.Should().Be(1);
+
+ queue[0].Should().Be(1);
+ queue[1].Should().Be(2);
+ queue[2].Should().Be(3);
+ queue.Dequeue().Should().Be(1);
+ queue.Dequeue().Should().Be(2);
+ queue.Dequeue().Should().Be(3);
+ queue[0] = 5;
+ queue.TryDequeue(out a).Should().BeTrue();
+ a.Should().Be(5);
+
+ queue.Enqueue(4);
+ queue.Clear();
+ queue.Count.Should().Be(0);
+ }
+
+ [TestMethod]
+ public void TestEnumerator()
+ {
+ int[] arr = new int[3] { 1, 2, 3 };
+ var queue = new IndexedQueue(arr);
+
+ arr.SequenceEqual(queue).Should().BeTrue();
+ }
+
+ [TestMethod]
+ public void TestCopyTo()
+ {
+ int[] arr = new int[3];
+ var queue = new IndexedQueue(new int[] { 1, 2, 3 });
+
+ Assert.ThrowsException(() => queue.CopyTo(null, 0));
+ Assert.ThrowsException(() => queue.CopyTo(arr, -1));
+ Assert.ThrowsException(() => queue.CopyTo(arr, 2));
+
+ queue.CopyTo(arr, 0);
+
+ arr[0].Should().Be(1);
+ arr[1].Should().Be(2);
+ arr[2].Should().Be(3);
+
+ arr = queue.ToArray();
+
+ arr[0].Should().Be(1);
+ arr[1].Should().Be(2);
+ arr[2].Should().Be(3);
+ }
+ }
+}
diff --git a/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs b/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs
index 6ae403a946..bee2bc8efe 100644
--- a/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs
+++ b/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs
@@ -13,25 +13,19 @@ public class UT_TaskSession
[TestMethod]
public void CreateTest()
{
- Assert.ThrowsException(() => new TaskSession(null));
-
var ses = new TaskSession(new VersionPayload() { Capabilities = new NodeCapability[] { new FullNodeCapability(123) } });
- Assert.IsTrue(ses.IsFullNode);
+ Assert.IsFalse(ses.HasTooManyTasks);
Assert.AreEqual((uint)123, ses.LastBlockIndex);
Assert.AreEqual(0, ses.IndexTasks.Count);
- Assert.AreEqual(0, ses.InvTasks.Count);
- Assert.AreEqual((uint)0, ses.TimeoutTimes);
- Assert.AreEqual((uint)0, ses.InvalidBlockCount);
+ Assert.IsTrue(ses.IsFullNode);
- ses = new TaskSession(new VersionPayload() { Capabilities = new NodeCapability[0] });
+ ses = new TaskSession(new VersionPayload() { Capabilities = Array.Empty() });
- Assert.IsFalse(ses.IsFullNode);
+ Assert.IsFalse(ses.HasTooManyTasks);
Assert.AreEqual((uint)0, ses.LastBlockIndex);
Assert.AreEqual(0, ses.IndexTasks.Count);
- Assert.AreEqual(0, ses.InvTasks.Count);
- Assert.AreEqual((uint)0, ses.TimeoutTimes);
- Assert.AreEqual((uint)0, ses.InvalidBlockCount);
+ Assert.IsFalse(ses.IsFullNode);
}
}
}
diff --git a/tests/neo.UnitTests/Persistence/UT_MemoryStore.cs b/tests/neo.UnitTests/Persistence/UT_MemoryStore.cs
index 5feff2b677..23ac57e261 100644
--- a/tests/neo.UnitTests/Persistence/UT_MemoryStore.cs
+++ b/tests/neo.UnitTests/Persistence/UT_MemoryStore.cs
@@ -1,5 +1,4 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Neo.IO.Caching;
using Neo.Persistence;
using System.Linq;
diff --git a/tests/neo.UnitTests/SmartContract/Iterators/UT_StorageIterator.cs b/tests/neo.UnitTests/SmartContract/Iterators/UT_StorageIterator.cs
index 806c4d4817..a9ccec7769 100644
--- a/tests/neo.UnitTests/SmartContract/Iterators/UT_StorageIterator.cs
+++ b/tests/neo.UnitTests/SmartContract/Iterators/UT_StorageIterator.cs
@@ -1,6 +1,5 @@
using FluentAssertions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Neo.Ledger;
using Neo.SmartContract;
using Neo.SmartContract.Iterators;
using Neo.VM.Types;
diff --git a/tests/neo.UnitTests/UT_DataCache.cs b/tests/neo.UnitTests/UT_DataCache.cs
index b10747df8d..ca15234c9b 100644
--- a/tests/neo.UnitTests/UT_DataCache.cs
+++ b/tests/neo.UnitTests/UT_DataCache.cs
@@ -1,5 +1,4 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Neo.IO.Caching;
using Neo.Ledger;
using Neo.Persistence;
using Neo.SmartContract;