Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header back #2259

Merged
merged 107 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 103 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
6f5fe69
add header back
Jan 22, 2021
ce6e479
remove ping pong
Jan 22, 2021
af9d2a8
restore ping pong
Jan 22, 2021
77a0d7d
fix
Jan 22, 2021
c70a985
Merge branch 'master' into add_header
Tommo-L Jan 23, 2021
f190df4
logic fix
Jan 23, 2021
3df7314
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 23, 2021
87070ba
Update TaskManager/TaskSession
Jan 25, 2021
02be1c7
fix ut
Jan 25, 2021
84b562c
Array.Empty
shargon Jan 25, 2021
a7320ac
kick line
Jan 25, 2021
53f2f05
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 25, 2021
97479d7
Merge branch 'master' into add_header
Qiao-Jin Jan 26, 2021
ffc236f
add header cache
Jan 26, 2021
7c663d6
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 26, 2021
90c8d95
format
Jan 26, 2021
a1f56fc
fix
Jan 26, 2021
4206e00
rename parameter
Jan 26, 2021
0a85bdc
fix
Jan 26, 2021
82516be
header only in cache
Jan 26, 2021
8ac3ea2
fix
Jan 26, 2021
538eeab
fix
Jan 26, 2021
5eacd2f
Update HeaderCache
erikzhang Jan 26, 2021
fa112e9
Update HeaderCache.cs
erikzhang Jan 26, 2021
23269f2
Merge branch 'master' into add_header
Qiao-Jin Jan 26, 2021
9808fc3
update headercache logic
Jan 26, 2021
fdfbb67
fix
Jan 27, 2021
fabf59c
Add IndexedQueue
erikzhang Jan 27, 2021
4143d82
Remove HeaderCache
erikzhang Jan 27, 2021
e7529cf
Rename
erikzhang Jan 27, 2021
b7075b5
Limit the count of cached headers
erikzhang Jan 27, 2021
2ca2e59
Clean using
shargon Jan 27, 2021
e543a42
Merge remote-tracking branch 'Qiao-Jin/add_header' into add_header
shargon Jan 27, 2021
b2574f6
Add IndexedQueue UT
shargon Jan 27, 2021
7683b3f
Merge branch 'master' into add_header
shargon Jan 27, 2021
fe29464
restore GetBlockByIndexPayload and fix
Jan 27, 2021
1cec5c6
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 27, 2021
d2fa995
Merge branch 'master' into add_header
Qiao-Jin Jan 28, 2021
6d8dcae
restore getheaders
Jan 28, 2021
ccb0b45
Optimize IndexedQueue
erikzhang Jan 28, 2021
b563d1d
HeaderCacheFull
erikzhang Jan 28, 2021
2de91fe
Update src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
erikzhang Jan 28, 2021
573f630
Merge branch 'master' into add_header
Qiao-Jin Jan 28, 2021
478c12c
fix
Jan 28, 2021
e1b82f0
Merge branch 'master' into add_header
Qiao-Jin Jan 29, 2021
7710ab9
Update OnGetHeadersMessageReceived
Jan 29, 2021
e77b299
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 29, 2021
052559a
Add lock for IndexedQueue
Jan 29, 2021
33fceba
Revert "Add lock for IndexedQueue"
erikzhang Jan 29, 2021
026c7fe
Add HeaderCache and locks
erikzhang Jan 29, 2021
0e06127
Optimize
erikzhang Jan 29, 2021
a4240cf
Optimize
erikzhang Jan 29, 2021
234ffcb
assign GetBlockByIndexPayload to GetBlocks
Jan 29, 2021
35fd13c
Optimize
shargon Jan 29, 2021
f0ed92a
fix sync logic
Jan 29, 2021
82627a2
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 29, 2021
3c3b43b
Revert "Optimize"
erikzhang Jan 29, 2021
6a17179
Merge branch 'master' into add_header
Qiao-Jin Jan 29, 2021
2131bef
Revert "assign GetBlockByIndexPayload to GetBlocks"
Jan 29, 2021
9e1ff37
fix
Jan 29, 2021
cea69bb
Add check null
shargon Jan 29, 2021
8aba54f
Add nullable
shargon Jan 29, 2021
421317e
Add HeaderCache.GetSnapshot()
erikzhang Jan 29, 2021
1bd4bbb
fix null pointer
Jan 30, 2021
51fc9ec
Optimize
erikzhang Jan 30, 2021
7f1a362
Do not send ping when persisted block
erikzhang Jan 30, 2021
6e1f597
Remove TaskCompleted
erikzhang Jan 30, 2021
0810e80
Remove HeaderTaskCompleted
erikzhang Jan 30, 2021
131f2df
restore OnBlock
Jan 30, 2021
44781e1
Optimize ping
erikzhang Jan 30, 2021
4847752
Remove TaskSession.StartHeight
erikzhang Jan 30, 2021
d237a9a
Remove TaskSession.RemoteNode
erikzhang Jan 30, 2021
8b85668
Remove TaskSession.Version
erikzhang Jan 30, 2021
4099cf4
Update TaskSession.cs
erikzhang Jan 30, 2021
a66d1d7
Remove MemPoolTaskHash
erikzhang Jan 30, 2021
f72105a
Optimize
erikzhang Jan 30, 2021
9c90f4b
restore indextasks
Jan 30, 2021
fa5e04c
Remove hardcoded time
shargon Jan 31, 2021
66bb0a2
Optimize
shargon Jan 31, 2021
e720a72
Revert change
shargon Jan 31, 2021
4cffe13
Rename
erikzhang Feb 1, 2021
1834852
Optimize
erikzhang Feb 1, 2021
73edcfd
reset TimeProvider.Current.UtcNow
Feb 1, 2021
a8713a3
use count
Feb 1, 2021
2b366e4
fix
Feb 1, 2021
439699f
fix (2)
Feb 1, 2021
eda2db7
Add globalIndexTasks
erikzhang Feb 1, 2021
620a95b
fix
erikzhang Feb 1, 2021
1669be6
fix
erikzhang Feb 1, 2021
804d30b
Fix
erikzhang Feb 1, 2021
12b2f6d
Assert
erikzhang Feb 1, 2021
d14eb19
restore logic
Feb 2, 2021
3540a54
Optimize
erikzhang Feb 2, 2021
d695e1b
Fix
erikzhang Feb 2, 2021
51a879e
Update TaskManager.cs
erikzhang Feb 2, 2021
9692809
add a comparison
Feb 2, 2021
2354dfd
optimize
Feb 2, 2021
a860c45
optimize globalindextaks
Feb 2, 2021
a972e45
Revert "optimize globalindextaks"
Feb 2, 2021
565fa05
logic fix
Feb 2, 2021
7057ec7
fix
Feb 2, 2021
9d8057b
fix
Feb 2, 2021
a84ce4b
Revert "fix"
Feb 3, 2021
954a04c
logic fix
Feb 3, 2021
5ede7b9
Merge branch 'master' into add_header
Qiao-Jin Feb 3, 2021
6d0814c
code clean
Feb 3, 2021
4a5ebbb
Update src/neo/Network/P2P/TaskSession.cs
Qiao-Jin Feb 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions src/neo/IO/Caching/IndexedQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;

