Skip to content

Commit

Permalink
MemoryPool: state-dependent concurrent verification in TryAdd
Browse files Browse the repository at this point in the history
MemoryPool contents is always valid (verified) against some snapshot. This
snapshot is only changed when new block is added. Between blocks we only have
one valid chain state that can be read by multiple threads without any issues,
thus we can execute concurrently not only state-independent, but also
state-dependent parts of transaction verification.

To simplify execution flow (minimize single-threaded Blockchain message
handling and eliminate duplicate DB accesses for ContainsTransaction)
TransactionRouter is an independent entity now, though of course we can also
attach the same actor functionality to MemoryPool itself.

TransactionVerificationContext balance checking is moved to MemoryPool from
Transaction with this, Transaction shouldn't care (it can check overall GAS
balance though).

This is directly related to neo-project#2045 work (it solves state access problem there)
and in some sense is an alternative to neo-project#2054 (makes fee calculation easier,
though IsStandardContract() trick to optimize out these contracts during
reverification is still relevant and can be added here). At this stage it's
just a prototype, some additional optimizations and simplifications are
possible of course, but this prototype shows the direction and the main
question for now is whether this direction is interesting for us.
  • Loading branch information
roman-khimov committed Nov 26, 2020
1 parent 0c24322 commit c4abe4b
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 131 deletions.
39 changes: 2 additions & 37 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class RelayResult { public IInventory Inventory; public VerifyResult Resu
private const int MaxTxToReverifyPerIdle = 10;
private static readonly object lockObj = new object();
private readonly NeoSystem system;
private readonly IActorRef txrouter;
private readonly List<UInt256> header_index = new List<UInt256>();
private uint stored_header_count = 0;
private readonly Dictionary<UInt256, Block> block_cache = new Dictionary<UInt256, Block>();
Expand Down Expand Up @@ -112,7 +111,6 @@ static Blockchain()
public Blockchain(NeoSystem system, IStore store)
{
this.system = system;
this.txrouter = Context.ActorOf(TransactionRouter.Props(system));
this.MemPool = new MemoryPool(system, ProtocolSettings.Default.MemoryPoolMaxTransactions);
this.Store = store;
this.View = new ReadOnlyView(store);
Expand Down Expand Up @@ -146,7 +144,7 @@ public Blockchain(NeoSystem system, IStore store)
else
{
UpdateCurrentSnapshot();
MemPool.LoadPolicy(currentSnapshot);
MemPool.InitSnapshot(currentSnapshot);
}
singleton = this;
}
Expand Down Expand Up @@ -314,7 +312,7 @@ private void OnFillMemoryPool(IEnumerable<Transaction> transactions)
// First remove the tx if it is unverified in the pool.
MemPool.TryRemoveUnVerified(tx.Hash, out _);
// Add to the memory pool
MemPool.TryAdd(tx, currentSnapshot);
MemPool.TryAdd(tx);
}
// Transactions originally in the pool will automatically be reverified based on their priority.

Expand All @@ -326,7 +324,6 @@ private void OnInventory(IInventory inventory, bool relay = true)
VerifyResult result = inventory switch
{
Block block => OnNewBlock(block),
Transaction transaction => OnNewTransaction(transaction),
_ => OnNewInventory(inventory)
};
if (relay && result == VerifyResult.Succeed)
Expand Down Expand Up @@ -370,20 +367,6 @@ private VerifyResult OnNewInventory(IInventory inventory)
return VerifyResult.Succeed;
}

private VerifyResult OnNewTransaction(Transaction transaction)
{
if (ContainsTransaction(transaction.Hash)) return VerifyResult.AlreadyExists;
return MemPool.TryAdd(transaction, currentSnapshot);
}

private void OnPreverifyCompleted(PreverifyCompleted task)
{
if (task.Result == VerifyResult.Succeed)
OnInventory(task.Transaction, task.Relay);
else
SendRelayResult(task.Transaction, task.Result);
}

