Skip to content

Commit

Permalink
Restore transactions from saved consensus context (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikzhang authored and jsolman committed Feb 22, 2019
1 parent 3b27f0a commit de64019
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 24 deletions.
16 changes: 16 additions & 0 deletions neo.UnitTests/UT_MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,21 @@ public void CapacityTestWithUnverifiedHighProirtyTransactions()
AddHighPriorityTransactions(1);
_unit.CanTransactionFitInPool(CreateMockLowPriorityTransaction()).ShouldBeEquivalentTo(false);
}

[TestMethod]
public void TestInvalidateAll()
{
AddHighPriorityTransactions(30);
AddLowPriorityTransactions(60);
_unit.UnverifiedSortedHighPrioTxCount.ShouldBeEquivalentTo(0);
_unit.UnverifiedSortedLowPrioTxCount.ShouldBeEquivalentTo(0);
_unit.SortedHighPrioTxCount.ShouldBeEquivalentTo(30);
_unit.SortedLowPrioTxCount.ShouldBeEquivalentTo(60);
_unit.InvalidateAllTransactions();
_unit.UnverifiedSortedHighPrioTxCount.ShouldBeEquivalentTo(30);
_unit.UnverifiedSortedLowPrioTxCount.ShouldBeEquivalentTo(60);
_unit.SortedHighPrioTxCount.ShouldBeEquivalentTo(0);
_unit.SortedLowPrioTxCount.ShouldBeEquivalentTo(0);
}
}
}
18 changes: 13 additions & 5 deletions neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,21 @@ private void OnStart(Start options)
{
Log("OnStart");
started = true;
bool loadedState = !options.IgnoreRecoveryLogs && context.Load(store);
if (loadedState && context.State.HasFlag(ConsensusState.CommitSent))
if (!options.IgnoreRecoveryLogs && context.Load(store))
{
CheckPreparations();
return;
if (context.Transactions != null)
{
Sender.Ask<Blockchain.FillCompleted>(new Blockchain.FillMemoryPool
{
Transactions = context.Transactions.Values
}).Wait();
}
if (context.State.HasFlag(ConsensusState.CommitSent))
{
CheckPreparations();
return;
}
}

InitializeConsensus(0);
// Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up.
if (context.BlockIndex == Blockchain.Singleton.HeaderHeight + 1)
Expand Down
32 changes: 32 additions & 0 deletions neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class ApplicationExecuted { public Transaction Transaction; public Applic
public class PersistCompleted { public Block Block; }
public class Import { public IEnumerable<Block> Blocks; }
public class ImportCompleted { }
public class FillMemoryPool { public IEnumerable<Transaction> Transactions; }
public class FillCompleted { }

public static readonly uint SecondsPerBlock = ProtocolSettings.Default.SecondsPerBlock;
public const uint DecrementInterval = 2000000;
Expand Down Expand Up @@ -253,6 +255,33 @@ private void AddUnverifiedBlockToCache(Block block)
blocks.AddLast(block);
}

private void OnFillMemoryPool(IEnumerable<Transaction> transactions)
{
// Invalidate all the transactions in the memory pool, to avoid any failures when adding new transactions.
MemPool.InvalidateAllTransactions();

// Add the transactions to the memory pool
foreach (var tx in transactions)
{
if (tx.Type == TransactionType.MinerTransaction)
continue;
if (Store.ContainsTransaction(tx.Hash))
continue;
if (!Plugin.CheckPolicy(tx))
continue;
// First remove the tx if it is unverified in the pool.
MemPool.TryRemoveUnVerified(tx.Hash, out _);
// Verify the the transaction
if (!tx.Verify(currentSnapshot, MemPool.GetVerifiedTransactions()))
continue;
// Add to the memory pool
MemPool.TryAdd(tx.Hash, tx);
}
// Transactions originally in the pool will automatically be reverified based on their priority.

Sender.Tell(new FillCompleted());
}

private RelayResultReason OnNewBlock(Block block)
{
if (block.Index <= Height)
Expand Down Expand Up @@ -406,6 +435,9 @@ protected override void OnReceive(object message)
case Import import:
OnImport(import.Blocks);
break;
case FillMemoryPool fill:
OnFillMemoryPool(fill.Transactions);
break;
case Header[] headers:
OnNewHeaders(headers);
break;
Expand Down
54 changes: 36 additions & 18 deletions neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private bool TryRemoveVerified(UInt256 hash, out PoolItem item)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool TryRemoveUnVerified(UInt256 hash, out PoolItem item)
internal bool TryRemoveUnVerified(UInt256 hash, out PoolItem item)
{
if (!_unverifiedTransactions.TryGetValue(hash, out item))
return false;
Expand All @@ -354,6 +354,27 @@ private bool TryRemoveUnVerified(UInt256 hash, out PoolItem item)
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void InvalidateVerifiedTransactions()
{
foreach (PoolItem item in _sortedHighPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedHighPriorityTransactions.Add(item);
}

foreach (PoolItem item in _sortedLowPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedLowPriorityTransactions.Add(item);
}

// Clear the verified transactions now, since they all must be reverified.
_unsortedTransactions.Clear();
_sortedHighPrioTransactions.Clear();
_sortedLowPrioTransactions.Clear();
}

// Note: this must only be called from a single thread (the Blockchain actor)
internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
{
Expand All @@ -368,22 +389,7 @@ internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
}

// Add all the previously verified transactions back to the unverified transactions
foreach (PoolItem item in _sortedHighPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedHighPriorityTransactions.Add(item);
}

foreach (PoolItem item in _sortedLowPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedLowPriorityTransactions.Add(item);
}

// Clear the verified transactions now, since they all must be reverified.
_unsortedTransactions.Clear();
_sortedHighPrioTransactions.Clear();
_sortedLowPrioTransactions.Clear();
InvalidateVerifiedTransactions();
}
finally
{
Expand All @@ -406,7 +412,19 @@ internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
_maxLowPriorityTxPerBlock, MaxSecondsToReverifyLowPrioTx, snapshot);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void InvalidateAllTransactions()
{
_txRwLock.EnterWriteLock();
try
{
InvalidateVerifiedTransactions();
}
finally
{
_txRwLock.ExitWriteLock();
}
}

private int ReverifyTransactions(SortedSet<PoolItem> verifiedSortedTxPool,
SortedSet<PoolItem> unverifiedSortedTxPool, int count, double secondsTimeout, Snapshot snapshot)
{
Expand Down
2 changes: 1 addition & 1 deletion neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal void ResumeNodeStartup()
public void StartConsensus(Wallet wallet, Store consensus_store = null, bool ignoreRecoveryLogs = false)
{
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet));
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs });
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs }, Blockchain);
}

public void StartNode(int port = 0, int wsPort = 0, int minDesiredConnections = Peer.DefaultMinDesiredConnections,
Expand Down

0 comments on commit de64019

Please sign in to comment.