Skip to content

Commit

Permalink
Partial revert to store transactions in memory as well
Browse files Browse the repository at this point in the history
  • Loading branch information
kiminuo committed Dec 24, 2023
1 parent 3dc1f59 commit bba556e
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 55 deletions.
71 changes: 37 additions & 34 deletions WalletWasabi/Blockchain/Transactions/AllTransactionStore.cs
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using WalletWasabi.Blockchain.Analysis.Clustering;
using WalletWasabi.Extensions;
Expand Down Expand Up @@ -30,63 +31,64 @@ public AllTransactionStore(string workFolderPath, Network network, bool migrateD
public TransactionStore ConfirmedStore { get; }
private object Lock { get; } = new();

public Task InitializeAsync()
public async Task InitializeAsync(CancellationToken cancellationToken = default)
{
using IDisposable _ = BenchmarkLogger.Measure();

EnsureConsistency();
var initTasks = new[]
{
MempoolStore.InitializeAsync($"{nameof(MempoolStore)}.{nameof(MempoolStore.InitializeAsync)}", cancellationToken),
ConfirmedStore.InitializeAsync($"{nameof(ConfirmedStore)}.{nameof(ConfirmedStore.InitializeAsync)}", cancellationToken)
};

return Task.CompletedTask;
await Task.WhenAll(initTasks).ConfigureAwait(false);
EnsureConsistency();
}

#endregion Initializers

#region Modifiers

private void AddOrUpdateNoLock(SmartTransaction tx)
public void AddOrUpdate(SmartTransaction tx)
{
var hash = tx.GetHash();

if (tx.Confirmed)
lock (Lock)
{
if (MempoolStore.TryRemove(hash, out var found))
var hash = tx.GetHash();

if (tx.Confirmed)
{
found.TryUpdate(tx);
ConfirmedStore.TryAddOrUpdate(found);
if (MempoolStore.TryRemove(hash, out var found))
{
found.TryUpdate(tx);
ConfirmedStore.TryAddOrUpdate(found);
}
else
{
ConfirmedStore.TryAddOrUpdate(tx);
}
}
else
{
ConfirmedStore.TryAddOrUpdate(tx);
}
}
else
{
if (!ConfirmedStore.TryUpdate(tx))
{
MempoolStore.TryAddOrUpdate(tx);
if (!ConfirmedStore.TryUpdate(tx))
{
MempoolStore.TryAddOrUpdate(tx);
}
}
}
}

public void AddOrUpdate(SmartTransaction tx)
internal bool TryUpdate(SmartTransaction tx)
{
lock (Lock)
{
AddOrUpdateNoLock(tx);
}
}
uint256 hash = tx.GetHash();

public bool TryUpdate(SmartTransaction tx)
{
var hash = tx.GetHash();
lock (Lock)
{
// Do Contains first, because it's fast.
if (ConfirmedStore.TryUpdate(tx))
{
return true;
}
else if (tx.Confirmed && MempoolStore.TryRemove(hash, out var originalTx))
else if (tx.Confirmed && MempoolStore.TryRemove(hash, out SmartTransaction? originalTx))
{
originalTx.TryUpdate(tx);
ConfirmedStore.TryAddOrUpdate(originalTx);
Expand All @@ -105,12 +107,13 @@ private void EnsureConsistency()
{
lock (Lock)
{
var mempoolTransactions = MempoolStore.GetTransactionHashes();
foreach (var hash in mempoolTransactions)
List<uint256> mempoolTransactions = MempoolStore.GetTransactionHashes();

foreach (uint256 txid in mempoolTransactions)
{
// Contains is fast, so do this first.
if (ConfirmedStore.Contains(hash)
&& MempoolStore.TryRemove(hash, out var uTx))
if (ConfirmedStore.Contains(txid)
&& MempoolStore.TryRemove(txid, out var uTx))
{
ConfirmedStore.TryAddOrUpdate(uTx);
}
Expand Down Expand Up @@ -144,15 +147,15 @@ public IEnumerable<SmartTransaction> GetTransactions()
}
}

public IEnumerable<uint256> GetTransactionHashes()
internal IEnumerable<uint256> GetTransactionHashes()
{
lock (Lock)
{
return ConfirmedStore.GetTransactionHashes().Concat(MempoolStore.GetTransactionHashes()).ToList();
}
}

public bool IsEmpty()
internal bool IsEmpty()
{
lock (Lock)
{
Expand Down
149 changes: 130 additions & 19 deletions WalletWasabi/Blockchain/Transactions/TransactionStore.cs
@@ -1,11 +1,12 @@
using Microsoft.Data.Sqlite;
using NBitcoin;
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WalletWasabi.Extensions;
using WalletWasabi.Helpers;
Expand All @@ -22,16 +23,14 @@ public TransactionStore(string workFolderPath, Network network, bool migrateData
workFolderPath = Guard.NotNullOrEmptyOrWhitespace(nameof(workFolderPath), workFolderPath, trim: true);
IoHelpers.EnsureDirectoryExists(workFolderPath);

string dataSource;

if (workFolderPath == SqliteStorageHelper.InMemoryDatabase)
{
dataSource = SqliteStorageHelper.InMemoryDatabase;
DataSource = SqliteStorageHelper.InMemoryDatabase;
}
else
{
IoHelpers.EnsureDirectoryExists(workFolderPath);
dataSource = Path.Combine(workFolderPath, "Transactions.sqlite");
DataSource = Path.Combine(workFolderPath, "Transactions.sqlite");

// TODO: Remove. Useful for testing.
// if (File.Exists(dataSource))
Expand All @@ -40,7 +39,7 @@ public TransactionStore(string workFolderPath, Network network, bool migrateData
// }
}

SqliteStorage = TransactionSqliteStorage.FromFile(dataSource: dataSource, network);
SqliteStorage = TransactionSqliteStorage.FromFile(dataSource: DataSource, network);

if (migrateData)
{
Expand All @@ -49,9 +48,15 @@ public TransactionStore(string workFolderPath, Network network, bool migrateData
}
}

private TransactionSqliteStorage SqliteStorage { get; }
private string DataSource { get; }
private object SqliteStorageLock { get; } = new();

/// <remarks>Guarded by <see cref="SqliteStorageLock"/>.</remarks>
private TransactionSqliteStorage SqliteStorage { get; }

/// <remarks>Guarded by <see cref="SqliteStorageLock"/>.</remarks>
private Dictionary<uint256, SmartTransaction> Transactions { get; } = new();

private void Import(string oldPath, Network network, bool deleteAfterImport = false)
{
if (File.Exists(oldPath))
Expand All @@ -73,78 +78,184 @@ private void Import(string oldPath, Network network, bool deleteAfterImport = fa
}
}

public Task InitializeAsync(string operationName, CancellationToken cancellationToken)
{
using (BenchmarkLogger.Measure(operationName: operationName))
{
lock (SqliteStorageLock)
{
InitializeTransactionsNoLock(cancellationToken);
}
}

return Task.CompletedTask;
}

private void InitializeTransactionsNoLock(CancellationToken cancellationToken)
{
try
{
lock (SqliteStorageLock)
{
int i = 0;
foreach (SmartTransaction tx in SqliteStorage.GetAll(cancellationToken).OrderByBlockchain())
{
i++;

if (i % 100 == 0)
{
cancellationToken.ThrowIfCancellationRequested();
}

_ = TryAddOrUpdateNoLockNoSerialization(tx);
}
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
// We found a corrupted entry. Stop here.
// Do not try to automatically correct the data, because the internal data structures are throwing events that may confuse the consumers of those events.
Logger.LogError($"'{DataSource}' database got corrupted. Clearing it...");
SqliteStorage.Clear();
throw;
}
}

private bool TryAddOrUpdateNoLockNoSerialization(SmartTransaction tx)
{
uint256 hash = tx.GetHash();

if (Transactions.TryAdd(hash, tx))
{
return true;
}
else
{
if (Transactions[hash].TryUpdate(tx))
{
return true;
}
}

return false;
}

public bool TryAdd(SmartTransaction tx)
{
lock (SqliteStorageLock)
{
int result = BulkInsert(tx);
return result > 0;
if (Transactions.TryAdd(tx.GetHash(), tx))
{
if (SqliteStorage.BulkInsert(tx) == 0)
{
throw new UnreachableException($"Transaction '{tx.GetHash()}' was added in memory but not in database.");
}

return true;
}

return false;
}
}

public bool TryAddOrUpdate(SmartTransaction tx)
{
lock (SqliteStorageLock)
{
int result = BulkUpdate(tx);
return result > 0;
bool result = TryAddOrUpdateNoLockNoSerialization(tx);

if (result)
{
if (SqliteStorage.BulkInsert(new SmartTransaction[] { tx }, upsert: true) == 0)
{
throw new UnreachableException($"Transaction '{tx.GetHash()}' was update in memory but not in database.");
}
}

return result;
}
}

public bool TryUpdate(SmartTransaction tx)
{
bool updated = false;

lock (SqliteStorageLock)
{
int result = BulkUpdate(tx);
return result > 0;
if (Transactions.TryGetValue(tx.GetHash(), out var foundTx))
{
if (foundTx.TryUpdate(tx))
{
updated = true;

if (SqliteStorage.BulkUpdate(tx) == 0)
{
throw new UnreachableException($"Transaction '{tx.GetHash()}' was update in memory but not in database.");
}
}
}
}

return updated;
}

public bool TryRemove(uint256 hash, [NotNullWhen(true)] out SmartTransaction? tx)
{
bool isRemoved = false;

lock (SqliteStorageLock)
{
return SqliteStorage.TryRemove(hash, out tx);
if (Transactions.Remove(hash, out tx))
{
isRemoved = true;

if (!SqliteStorage.TryRemove(hash, out _))
{
throw new UnreachableException($"Transaction '{tx.GetHash()}' was removed from memory but not from database.");
}
}
}

return isRemoved;
}

public bool TryGetTransaction(uint256 hash, [NotNullWhen(true)] out SmartTransaction? tx)
{
lock (SqliteStorageLock)
{
return SqliteStorage.TryGet(hash, out tx);
return Transactions.TryGetValue(hash, out tx);
}
}

public List<SmartTransaction> GetTransactions()
{
lock (SqliteStorageLock)
{
return SqliteStorage.GetAll().ToList();
return Transactions.Values.OrderByBlockchain().ToList();
}
}

public List<uint256> GetTransactionHashes()
{
lock (SqliteStorageLock)
{
return SqliteStorage.GetAllTxids().ToList();
return Transactions.Values.OrderByBlockchain().Select(x => x.GetHash()).ToList();
}
}

public bool IsEmpty()
{
lock (SqliteStorageLock)
{
return SqliteStorage.IsEmpty();
return Transactions.Count == 0;
}
}

public bool Contains(uint256 hash)
{
lock (SqliteStorageLock)
{
return SqliteStorage.Contains(txid: hash);
return Transactions.ContainsKey(hash);
}
}

Expand Down

0 comments on commit bba556e

Please sign in to comment.