diff --git a/WalletWasabi/Blockchain/Transactions/AllTransactionStore.cs b/WalletWasabi/Blockchain/Transactions/AllTransactionStore.cs index ea421943858..b65d3c8b2d7 100644 --- a/WalletWasabi/Blockchain/Transactions/AllTransactionStore.cs +++ b/WalletWasabi/Blockchain/Transactions/AllTransactionStore.cs @@ -3,6 +3,7 @@ using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; +using System.Threading; using System.Threading.Tasks; using WalletWasabi.Blockchain.Analysis.Clustering; using WalletWasabi.Extensions; @@ -30,55 +31,56 @@ public AllTransactionStore(string workFolderPath, Network network, bool migrateD public TransactionStore ConfirmedStore { get; } private object Lock { get; } = new(); - public Task InitializeAsync() + public async Task InitializeAsync(CancellationToken cancellationToken = default) { using IDisposable _ = BenchmarkLogger.Measure(); - EnsureConsistency(); + var initTasks = new[] + { + MempoolStore.InitializeAsync($"{nameof(MempoolStore)}.{nameof(MempoolStore.InitializeAsync)}", cancellationToken), + ConfirmedStore.InitializeAsync($"{nameof(ConfirmedStore)}.{nameof(ConfirmedStore.InitializeAsync)}", cancellationToken) + }; - return Task.CompletedTask; + await Task.WhenAll(initTasks).ConfigureAwait(false); + EnsureConsistency(); } #endregion Initializers #region Modifiers - private void AddOrUpdateNoLock(SmartTransaction tx) + public void AddOrUpdate(SmartTransaction tx) { - var hash = tx.GetHash(); - - if (tx.Confirmed) + lock (Lock) { - if (MempoolStore.TryRemove(hash, out var found)) + var hash = tx.GetHash(); + + if (tx.Confirmed) { - found.TryUpdate(tx); - ConfirmedStore.TryAddOrUpdate(found); + if (MempoolStore.TryRemove(hash, out var found)) + { + found.TryUpdate(tx); + ConfirmedStore.TryAddOrUpdate(found); + } + else + { + ConfirmedStore.TryAddOrUpdate(tx); + } } else { - ConfirmedStore.TryAddOrUpdate(tx); - } - } - else - { - if (!ConfirmedStore.TryUpdate(tx)) - { - MempoolStore.TryAddOrUpdate(tx); + if (!ConfirmedStore.TryUpdate(tx)) + { + MempoolStore.TryAddOrUpdate(tx); + } } } } - public void AddOrUpdate(SmartTransaction tx) + internal bool TryUpdate(SmartTransaction tx) { - lock (Lock) - { - AddOrUpdateNoLock(tx); - } - } + uint256 hash = tx.GetHash(); - public bool TryUpdate(SmartTransaction tx) - { - var hash = tx.GetHash(); lock (Lock) { // Do Contains first, because it's fast. @@ -86,7 +88,7 @@ public bool TryUpdate(SmartTransaction tx) { return true; } - else if (tx.Confirmed && MempoolStore.TryRemove(hash, out var originalTx)) + else if (tx.Confirmed && MempoolStore.TryRemove(hash, out SmartTransaction? originalTx)) { originalTx.TryUpdate(tx); ConfirmedStore.TryAddOrUpdate(originalTx); @@ -105,12 +107,13 @@ private void EnsureConsistency() { lock (Lock) { - var mempoolTransactions = MempoolStore.GetTransactionHashes(); - foreach (var hash in mempoolTransactions) + List mempoolTransactions = MempoolStore.GetTransactionHashes(); + + foreach (uint256 txid in mempoolTransactions) { // Contains is fast, so do this first. - if (ConfirmedStore.Contains(hash) - && MempoolStore.TryRemove(hash, out var uTx)) + if (ConfirmedStore.Contains(txid) + && MempoolStore.TryRemove(txid, out var uTx)) { ConfirmedStore.TryAddOrUpdate(uTx); } @@ -144,7 +147,7 @@ public IEnumerable GetTransactions() } } - public IEnumerable GetTransactionHashes() + internal IEnumerable GetTransactionHashes() { lock (Lock) { @@ -152,7 +155,7 @@ public IEnumerable GetTransactionHashes() } } - public bool IsEmpty() + internal bool IsEmpty() { lock (Lock) { diff --git a/WalletWasabi/Blockchain/Transactions/TransactionStore.cs b/WalletWasabi/Blockchain/Transactions/TransactionStore.cs index 1d79cf9a7da..8c009d46b6a 100644 --- a/WalletWasabi/Blockchain/Transactions/TransactionStore.cs +++ b/WalletWasabi/Blockchain/Transactions/TransactionStore.cs @@ -1,11 +1,12 @@ using Microsoft.Data.Sqlite; using NBitcoin; using System.Collections.Generic; -using System.Data.Common; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using WalletWasabi.Extensions; using WalletWasabi.Helpers; @@ -22,16 +23,14 @@ public TransactionStore(string workFolderPath, Network network, bool migrateData workFolderPath = Guard.NotNullOrEmptyOrWhitespace(nameof(workFolderPath), workFolderPath, trim: true); IoHelpers.EnsureDirectoryExists(workFolderPath); - string dataSource; - if (workFolderPath == SqliteStorageHelper.InMemoryDatabase) { - dataSource = SqliteStorageHelper.InMemoryDatabase; + DataSource = SqliteStorageHelper.InMemoryDatabase; } else { IoHelpers.EnsureDirectoryExists(workFolderPath); - dataSource = Path.Combine(workFolderPath, "Transactions.sqlite"); + DataSource = Path.Combine(workFolderPath, "Transactions.sqlite"); // TODO: Remove. Useful for testing. // if (File.Exists(dataSource)) @@ -40,7 +39,7 @@ public TransactionStore(string workFolderPath, Network network, bool migrateData // } } - SqliteStorage = TransactionSqliteStorage.FromFile(dataSource: dataSource, network); + SqliteStorage = TransactionSqliteStorage.FromFile(dataSource: DataSource, network); if (migrateData) { @@ -49,9 +48,15 @@ public TransactionStore(string workFolderPath, Network network, bool migrateData } } - private TransactionSqliteStorage SqliteStorage { get; } + private string DataSource { get; } private object SqliteStorageLock { get; } = new(); + /// Guarded by . + private TransactionSqliteStorage SqliteStorage { get; } + + /// Guarded by . + private Dictionary Transactions { get; } = new(); + private void Import(string oldPath, Network network, bool deleteAfterImport = false) { if (File.Exists(oldPath)) @@ -73,12 +78,83 @@ private void Import(string oldPath, Network network, bool deleteAfterImport = fa } } + public Task InitializeAsync(string operationName, CancellationToken cancellationToken) + { + using (BenchmarkLogger.Measure(operationName: operationName)) + { + lock (SqliteStorageLock) + { + InitializeTransactionsNoLock(cancellationToken); + } + } + + return Task.CompletedTask; + } + + private void InitializeTransactionsNoLock(CancellationToken cancellationToken) + { + try + { + lock (SqliteStorageLock) + { + int i = 0; + foreach (SmartTransaction tx in SqliteStorage.GetAll(cancellationToken).OrderByBlockchain()) + { + i++; + + if (i % 100 == 0) + { + cancellationToken.ThrowIfCancellationRequested(); + } + + _ = TryAddOrUpdateNoLockNoSerialization(tx); + } + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + // We found a corrupted entry. Stop here. + // Do not try to automatically correct the data, because the internal data structures are throwing events that may confuse the consumers of those events. + Logger.LogError($"'{DataSource}' database got corrupted. Clearing it..."); + SqliteStorage.Clear(); + throw; + } + } + + private bool TryAddOrUpdateNoLockNoSerialization(SmartTransaction tx) + { + uint256 hash = tx.GetHash(); + + if (Transactions.TryAdd(hash, tx)) + { + return true; + } + else + { + if (Transactions[hash].TryUpdate(tx)) + { + return true; + } + } + + return false; + } + public bool TryAdd(SmartTransaction tx) { lock (SqliteStorageLock) { - int result = BulkInsert(tx); - return result > 0; + if (Transactions.TryAdd(tx.GetHash(), tx)) + { + if (SqliteStorage.BulkInsert(tx) == 0) + { + throw new UnreachableException($"Transaction '{tx.GetHash()}' was added in memory but not in database."); + } + + return true; + } + + return false; } } @@ -86,33 +162,68 @@ public bool TryAddOrUpdate(SmartTransaction tx) { lock (SqliteStorageLock) { - int result = BulkUpdate(tx); - return result > 0; + bool result = TryAddOrUpdateNoLockNoSerialization(tx); + + if (result) + { + if (SqliteStorage.BulkInsert(new SmartTransaction[] { tx }, upsert: true) == 0) + { + throw new UnreachableException($"Transaction '{tx.GetHash()}' was update in memory but not in database."); + } + } + + return result; } } public bool TryUpdate(SmartTransaction tx) { + bool updated = false; + lock (SqliteStorageLock) { - int result = BulkUpdate(tx); - return result > 0; + if (Transactions.TryGetValue(tx.GetHash(), out var foundTx)) + { + if (foundTx.TryUpdate(tx)) + { + updated = true; + + if (SqliteStorage.BulkUpdate(tx) == 0) + { + throw new UnreachableException($"Transaction '{tx.GetHash()}' was update in memory but not in database."); + } + } + } } + + return updated; } public bool TryRemove(uint256 hash, [NotNullWhen(true)] out SmartTransaction? tx) { + bool isRemoved = false; + lock (SqliteStorageLock) { - return SqliteStorage.TryRemove(hash, out tx); + if (Transactions.Remove(hash, out tx)) + { + isRemoved = true; + + if (!SqliteStorage.TryRemove(hash, out _)) + { + throw new UnreachableException($"Transaction '{tx.GetHash()}' was removed from memory but not from database."); + } + } } + + return isRemoved; } public bool TryGetTransaction(uint256 hash, [NotNullWhen(true)] out SmartTransaction? tx) { lock (SqliteStorageLock) { - return SqliteStorage.TryGet(hash, out tx); + return Transactions.TryGetValue(hash, out tx); } } @@ -120,7 +231,7 @@ public List GetTransactions() { lock (SqliteStorageLock) { - return SqliteStorage.GetAll().ToList(); + return Transactions.Values.OrderByBlockchain().ToList(); } } @@ -128,7 +239,7 @@ public List GetTransactionHashes() { lock (SqliteStorageLock) { - return SqliteStorage.GetAllTxids().ToList(); + return Transactions.Values.OrderByBlockchain().Select(x => x.GetHash()).ToList(); } } @@ -136,7 +247,7 @@ public bool IsEmpty() { lock (SqliteStorageLock) { - return SqliteStorage.IsEmpty(); + return Transactions.Count == 0; } } @@ -144,7 +255,7 @@ public bool Contains(uint256 hash) { lock (SqliteStorageLock) { - return SqliteStorage.Contains(txid: hash); + return Transactions.ContainsKey(hash); } } diff --git a/WalletWasabi/Stores/BitcoinStore.cs b/WalletWasabi/Stores/BitcoinStore.cs index bf33c6b2b07..59557b1aabb 100644 --- a/WalletWasabi/Stores/BitcoinStore.cs +++ b/WalletWasabi/Stores/BitcoinStore.cs @@ -51,7 +51,7 @@ public async Task InitializeAsync(CancellationToken cancel = default) var initTasks = new[] { IndexStore.InitializeAsync(cancel), - TransactionStore.InitializeAsync() + TransactionStore.InitializeAsync(cancel) }; await Task.WhenAll(initTasks).ConfigureAwait(false); diff --git a/WalletWasabi/Stores/TransactionSqliteStorage.cs b/WalletWasabi/Stores/TransactionSqliteStorage.cs index 7b894da8aa8..1005cfa6aad 100644 --- a/WalletWasabi/Stores/TransactionSqliteStorage.cs +++ b/WalletWasabi/Stores/TransactionSqliteStorage.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; +using System.Threading; using WalletWasabi.Blockchain.Analysis.Clustering; using WalletWasabi.Blockchain.Transactions; using WalletWasabi.Logging; @@ -208,6 +209,10 @@ public int BulkInsert(IEnumerable transactions, bool upsert = return changedRows; } + /// + public int BulkInsert(params SmartTransaction[] transactions) + => BulkInsert(transactions as IEnumerable); + /// /// Update transactions in bulk. /// @@ -372,7 +377,7 @@ private bool TryReadSingleRecord(uint256 hash, string query, out SmartTransactio return true; } - public IEnumerable GetAll() + public IEnumerable GetAll(CancellationToken cancellationToken) { using SqliteTransaction transaction = Connection.BeginTransaction(); using SqliteCommand command = Connection.CreateCommand(); @@ -387,8 +392,16 @@ public IEnumerable GetAll() using SqliteDataReader reader = command.ExecuteReader(); + int i = 0; while (reader.Read()) { + i++; + + if (i % 100 == 0) + { + cancellationToken.ThrowIfCancellationRequested(); + } + SmartTransaction filter = ReadRow(reader); yield return filter; }