protected override void OnReceive(object message)
{
switch (message)
Expand All @@ -397,34 +380,16 @@ protected override void OnReceive(object message)
case Block block:
OnInventory(block, false);
break;
case Transaction tx:
OnTransaction(tx, true);
break;
case Transaction[] transactions:
// This message comes from a mempool's revalidation, already relayed
foreach (var tx in transactions) OnTransaction(tx, false);
break;
case IInventory inventory:
OnInventory(inventory);
break;
case PreverifyCompleted task:
OnPreverifyCompleted(task);
break;
case Idle _:
if (MemPool.ReVerifyTopUnverifiedTransactionsIfNeeded(MaxTxToReverifyPerIdle, currentSnapshot))
Self.Tell(Idle.Instance, ActorRefs.NoSender);
break;
}
}

private void OnTransaction(Transaction tx, bool relay)
{
if (ContainsTransaction(tx.Hash))
SendRelayResult(tx, VerifyResult.AlreadyExists);
else
txrouter.Tell(new TransactionRouter.Task { Transaction = tx, Relay = relay }, Sender);
}

private void Persist(Block block)
{
using (SnapshotView snapshot = GetSnapshot())
Expand Down
129 changes: 84 additions & 45 deletions src/neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MemoryPool : IReadOnlyCollection<Transaction>
/// lock for write operations.
/// </summary>
private readonly ReaderWriterLockSlim _txRwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly ReaderWriterLockSlim _snapshotLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

/// <summary>
/// Store all verified unsorted transactions currently in the pool.
Expand All @@ -56,6 +57,11 @@ public class MemoryPool : IReadOnlyCollection<Transaction>
private readonly Dictionary<UInt256, PoolItem> _unverifiedTransactions = new Dictionary<UInt256, PoolItem>();
private readonly SortedSet<PoolItem> _unverifiedSortedTransactions = new SortedSet<PoolItem>();

// <summary>
// Verified transactions are verified against this snapshot.
// </summary>
private SnapshotView currentSnapshot;

// Internal methods to aid in unit testing
internal int SortedTxCount => _sortedTransactions.Count;
internal int UnverifiedSortedTxCount => _unverifiedSortedTransactions.Count;
Expand Down Expand Up @@ -105,10 +111,16 @@ public MemoryPool(NeoSystem system, int capacity)
Capacity = capacity;
}

internal bool LoadPolicy(StoreView snapshot)
internal void InitSnapshot(SnapshotView snapshot)
{
currentSnapshot = snapshot;
LoadPolicy();
}

internal bool LoadPolicy()
{
_maxTxPerBlock = (int)NativeContract.Policy.GetMaxTransactionsPerBlock(snapshot);
long newFeePerByte = NativeContract.Policy.GetFeePerByte(snapshot);
_maxTxPerBlock = (int)NativeContract.Policy.GetMaxTransactionsPerBlock(currentSnapshot);
long newFeePerByte = NativeContract.Policy.GetFeePerByte(currentSnapshot);
bool policyChanged = newFeePerByte > _feePerByte;
_feePerByte = newFeePerByte;
return policyChanged;
Expand Down Expand Up @@ -253,48 +265,63 @@ internal bool CanTransactionFitInPool(Transaction tx)
}

