Permalink
Browse files

Implement mempool cleanup.

Resolves #743
Resolves #416
  • Loading branch information...
nopara73 committed Oct 23, 2018
1 parent a909b97 commit bd28638044c15bc313c6fe1478698a38ebf71bf8
@@ -183,7 +183,7 @@ public static void InitializeNoWallet()
Nodes = new NodesGroup(Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Constants.MinProtocolCapabilities
});
@@ -197,7 +197,7 @@ public static void InitializeNoWallet()
Nodes = new NodesGroup(Network, connectionParameters,
new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Constants.MinProtocolCapabilities
});
@@ -84,7 +84,7 @@ public async Task TestServicesAsync(string networkString)
var nodes = new NodesGroup(network, connectionParameters,
new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -405,7 +405,7 @@ public async Task WalletTestsAsync()
var nodes = new NodesGroup(Global.Config.Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -644,7 +644,7 @@ private void WalletTestsAsync_MemPoolService_TransactionReceived(object sender,
var nodes = new NodesGroup(Global.Config.Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -1087,7 +1087,7 @@ public async Task BuildTransactionValidationsTestAsync()
var nodes = new NodesGroup(Global.Config.Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -1258,7 +1258,7 @@ public async Task BuildTransactionReorgsTestAsync()
var nodes = new NodesGroup(Global.Config.Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -1430,7 +1430,7 @@ public async Task SpendUnconfirmedTxTestAsync()
var nodes = new NodesGroup(Global.Config.Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -2699,7 +2699,7 @@ public async Task CoinJoinMultipleRoundTestsAsync()
var nodes = new NodesGroup(Global.Config.Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -2708,7 +2708,7 @@ public async Task CoinJoinMultipleRoundTestsAsync()
var nodes2 = new NodesGroup(Global.Config.Network,
requirements: new NodeRequirement
{
RequiredServices = NodeServices.Network,
RequiredServices = NodeServices.NODE_WITNESS,
MinVersion = Helpers.Constants.ProtocolVersion_WITNESS_VERSION,
MinProtocolCapabilities = Helpers.Constants.MinProtocolCapabilities
});
@@ -14,7 +14,7 @@ public static class Constants
public const uint ProtocolVersion_WITNESS_VERSION = 70012;
public static readonly ProtocolCapabilities MinProtocolCapabilities = new ProtocolCapabilities() { SupportGetBlock = true, SupportWitness = true };
public static readonly ProtocolCapabilities MinProtocolCapabilities = new ProtocolCapabilities() { SupportGetBlock = true, SupportWitness = true, SupportMempoolQuery = true };
public const int P2wpkhInputSizeInBytes = 41;
public const int P2pkhInputSizeInBytes = 145;
@@ -2,6 +2,12 @@
using WalletWasabi.Models;
using NBitcoin;
using System;
using System.Threading.Tasks;
using System.Threading;
using WalletWasabi.Logging;
using NBitcoin.Protocol;
using System.Collections.Generic;
using System.Linq;
namespace WalletWasabi.Services
{
@@ -16,6 +22,85 @@ public class MemPoolService
public MemPoolService()
{
TransactionHashes = new ConcurrentHashSet<uint256>();
_cleanupInProcess = 0;
}
private long _cleanupInProcess;
public void TryPerformMempoolCleanupAsync(NodesGroup nodes, CancellationToken cancel)
{
if (Interlocked.Read(ref _cleanupInProcess) == 1) return; // If already cleaning, then no need to run it that often.
Interlocked.Exchange(ref _cleanupInProcess, 1);
Task.Run(async () =>
{
// This function is designed to prevent forever growing mempool.
try
{
var delay = TimeSpan.FromMinutes(1);
if (nodes?.ConnectedNodes is null) return;
while (nodes.ConnectedNodes.Count != nodes.MaximumNodeConnection && nodes.ConnectedNodes.All(x => x.IsConnected))
{
if (cancel.IsCancellationRequested) return;
Logger.LogInfo<MemPoolService>($"Not all nodes were in a connected state. Delaying mempool cleanup for {delay.TotalSeconds} seconds...");
await Task.Delay(delay, cancel);
}
Logger.LogInfo<MemPoolService>("Start cleaning out mempool...");
var allTxs = new HashSet<uint256>();
foreach (Node node in nodes.ConnectedNodes)
{
try
{
if (!node.IsConnected) continue;
if (cancel.IsCancellationRequested) return;
uint256[] txs = node.GetMempool(cancel);
if (cancel.IsCancellationRequested) return;
allTxs.UnionWith(txs);
if (cancel.IsCancellationRequested) return;
}
catch (Exception ex)
{
if (ex is InvalidOperationException && ex.Message.StartsWith("The node is not in a connected state", StringComparison.InvariantCultureIgnoreCase)
|| ex is OperationCanceledException
|| ex is TaskCanceledException
|| ex is TimeoutException)
{
Logger.LogTrace<MemPoolService>(ex);
}
else
{
Logger.LogDebug<MemPoolService>(ex);
}
}
}
uint256[] toRemove = TransactionHashes.Except(allTxs).ToArray();
foreach (uint256 tx in toRemove)
{
TransactionHashes.TryRemove(tx);
}
Logger.LogInfo<MemPoolService>($"{toRemove.Count()} transactions were cleaned from mempool...");
}
catch (Exception ex)
{
if (ex is OperationCanceledException
|| ex is TaskCanceledException
|| ex is TimeoutException)
{
Logger.LogTrace<MemPoolService>(ex);
}
else
{
Logger.LogDebug<MemPoolService>(ex);
}
}
finally
{
Interlocked.Exchange(ref _cleanupInProcess, 0);
}
});
}
}
}
@@ -185,6 +185,12 @@ private async void IndexDownloader_NewFilterAsync(object sender, FilterModel fil
}
}
NewFilterProcessed?.Invoke(this, filterModel);
// Try perform mempool cleanup based on connected nodes' mempools.
if (!(IndexDownloader is null) && IndexDownloader.GetFiltersLeft() == 0)
{
MemPool?.TryPerformMempoolCleanupAsync(Nodes, CancellationToken.None);
}
}
public async Task InitializeAsync(CancellationToken cancel)

0 comments on commit bd28638

Please sign in to comment.