namespace Neo.IO.Caching
{
/// <summary>
/// Represents a queue with indexed access to the items
/// </summary>
/// <typeparam name="T">The type of items in the queue</typeparam>
class IndexedQueue<T> : IReadOnlyCollection<T>
{
private const int DefaultCapacity = 16;
private const int GrowthFactor = 2;
private const float TrimThreshold = 0.9f;

private T[] _array;
private int _head;
private int _count;

/// <summary>
/// Indicates the count of items in the queue
/// </summary>
public int Count => _count;

/// <summary>
/// Creates a queue with the default capacity
/// </summary>
public IndexedQueue() : this(DefaultCapacity)
{
}

/// <summary>
/// Creates a queue with the specified capacity
/// </summary>
/// <param name="capacity">The initial capacity of the queue</param>
public IndexedQueue(int capacity)
{
if (capacity <= 0)
throw new ArgumentOutOfRangeException(nameof(capacity), "The capacity must be greater than zero.");
_array = new T[capacity];
_head = 0;
_count = 0;
}

/// <summary>
/// Creates a queue filled with the specified items
/// </summary>
/// <param name="collection">The collection of items to fill the queue with</param>
public IndexedQueue(IEnumerable<T> collection)
{
_array = collection.ToArray();
_head = 0;
_count = _array.Length;
}

/// <summary>
/// Gets the value at the index
/// </summary>
/// <param name="index">The index</param>
/// <returns>The value at the specified index</returns>
public ref T this[int index]
{
get
{
if (index < 0 || index >= _count)
throw new IndexOutOfRangeException();
return ref _array[(index + _head) % _array.Length];
}
}

/// <summary>
/// Inserts an item at the rear of the queue
/// </summary>
/// <param name="item">The item to insert</param>
public void Enqueue(T item)
{
if (_array.Length == _count)
{
int newSize = _array.Length * GrowthFactor;
if (_head == 0)
{
Array.Resize(ref _array, newSize);
}
else
{
T[] buffer = new T[newSize];
Array.Copy(_array, _head, buffer, 0, _array.Length - _head);
Array.Copy(_array, 0, buffer, _array.Length - _head, _head);
_array = buffer;
_head = 0;
}
}
_array[(_head + _count) % _array.Length] = item;
++_count;
}

/// <summary>
/// Provides access to the item at the front of the queue without dequeueing it
/// </summary>
/// <returns>The frontmost item</returns>
public T Peek()
{
if (_count == 0)
throw new InvalidOperationException("The queue is empty.");
return _array[_head];
}

/// <summary>
/// Attempts to return an item from the front of the queue without removing it
/// </summary>
/// <param name="item">The item</param>
/// <returns>True if the queue returned an item or false if the queue is empty</returns>
public bool TryPeek(out T item)
{
if (_count == 0)
{
item = default;
return false;
}
else
{
item = _array[_head];
return true;
}
}

/// <summary>
/// Removes an item from the front of the queue, returning it
/// </summary>
/// <returns>The item that was removed</returns>
public T Dequeue()
{
if (_count == 0)
throw new InvalidOperationException("The queue is empty");
T result = _array[_head];
++_head;
_head %= _array.Length;
--_count;
return result;
}

/// <summary>
/// Attempts to return an item from the front of the queue, removing it
/// </summary>
/// <param name="item">The item</param>
/// <returns>True if the queue returned an item or false if the queue is empty</returns>
public bool TryDequeue(out T item)
{
if (_count == 0)
{
item = default;
return false;
}
else
{
item = _array[_head];
++_head;
_head %= _array.Length;
--_count;
return true;
}
}

/// <summary>
/// Clears the items from the queue
/// </summary>
public void Clear()
{
_head = 0;
_count = 0;
}

/// <summary>
/// Trims the extra array space that isn't being used.
/// </summary>
public void TrimExcess()
{
if (_count == 0)
{
_array = new T[DefaultCapacity];
}
else if (_array.Length * TrimThreshold >= _count)
{
T[] arr = new T[_count];
CopyTo(arr, 0);
_array = arr;
_head = 0;
}
}

/// <summary>
/// Copys the queue's items to a destination array
/// </summary>
/// <param name="array">The destination array</param>
/// <param name="arrayIndex">The index in the destination to start copying at</param>
public void CopyTo(T[] array, int arrayIndex)
{
if (array is null) throw new ArgumentNullException(nameof(array));
if (arrayIndex < 0 || arrayIndex + _count > array.Length)
throw new ArgumentOutOfRangeException(nameof(arrayIndex));
if (_head + _count <= _array.Length)
{
Array.Copy(_array, _head, array, arrayIndex, _count);
}
else
{
Array.Copy(_array, _head, array, arrayIndex, _array.Length - _head);
Array.Copy(_array, 0, array, arrayIndex + _array.Length - _head, _count + _head - _array.Length);
}
}

/// <summary>
/// Returns an array of the items in the queue
/// </summary>
/// <returns>An array containing the queue's items</returns>
public T[] ToArray()
{
T[] result = new T[_count];
CopyTo(result, 0);
return result;
}

public IEnumerator<T> GetEnumerator()
{
for (int i = 0; i < _count; i++)
yield return _array[(_head + i) % _array.Length];
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
82 changes: 74 additions & 8 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading;

Expand Down Expand Up @@ -74,6 +75,7 @@ private class UnverifiedBlocksList { public LinkedList<Block> Blocks = new Linke
/// </summary>
public DataCache View => new SnapshotCache(Store);
public MemoryPool MemPool { get; }
public HeaderCache HeaderCache { get; } = new HeaderCache();

private static Blockchain singleton;
public static Blockchain Singleton
Expand Down Expand Up @@ -124,6 +126,12 @@ public Blockchain(NeoSystem system, IStore store)
}
}

protected override void PostStop()
{
base.PostStop();
HeaderCache.Dispose();
}

private bool ContainsTransaction(UInt256 hash)
{
if (MemPool.ContainsKey(hash)) return true;
Expand Down Expand Up @@ -221,32 +229,84 @@ private VerifyResult OnNewBlock(Block block)
{
DataCache snapshot = View;
uint currentHeight = NativeContract.Ledger.CurrentIndex(snapshot);
uint headerHeight = HeaderCache.Last?.Index ?? NativeContract.Ledger.CurrentIndex(snapshot);
if (block.Index <= currentHeight)
return VerifyResult.AlreadyExists;
if (block.Index - 1 > currentHeight)
if (block.Index - 1 > headerHeight)
{
AddUnverifiedBlockToCache(block);
return VerifyResult.UnableToVerify;
}
if (block.Index == currentHeight + 1)
if (block.Index == headerHeight + 1)
{
if (!block.Verify(snapshot))
return VerifyResult.Invalid;
block_cache.TryAdd(block.Hash, block);
block_cache_unverified.Remove(block.Index);
Persist(block);
if (block_cache_unverified.TryGetValue(block.Index + 1, out var unverifiedBlocks))
}
else
{
if (!block.Hash.Equals(HeaderCache[block.Index].Hash))
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
return VerifyResult.Invalid;
}
block_cache.TryAdd(block.Hash, block);
if (block.Index == currentHeight + 1)
{
Block block_persist = block;
List<Block> blocksToPersistList = new List<Block>();
while (true)
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
{
blocksToPersistList.Add(block_persist);
if (block_persist.Index + 1 > headerHeight) break;
UInt256 hash = HeaderCache[block_persist.Index + 1].Hash;
if (!block_cache.TryGetValue(hash, out block_persist)) break;
}

int blocksPersisted = 0;
// 15000 is the default among of seconds per block, while MilliSecondsPerBlock is the current
uint extraBlocks = (15000 - MillisecondsPerBlock) / 1000;
foreach (Block blockToPersist in blocksToPersistList)
{
block_cache_unverified.Remove(blockToPersist.Index);
Persist(blockToPersist);

if (blocksPersisted++ < blocksToPersistList.Count - (2 + Math.Max(0, extraBlocks))) continue;
// Empirically calibrated for relaying the most recent 2 blocks persisted with 15s network
// Increase in the rate of 1 block per second in configurations with faster blocks

if (blockToPersist.Index + 99 >= headerHeight)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = blockToPersist });
}
if (block_cache_unverified.TryGetValue(currentHeight + 1, out var unverifiedBlocks))
{
foreach (var unverifiedBlock in unverifiedBlocks.Blocks)
Self.Tell(unverifiedBlock, ActorRefs.NoSender);
block_cache_unverified.Remove(block.Index + 1);
}
// We can store the new block in block_cache and tell the new height to other nodes after Persist().
system.LocalNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(block.Index)));
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
if (block.Index + 99 >= headerHeight)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = block });
if (block.Index == headerHeight + 1)
HeaderCache.Add(block.Header);
}
return VerifyResult.Succeed;
}

