Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore transactions from saved consensus context #598

Merged
merged 4 commits into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions neo.UnitTests/UT_MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,21 @@ public void CapacityTestWithUnverifiedHighProirtyTransactions()
AddHighPriorityTransactions(1);
_unit.CanTransactionFitInPool(CreateMockLowPriorityTransaction()).ShouldBeEquivalentTo(false);
}

[TestMethod]
public void TestInvalidateAll()
{
AddHighPriorityTransactions(30);
AddLowPriorityTransactions(60);
_unit.UnverifiedSortedHighPrioTxCount.ShouldBeEquivalentTo(0);
_unit.UnverifiedSortedLowPrioTxCount.ShouldBeEquivalentTo(0);
_unit.SortedHighPrioTxCount.ShouldBeEquivalentTo(30);
_unit.SortedLowPrioTxCount.ShouldBeEquivalentTo(60);
_unit.InvalidateAllTransactions();
_unit.UnverifiedSortedHighPrioTxCount.ShouldBeEquivalentTo(30);
_unit.UnverifiedSortedLowPrioTxCount.ShouldBeEquivalentTo(60);
_unit.SortedHighPrioTxCount.ShouldBeEquivalentTo(0);
_unit.SortedLowPrioTxCount.ShouldBeEquivalentTo(0);
}
}
}
18 changes: 13 additions & 5 deletions neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,21 @@ private void OnStart(Start options)
{
Log("OnStart");
started = true;
bool loadedState = !options.IgnoreRecoveryLogs && context.Load(store);
if (loadedState && context.State.HasFlag(ConsensusState.CommitSent))
if (!options.IgnoreRecoveryLogs && context.Load(store))
{
CheckPreparations();
return;
if (context.Transactions != null)
{
Sender.Ask<Blockchain.FillCompleted>(new Blockchain.FillMemoryPool
{
Transactions = context.Transactions.Values
}).Wait();
}
if (context.State.HasFlag(ConsensusState.CommitSent))
{
CheckPreparations();
return;
}
}

InitializeConsensus(0);
// Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up.
if (context.BlockIndex == Blockchain.Singleton.HeaderHeight + 1)
Expand Down
32 changes: 32 additions & 0 deletions neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class ApplicationExecuted { public Transaction Transaction; public Applic
public class PersistCompleted { public Block Block; }
public class Import { public IEnumerable<Block> Blocks; }
public class ImportCompleted { }
public class FillMemoryPool { public IEnumerable<Transaction> Transactions; }
public class FillCompleted { }

public static readonly uint SecondsPerBlock = ProtocolSettings.Default.SecondsPerBlock;
public const uint DecrementInterval = 2000000;
Expand Down Expand Up @@ -253,6 +255,33 @@ private void AddUnverifiedBlockToCache(Block block)
blocks.AddLast(block);
}

private void OnFillMemoryPool(IEnumerable<Transaction> transactions)
{
// Invalidate all the transactions in the memory pool, to avoid any failures when adding new transactions.
MemPool.InvalidateAllTransactions();

// Add the transactions to the memory pool
foreach (var tx in transactions)
{
if (tx.Type == TransactionType.MinerTransaction)
continue;
if (Store.ContainsTransaction(tx.Hash))
continue;
if (!Plugin.CheckPolicy(tx))
continue;
// First remove the tx if it is unverified in the pool.
MemPool.TryRemoveUnVerified(tx.Hash, out _);
// Verify the the transaction
if (!tx.Verify(currentSnapshot, MemPool.GetVerifiedTransactions()))
jsolman marked this conversation as resolved.
Show resolved Hide resolved
continue;
// Add to the memory pool
MemPool.TryAdd(tx.Hash, tx);
}
// Transactions originally in the pool will automatically be reverified based on their priority.

Sender.Tell(new FillCompleted());
}

private RelayResultReason OnNewBlock(Block block)
{
if (block.Index <= Height)
Expand Down Expand Up @@ -406,6 +435,9 @@ protected override void OnReceive(object message)
case Import import:
OnImport(import.Blocks);
break;
case FillMemoryPool fill:
OnFillMemoryPool(fill.Transactions);
break;
case Header[] headers:
OnNewHeaders(headers);
break;
Expand Down
54 changes: 36 additions & 18 deletions neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private bool TryRemoveVerified(UInt256 hash, out PoolItem item)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool TryRemoveUnVerified(UInt256 hash, out PoolItem item)
internal bool TryRemoveUnVerified(UInt256 hash, out PoolItem item)
{
if (!_unverifiedTransactions.TryGetValue(hash, out item))
return false;
Expand All @@ -354,6 +354,27 @@ private bool TryRemoveUnVerified(UInt256 hash, out PoolItem item)
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void InvalidateVerifiedTransactions()
{
foreach (PoolItem item in _sortedHighPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedHighPriorityTransactions.Add(item);
}

foreach (PoolItem item in _sortedLowPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedLowPriorityTransactions.Add(item);
}

// Clear the verified transactions now, since they all must be reverified.
_unsortedTransactions.Clear();
_sortedHighPrioTransactions.Clear();
_sortedLowPrioTransactions.Clear();
}

// Note: this must only be called from a single thread (the Blockchain actor)
internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
{
Expand All @@ -368,22 +389,7 @@ internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
}

// Add all the previously verified transactions back to the unverified transactions
foreach (PoolItem item in _sortedHighPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedHighPriorityTransactions.Add(item);
}

foreach (PoolItem item in _sortedLowPrioTransactions)
{
if (_unverifiedTransactions.TryAdd(item.Tx.Hash, item))
_unverifiedSortedLowPriorityTransactions.Add(item);
}

// Clear the verified transactions now, since they all must be reverified.
_unsortedTransactions.Clear();
_sortedHighPrioTransactions.Clear();
_sortedLowPrioTransactions.Clear();
InvalidateVerifiedTransactions();
}
finally
{
Expand All @@ -406,7 +412,19 @@ internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
_maxLowPriorityTxPerBlock, MaxSecondsToReverifyLowPrioTx, snapshot);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void InvalidateAllTransactions()
{
_txRwLock.EnterWriteLock();
try
{
InvalidateVerifiedTransactions();
}
finally
{
_txRwLock.ExitWriteLock();
}
}

private int ReverifyTransactions(SortedSet<PoolItem> verifiedSortedTxPool,
SortedSet<PoolItem> unverifiedSortedTxPool, int count, double secondsTimeout, Snapshot snapshot)
{
Expand Down
2 changes: 1 addition & 1 deletion neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal void ResumeNodeStartup()
public void StartConsensus(Wallet wallet, Store consensus_store = null, bool ignoreRecoveryLogs = false)
{
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet));
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs });
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs }, Blockchain);
}

public void StartNode(int port = 0, int wsPort = 0, int minDesiredConnections = Peer.DefaultMinDesiredConnections,
Expand Down