Skip to content

Commit

Permalink
Add header back (#2259)
Browse files Browse the repository at this point in the history
  • Loading branch information
Qiao-Jin committed Feb 3, 2021
1 parent bbb458d commit 9340b30
Show file tree
Hide file tree
Showing 20 changed files with 718 additions and 171 deletions.
233 changes: 233 additions & 0 deletions src/neo/IO/Caching/IndexedQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;

namespace Neo.IO.Caching
{
/// <summary>
/// Represents a queue with indexed access to the items
/// </summary>
/// <typeparam name="T">The type of items in the queue</typeparam>
class IndexedQueue<T> : IReadOnlyCollection<T>
{
private const int DefaultCapacity = 16;
private const int GrowthFactor = 2;
private const float TrimThreshold = 0.9f;

private T[] _array;
private int _head;
private int _count;

/// <summary>
/// Indicates the count of items in the queue
/// </summary>
public int Count => _count;

/// <summary>
/// Creates a queue with the default capacity
/// </summary>
public IndexedQueue() : this(DefaultCapacity)
{
}

/// <summary>
/// Creates a queue with the specified capacity
/// </summary>
/// <param name="capacity">The initial capacity of the queue</param>
public IndexedQueue(int capacity)
{
if (capacity <= 0)
throw new ArgumentOutOfRangeException(nameof(capacity), "The capacity must be greater than zero.");
_array = new T[capacity];
_head = 0;
_count = 0;
}

/// <summary>
/// Creates a queue filled with the specified items
/// </summary>
/// <param name="collection">The collection of items to fill the queue with</param>
public IndexedQueue(IEnumerable<T> collection)
{
_array = collection.ToArray();
_head = 0;
_count = _array.Length;
}

/// <summary>
/// Gets the value at the index
/// </summary>
/// <param name="index">The index</param>
/// <returns>The value at the specified index</returns>
public ref T this[int index]
{
get
{
if (index < 0 || index >= _count)
throw new IndexOutOfRangeException();
return ref _array[(index + _head) % _array.Length];
}
}

/// <summary>
/// Inserts an item at the rear of the queue
/// </summary>
/// <param name="item">The item to insert</param>
public void Enqueue(T item)
{
if (_array.Length == _count)
{
int newSize = _array.Length * GrowthFactor;
if (_head == 0)
{
Array.Resize(ref _array, newSize);
}
else
{
T[] buffer = new T[newSize];
Array.Copy(_array, _head, buffer, 0, _array.Length - _head);
Array.Copy(_array, 0, buffer, _array.Length - _head, _head);
_array = buffer;
_head = 0;
}
}
_array[(_head + _count) % _array.Length] = item;
++_count;
}

/// <summary>
/// Provides access to the item at the front of the queue without dequeueing it
/// </summary>
/// <returns>The frontmost item</returns>
public T Peek()
{
if (_count == 0)
throw new InvalidOperationException("The queue is empty.");
return _array[_head];
}

/// <summary>
/// Attempts to return an item from the front of the queue without removing it
/// </summary>
/// <param name="item">The item</param>
/// <returns>True if the queue returned an item or false if the queue is empty</returns>
public bool TryPeek(out T item)
{
if (_count == 0)
{
item = default;
return false;
}
else
{
item = _array[_head];
return true;
}
}

/// <summary>
/// Removes an item from the front of the queue, returning it
/// </summary>
/// <returns>The item that was removed</returns>
public T Dequeue()
{
if (_count == 0)
throw new InvalidOperationException("The queue is empty");
T result = _array[_head];
++_head;
_head %= _array.Length;
--_count;
return result;
}

/// <summary>
/// Attempts to return an item from the front of the queue, removing it
/// </summary>
/// <param name="item">The item</param>
/// <returns>True if the queue returned an item or false if the queue is empty</returns>
public bool TryDequeue(out T item)
{
if (_count == 0)
{
item = default;
return false;
}
else
{
item = _array[_head];
++_head;
_head %= _array.Length;
--_count;
return true;
}
}

/// <summary>
/// Clears the items from the queue
/// </summary>
public void Clear()
{
_head = 0;
_count = 0;
}

/// <summary>
/// Trims the extra array space that isn't being used.
/// </summary>
public void TrimExcess()
{
if (_count == 0)
{
_array = new T[DefaultCapacity];
}
else if (_array.Length * TrimThreshold >= _count)
{
T[] arr = new T[_count];
CopyTo(arr, 0);
_array = arr;
_head = 0;
}
}

/// <summary>
/// Copys the queue's items to a destination array
/// </summary>
/// <param name="array">The destination array</param>
/// <param name="arrayIndex">The index in the destination to start copying at</param>
public void CopyTo(T[] array, int arrayIndex)
{
if (array is null) throw new ArgumentNullException(nameof(array));
if (arrayIndex < 0 || arrayIndex + _count > array.Length)
throw new ArgumentOutOfRangeException(nameof(arrayIndex));
if (_head + _count <= _array.Length)
{
Array.Copy(_array, _head, array, arrayIndex, _count);
}
else
{
Array.Copy(_array, _head, array, arrayIndex, _array.Length - _head);
Array.Copy(_array, 0, array, arrayIndex + _array.Length - _head, _count + _head - _array.Length);
}
}

/// <summary>
/// Returns an array of the items in the queue
/// </summary>
/// <returns>An array containing the queue's items</returns>
public T[] ToArray()
{
T[] result = new T[_count];
CopyTo(result, 0);
return result;
}

public IEnumerator<T> GetEnumerator()
{
for (int i = 0; i < _count; i++)
yield return _array[(_head + i) % _array.Length];
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
82 changes: 74 additions & 8 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading;

Expand Down Expand Up @@ -74,6 +75,7 @@ private class UnverifiedBlocksList { public LinkedList<Block> Blocks = new Linke
/// </summary>
public DataCache View => new SnapshotCache(Store);
public MemoryPool MemPool { get; }
public HeaderCache HeaderCache { get; } = new HeaderCache();

private static Blockchain singleton;
public static Blockchain Singleton
Expand Down Expand Up @@ -124,6 +126,12 @@ public Blockchain(NeoSystem system, IStore store)
}
}

protected override void PostStop()
{
base.PostStop();
HeaderCache.Dispose();
}

private bool ContainsTransaction(UInt256 hash)
{
if (MemPool.ContainsKey(hash)) return true;
Expand Down Expand Up @@ -221,32 +229,84 @@ private VerifyResult OnNewBlock(Block block)
{
DataCache snapshot = View;
uint currentHeight = NativeContract.Ledger.CurrentIndex(snapshot);
uint headerHeight = HeaderCache.Last?.Index ?? NativeContract.Ledger.CurrentIndex(snapshot);
if (block.Index <= currentHeight)
return VerifyResult.AlreadyExists;
if (block.Index - 1 > currentHeight)
if (block.Index - 1 > headerHeight)
{
AddUnverifiedBlockToCache(block);
return VerifyResult.UnableToVerify;
}
if (block.Index == currentHeight + 1)
if (block.Index == headerHeight + 1)
{
if (!block.Verify(snapshot))
return VerifyResult.Invalid;
block_cache.TryAdd(block.Hash, block);
block_cache_unverified.Remove(block.Index);
Persist(block);
if (block_cache_unverified.TryGetValue(block.Index + 1, out var unverifiedBlocks))
}
else
{
if (!block.Hash.Equals(HeaderCache[block.Index].Hash))
return VerifyResult.Invalid;
}
block_cache.TryAdd(block.Hash, block);
if (block.Index == currentHeight + 1)
{
Block block_persist = block;
List<Block> blocksToPersistList = new List<Block>();
while (true)
{
blocksToPersistList.Add(block_persist);
if (block_persist.Index + 1 > headerHeight) break;
UInt256 hash = HeaderCache[block_persist.Index + 1].Hash;
if (!block_cache.TryGetValue(hash, out block_persist)) break;
}

int blocksPersisted = 0;
// 15000 is the default among of seconds per block, while MilliSecondsPerBlock is the current
uint extraBlocks = (15000 - MillisecondsPerBlock) / 1000;
foreach (Block blockToPersist in blocksToPersistList)
{
block_cache_unverified.Remove(blockToPersist.Index);
Persist(blockToPersist);

if (blocksPersisted++ < blocksToPersistList.Count - (2 + Math.Max(0, extraBlocks))) continue;
// Empirically calibrated for relaying the most recent 2 blocks persisted with 15s network
// Increase in the rate of 1 block per second in configurations with faster blocks

if (blockToPersist.Index + 99 >= headerHeight)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = blockToPersist });
}
if (block_cache_unverified.TryGetValue(currentHeight + 1, out var unverifiedBlocks))
{
foreach (var unverifiedBlock in unverifiedBlocks.Blocks)
Self.Tell(unverifiedBlock, ActorRefs.NoSender);
block_cache_unverified.Remove(block.Index + 1);
}
// We can store the new block in block_cache and tell the new height to other nodes after Persist().
system.LocalNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(block.Index)));
}
else
{
if (block.Index + 99 >= headerHeight)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = block });
if (block.Index == headerHeight + 1)
HeaderCache.Add(block.Header);
}
return VerifyResult.Succeed;
}