/// <summary>
/// Adds an already verified transaction to the memory pool.
///
/// Note: This must only be called from a single thread (the Blockchain actor). To add a transaction to the pool
/// tell the Blockchain actor about the transaction.
/// Verifies (against currentSnapshot) and adds transaction to the memory pool.
/// </summary>
/// <param name="hash"></param>
/// <param name="tx"></param>
/// <returns></returns>
internal VerifyResult TryAdd(Transaction tx, StoreView snapshot)
internal VerifyResult TryAdd(Transaction tx)
{
var poolItem = new PoolItem(tx);

if (_unsortedTransactions.ContainsKey(tx.Hash)) return VerifyResult.AlreadyExists;

List<Transaction> removedTransactions = null;
_txRwLock.EnterWriteLock();
VerifyResult result = VerifyResult.Succeed;

_snapshotLock.EnterReadLock();
try
{
VerifyResult result = tx.VerifyStateDependent(snapshot, VerificationContext);
if (result != VerifyResult.Succeed) return result;
if (currentSnapshot.ContainsTransaction(tx.Hash))
return VerifyResult.AlreadyExists;

_unsortedTransactions.Add(tx.Hash, poolItem);
VerificationContext.AddTransaction(tx);
_sortedTransactions.Add(poolItem);
result = tx.Verify(currentSnapshot, null);
if (result != VerifyResult.Succeed)
return result;

if (Count > Capacity)
removedTransactions = RemoveOverCapacity();
_txRwLock.EnterWriteLock();
try
{
if (_unsortedTransactions.ContainsKey(tx.Hash) || _unverifiedTransactions.ContainsKey(tx.Hash))
return VerifyResult.AlreadyExists;

if (!VerificationContext.CheckTransaction(tx, currentSnapshot))
return VerifyResult.InsufficientFunds;
_unsortedTransactions.Add(tx.Hash, poolItem);
VerificationContext.AddTransaction(tx);
_sortedTransactions.Add(poolItem);

if (Count > Capacity)
removedTransactions = RemoveOverCapacity();
if (!_unsortedTransactions.ContainsKey(tx.Hash))
result = VerifyResult.OutOfMemory;
}
finally
{
_txRwLock.ExitWriteLock();
}
}
finally
{
_txRwLock.ExitWriteLock();
_snapshotLock.ExitReadLock();
}

foreach (IMemoryPoolTxObserverPlugin plugin in Plugin.TxObserverPlugins)
{
plugin.TransactionAdded(poolItem.Tx);
if (result == VerifyResult.Succeed)
plugin.TransactionAdded(poolItem.Tx);
if (removedTransactions != null)
plugin.TransactionsRemoved(MemoryPoolTxRemovalReason.CapacityExceeded, removedTransactions);
}

if (!_unsortedTransactions.ContainsKey(tx.Hash)) return VerifyResult.OutOfMemory;
return VerifyResult.Succeed;
return result;
}

private List<Transaction> RemoveOverCapacity()
Expand Down Expand Up @@ -354,40 +381,51 @@ internal void InvalidateVerifiedTransactions()
}