private void OnNewHeaders(Header[] headers)
{
if (HeaderCache.Full) return;
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
DataCache snapshot = View;
uint headerHeight = HeaderCache.Last?.Index ?? NativeContract.Ledger.CurrentIndex(snapshot);
foreach (Header header in headers)
{
if (header.Index > headerHeight + 1) break;
if (header.Index < headerHeight + 1) continue;
if (!header.Verify(snapshot)) break;
HeaderCache.Add(header);
++headerHeight;
}
}

private VerifyResult OnNewInventory(IInventory inventory)
{
if (!inventory.Verify(View)) return VerifyResult.Invalid;
Expand Down Expand Up @@ -278,6 +338,9 @@ protected override void OnReceive(object message)
case FillMemoryPool fill:
OnFillMemoryPool(fill.Transactions);
break;
case Header[] headers:
OnNewHeaders(headers);
break;
case Block block:
OnInventory(block, false);
break;
Expand Down Expand Up @@ -373,6 +436,8 @@ private void Persist(Block block)
}
block_cache.TryRemove(block.PrevHash, out _);
Context.System.EventStream.Publish(new PersistCompleted { Block = block });
if (HeaderCache.TryRemoveFirst(out Header header))
Debug.Assert(header.Index == block.Index);
}

public static Props Props(NeoSystem system, IStore store)
Expand Down Expand Up @@ -431,6 +496,7 @@ internal protected override bool IsHighPriority(object message)
{
switch (message)
{
case Header[] _:
case Block _:
case ExtensiblePayload _:
case Terminated _:
Expand Down
Loading