private void OnNewHeaders(Header[] headers)
{
if (HeaderCache.Full) return;
DataCache snapshot = View;
uint headerHeight = HeaderCache.Last?.Index ?? NativeContract.Ledger.CurrentIndex(snapshot);
foreach (Header header in headers)
{
if (header.Index > headerHeight + 1) break;
if (header.Index < headerHeight + 1) continue;
if (!header.Verify(snapshot)) break;
HeaderCache.Add(header);
++headerHeight;
}
}

private VerifyResult OnNewInventory(IInventory inventory)
{
if (!inventory.Verify(View)) return VerifyResult.Invalid;
Expand Down Expand Up @@ -278,6 +338,9 @@ protected override void OnReceive(object message)
case FillMemoryPool fill:
OnFillMemoryPool(fill.Transactions);
break;
case Header[] headers:
OnNewHeaders(headers);
break;
case Block block:
OnInventory(block, false);
break;
Expand Down Expand Up @@ -373,6 +436,8 @@ private void Persist(Block block)
}
block_cache.TryRemove(block.PrevHash, out _);
Context.System.EventStream.Publish(new PersistCompleted { Block = block });
if (HeaderCache.TryRemoveFirst(out Header header))
Debug.Assert(header.Index == block.Index);
}

public static Props Props(NeoSystem system, IStore store)
Expand Down Expand Up @@ -431,6 +496,7 @@ internal protected override bool IsHighPriority(object message)
{
switch (message)
{
case Header[] _:
case Block _:
case ExtensiblePayload _:
case Terminated _:
Expand Down
Loading

0 comments on commit 9340b30

Please sign in to comment.