// Note: this must only be called from a single thread (the Blockchain actor)
internal void UpdatePoolForBlockPersisted(Block block, StoreView snapshot)
internal void UpdatePoolForBlockPersisted(Block block, SnapshotView snapshot)
{
bool policyChanged = LoadPolicy(snapshot);
bool policyChanged = false;

_txRwLock.EnterWriteLock();
_snapshotLock.EnterWriteLock();
try
{
// First remove the transactions verified in the block.
foreach (Transaction tx in block.Transactions)
currentSnapshot = snapshot;
policyChanged = LoadPolicy();

_txRwLock.EnterWriteLock();
try
{
if (TryRemoveVerified(tx.Hash, out _)) continue;
TryRemoveUnVerified(tx.Hash, out _);
}
// First remove the transactions verified in the block.
foreach (Transaction tx in block.Transactions)
{
if (TryRemoveVerified(tx.Hash, out _)) continue;
TryRemoveUnVerified(tx.Hash, out _);
}

// Add all the previously verified transactions back to the unverified transactions
InvalidateVerifiedTransactions();
// Add all the previously verified transactions back to the unverified transactions
InvalidateVerifiedTransactions();

if (policyChanged)
{
var tx = new List<Transaction>();
foreach (PoolItem item in _unverifiedSortedTransactions.Reverse())
if (item.Tx.FeePerByte >= _feePerByte)
tx.Add(item.Tx);
if (policyChanged)
{
var tx = new List<Transaction>();
foreach (PoolItem item in _unverifiedSortedTransactions.Reverse())
if (item.Tx.FeePerByte >= _feePerByte)
tx.Add(item.Tx);

_unverifiedTransactions.Clear();
_unverifiedSortedTransactions.Clear();
_unverifiedTransactions.Clear();
_unverifiedSortedTransactions.Clear();

if (tx.Count > 0)
_system.Blockchain.Tell(tx.ToArray(), ActorRefs.NoSender);
if (tx.Count > 0)
_system.TransactionRouter.Tell(tx.ToArray(), ActorRefs.NoSender);
}
}
finally
{
_txRwLock.ExitWriteLock();
}
}
finally
{
_txRwLock.ExitWriteLock();
_snapshotLock.ExitWriteLock();
}

// If we know about headers of future blocks, no point in verifying transactions from the unverified tx pool
Expand Down Expand Up @@ -425,7 +463,8 @@ private int ReverifyTransactions(SortedSet<PoolItem> verifiedSortedTxPool,
// Since unverifiedSortedTxPool is ordered in an ascending manner, we take from the end.
foreach (PoolItem item in unverifiedSortedTxPool.Reverse().Take(count))
{
if (item.Tx.VerifyStateDependent(snapshot, VerificationContext) == VerifyResult.Succeed)
if (VerificationContext.CheckTransaction(item.Tx, currentSnapshot) &&
item.Tx.VerifyStateDependent(snapshot) == VerifyResult.Succeed)
{
reverifiedItems.Add(item);
VerificationContext.AddTransaction(item.Tx);
Expand Down
51 changes: 37 additions & 14 deletions src/neo/Ledger/TransactionRouter.cs
Original file line number Diff line number Diff line change
@@ -1,35 +1,58 @@
using Akka.Actor;
using Akka.Routing;
using Neo.Network.P2P;
using Neo.Network.P2P.Payloads;
using System;

namespace Neo.Ledger
{
internal class TransactionRouter : UntypedActor
public sealed class TransactionRouter : UntypedActor
{
public class Task { public Transaction Transaction; public bool Relay; }
private readonly NeoSystem system;
private readonly Blockchain chain;

private readonly IActorRef blockchain;

public TransactionRouter(NeoSystem system)
public TransactionRouter(NeoSystem system, Blockchain bc)
{
this.blockchain = system.Blockchain;
this.system = system;
this.chain = bc;
}

protected override void OnReceive(object message)
{
if (!(message is Task task)) return;
blockchain.Tell(new Blockchain.PreverifyCompleted
switch (message)
{
case Transaction tx:
OnTransaction(tx, true);
break;
case Transaction[] transactions:
// This message comes from a mempool's revalidation, already relayed
foreach (var tx in transactions) OnTransaction(tx, false);
break;
}
}

private void OnTransaction(Transaction tx, bool relay)
{
VerifyResult res = chain.MemPool.TryAdd(tx);
if (relay && res == VerifyResult.Succeed)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = tx });
SendRelayResult(tx, res);
}

private void SendRelayResult(IInventory inventory, VerifyResult result)
{
Blockchain.RelayResult rr = new Blockchain.RelayResult
{
Transaction = task.Transaction,
Result = task.Transaction.VerifyStateIndependent(),
Relay = task.Relay
}, Sender);
Inventory = inventory,
Result = result
};
Sender.Tell(rr);
Context.System.EventStream.Publish(rr);
}

internal static Props Props(NeoSystem system)
public static Props Props(NeoSystem system, Blockchain bc)
{
return Akka.Actor.Props.Create(() => new TransactionRouter(system)).WithRouter(new SmallestMailboxPool(Environment.ProcessorCount));
return Akka.Actor.Props.Create(() => new TransactionRouter(system, bc)).WithRouter(new SmallestMailboxPool(Environment.ProcessorCount));
}
}
}
2 changes: 2 additions & 0 deletions src/neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class NeoSystem : IDisposable
$"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}" +
$"consensus-service-mailbox {{ mailbox-type: \"{typeof(ConsensusServiceMailbox).AssemblyQualifiedName}\" }}");
public IActorRef Blockchain { get; }
public IActorRef TransactionRouter { get; }
public IActorRef LocalNode { get; }
internal IActorRef TaskManager { get; }
public IActorRef Consensus { get; private set; }
Expand All @@ -39,6 +40,7 @@ public NeoSystem(string storageEngine = null)
? new MemoryStore()
: Plugin.Storages[storageEngine].GetStore();
this.Blockchain = ActorSystem.ActorOf(Ledger.Blockchain.Props(this, store));
this.TransactionRouter = ActorSystem.ActorOf(Ledger.TransactionRouter.Props(this, Ledger.Blockchain.Singleton));
this.LocalNode = ActorSystem.ActorOf(Network.P2P.LocalNode.Props(this));
this.TaskManager = ActorSystem.ActorOf(Network.P2P.TaskManager.Props(this));
foreach (var plugin in Plugin.Plugins)
Expand Down
Loading

0 comments on commit c4abe4b

Please sign in to comment.