From fe9033b56704602cf22f6d28c03e73de97482f8e Mon Sep 17 00:00:00 2001 From: Philip Robinson Date: Tue, 21 Dec 2021 14:41:45 +0200 Subject: [PATCH] feat: improve wallet recovery and scanning handling of reorgs (#3655) Description --- @mikethetike Merging this into the `weatherwax` branch because I would like to see it tested on the Mobile apps sooner rather than later and not sure what the plan is for migrating them to the new `Development` This PR updates the wallet UtxoScanner to better detect and handle reorgs. Previously only the last scanned block was check to see if it had been re-orged out and if it had then the whole chain was scanned from scratch. Reorgs of a few blocks are common so to reduce the amount of work done after a reorg this PR adds maintaining history of previously scanned block headers so that the depth of the reorg can be quickly determined and the rescan only be done for affected blocks. This PR adds a number of updates: - Add the `sync_utxos_by_block` method to the `BaseNodeWalletService` RPC service to facilitate a more appropriate UTXO sync strategy for the wallet - Add the `ScannedBlocks` table to the wallet DB to keep a history of scanned block headers and the number and amount of outputs recovered from each block. - Update the UTXO scanner to use this stored history to detect reorgs and only sync the required UTXOs - Built out a test harness to test the scanner thoroughly in Rust integration tests. How Has This Been Tested? --- Added new Rust integration tests Added back excluded Cucumber tests --- .../tari_console_wallet/src/recovery.rs | 34 +- base_layer/core/src/base_node/proto/rpc.proto | 11 + base_layer/core/src/base_node/rpc/mod.rs | 16 +- base_layer/core/src/base_node/rpc/service.rs | 93 ++- .../base_node/rpc/sync_utxos_by_block_task.rs | 196 +++++ base_layer/core/src/base_node/sync/rpc/mod.rs | 3 - .../core/src/base_node/sync/rpc/service.rs | 55 -- base_layer/core/tests/base_node_rpc.rs | 114 ++- .../down.sql | 1 + .../up.sql | 7 + base_layer/wallet/src/lib.rs | 2 +- .../storage/sqlite_db/mod.rs | 2 +- .../storage/sqlite_db/output_sql.rs | 2 +- base_layer/wallet/src/schema.rs | 11 + base_layer/wallet/src/storage/database.rs | 71 +- .../storage/sqlite_db}/mod.rs | 7 +- .../src/storage/sqlite_db/scanned_blocks.rs | 132 ++++ .../{sqlite_db.rs => sqlite_db/wallet.rs} | 35 +- .../src/storage/sqlite_utilities/mod.rs | 4 +- .../transaction_service/storage/sqlite_db.rs | 2 +- .../wallet/src/utxo_scanner_service/error.rs | 2 + .../wallet/src/utxo_scanner_service/handle.rs | 10 +- .../wallet/src/utxo_scanner_service/mod.rs | 2 +- .../src/utxo_scanner_service/service.rs | 25 +- .../utxo_scanner_service/utxo_scanner_task.rs | 559 +++++++------- .../uxto_scanner_service_builder.rs | 2 +- .../mod.rs => contacts_service.rs} | 25 +- .../{mod.rs => output_manager_service.rs} | 7 +- .../mod.rs | 7 +- .../service.rs | 1 - .../storage.rs | 0 base_layer/wallet/tests/support/comms_rpc.rs | 185 ++++- base_layer/wallet/tests/support/mod.rs | 2 + .../support/output_manager_service_mock.rs | 166 ++++ .../tests/support/transaction_service_mock.rs | 108 +++ base_layer/wallet/tests/support/utils.rs | 28 - .../wallet/tests/transaction_service.rs | 24 + .../tests/transaction_service_tests/mod.rs | 24 + .../service.rs | 2 +- .../storage.rs | 0 .../transaction_protocols.rs | 39 +- base_layer/wallet/tests/utxo_scanner.rs | 731 ++++++++++++++++++ .../wallet/tests/{wallet/mod.rs => wallet.rs} | 27 +- base_layer/wallet_ffi/src/lib.rs | 4 +- base_layer/wallet_ffi/src/tasks.rs | 27 +- base_layer/wallet_ffi/wallet.h | 2 +- .../features/WalletRecovery.feature | 4 +- .../features/WalletTransactions.feature | 2 +- integration_tests/package-lock.json | 74 +- 49 files changed, 2407 insertions(+), 480 deletions(-) create mode 100644 base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs create mode 100644 base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/down.sql create mode 100644 base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/up.sql rename base_layer/wallet/{tests/transaction_service => src/storage/sqlite_db}/mod.rs (93%) create mode 100644 base_layer/wallet/src/storage/sqlite_db/scanned_blocks.rs rename base_layer/wallet/src/storage/{sqlite_db.rs => sqlite_db/wallet.rs} (96%) rename base_layer/wallet/tests/{contacts_service/mod.rs => contacts_service.rs} (76%) rename base_layer/wallet/tests/{mod.rs => output_manager_service.rs} (91%) rename base_layer/wallet/tests/{output_manager_service => output_manager_service_tests}/mod.rs (95%) rename base_layer/wallet/tests/{output_manager_service => output_manager_service_tests}/service.rs (99%) rename base_layer/wallet/tests/{output_manager_service => output_manager_service_tests}/storage.rs (100%) create mode 100644 base_layer/wallet/tests/support/output_manager_service_mock.rs create mode 100644 base_layer/wallet/tests/support/transaction_service_mock.rs create mode 100644 base_layer/wallet/tests/transaction_service.rs create mode 100644 base_layer/wallet/tests/transaction_service_tests/mod.rs rename base_layer/wallet/tests/{transaction_service => transaction_service_tests}/service.rs (99%) rename base_layer/wallet/tests/{transaction_service => transaction_service_tests}/storage.rs (100%) rename base_layer/wallet/tests/{transaction_service => transaction_service_tests}/transaction_protocols.rs (97%) create mode 100644 base_layer/wallet/tests/utxo_scanner.rs rename base_layer/wallet/tests/{wallet/mod.rs => wallet.rs} (93%) diff --git a/applications/tari_console_wallet/src/recovery.rs b/applications/tari_console_wallet/src/recovery.rs index d94de9419b..54b01a8571 100644 --- a/applications/tari_console_wallet/src/recovery.rs +++ b/applications/tari_console_wallet/src/recovery.rs @@ -29,7 +29,7 @@ use tari_crypto::tari_utilities::hex::Hex; use tari_key_manager::{cipher_seed::CipherSeed, mnemonic::Mnemonic}; use tari_shutdown::Shutdown; use tari_wallet::{ - storage::sqlite_db::WalletSqliteDatabase, + storage::sqlite_db::wallet::WalletSqliteDatabase, utxo_scanner_service::{handle::UtxoScannerEvent, service::UtxoScannerService}, WalletSqlite, }; @@ -126,24 +126,24 @@ pub async fn wallet_recovery( println!("OK (latency = {:.2?})", latency); }, Ok(UtxoScannerEvent::Progress { - current_index: current, - total_index: total, + current_height, + tip_height, }) => { - let percentage_progress = ((current as f32) * 100f32 / (total as f32)).round() as u32; + let percentage_progress = ((current_height as f32) * 100f32 / (tip_height as f32)).round() as u32; debug!( target: LOG_TARGET, - "{}: Recovery process {}% complete ({} of {} utxos).", + "{}: Recovery process {}% complete (Block {} of {}).", Local::now(), percentage_progress, - current, - total + current_height, + tip_height ); println!( - "{}: Recovery process {}% complete ({} of {} utxos).", + "{}: Recovery process {}% complete (Block {} of {}).", Local::now(), percentage_progress, - current, - total + current_height, + tip_height ); }, Ok(UtxoScannerEvent::ScanningRoundFailed { @@ -172,15 +172,15 @@ pub async fn wallet_recovery( warn!(target: LOG_TARGET, "{}", s); }, Ok(UtxoScannerEvent::Completed { - number_scanned: num_scanned, - number_received: num_utxos, - value_received: total_amount, - time_taken: elapsed, + final_height, + num_recovered, + value_recovered, + time_taken, }) => { - let rate = (num_scanned as f32) * 1000f32 / (elapsed.as_millis() as f32); + let rate = (final_height as f32) * 1000f32 / (time_taken.as_millis() as f32); let stats = format!( - "Recovery complete! Scanned = {} in {:.2?} ({:.2?} utxos/s), Recovered {} worth {}", - num_scanned, elapsed, rate, num_utxos, total_amount + "Recovery complete! Scanned {} blocks in {:.2?} ({:.2?} blocks/s), Recovered {} outputs worth {}", + final_height, time_taken, rate, num_recovered, value_recovered ); info!(target: LOG_TARGET, "{}", stats); println!("{}", stats); diff --git a/base_layer/core/src/base_node/proto/rpc.proto b/base_layer/core/src/base_node/proto/rpc.proto index f1c4ee9d6f..17f04e5fa3 100644 --- a/base_layer/core/src/base_node/proto/rpc.proto +++ b/base_layer/core/src/base_node/proto/rpc.proto @@ -76,3 +76,14 @@ message PrunedOutput { bytes hash = 1; bytes witness_hash = 2; } + +message SyncUtxosByBlockRequest { + bytes start_header_hash = 1; + bytes end_header_hash = 2; +} + +message SyncUtxosByBlockResponse { + repeated tari.types.TransactionOutput outputs = 1; + uint64 height = 2; + bytes header_hash = 3; +} \ No newline at end of file diff --git a/base_layer/core/src/base_node/rpc/mod.rs b/base_layer/core/src/base_node/rpc/mod.rs index 0916f13808..eb97721008 100644 --- a/base_layer/core/src/base_node/rpc/mod.rs +++ b/base_layer/core/src/base_node/rpc/mod.rs @@ -22,9 +22,12 @@ #[cfg(feature = "base_node")] mod service; +#[cfg(feature = "base_node")] +pub mod sync_utxos_by_block_task; + #[cfg(feature = "base_node")] pub use service::BaseNodeWalletRpcService; -use tari_comms::protocol::rpc::{Request, Response, RpcStatus}; +use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; use tari_comms_rpc_macros::tari_rpc; #[cfg(feature = "base_node")] @@ -43,6 +46,8 @@ use crate::{ QueryDeletedRequest, QueryDeletedResponse, Signatures, + SyncUtxosByBlockRequest, + SyncUtxosByBlockResponse, TipInfoResponse, TxQueryBatchResponses, TxQueryResponse, @@ -97,6 +102,15 @@ pub trait BaseNodeWalletService: Send + Sync + 'static { &self, request: Request, ) -> Result, RpcStatus>; + + #[rpc(method = 10)] + async fn get_height_at_time(&self, request: Request) -> Result, RpcStatus>; + + #[rpc(method = 11)] + async fn sync_utxos_by_block( + &self, + request: Request, + ) -> Result, RpcStatus>; } #[cfg(feature = "base_node")] diff --git a/base_layer/core/src/base_node/rpc/service.rs b/base_layer/core/src/base_node/rpc/service.rs index ec9d7bb052..0d2887538f 100644 --- a/base_layer/core/src/base_node/rpc/service.rs +++ b/base_layer/core/src/base_node/rpc/service.rs @@ -1,7 +1,10 @@ use std::convert::TryFrom; +use log::*; use tari_common_types::types::Signature; -use tari_comms::protocol::rpc::{Request, Response, RpcStatus}; +use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; +use tari_crypto::tari_utilities::hex::Hex; +use tokio::sync::mpsc; // Copyright 2020, The Tari Project // @@ -26,7 +29,11 @@ use tari_comms::protocol::rpc::{Request, Response, RpcStatus}; // OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH // DAMAGE. use crate::{ - base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle}, + base_node::{ + rpc::{sync_utxos_by_block_task::SyncUtxosByBlockTask, BaseNodeWalletService}, + state_machine_service::states::StateInfo, + StateMachineHandle, + }, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput, UtxoMinedInfo}, mempool::{service::MempoolHandle, TxStorageResponse}, proto, @@ -37,6 +44,8 @@ use crate::{ QueryDeletedRequest, QueryDeletedResponse, Signatures as SignaturesProto, + SyncUtxosByBlockRequest, + SyncUtxosByBlockResponse, TipInfoResponse, TxLocation, TxQueryBatchResponse, @@ -52,7 +61,6 @@ use crate::{ }, transactions::transaction::Transaction, }; - const LOG_TARGET: &str = "c::base_node::rpc"; pub struct BaseNodeWalletRpcService { @@ -499,4 +507,83 @@ impl BaseNodeWalletService for BaseNodeWalletRpc Ok(Response::new(header.into())) } + + async fn get_height_at_time(&self, request: Request) -> Result, RpcStatus> { + let requested_epoch_time: u64 = request.into_message(); + + let tip_header = self + .db() + .fetch_tip_header() + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + let mut left_height = 0u64; + let mut right_height = tip_header.height(); + + while left_height <= right_height { + let mut mid_height = (left_height + right_height) / 2; + + if mid_height == 0 { + return Ok(Response::new(0u64)); + } + // If the two bounds are adjacent then perform the test between the right and left sides + if left_height == mid_height { + mid_height = right_height; + } + + let mid_header = self + .db() + .fetch_header(mid_height) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| { + RpcStatus::not_found(format!("Header not found during search at height {}", mid_height)) + })?; + let before_mid_header = self + .db() + .fetch_header(mid_height - 1) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| { + RpcStatus::not_found(format!("Header not found during search at height {}", mid_height - 1)) + })?; + + if requested_epoch_time < mid_header.timestamp.as_u64() && + requested_epoch_time >= before_mid_header.timestamp.as_u64() + { + return Ok(Response::new(before_mid_header.height)); + } else if mid_height == right_height { + return Ok(Response::new(right_height)); + } else if requested_epoch_time <= mid_header.timestamp.as_u64() { + right_height = mid_height; + } else { + left_height = mid_height; + } + } + + Ok(Response::new(0u64)) + } + + async fn sync_utxos_by_block( + &self, + request: Request, + ) -> Result, RpcStatus> { + let req = request.message(); + let peer = request.context().peer_node_id(); + debug!( + target: LOG_TARGET, + "Received sync_utxos_by_block request from {} from header {} to {} ", + peer, + req.start_header_hash.to_hex(), + req.end_header_hash.to_hex(), + ); + + // Number of blocks to load and push to the stream before loading the next batch. Most blocks have 1 output but + // full blocks will have 500 + const BATCH_SIZE: usize = 5; + let (tx, rx) = mpsc::channel(BATCH_SIZE); + let task = SyncUtxosByBlockTask::new(self.db()); + task.run(request.into_message(), tx).await?; + + Ok(Streaming::new(rx)) + } } diff --git a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs new file mode 100644 index 0000000000..b0d24c5398 --- /dev/null +++ b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs @@ -0,0 +1,196 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{sync::Arc, time::Instant}; + +use log::*; +use tari_comms::protocol::rpc::RpcStatus; +use tari_crypto::tari_utilities::{hex::Hex, Hashable}; +use tokio::{sync::mpsc, task}; + +use crate::{ + blocks::BlockHeader, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput}, + proto, + proto::base_node::{SyncUtxosByBlockRequest, SyncUtxosByBlockResponse}, +}; + +const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_by_block_task"; + +pub(crate) struct SyncUtxosByBlockTask { + db: AsyncBlockchainDb, +} + +impl SyncUtxosByBlockTask +where B: BlockchainBackend + 'static +{ + pub(crate) fn new(db: AsyncBlockchainDb) -> Self { + Self { db } + } + + pub(crate) async fn run( + self, + request: SyncUtxosByBlockRequest, + mut tx: mpsc::Sender>, + ) -> Result<(), RpcStatus> { + let start_header = self + .db + .fetch_header_by_block_hash(request.start_header_hash.clone()) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found("Start header hash is was not found"))?; + + let end_header = self + .db + .fetch_header_by_block_hash(request.end_header_hash.clone()) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?; + + if start_header.height > end_header.height { + return Err(RpcStatus::bad_request(format!( + "start header height {} cannot be greater than the end header height ({})", + start_header.height, end_header.height + ))); + } + + task::spawn(async move { + if let Err(err) = self.start_streaming(&mut tx, start_header, end_header).await { + let _ = tx.send(Err(err)).await; + } + }); + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn start_streaming( + &self, + tx: &mut mpsc::Sender>, + start_header: BlockHeader, + end_header: BlockHeader, + ) -> Result<(), RpcStatus> { + let bitmap = self + .db + .fetch_complete_deleted_bitmap_at(end_header.hash()) + .await + .map_err(|err| { + error!(target: LOG_TARGET, "Failed to get deleted bitmap: {}", err); + RpcStatus::general(format!( + "Could not get deleted bitmap at hash {}", + end_header.hash().to_hex() + )) + })? + .into_bitmap(); + let bitmap = Arc::new(bitmap); + + debug!( + target: LOG_TARGET, + "Starting stream task with start_header: {} and end_header: {}", + start_header.hash().to_hex(), + end_header.hash().to_hex(), + ); + + let mut current_header = start_header; + + loop { + let timer = Instant::now(); + let current_header_hash = current_header.hash(); + + debug!( + target: LOG_TARGET, + "current header = {} ({})", + current_header.height, + current_header_hash.to_hex() + ); + + if tx.is_closed() { + debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",); + break; + } + + let (utxos, _deleted_diff) = self + .db + .fetch_utxos_in_block(current_header.hash(), Some(bitmap.clone())) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + + let utxos: Vec = utxos + .into_iter() + .enumerate() + // Don't include pruned UTXOs + .filter_map(|(_, utxo)| match utxo { + PrunedOutput::Pruned{output_hash: _,witness_hash:_} => None, + PrunedOutput::NotPruned{output} => Some(output.into()), + }).collect(); + + debug!( + target: LOG_TARGET, + "Streaming {} UTXO(s) for block #{} (Hash: {})", + utxos.len(), + current_header.height, + current_header_hash.to_hex(), + ); + + let utxo_block_response = SyncUtxosByBlockResponse { + outputs: utxos, + height: current_header.height, + header_hash: current_header_hash, + }; + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send(Ok(utxo_block_response)).await.is_err() { + break; + } + + debug!( + target: LOG_TARGET, + "Streamed utxos in {:.2?} (including stream backpressure)", + timer.elapsed() + ); + + if current_header.height >= end_header.height { + break; + } + + current_header = self + .db + .fetch_header(current_header.height + 1) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| { + RpcStatus::general(format!( + "Potential data consistency issue: header {} not found", + current_header.height + 1 + )) + })?; + } + + debug!( + target: LOG_TARGET, + "UTXO sync by block completed to UTXO {} (Header hash = {})", + current_header.output_mmr_size, + current_header.hash().to_hex() + ); + + Ok(()) + } +} diff --git a/base_layer/core/src/base_node/sync/rpc/mod.rs b/base_layer/core/src/base_node/sync/rpc/mod.rs index 3a54356c11..2f540636be 100644 --- a/base_layer/core/src/base_node/sync/rpc/mod.rs +++ b/base_layer/core/src/base_node/sync/rpc/mod.rs @@ -90,9 +90,6 @@ pub trait BaseNodeSyncService: Send + Sync + 'static { #[rpc(method = 8)] async fn sync_utxos(&self, request: Request) -> Result, RpcStatus>; - - #[rpc(method = 9)] - async fn get_height_at_time(&self, request: Request) -> Result, RpcStatus>; } #[cfg(feature = "base_node")] diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index c86026de5b..2c9f87a1e1 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -508,59 +508,4 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ Ok(Streaming::new(rx)) } - - async fn get_height_at_time(&self, request: Request) -> Result, RpcStatus> { - let requested_epoch_time: u64 = request.into_message(); - - let tip_header = self - .db() - .fetch_tip_header() - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; - let mut left_height = 0u64; - let mut right_height = tip_header.height(); - - while left_height <= right_height { - let mut mid_height = (left_height + right_height) / 2; - - if mid_height == 0 { - return Ok(Response::new(0u64)); - } - // If the two bounds are adjacent then perform the test between the right and left sides - if left_height == mid_height { - mid_height = right_height; - } - - let mid_header = self - .db() - .fetch_header(mid_height) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET))? - .ok_or_else(|| { - RpcStatus::not_found(format!("Header not found during search at height {}", mid_height)) - })?; - let before_mid_header = self - .db() - .fetch_header(mid_height - 1) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET))? - .ok_or_else(|| { - RpcStatus::not_found(format!("Header not found during search at height {}", mid_height - 1)) - })?; - - if requested_epoch_time < mid_header.timestamp.as_u64() && - requested_epoch_time >= before_mid_header.timestamp.as_u64() - { - return Ok(Response::new(before_mid_header.height)); - } else if mid_height == right_height { - return Ok(Response::new(right_height)); - } else if requested_epoch_time <= mid_header.timestamp.as_u64() { - right_height = mid_height; - } else { - left_height = mid_height; - } - } - - Ok(Response::new(0u64)) - } } diff --git a/base_layer/core/tests/base_node_rpc.rs b/base_layer/core/tests/base_node_rpc.rs index 48007690c6..0a3ee467aa 100644 --- a/base_layer/core/tests/base_node_rpc.rs +++ b/base_layer/core/tests/base_node_rpc.rs @@ -22,6 +22,7 @@ use std::{convert::TryFrom, sync::Arc, time::Duration}; +use futures::StreamExt; use randomx_rs::RandomXFlag; use tari_common::configuration::Network; use tari_comms::protocol::rpc::mock::RpcRequestMock; @@ -38,13 +39,12 @@ use tari_core::{ rpc::{BaseNodeWalletRpcService, BaseNodeWalletService}, state_machine_service::states::{ListeningInfo, StateInfo, StatusInfo}, sync::rpc::BaseNodeSyncRpcService, - BaseNodeSyncService, }, blocks::ChainBlock, consensus::{ConsensusManager, ConsensusManagerBuilder, NetworkConsensus}, crypto::tari_utilities::Hashable, proto::{ - base_node::{FetchMatchingUtxos, Signatures as SignaturesProto}, + base_node::{FetchMatchingUtxos, Signatures as SignaturesProto, SyncUtxosByBlockRequest}, types::{Signature as SignatureProto, Transaction as TransactionProto}, }, test_helpers::blockchain::TempDatabase, @@ -57,6 +57,7 @@ use tari_core::{ txn_schema, }; use tari_crypto::tari_utilities::epoch_time::EpochTime; +use tari_test_utils::streams::convert_mpsc_to_stream; use tempfile::{tempdir, TempDir}; use crate::helpers::{ @@ -174,7 +175,7 @@ async fn test_base_node_wallet_rpc() { .await .unwrap(); - // Check that subitting Tx2 will now be accepted + // Check that submiting Tx2 will now be accepted let msg = TransactionProto::from(tx2); let req = request_mock.request_with_context(Default::default(), msg); let resp = service.submit_transaction(req).await.unwrap().into_message(); @@ -278,7 +279,7 @@ async fn test_base_node_wallet_rpc() { async fn test_get_height_at_time() { let factories = CryptoFactories::default(); - let (_, service, base_node, request_mock, consensus_manager, block0, _utxo0, _temp_dir) = setup().await; + let (service, _, base_node, request_mock, consensus_manager, block0, _utxo0, _temp_dir) = setup().await; let mut prev_block = block0.clone(); let mut times: Vec = vec![prev_block.header().timestamp]; @@ -333,3 +334,108 @@ async fn test_get_height_at_time() { let resp = service.get_height_at_time(req).await.unwrap().into_message(); assert_eq!(resp, 10); } + +#[tokio::test] +async fn test_sync_utxos_by_block() { + let (service, _, mut base_node, request_mock, consensus_manager, block0, utxo0, _temp_dir) = setup().await; + + let (txs1, utxos1) = schema_to_transaction(&[txn_schema!( + from: vec![utxo0.clone()], + to: vec![10 * T, 10 * T, 10 * T, 10 * T, 10 * T, 10 * T, 10 * T, 10 * T, 10 * T] + )]); + let tx1 = (*txs1[0]).clone(); + + let (txs2, utxos2) = schema_to_transaction(&[txn_schema!( + from: vec![utxos1[0].clone()], + to: vec![2 * T, 2 * T, 2 * T, 2 * T] + )]); + let tx2 = (*txs2[0]).clone(); + + let (txs3, _utxos3) = schema_to_transaction(&[txn_schema!( + from: vec![utxos2[0].clone(), utxos2[1].clone()], + to: vec![100_000 * uT, 100_000 * uT, 100_000 * uT, 100_000 * uT, 100_000 * uT] + )]); + let tx3 = (*txs3[0]).clone(); + + let block1 = base_node + .blockchain_db + .prepare_new_block(chain_block(block0.block(), vec![tx1.clone()], &consensus_manager)) + .unwrap(); + + base_node + .local_nci + .submit_block(block1.clone(), Broadcast::from(true)) + .await + .unwrap(); + + let block2 = base_node + .blockchain_db + .prepare_new_block(chain_block(&block1, vec![tx2], &consensus_manager)) + .unwrap(); + + base_node + .local_nci + .submit_block(block2.clone(), Broadcast::from(true)) + .await + .unwrap(); + + let block3 = base_node + .blockchain_db + .prepare_new_block(chain_block(&block2, vec![tx3], &consensus_manager)) + .unwrap(); + + base_node + .local_nci + .submit_block(block3.clone(), Broadcast::from(true)) + .await + .unwrap(); + + // All blocks + let msg = SyncUtxosByBlockRequest { + start_header_hash: block0.header().hash(), + end_header_hash: block3.header.hash(), + }; + + let req = request_mock.request_with_context(Default::default(), msg); + let mut streaming = service.sync_utxos_by_block(req).await.unwrap().into_inner(); + + let responses = convert_mpsc_to_stream(&mut streaming).collect::>().await; + + assert_eq!( + vec![ + (0, block0.header().hash(), 0), + (1, block1.header.hash(), 9), + (2, block2.header.hash(), 3), + (3, block3.header.hash(), 6) + ], + responses + .iter() + .map(|r| { + let resp = r.clone().unwrap(); + (resp.height, resp.header_hash, resp.outputs.len()) + }) + .collect::, usize)>>() + ); + + // Block 1 to 2 + let msg = SyncUtxosByBlockRequest { + start_header_hash: block1.header.hash(), + end_header_hash: block2.header.hash(), + }; + + let req = request_mock.request_with_context(Default::default(), msg); + let mut streaming = service.sync_utxos_by_block(req).await.unwrap().into_inner(); + + let responses = convert_mpsc_to_stream(&mut streaming).collect::>().await; + + assert_eq!( + vec![(1, block1.header.hash(), 9), (2, block2.header.hash(), 5),], + responses + .iter() + .map(|r| { + let resp = r.clone().unwrap(); + (resp.height, resp.header_hash, resp.outputs.len()) + }) + .collect::, usize)>>() + ); +} diff --git a/base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/down.sql b/base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/down.sql new file mode 100644 index 0000000000..3ecdf26d20 --- /dev/null +++ b/base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS scanned_blocks; \ No newline at end of file diff --git a/base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/up.sql b/base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/up.sql new file mode 100644 index 0000000000..60967bf7fb --- /dev/null +++ b/base_layer/wallet/migrations/2021-12-06-104545_add_block_scanned_cache/up.sql @@ -0,0 +1,7 @@ +CREATE TABLE scanned_blocks ( + header_hash BLOB PRIMARY KEY NOT NULL, + height BIGINT NOT NULL, + num_outputs BIGINT NULL, + amount BIGINT NULL, + timestamp DATETIME NOT NULL +); diff --git a/base_layer/wallet/src/lib.rs b/base_layer/wallet/src/lib.rs index 463a83154b..461e02dffd 100644 --- a/base_layer/wallet/src/lib.rs +++ b/base_layer/wallet/src/lib.rs @@ -40,7 +40,7 @@ pub use wallet::Wallet; use crate::{ contacts_service::storage::sqlite_db::ContactsServiceSqliteDatabase, output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, - storage::sqlite_db::WalletSqliteDatabase, + storage::sqlite_db::wallet::WalletSqliteDatabase, transaction_service::storage::sqlite_db::TransactionServiceSqliteDatabase, }; diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs index 6325766041..5a0824dd93 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs @@ -67,7 +67,7 @@ use crate::{ mod new_output_sql; mod output_sql; -const LOG_TARGET: &str = "wallet::output_manager_service::database::sqlite_db"; +const LOG_TARGET: &str = "wallet::output_manager_service::database::wallet"; /// A Sqlite backend for the Output Manager Service. The Backend is accessed via a connection pool to the Sqlite file. #[derive(Clone)] diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs index ff65873b65..b1fabeb84f 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs @@ -59,7 +59,7 @@ use crate::{ }, }; -const LOG_TARGET: &str = "wallet::output_manager_service::database::sqlite_db"; +const LOG_TARGET: &str = "wallet::output_manager_service::database::wallet"; #[derive(Clone, Debug, Queryable, Identifiable, PartialEq)] #[table_name = "outputs"] diff --git a/base_layer/wallet/src/schema.rs b/base_layer/wallet/src/schema.rs index 383dc93f7b..3bcefe0cf8 100644 --- a/base_layer/wallet/src/schema.rs +++ b/base_layer/wallet/src/schema.rs @@ -118,6 +118,16 @@ table! { } } +table! { + scanned_blocks (header_hash) { + header_hash -> Binary, + height -> BigInt, + num_outputs -> Nullable, + amount -> Nullable, + timestamp -> Timestamp, + } +} + table! { wallet_settings (key) { key -> Text, @@ -134,5 +144,6 @@ allow_tables_to_appear_in_same_query!( known_one_sided_payment_scripts, outbound_transactions, outputs, + scanned_blocks, wallet_settings, ); diff --git a/base_layer/wallet/src/storage/database.rs b/base_layer/wallet/src/storage/database.rs index ea41b84c76..7e9b926aeb 100644 --- a/base_layer/wallet/src/storage/database.rs +++ b/base_layer/wallet/src/storage/database.rs @@ -31,7 +31,7 @@ use tari_common_types::chain_metadata::ChainMetadata; use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, tor::TorIdentity}; use tari_key_manager::cipher_seed::CipherSeed; -use crate::error::WalletStorageError; +use crate::{error::WalletStorageError, utxo_scanner_service::service::ScannedBlock}; const LOG_TARGET: &str = "wallet::database"; @@ -45,6 +45,19 @@ pub trait WalletBackend: Send + Sync + Clone { fn apply_encryption(&self, passphrase: String) -> Result; /// Remove encryption from the backend. fn remove_encryption(&self) -> Result<(), WalletStorageError>; + + fn get_scanned_blocks(&self) -> Result, WalletStorageError>; + fn save_scanned_block(&self, scanned_block: ScannedBlock) -> Result<(), WalletStorageError>; + fn clear_scanned_blocks(&self) -> Result<(), WalletStorageError>; + /// Clear scanned blocks from the givne height and higher + fn clear_scanned_blocks_from_and_higher(&self, height: u64) -> Result<(), WalletStorageError>; + /// Clear scanned block history from before the specified height. Choice to exclude blocks that contained recovered + /// outputs + fn clear_scanned_blocks_before_height( + &self, + height: u64, + exclude_recovered: bool, + ) -> Result<(), WalletStorageError>; } #[derive(Debug, Clone, PartialEq)] @@ -326,6 +339,60 @@ where T: WalletBackend + 'static .map_err(|err| WalletStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(result) } + + pub async fn get_scanned_blocks(&self) -> Result, WalletStorageError> { + let db_clone = self.db.clone(); + + let result = tokio::task::spawn_blocking(move || db_clone.get_scanned_blocks()) + .await + .map_err(|err| WalletStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(result) + } + + pub async fn save_scanned_block(&self, scanned_block: ScannedBlock) -> Result<(), WalletStorageError> { + let db_clone = self.db.clone(); + + tokio::task::spawn_blocking(move || db_clone.save_scanned_block(scanned_block)) + .await + .map_err(|err| WalletStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(()) + } + + pub async fn clear_scanned_blocks(&self) -> Result<(), WalletStorageError> { + let db_clone = self.db.clone(); + + tokio::task::spawn_blocking(move || db_clone.clear_scanned_blocks()) + .await + .map_err(|err| WalletStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(()) + } + + pub async fn clear_scanned_blocks_from_and_higher(&self, height: u64) -> Result<(), WalletStorageError> { + let db_clone = self.db.clone(); + + tokio::task::spawn_blocking(move || db_clone.clear_scanned_blocks_from_and_higher(height)) + .await + .map_err(|err| WalletStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(()) + } + + pub async fn clear_scanned_blocks_before_height( + &self, + height: u64, + exclude_recovered: bool, + ) -> Result<(), WalletStorageError> { + let db_clone = self.db.clone(); + + tokio::task::spawn_blocking(move || db_clone.clear_scanned_blocks_before_height(height, exclude_recovered)) + .await + .map_err(|err| WalletStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(()) + } } impl Display for DbKey { @@ -386,7 +453,7 @@ mod test { use crate::storage::{ database::WalletDatabase, - sqlite_db::WalletSqliteDatabase, + sqlite_db::wallet::WalletSqliteDatabase, sqlite_utilities::run_migration_and_create_sqlite_connection, }; diff --git a/base_layer/wallet/tests/transaction_service/mod.rs b/base_layer/wallet/src/storage/sqlite_db/mod.rs similarity index 93% rename from base_layer/wallet/tests/transaction_service/mod.rs rename to base_layer/wallet/src/storage/sqlite_db/mod.rs index cf5a90b4c3..71fb427105 100644 --- a/base_layer/wallet/tests/transaction_service/mod.rs +++ b/base_layer/wallet/src/storage/sqlite_db/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2019. The Tari Project +// Copyright 2021. The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the // following conditions are met: @@ -20,6 +20,5 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -pub mod service; -pub mod storage; -pub mod transaction_protocols; +pub mod scanned_blocks; +pub mod wallet; diff --git a/base_layer/wallet/src/storage/sqlite_db/scanned_blocks.rs b/base_layer/wallet/src/storage/sqlite_db/scanned_blocks.rs new file mode 100644 index 0000000000..b3e7be2ee8 --- /dev/null +++ b/base_layer/wallet/src/storage/sqlite_db/scanned_blocks.rs @@ -0,0 +1,132 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use chrono::{NaiveDateTime, Utc}; +use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection}; +use tari_core::transactions::tari_amount::MicroTari; + +use crate::{ + diesel::BoolExpressionMethods, + error::WalletStorageError, + schema::scanned_blocks, + utxo_scanner_service::service::ScannedBlock, +}; + +#[derive(Clone, Debug, Queryable, Insertable, PartialEq)] +#[table_name = "scanned_blocks"] +pub struct ScannedBlockSql { + header_hash: Vec, + height: i64, + num_outputs: Option, + amount: Option, + timestamp: NaiveDateTime, +} + +impl ScannedBlockSql { + pub fn index(conn: &SqliteConnection) -> Result, WalletStorageError> { + Ok(scanned_blocks::table + .order(scanned_blocks::height.desc()) + .load::(conn)?) + } + + pub fn new(header_hash: Vec, height: i64) -> Self { + Self { + header_hash, + height, + num_outputs: None, + amount: None, + timestamp: Utc::now().naive_utc(), + } + } + + pub fn new_with_amount(header_hash: Vec, height: i64, num_outputs: i64, amount: i64) -> Self { + Self { + header_hash, + height, + num_outputs: Some(num_outputs), + amount: Some(amount), + timestamp: Utc::now().naive_utc(), + } + } + + pub fn commit(&self, conn: &SqliteConnection) -> Result<(), WalletStorageError> { + diesel::insert_into(scanned_blocks::table) + .values(self.clone()) + .execute(conn)?; + Ok(()) + } + + pub fn clear_all(conn: &SqliteConnection) -> Result<(), WalletStorageError> { + diesel::delete(scanned_blocks::table).execute(conn)?; + Ok(()) + } + + /// Clear Scanned Blocks from the given height and higher + pub fn clear_from_and_higher(height: u64, conn: &SqliteConnection) -> Result<(), WalletStorageError> { + diesel::delete(scanned_blocks::table.filter(scanned_blocks::height.ge(height as i64))).execute(conn)?; + Ok(()) + } + + pub fn clear_before_height( + height: u64, + exclude_recovered: bool, + conn: &SqliteConnection, + ) -> Result<(), WalletStorageError> { + let mut query = diesel::delete(scanned_blocks::table) + .into_boxed() + .filter(scanned_blocks::height.lt(height as i64)); + if exclude_recovered { + query = query.filter( + scanned_blocks::num_outputs + .is_null() + .or(scanned_blocks::num_outputs.eq(0)), + ); + } + + query.execute(conn)?; + Ok(()) + } +} + +impl From for ScannedBlockSql { + fn from(sb: ScannedBlock) -> Self { + Self { + header_hash: sb.header_hash, + height: sb.height as i64, + num_outputs: sb.num_outputs.map(|n| n as i64), + amount: sb.amount.map(|a| a.as_u64() as i64), + timestamp: sb.timestamp, + } + } +} + +impl From for ScannedBlock { + fn from(sb: ScannedBlockSql) -> Self { + Self { + header_hash: sb.header_hash, + height: sb.height as u64, + num_outputs: sb.num_outputs.map(|n| n as u64), + amount: sb.amount.map(|a| MicroTari::from(a as u64)), + timestamp: sb.timestamp, + } + } +} diff --git a/base_layer/wallet/src/storage/sqlite_db.rs b/base_layer/wallet/src/storage/sqlite_db/wallet.rs similarity index 96% rename from base_layer/wallet/src/storage/sqlite_db.rs rename to base_layer/wallet/src/storage/sqlite_db/wallet.rs index f1201a5a52..50a974ad25 100644 --- a/base_layer/wallet/src/storage/sqlite_db.rs +++ b/base_layer/wallet/src/storage/sqlite_db/wallet.rs @@ -50,12 +50,14 @@ use crate::{ schema::{client_key_values, wallet_settings}, storage::{ database::{DbKey, DbKeyValuePair, DbValue, WalletBackend, WriteOperation}, + sqlite_db::scanned_blocks::ScannedBlockSql, sqlite_utilities::wallet_db_connection::WalletDbConnection, }, util::encryption::{decrypt_bytes_integral_nonce, encrypt_bytes_integral_nonce, Encryptable, AES_NONCE_BYTES}, + utxo_scanner_service::service::ScannedBlock, }; -const LOG_TARGET: &str = "wallet::storage::sqlite_db"; +const LOG_TARGET: &str = "wallet::storage::wallet"; /// A Sqlite backend for the Output Manager Service. The Backend is accessed via a connection pool to the Sqlite file. #[derive(Clone)] @@ -512,6 +514,35 @@ impl WalletBackend for WalletSqliteDatabase { Ok(()) } + + fn get_scanned_blocks(&self) -> Result, WalletStorageError> { + let conn = self.database_connection.get_pooled_connection()?; + ScannedBlockSql::index(&conn).map(|sb| sb.into_iter().map(ScannedBlock::from).collect()) + } + + fn save_scanned_block(&self, scanned_block: ScannedBlock) -> Result<(), WalletStorageError> { + let conn = self.database_connection.get_pooled_connection()?; + ScannedBlockSql::from(scanned_block).commit(&conn) + } + + fn clear_scanned_blocks(&self) -> Result<(), WalletStorageError> { + let conn = self.database_connection.get_pooled_connection()?; + ScannedBlockSql::clear_all(&conn) + } + + fn clear_scanned_blocks_from_and_higher(&self, height: u64) -> Result<(), WalletStorageError> { + let conn = self.database_connection.get_pooled_connection()?; + ScannedBlockSql::clear_from_and_higher(height, &conn) + } + + fn clear_scanned_blocks_before_height( + &self, + height: u64, + exclude_recovered: bool, + ) -> Result<(), WalletStorageError> { + let conn = self.database_connection.get_pooled_connection()?; + ScannedBlockSql::clear_before_height(height, exclude_recovered, &conn) + } } /// Confirm if database is encrypted or not and if a cipher is provided confirm the cipher is correct. @@ -736,7 +767,7 @@ mod test { use crate::storage::{ database::{DbKey, DbValue, WalletBackend}, - sqlite_db::{ClientKeyValueSql, WalletSettingSql, WalletSqliteDatabase}, + sqlite_db::wallet::{ClientKeyValueSql, WalletSettingSql, WalletSqliteDatabase}, sqlite_utilities::run_migration_and_create_sqlite_connection, }; diff --git a/base_layer/wallet/src/storage/sqlite_utilities/mod.rs b/base_layer/wallet/src/storage/sqlite_utilities/mod.rs index e119bf1f40..7d766f9097 100644 --- a/base_layer/wallet/src/storage/sqlite_utilities/mod.rs +++ b/base_layer/wallet/src/storage/sqlite_utilities/mod.rs @@ -36,7 +36,7 @@ use crate::{ contacts_service::storage::sqlite_db::ContactsServiceSqliteDatabase, error::WalletStorageError, output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, - storage::{database::WalletDatabase, sqlite_db::WalletSqliteDatabase}, + storage::{database::WalletDatabase, sqlite_db::wallet::WalletSqliteDatabase}, transaction_service::storage::sqlite_db::TransactionServiceSqliteDatabase, }; @@ -167,7 +167,7 @@ pub fn initialize_sqlite_database_backends( /// the DB /// TODO remove at next testnet reset fn check_for_incompatible_db_encryption(connection: &SqliteConnection) -> Result<(), WalletStorageError> { - use crate::{diesel::RunQueryDsl, schema::wallet_settings, storage::sqlite_db::WalletSettingSql}; + use crate::{diesel::RunQueryDsl, schema::wallet_settings, storage::sqlite_db::wallet::WalletSettingSql}; if wallet_settings::table .filter(wallet_settings::key.eq("MasterSecretKey".to_string())) diff --git a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs index 92b8fbdaef..1b19bd7cd3 100644 --- a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs @@ -66,7 +66,7 @@ use crate::{ }, }; -const LOG_TARGET: &str = "wallet::transaction_service::database::sqlite_db"; +const LOG_TARGET: &str = "wallet::transaction_service::database::wallet"; /// A Sqlite backend for the Transaction Service. The Backend is accessed via a connection pool to the Sqlite file. #[derive(Clone)] diff --git a/base_layer/wallet/src/utxo_scanner_service/error.rs b/base_layer/wallet/src/utxo_scanner_service/error.rs index 53c79f4f71..d479e82fae 100644 --- a/base_layer/wallet/src/utxo_scanner_service/error.rs +++ b/base_layer/wallet/src/utxo_scanner_service/error.rs @@ -56,4 +56,6 @@ pub enum UtxoScannerError { TransportChannelError(#[from] TransportChannelError), #[error("Serde json error: `{0}`")] SerdeJsonError(#[from] SerdeJsonError), + #[error("Overflow Error")] + OverflowError, } diff --git a/base_layer/wallet/src/utxo_scanner_service/handle.rs b/base_layer/wallet/src/utxo_scanner_service/handle.rs index 09240bd998..dea0c1d9b9 100644 --- a/base_layer/wallet/src/utxo_scanner_service/handle.rs +++ b/base_layer/wallet/src/utxo_scanner_service/handle.rs @@ -43,14 +43,14 @@ pub enum UtxoScannerEvent { }, /// Progress of the recovery process (current_block, current_chain_height) Progress { - current_index: u64, - total_index: u64, + current_height: u64, + tip_height: u64, }, /// Completed Recovery (Number scanned, Num of Recovered outputs, Value of recovered outputs, Time taken) Completed { - number_scanned: u64, - number_received: u64, - value_received: MicroTari, + final_height: u64, + num_recovered: u64, + value_recovered: MicroTari, time_taken: Duration, }, /// Scanning process has failed and scanning process has exited diff --git a/base_layer/wallet/src/utxo_scanner_service/mod.rs b/base_layer/wallet/src/utxo_scanner_service/mod.rs index b17c5c775e..75957248f8 100644 --- a/base_layer/wallet/src/utxo_scanner_service/mod.rs +++ b/base_layer/wallet/src/utxo_scanner_service/mod.rs @@ -24,7 +24,7 @@ pub mod error; pub mod handle; pub mod service; mod utxo_scanner_task; -mod uxto_scanner_service_builder; +pub mod uxto_scanner_service_builder; pub use utxo_scanner_task::RECOVERY_KEY; diff --git a/base_layer/wallet/src/utxo_scanner_service/service.rs b/base_layer/wallet/src/utxo_scanner_service/service.rs index 6f21bced64..31af81c0c3 100644 --- a/base_layer/wallet/src/utxo_scanner_service/service.rs +++ b/base_layer/wallet/src/utxo_scanner_service/service.rs @@ -22,9 +22,9 @@ use std::sync::Arc; +use chrono::NaiveDateTime; use futures::FutureExt; use log::*; -use serde::{Deserialize, Serialize}; use tari_common_types::types::HashOutput; use tari_comms::{connectivity::ConnectivityRequester, peer_manager::Peer, types::CommsPublicKey, NodeIdentity}; use tari_core::transactions::{tari_amount::MicroTari, CryptoFactories}; @@ -49,6 +49,12 @@ use crate::{ pub const LOG_TARGET: &str = "wallet::utxo_scanning"; +// Cache 1 days worth of headers. +// TODO Determine a better strategy for maintaining a cache. Logarithmic sampling has been suggested but the problem +// with it is that as you move on to the next block you need to resample say a 100 headers where a simple window like +// this only samples 1 header per new block. A ticket has been added to the backlog to think about this +pub const SCANNED_BLOCK_CACHE_SIZE: u64 = 720; + pub struct UtxoScannerService where TBackend: WalletBackend + 'static { @@ -172,14 +178,6 @@ where TBackend: WalletBackend + 'static } } -#[derive(Clone, Default, Serialize, Deserialize)] -pub struct ScanningMetadata { - pub total_amount: MicroTari, - pub number_of_utxos: u64, - pub utxo_index: u64, - pub height_hash: HashOutput, -} - #[derive(Clone)] pub struct UtxoScannerResources { pub db: WalletDatabase, @@ -190,3 +188,12 @@ pub struct UtxoScannerResources { pub node_identity: Arc, pub factories: CryptoFactories, } + +#[derive(Debug, Clone)] +pub struct ScannedBlock { + pub header_hash: HashOutput, + pub height: u64, + pub num_outputs: Option, + pub amount: Option, + pub timestamp: NaiveDateTime, +} diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index 0180d9705f..783d8093b5 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -28,19 +28,13 @@ use std::{ use chrono::Utc; use futures::StreamExt; use log::*; -use tari_common_types::transaction::TxId; -use tari_comms::{ - peer_manager::NodeId, - protocol::rpc::{RpcError, RpcStatus}, - types::CommsPublicKey, - PeerConnection, -}; +use tari_common_types::{transaction::TxId, types::HashOutput}; +use tari_comms::{peer_manager::NodeId, types::CommsPublicKey, PeerConnection}; use tari_core::{ - base_node::sync::rpc::BaseNodeSyncRpcClient, + base_node::rpc::BaseNodeWalletRpcClient, blocks::BlockHeader, crypto::tari_utilities::hex::Hex, - proto, - proto::base_node::{FindChainSplitRequest, SyncUtxosRequest}, + proto::base_node::SyncUtxosByBlockRequest, tari_utilities::Hashable, transactions::{ tari_amount::MicroTari, @@ -48,7 +42,7 @@ use tari_core::{ }, }; use tari_shutdown::ShutdownSignal; -use tokio::{sync::broadcast, time}; +use tokio::sync::broadcast; use crate::{ error::WalletError, @@ -56,7 +50,7 @@ use crate::{ utxo_scanner_service::{ error::UtxoScannerError, handle::UtxoScannerEvent, - service::{ScanningMetadata, UtxoScannerResources}, + service::{ScannedBlock, UtxoScannerResources, SCANNED_BLOCK_CACHE_SIZE}, uxto_scanner_service_builder::UtxoScannerMode, }, }; @@ -64,7 +58,6 @@ use crate::{ pub const LOG_TARGET: &str = "wallet::utxo_scanning"; pub const RECOVERY_KEY: &str = "recovery_data"; -const SCANNING_KEY: &str = "scanning_data"; pub struct UtxoScannerTask where TBackend: WalletBackend + 'static @@ -81,27 +74,87 @@ where TBackend: WalletBackend + 'static impl UtxoScannerTask where TBackend: WalletBackend + 'static { + pub async fn run(mut self) -> Result<(), UtxoScannerError> { + if self.mode == UtxoScannerMode::Recovery { + self.set_recovery_mode().await?; + } else if self.check_recovery_mode().await? { + warn!( + target: LOG_TARGET, + "Scanning round aborted as a Recovery is in progress" + ); + return Ok(()); + } + + loop { + if self.shutdown_signal.is_triggered() { + return Ok(()); + } + match self.get_next_peer() { + Some(peer) => match self.attempt_sync(peer.clone()).await { + Ok((num_outputs_recovered, final_height, final_amount, elapsed)) => { + debug!(target: LOG_TARGET, "Scanned to height #{}", final_height); + self.finalize(num_outputs_recovered, final_height, final_amount, elapsed) + .await?; + return Ok(()); + }, + Err(e) => { + warn!( + target: LOG_TARGET, + "Failed to scan UTXO's from base node {}: {}", peer, e + ); + self.publish_event(UtxoScannerEvent::ScanningRoundFailed { + num_retries: self.num_retries, + retry_limit: self.retry_limit, + error: e.to_string(), + }); + continue; + }, + }, + None => { + self.publish_event(UtxoScannerEvent::ScanningRoundFailed { + num_retries: self.num_retries, + retry_limit: self.retry_limit, + error: "No new peers to try after this round".to_string(), + }); + + if self.num_retries >= self.retry_limit { + self.publish_event(UtxoScannerEvent::ScanningFailed); + return Err(UtxoScannerError::UtxoScanningError(format!( + "Failed to scan UTXO's after {} attempt(s) using all {} sync peer(s). Aborting...", + self.num_retries, + self.peer_seeds.len() + ))); + } + + self.num_retries += 1; + // Reset peer index to try connect to the first peer again + self.peer_index = 0; + }, + } + } + } + async fn finalize( &self, - total_scanned: u64, - final_utxo_pos: u64, + num_outputs_recovered: u64, + final_height: u64, + total_value: MicroTari, elapsed: Duration, ) -> Result<(), UtxoScannerError> { - let metadata = self.get_metadata().await?.unwrap_or_default(); self.publish_event(UtxoScannerEvent::Progress { - current_index: final_utxo_pos, - total_index: final_utxo_pos, + current_height: final_height, + tip_height: final_height, }); self.publish_event(UtxoScannerEvent::Completed { - number_scanned: total_scanned, - number_received: metadata.number_of_utxos, - value_received: metadata.total_amount, + final_height, + num_recovered: num_outputs_recovered, + value_recovered: total_value, time_taken: elapsed, }); // Presence of scanning keys are used to determine if a wallet is busy with recovery or not. if self.mode == UtxoScannerMode::Recovery { - self.clear_db().await?; + self.clear_recovery_mode().await?; } Ok(()) } @@ -121,14 +174,11 @@ where TBackend: WalletBackend + 'static retry_limit: self.retry_limit, error: e.to_string(), }); - // No use re-dialing a peer that is not responsive for recovery mode - if self.mode == UtxoScannerMode::Recovery { - if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await { - if connection.clone().disconnect().await.is_ok() { - debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer); - } - }; - let _ = time::sleep(Duration::from_secs(30)); + + if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await { + if connection.clone().disconnect().await.is_ok() { + debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer); + } } Err(e.into()) @@ -136,11 +186,11 @@ where TBackend: WalletBackend + 'static } } - async fn attempt_sync(&mut self, peer: NodeId) -> Result<(u64, u64, Duration), UtxoScannerError> { + async fn attempt_sync(&mut self, peer: NodeId) -> Result<(u64, u64, MicroTari, Duration), UtxoScannerError> { let mut connection = self.connect_to_peer(peer.clone()).await?; let mut client = connection - .connect_rpc_using_builder(BaseNodeSyncRpcClient::builder().with_deadline(Duration::from_secs(60))) + .connect_rpc_using_builder(BaseNodeWalletRpcClient::builder().with_deadline(Duration::from_secs(60))) .await?; let latency = client.get_last_request_latency(); @@ -150,36 +200,47 @@ where TBackend: WalletBackend + 'static )); let timer = Instant::now(); - let mut total_scanned = 0u64; + loop { - let start_index = self.get_start_utxo_mmr_pos(&mut client).await?; let tip_header = self.get_chain_tip_header(&mut client).await?; - let output_mmr_size = tip_header.output_mmr_size; + let tip_header_hash = tip_header.hash(); + let start_block = self.get_start_scanned_block(tip_header.height, &mut client).await?; + if self.shutdown_signal.is_triggered() { - // if running is set to false, we know its been canceled upstream so lets exit the loop - return Ok((total_scanned, start_index, timer.elapsed())); + return Ok(( + start_block.num_outputs.unwrap_or(0), + start_block.height, + start_block.amount.unwrap_or_else(|| MicroTari::from(0)), + timer.elapsed(), + )); } debug!( target: LOG_TARGET, - "Scanning UTXO's (start_index = {}, output_mmr_size = {}, height = {}, tip_hash = {})", - start_index, - output_mmr_size, + "Scanning UTXO's from height = {} to current tip_height = {} (starting header_hash: {})", + start_block.height, tip_header.height, - tip_header.hash().to_hex() + start_block.header_hash.to_hex(), ); - // start_index could be greater than output_mmr_size if we switch to a new peer that is behind the original - // peer. In the common case, we wait for start index. - if start_index >= output_mmr_size - 1 { + + // If we have scanned to the tip we are done + if start_block.height >= tip_header.height || start_block.header_hash == tip_header_hash { debug!( target: LOG_TARGET, - "Scanning complete UTXO #{} in {:.2?}", - start_index, + "Scanning complete to current tip (height: {}) in {:.2?}", + start_block.height, timer.elapsed() ); - return Ok((total_scanned, start_index, timer.elapsed())); + return Ok(( + start_block.num_outputs.unwrap_or(0), + start_block.height, + start_block.amount.unwrap_or_else(|| MicroTari::from(0)), + timer.elapsed(), + )); } - let num_scanned = self.scan_utxos(&mut client, start_index, tip_header).await?; + let (num_recovered, num_scanned, amount) = self + .scan_utxos(&mut client, start_block.header_hash, tip_header_hash, tip_header.height) + .await?; if num_scanned == 0 { return Err(UtxoScannerError::UtxoScanningError( "Peer returned 0 UTXOs to scan".to_string(), @@ -187,101 +248,161 @@ where TBackend: WalletBackend + 'static } debug!( target: LOG_TARGET, - "Scanning round completed UTXO #{} in {:.2?} ({} scanned)", - output_mmr_size, + "Scanning round completed up to height {} in {:.2?} ({} outputs scanned, {} recovered with value {})", + tip_header.height, timer.elapsed(), - num_scanned + num_scanned, + num_recovered, + amount ); - - // let num_scanned = 0; - total_scanned += num_scanned; - // return Ok((total_scanned, start_index, timer.elapsed())); } } - async fn get_chain_tip_header(&self, client: &mut BaseNodeSyncRpcClient) -> Result { - let chain_metadata = client.get_chain_metadata().await?; - let chain_height = chain_metadata.height_of_longest_chain(); + async fn get_chain_tip_header( + &self, + client: &mut BaseNodeWalletRpcClient, + ) -> Result { + let tip_info = client.get_tip_info().await?; + let chain_height = tip_info.metadata.map(|m| m.height_of_longest_chain()).unwrap_or(0); let end_header = client.get_header_by_height(chain_height).await?; let end_header = BlockHeader::try_from(end_header).map_err(|_| UtxoScannerError::ConversionError)?; Ok(end_header) } - async fn get_start_utxo_mmr_pos(&self, client: &mut BaseNodeSyncRpcClient) -> Result { - let metadata = match self.get_metadata().await? { - None => { - let birthday_metadata = self.get_birthday_metadata(client).await?; - self.set_metadata(birthday_metadata.clone()).await?; - return Ok(birthday_metadata.utxo_index); - }, - Some(m) => m, - }; + async fn get_start_scanned_block( + &self, + current_tip_height: u64, + client: &mut BaseNodeWalletRpcClient, + ) -> Result { + // Check for reogs + let scanned_blocks = self.resources.db.get_scanned_blocks().await?; + + if scanned_blocks.is_empty() { + let birthday_height_hash = self.get_birthday_header_height_hash(client).await?; + return Ok(ScannedBlock { + header_hash: birthday_height_hash.header_hash, + height: birthday_height_hash.height, + num_outputs: None, + amount: None, + timestamp: Utc::now().naive_utc(), + }); + } - // if it's none, we return 0 above. - let request = FindChainSplitRequest { - block_hashes: vec![metadata.height_hash], - header_count: 1, - }; - // this returns the index of the vec of hashes we sent it, that is the last hash it knows of. - match client.find_chain_split(request).await { - Ok(_) => Ok(metadata.utxo_index + 1), - Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => { - warn!(target: LOG_TARGET, "Reorg detected: {}", err); - // The node does not know of the last hash we scanned, thus we had a chain split. - // We now start at the wallet birthday again - let birthday_metdadata = self.get_birthday_metadata(client).await?; - Ok(birthday_metdadata.utxo_index) - }, - Err(err) => Err(err.into()), + // Run through the cached blocks and check which are not found in the current chain anymore + // Accumulate number of outputs and recovered Tari in the valid blocks + // Assumption: The blocks are ordered and a reorg will occur to the most recent blocks. Once you have found a + // valid block the blocks before it are also valid and don't need to be checked + let mut missing_scanned_blocks = Vec::new(); + let mut found_scanned_block = None; + let mut num_outputs = 0u64; + let mut amount = MicroTari::from(0); + for sb in scanned_blocks.into_iter() { + if sb.height <= current_tip_height { + if found_scanned_block.is_none() { + let header = BlockHeader::try_from(client.get_header_by_height(sb.height).await?) + .map_err(|_| UtxoScannerError::ConversionError)?; + let header_hash = header.hash(); + if header_hash != sb.header_hash { + missing_scanned_blocks.push(sb.clone()); + } else { + found_scanned_block = Some(sb.clone()); + } + } + if found_scanned_block.is_some() { + num_outputs = num_outputs.saturating_add(sb.num_outputs.unwrap_or(0)); + amount = amount + .checked_add(sb.amount.unwrap_or_else(|| MicroTari::from(0))) + .ok_or(UtxoScannerError::OverflowError)?; + } + } else { + missing_scanned_blocks.push(sb.clone()); + } + } + + if let Some(sb) = found_scanned_block { + let (height, next_header_hash) = if sb.height == current_tip_height { + // If we are at the tip just return the tip height and hash + (current_tip_height, sb.header_hash) + } else { + // If we are not at the tip scanning should resume from the next header in the chain + let next_header = BlockHeader::try_from(client.get_header_by_height(sb.height + 1).await?) + .map_err(|_| UtxoScannerError::ConversionError)?; + let next_header_hash = next_header.hash(); + (sb.height + 1, next_header_hash) + }; + + if !missing_scanned_blocks.is_empty() { + warn!( + target: LOG_TARGET, + "Reorg detected on base node. Restarting scanning from height {} (Header Hash: {})", + height, + next_header_hash.to_hex() + ); + self.resources + .db + .clear_scanned_blocks_from_and_higher( + missing_scanned_blocks + .last() + .expect("cannot fail, the vector is not empty") + .height, + ) + .await?; + } + Ok(ScannedBlock { + height, + num_outputs: Some(num_outputs), + amount: Some(amount), + header_hash: next_header_hash, + timestamp: Utc::now().naive_utc(), + }) + } else { + warn!( + target: LOG_TARGET, + "Reorg detected on base node. No previously scanned block headers found, resuming scan from wallet \ + birthday" + ); + // The node does not know of any of our cached headers so we will start the scan anew from the wallet + // birthday + self.resources.db.clear_scanned_blocks().await?; + let birthday_height_hash = self.get_birthday_header_height_hash(client).await?; + Ok(ScannedBlock { + header_hash: birthday_height_hash.header_hash, + height: birthday_height_hash.height, + num_outputs: None, + amount: None, + timestamp: Utc::now().naive_utc(), + }) } } async fn scan_utxos( &mut self, - client: &mut BaseNodeSyncRpcClient, - start_mmr_leaf_index: u64, - end_header: BlockHeader, - ) -> Result { - debug!( - target: LOG_TARGET, - "Scanning UTXO's from #{} to #{} (height {})", - start_mmr_leaf_index, - end_header.output_mmr_size, - end_header.height - ); + client: &mut BaseNodeWalletRpcClient, + start_header_hash: HashOutput, + end_header_hash: HashOutput, + tip_height: u64, + ) -> Result<(u64, u64, MicroTari), UtxoScannerError> { + // Setting how often the progress event and log should occur during scanning. Defined in blocks + const PROGRESS_REPORT_INTERVAL: u64 = 100; - let end_header_hash = end_header.hash(); - let output_mmr_size = end_header.output_mmr_size; let mut num_recovered = 0u64; let mut total_amount = MicroTari::from(0); let mut total_scanned = 0; - self.publish_event(UtxoScannerEvent::Progress { - current_index: start_mmr_leaf_index, - total_index: (output_mmr_size - 1), - }); - let request = SyncUtxosRequest { - start: start_mmr_leaf_index, + let request = SyncUtxosByBlockRequest { + start_header_hash: start_header_hash.clone(), end_header_hash: end_header_hash.clone(), - include_pruned_utxos: false, - include_deleted_bitmaps: false, }; let start = Instant::now(); - let utxo_stream = client.sync_utxos(request).await?; + let mut utxo_stream = client.sync_utxos_by_block(request).await?; trace!( target: LOG_TARGET, "bulletproof rewind profile - UTXO stream request time {} ms", start.elapsed().as_millis(), ); - // We download in chunks for improved streaming efficiency - const CHUNK_SIZE: usize = 125; - let mut utxo_stream = utxo_stream.chunks(CHUNK_SIZE); - const COMMIT_EVERY_N: u64 = (1000_i64 / CHUNK_SIZE as i64) as u64; - let mut last_utxo_index = 0u64; - let mut iteration_count = 0u64; let mut utxo_next_await_profiling = Vec::new(); let mut scan_for_outputs_profiling = Vec::new(); while let Some(response) = { @@ -292,32 +413,53 @@ where TBackend: WalletBackend + 'static } { if self.shutdown_signal.is_triggered() { // if running is set to false, we know its been canceled upstream so lets exit the loop - return Ok(total_scanned as u64); + return Ok((num_recovered, total_scanned as u64, total_amount)); } - let (outputs, utxo_index) = convert_response_to_transaction_outputs(response, last_utxo_index)?; - last_utxo_index = utxo_index; + + let response = response.map_err(|e| UtxoScannerError::RpcStatus(e.to_string()))?; + let current_height = response.height; + let current_header_hash = response.header_hash; + let outputs = response + .outputs + .into_iter() + .map(|utxo| TransactionOutput::try_from(utxo).map_err(|_| UtxoScannerError::ConversionError)) + .collect::, _>>()?; + total_scanned += outputs.len(); - iteration_count += 1; let start = Instant::now(); let found_outputs = self.scan_for_outputs(outputs).await?; scan_for_outputs_profiling.push(start.elapsed()); - // Reduce the number of db hits by only persisting progress every N iterations - if iteration_count % COMMIT_EVERY_N == 0 || last_utxo_index >= output_mmr_size - 1 { + let (count, amount) = self.import_utxos_to_transaction_service(found_outputs).await?; + + self.resources + .db + .save_scanned_block(ScannedBlock { + header_hash: current_header_hash, + height: current_height, + num_outputs: Some(count), + amount: Some(amount), + timestamp: Utc::now().naive_utc(), + }) + .await?; + + self.resources + .db + .clear_scanned_blocks_before_height(current_height.saturating_sub(SCANNED_BLOCK_CACHE_SIZE), true) + .await?; + + if current_height % PROGRESS_REPORT_INTERVAL == 0 { + debug!( + target: LOG_TARGET, + "Scanned up to block {} with a current tip_height of {}", current_height, tip_height + ); self.publish_event(UtxoScannerEvent::Progress { - current_index: last_utxo_index, - total_index: (output_mmr_size - 1), + current_height, + tip_height, }); - self.update_scanning_progress_in_db( - last_utxo_index, - total_amount, - num_recovered, - end_header_hash.clone(), - ) - .await?; } - let (count, amount) = self.import_utxos_to_transaction_service(found_outputs).await?; + num_recovered = num_recovered.saturating_add(count); total_amount += amount; } @@ -333,30 +475,8 @@ where TBackend: WalletBackend + 'static total_scanned, scan_for_outputs_profiling.iter().fold(0, |acc, &x| acc + x.as_millis()), ); - self.update_scanning_progress_in_db(last_utxo_index, total_amount, num_recovered, end_header_hash) - .await?; - self.publish_event(UtxoScannerEvent::Progress { - current_index: (output_mmr_size - 1), - total_index: (output_mmr_size - 1), - }); - Ok(total_scanned as u64) - } - - async fn update_scanning_progress_in_db( - &self, - last_utxo_index: u64, - total_amount: MicroTari, - num_recovered: u64, - end_header_hash: Vec, - ) -> Result<(), UtxoScannerError> { - let mut meta_data = self.get_metadata().await?.unwrap_or_default(); - meta_data.height_hash = end_header_hash; - meta_data.number_of_utxos += num_recovered; - meta_data.utxo_index = last_utxo_index; - meta_data.total_amount += total_amount; - self.set_metadata(meta_data).await?; - Ok(()) + Ok((num_recovered, total_scanned as u64, total_amount)) } async fn scan_for_outputs( @@ -417,32 +537,28 @@ where TBackend: WalletBackend + 'static Ok((num_recovered, total_amount)) } - fn get_db_mode_key(&self) -> String { - match self.mode { - UtxoScannerMode::Recovery => RECOVERY_KEY.to_owned(), - UtxoScannerMode::Scanning => SCANNING_KEY.to_owned(), - } - } - - async fn set_metadata(&self, data: ScanningMetadata) -> Result<(), UtxoScannerError> { - let total_key = self.get_db_mode_key(); - let db_value = serde_json::to_string(&data)?; - self.resources.db.set_client_key_value(total_key, db_value).await?; + async fn set_recovery_mode(&self) -> Result<(), UtxoScannerError> { + self.resources + .db + .set_client_key_value(RECOVERY_KEY.to_owned(), Utc::now().to_string()) + .await?; Ok(()) } - async fn get_metadata(&self) -> Result, UtxoScannerError> { - let total_key = self.get_db_mode_key(); - let value: Option = self.resources.db.get_client_key_from_str(total_key).await?; + async fn check_recovery_mode(&self) -> Result { + let value: Option = self + .resources + .db + .get_client_key_from_str(RECOVERY_KEY.to_owned()) + .await?; match value { - None => Ok(None), - Some(v) => Ok(serde_json::from_str(&v)?), + None => Ok(false), + Some(_v) => Ok(true), } } - async fn clear_db(&self) -> Result<(), UtxoScannerError> { - let total_key = self.get_db_mode_key(); - let _ = self.resources.db.clear_client_value(total_key).await?; + async fn clear_recovery_mode(&self) -> Result<(), UtxoScannerError> { + let _ = self.resources.db.clear_client_value(RECOVERY_KEY.to_owned()).await?; Ok(()) } @@ -481,66 +597,16 @@ where TBackend: WalletBackend + 'static Ok(tx_id) } - pub async fn run(mut self) -> Result<(), UtxoScannerError> { - loop { - if self.shutdown_signal.is_triggered() { - // if running is set to false, we know its been canceled upstream so lets exit the loop - return Ok(()); - } - match self.get_next_peer() { - Some(peer) => match self.attempt_sync(peer.clone()).await { - Ok((total_scanned, final_utxo_pos, elapsed)) => { - debug!(target: LOG_TARGET, "Scanned to UTXO #{}", final_utxo_pos); - self.finalize(total_scanned, final_utxo_pos, elapsed).await?; - return Ok(()); - }, - Err(e) => { - warn!( - target: LOG_TARGET, - "Failed to scan UTXO's from base node {}: {}", peer, e - ); - self.publish_event(UtxoScannerEvent::ScanningRoundFailed { - num_retries: self.num_retries, - retry_limit: self.retry_limit, - error: e.to_string(), - }); - continue; - }, - }, - None => { - self.publish_event(UtxoScannerEvent::ScanningRoundFailed { - num_retries: self.num_retries, - retry_limit: self.retry_limit, - error: "No new peers to try after this round".to_string(), - }); - - if self.num_retries >= self.retry_limit { - self.publish_event(UtxoScannerEvent::ScanningFailed); - return Err(UtxoScannerError::UtxoScanningError(format!( - "Failed to scan UTXO's after {} attempt(s) using all {} sync peer(s). Aborting...", - self.num_retries, - self.peer_seeds.len() - ))); - } - - self.num_retries += 1; - // Reset peer index to try connect to the first peer again - self.peer_index = 0; - }, - } - } - } - fn get_next_peer(&mut self) -> Option { let peer = self.peer_seeds.get(self.peer_index).map(NodeId::from_public_key); self.peer_index += 1; peer } - async fn get_birthday_metadata( + async fn get_birthday_header_height_hash( &self, - client: &mut BaseNodeSyncRpcClient, - ) -> Result { + client: &mut BaseNodeWalletRpcClient, + ) -> Result { let birthday = self.resources.db.get_wallet_birthday().await?; // Calculate the unix epoch time of two days before the wallet birthday. This is to avoid any weird time zone // issues @@ -557,50 +623,21 @@ where TBackend: WalletBackend + 'static }; let header = client.get_header_by_height(block_height).await?; let header = BlockHeader::try_from(header).map_err(|_| UtxoScannerError::ConversionError)?; - + let header_hash = header.hash(); info!( target: LOG_TARGET, - "Fresh wallet recovery starting at Block {}", block_height + "Fresh wallet recovery starting at Block {} (Header Hash: {})", + block_height, + header_hash.to_hex(), ); - Ok(ScanningMetadata { - total_amount: Default::default(), - number_of_utxos: 0, - utxo_index: header.output_mmr_size, - height_hash: header.hash(), + Ok(HeightHash { + height: block_height, + header_hash, }) } } -fn convert_response_to_transaction_outputs( - response: Vec>, - last_utxo_index: u64, -) -> Result<(Vec, u64), UtxoScannerError> { - let response: Vec = response - .into_iter() - .map(|v| v.map_err(|e| UtxoScannerError::RpcStatus(e.to_string()))) - .collect::, _>>()?; - - let current_utxo_index = response - // Assumes correct ordering which is otherwise not required for this protocol - .last() - .ok_or_else(|| { - UtxoScannerError::BaseNodeResponseError("Invalid response from base node: response was empty".to_string()) - })? - .mmr_index; - if current_utxo_index < last_utxo_index { - return Err(UtxoScannerError::BaseNodeResponseError( - "Invalid response from base node: mmr index must be non-decreasing".to_string(), - )); - } - - let outputs = response - .into_iter() - .filter_map(|utxo| { - utxo.into_utxo() - .and_then(|o| o.utxo) - .and_then(|utxo| utxo.into_transaction_output()) - .map(|output| TransactionOutput::try_from(output).map_err(|_| UtxoScannerError::ConversionError)) - }) - .collect::, _>>()?; - Ok((outputs, current_utxo_index)) +struct HeightHash { + height: u64, + header_hash: HashOutput, } diff --git a/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs b/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs index fadec3c353..271ab0a3bc 100644 --- a/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs +++ b/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs @@ -33,7 +33,7 @@ use crate::{ output_manager_service::handle::OutputManagerHandle, storage::{ database::{WalletBackend, WalletDatabase}, - sqlite_db::WalletSqliteDatabase, + sqlite_db::wallet::WalletSqliteDatabase, }, transaction_service::handle::TransactionServiceHandle, utxo_scanner_service::{ diff --git a/base_layer/wallet/tests/contacts_service/mod.rs b/base_layer/wallet/tests/contacts_service.rs similarity index 76% rename from base_layer/wallet/tests/contacts_service/mod.rs rename to base_layer/wallet/tests/contacts_service.rs index ba4f37722d..b325c9221a 100644 --- a/base_layer/wallet/tests/contacts_service/mod.rs +++ b/base_layer/wallet/tests/contacts_service.rs @@ -1,3 +1,25 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + // Copyright 2019. The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the @@ -37,7 +59,8 @@ use tari_wallet::contacts_service::{ }; use tokio::runtime::Runtime; -use crate::support::data::get_temp_sqlite_database_connection; +pub mod support; +use support::data::get_temp_sqlite_database_connection; pub fn setup_contacts_service( runtime: &mut Runtime, diff --git a/base_layer/wallet/tests/mod.rs b/base_layer/wallet/tests/output_manager_service.rs similarity index 91% rename from base_layer/wallet/tests/mod.rs rename to base_layer/wallet/tests/output_manager_service.rs index a7a8824b0b..74748b3a6a 100644 --- a/base_layer/wallet/tests/mod.rs +++ b/base_layer/wallet/tests/output_manager_service.rs @@ -1,4 +1,4 @@ -// Copyright 2019. The Tari Project +// Copyright 2021. The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the // following conditions are met: @@ -20,8 +20,5 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -pub mod contacts_service; -pub mod output_manager_service; +mod output_manager_service_tests; pub mod support; -pub mod transaction_service; -pub mod wallet; diff --git a/base_layer/wallet/tests/output_manager_service/mod.rs b/base_layer/wallet/tests/output_manager_service_tests/mod.rs similarity index 95% rename from base_layer/wallet/tests/output_manager_service/mod.rs rename to base_layer/wallet/tests/output_manager_service_tests/mod.rs index 4e344eb9f5..fbc3a8f203 100644 --- a/base_layer/wallet/tests/output_manager_service/mod.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2019. The Tari Project +// Copyright 2021. The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the // following conditions are met: @@ -19,6 +19,5 @@ // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -pub mod service; -pub mod storage; +mod service; +mod storage; diff --git a/base_layer/wallet/tests/output_manager_service/service.rs b/base_layer/wallet/tests/output_manager_service_tests/service.rs similarity index 99% rename from base_layer/wallet/tests/output_manager_service/service.rs rename to base_layer/wallet/tests/output_manager_service_tests/service.rs index dc3058d6dd..6f7a94452f 100644 --- a/base_layer/wallet/tests/output_manager_service/service.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/service.rs @@ -123,7 +123,6 @@ async fn setup_output_manager_service( let (sender, receiver_bns) = reply_channel::unbounded(); let (event_publisher_bns, _) = broadcast::channel(100); - let basenode_service_handle = BaseNodeServiceHandle::new(sender, event_publisher_bns.clone()); let mut mock_base_node_service = MockBaseNodeService::new(receiver_bns, shutdown.to_signal()); mock_base_node_service.set_default_base_node_state(); diff --git a/base_layer/wallet/tests/output_manager_service/storage.rs b/base_layer/wallet/tests/output_manager_service_tests/storage.rs similarity index 100% rename from base_layer/wallet/tests/output_manager_service/storage.rs rename to base_layer/wallet/tests/output_manager_service_tests/storage.rs diff --git a/base_layer/wallet/tests/support/comms_rpc.rs b/base_layer/wallet/tests/support/comms_rpc.rs index 62edc0fd98..4b69752325 100644 --- a/base_layer/wallet/tests/support/comms_rpc.rs +++ b/base_layer/wallet/tests/support/comms_rpc.rs @@ -21,15 +21,16 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ + cmp::min, collections::HashMap, convert::TryFrom, sync::{Arc, Mutex}, time::{Duration, Instant}, }; -use tari_common_types::types::Signature; +use tari_common_types::types::{HashOutput, Signature}; use tari_comms::{ - protocol::rpc::{NamedProtocolService, Request, Response, RpcClient, RpcStatus}, + protocol::rpc::{NamedProtocolService, Request, Response, RpcClient, RpcStatus, Streaming}, PeerConnection, }; use tari_core::{ @@ -47,6 +48,8 @@ use tari_core::{ QueryDeletedRequest, QueryDeletedResponse, Signatures as SignaturesProto, + SyncUtxosByBlockRequest, + SyncUtxosByBlockResponse, TipInfoResponse, TxQueryBatchResponses as TxQueryBatchResponsesProto, TxQueryResponse as TxQueryResponseProto, @@ -63,7 +66,7 @@ use tari_core::{ tari_utilities::Hashable, transactions::transaction::{Transaction, TransactionOutput}, }; -use tokio::time::sleep; +use tokio::{sync::mpsc, time::sleep}; pub async fn connect_rpc_client(connection: &mut PeerConnection) -> T where T: From + NamedProtocolService { @@ -87,6 +90,8 @@ pub struct BaseNodeWalletRpcMockState { utxo_query_calls: Arc>>>>, query_deleted_calls: Arc>>, get_header_by_height_calls: Arc>>, + get_height_at_time_calls: Arc>>, + sync_utxo_by_block_calls: Arc>>, submit_transaction_response: Arc>, transaction_query_response: Arc>, transaction_query_batch_response: Arc>, @@ -100,6 +105,8 @@ pub struct BaseNodeWalletRpcMockState { synced: Arc>, utxos: Arc>>, blocks: Arc>>, + utxos_by_block: Arc>>, + sync_utxos_by_block_trigger_channel: Arc>>>, } #[allow(clippy::mutex_atomic)] @@ -112,6 +119,8 @@ impl BaseNodeWalletRpcMockState { utxo_query_calls: Arc::new(Mutex::new(vec![])), query_deleted_calls: Arc::new(Mutex::new(vec![])), get_header_by_height_calls: Arc::new(Mutex::new(vec![])), + get_height_at_time_calls: Arc::new(Mutex::new(vec![])), + sync_utxo_by_block_calls: Arc::new(Mutex::new(vec![])), submit_transaction_response: Arc::new(Mutex::new(TxSubmissionResponse { accepted: true, rejection_reason: TxSubmissionRejectionReason::None, @@ -159,6 +168,8 @@ impl BaseNodeWalletRpcMockState { synced: Arc::new(Mutex::new(true)), utxos: Arc::new(Mutex::new(Vec::new())), blocks: Arc::new(Mutex::new(Default::default())), + utxos_by_block: Arc::new(Mutex::new(vec![])), + sync_utxos_by_block_trigger_channel: Arc::new(Mutex::new(None)), } } @@ -219,6 +230,17 @@ impl BaseNodeWalletRpcMockState { *lock = blocks; } + pub fn set_utxos_by_block(&self, utxos_by_block: Vec) { + let mut lock = acquire_lock!(self.utxos_by_block); + *lock = utxos_by_block; + } + + /// This channel will used to control which height a sync stream will return to from the testing client + pub fn set_utxos_by_block_trigger_channel(&self, channel: mpsc::Receiver) { + let mut lock = acquire_lock!(self.sync_utxos_by_block_trigger_channel); + *lock = Some(channel); + } + pub fn take_utxo_query_calls(&self) -> Vec>> { acquire_lock!(self.utxo_query_calls).drain(..).collect() } @@ -275,6 +297,44 @@ impl BaseNodeWalletRpcMockState { acquire_lock!(self.get_header_by_height_calls).pop() } + pub fn pop_get_height_at_time_calls(&self) -> Option { + acquire_lock!(self.get_height_at_time_calls).pop() + } + + pub fn take_get_height_at_time_calls(&self) -> Vec { + acquire_lock!(self.get_height_at_time_calls).drain(..).collect() + } + + pub fn take_sync_utxos_by_block_calls(&self) -> Vec<(HashOutput, HashOutput)> { + acquire_lock!(self.sync_utxo_by_block_calls).drain(..).collect() + } + + pub fn pop_sync_utxos_by_block_calls(&self) -> Option<(HashOutput, HashOutput)> { + acquire_lock!(self.sync_utxo_by_block_calls).pop() + } + + pub async fn wait_pop_sync_utxos_by_block_calls( + &self, + num_calls: usize, + timeout: Duration, + ) -> Result, String> { + let now = Instant::now(); + let mut count = 0usize; + while now.elapsed() < timeout { + let mut lock = acquire_lock!(self.sync_utxo_by_block_calls); + count = (*lock).len(); + if (*lock).len() >= num_calls { + return Ok((*lock).drain(..num_calls).collect()); + } + drop(lock); + sleep(Duration::from_millis(100)).await; + } + Err(format!( + "Did not receive enough calls within the timeout period, received {}, expected {}.", + count, num_calls + )) + } + pub async fn wait_pop_get_header_by_height_calls( &self, num_calls: usize, @@ -297,6 +357,24 @@ impl BaseNodeWalletRpcMockState { )) } + pub async fn wait_pop_get_height_at_time(&self, num_calls: usize, timeout: Duration) -> Result, String> { + let now = Instant::now(); + let mut count = 0usize; + while now.elapsed() < timeout { + let mut lock = acquire_lock!(self.get_height_at_time_calls); + count = (*lock).len(); + if (*lock).len() >= num_calls { + return Ok((*lock).drain(..num_calls).collect()); + } + drop(lock); + sleep(Duration::from_millis(100)).await; + } + Err(format!( + "Did not receive enough calls within the timeout period, received {}, expected {}.", + count, num_calls + )) + } + pub async fn wait_pop_utxo_query_calls( &self, num_calls: usize, @@ -644,6 +722,107 @@ impl BaseNodeWalletService for BaseNodeWalletRpcMockService { Err(RpcStatus::not_found("Header not found")) } } + + async fn get_height_at_time(&self, request: Request) -> Result, RpcStatus> { + let time = request.into_message(); + + let mut height_at_time_lock = acquire_lock!(self.state.get_height_at_time_calls); + (*height_at_time_lock).push(time); + + let block_lock = acquire_lock!(self.state.blocks); + + let mut headers = (*block_lock).values().cloned().collect::>(); + headers.sort_by(|a, b| b.height.cmp(&a.height)); + + let mut found_height = 0; + for h in headers.iter() { + if h.timestamp.as_u64() < time { + found_height = h.height; + break; + } + } + if found_height == 0 { + found_height = headers[0].height; + } + Ok(Response::new(found_height)) + } + + async fn sync_utxos_by_block( + &self, + request: Request, + ) -> Result, RpcStatus> { + let SyncUtxosByBlockRequest { + start_header_hash, + end_header_hash, + } = request.into_message(); + + let mut sync_utxo_by_block_lock = acquire_lock!(self.state.sync_utxo_by_block_calls); + (*sync_utxo_by_block_lock).push((start_header_hash.clone(), end_header_hash.clone())); + + let block_lock = acquire_lock!(self.state.utxos_by_block); + let mut blocks = (*block_lock).clone(); + blocks.sort_by(|a, b| a.height.cmp(&b.height)); + + let start_index = blocks.iter().position(|b| b.header_hash == start_header_hash); + let end_index = blocks.iter().position(|b| b.header_hash == end_header_hash); + + let mut channel_lock = acquire_lock!(self.state.sync_utxos_by_block_trigger_channel); + let trigger_channel_option = (*channel_lock).take(); + + if let (Some(start), Some(end)) = (start_index, end_index) { + let (tx, rx) = mpsc::channel(200); + let task = async move { + if let Some(mut trigger_channel) = trigger_channel_option { + let mut current_block = start; + while let Some(trigger_block) = trigger_channel.recv().await { + if trigger_block < current_block { + // This is a testing harness so just panic if used incorrectly. + panic!("Trigger block cannot be before current starting block"); + } + for b in blocks + .clone() + .into_iter() + .skip(current_block) + .take(min(trigger_block, end) - current_block + 1) + { + let item = SyncUtxosByBlockResponse { + outputs: b.utxos.clone().into_iter().map(|o| o.into()).collect(), + height: b.height, + header_hash: b.header_hash.clone(), + }; + tx.send(Ok(item)).await.unwrap(); + } + if trigger_block >= end { + break; + } + current_block = trigger_block + 1; + } + } else { + for b in blocks.into_iter().skip(start).take(end - start + 1) { + let item = SyncUtxosByBlockResponse { + outputs: b.utxos.clone().into_iter().map(|o| o.into()).collect(), + height: b.height, + header_hash: b.header_hash.clone(), + }; + tx.send(Ok(item)).await.unwrap(); + } + } + }; + + tokio::spawn(task); + + Ok(Streaming::new(rx)) + } else { + Err(RpcStatus::not_found("Headers not found")) + } + } +} + +#[derive(Clone, Debug)] +pub struct UtxosByBlock { + pub height: u64, + pub header_hash: Vec, + pub utxos: Vec, } #[cfg(test)] diff --git a/base_layer/wallet/tests/support/mod.rs b/base_layer/wallet/tests/support/mod.rs index ea01f6c3d9..6915c1cc0d 100644 --- a/base_layer/wallet/tests/support/mod.rs +++ b/base_layer/wallet/tests/support/mod.rs @@ -24,3 +24,5 @@ pub mod utils; pub mod comms_and_services; pub mod comms_rpc; pub mod data; +pub mod output_manager_service_mock; +pub mod transaction_service_mock; diff --git a/base_layer/wallet/tests/support/output_manager_service_mock.rs b/base_layer/wallet/tests/support/output_manager_service_mock.rs new file mode 100644 index 0000000000..a409dd2c4c --- /dev/null +++ b/base_layer/wallet/tests/support/output_manager_service_mock.rs @@ -0,0 +1,166 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::sync::{Arc, Mutex}; + +use futures::StreamExt; +use log::*; +use tari_service_framework::{reply_channel, reply_channel::Receiver}; +use tari_shutdown::ShutdownSignal; +use tari_wallet::output_manager_service::{ + error::OutputManagerError, + handle::{OutputManagerEvent, OutputManagerHandle, OutputManagerRequest, OutputManagerResponse}, + storage::models::DbUnblindedOutput, +}; +use tokio::sync::{broadcast, broadcast::Sender, oneshot}; + +const LOG_TARGET: &str = "wallet::output_manager_service_mock"; + +pub fn make_output_manager_service_mock( + shutdown_signal: ShutdownSignal, +) -> (OutputManagerServiceMock, OutputManagerHandle) { + let (sender, receiver) = reply_channel::unbounded(); + let (publisher, _) = broadcast::channel(100); + let output_manager_handle = OutputManagerHandle::new(sender, publisher.clone()); + let mock = OutputManagerServiceMock::new(publisher, receiver, shutdown_signal); + (mock, output_manager_handle) +} + +pub struct OutputManagerServiceMock { + _event_publisher: Sender>, + request_stream: Option>>, + shutdown_signal: ShutdownSignal, + state: OutputManagerMockState, +} + +impl OutputManagerServiceMock { + pub fn new( + event_publisher: Sender>, + request_stream: Receiver>, + shutdown_signal: ShutdownSignal, + ) -> Self { + Self { + _event_publisher: event_publisher, + request_stream: Some(request_stream), + shutdown_signal, + state: OutputManagerMockState::new(), + } + } + + pub fn get_state(&self) -> OutputManagerMockState { + self.state.clone() + } + + pub async fn run(mut self) { + info!(target: LOG_TARGET, "Starting Mock OutputManager Service"); + let mut shutdown = self.shutdown_signal.clone(); + let mut request_stream = self.request_stream.take().unwrap(); + + loop { + tokio::select! { + Some(request_context) = request_stream.next() => { + let (request, reply_tx) = request_context.split(); + self.handle_request(request, reply_tx); + }, + _ = shutdown.wait() => { + info!(target: LOG_TARGET, "OutputManager service mock shutting down because it received the shutdown signal"); + break; + } + } + } + } + + fn handle_request( + &self, + request: OutputManagerRequest, + reply_tx: oneshot::Sender>, + ) { + info!(target: LOG_TARGET, "Handling Request: {}", request); + match request { + OutputManagerRequest::ScanForRecoverableOutputs(requested_outputs) => { + let lock = acquire_lock!(self.state.recoverable_outputs); + let outputs = (*lock) + .clone() + .into_iter() + .filter_map(|dbuo| { + if requested_outputs.iter().any(|ro| dbuo.commitment == ro.commitment) { + Some(dbuo.unblinded_output) + } else { + None + } + }) + .collect(); + + let _ = reply_tx + .send(Ok(OutputManagerResponse::RewoundOutputs(outputs))) + .map_err(|e| { + warn!(target: LOG_TARGET, "Failed to send reply"); + e + }); + }, + OutputManagerRequest::ScanOutputs(_to) => { + let lock = acquire_lock!(self.state.one_sided_payments); + let outputs = (*lock).clone(); + let _ = reply_tx + .send(Ok(OutputManagerResponse::ScanOutputs( + outputs.into_iter().map(|dbuo| dbuo.unblinded_output).collect(), + ))) + .map_err(|e| { + warn!(target: LOG_TARGET, "Failed to send reply"); + e + }); + }, + _ => panic!("Output Manager Service Mock does not support this call"), + } + } +} + +#[derive(Clone, Debug)] +pub struct OutputManagerMockState { + pub recoverable_outputs: Arc>>, + pub one_sided_payments: Arc>>, +} + +impl OutputManagerMockState { + pub fn new() -> Self { + Self { + recoverable_outputs: Arc::new(Mutex::new(Vec::new())), + one_sided_payments: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn set_recoverable_outputs(&self, outputs: Vec) { + let mut lock = acquire_lock!(self.recoverable_outputs); + *lock = outputs; + } + + pub fn _set_one_sided_payments(&self, outputs: Vec) { + let mut lock = acquire_lock!(self.one_sided_payments); + *lock = outputs; + } +} + +impl Default for OutputManagerMockState { + fn default() -> Self { + Self::new() + } +} diff --git a/base_layer/wallet/tests/support/transaction_service_mock.rs b/base_layer/wallet/tests/support/transaction_service_mock.rs new file mode 100644 index 0000000000..5fa2446e6c --- /dev/null +++ b/base_layer/wallet/tests/support/transaction_service_mock.rs @@ -0,0 +1,108 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::sync::Arc; + +use futures::StreamExt; +use log::*; +use tari_service_framework::{reply_channel, reply_channel::Receiver}; +use tari_shutdown::ShutdownSignal; +use tari_wallet::transaction_service::{ + error::TransactionServiceError, + handle::{TransactionEvent, TransactionServiceHandle, TransactionServiceRequest, TransactionServiceResponse}, +}; +use tokio::sync::{broadcast, broadcast::Sender, oneshot}; + +const LOG_TARGET: &str = "wallet::transaction_service_mock"; + +pub fn make_transaction_service_mock( + shutdown_signal: ShutdownSignal, +) -> (TransactionServiceMock, TransactionServiceHandle) { + let (sender, receiver) = reply_channel::unbounded(); + let (publisher, _) = broadcast::channel(100); + let transaction_handle = TransactionServiceHandle::new(sender, publisher.clone()); + let mock = TransactionServiceMock::new(publisher, receiver, shutdown_signal); + (mock, transaction_handle) +} + +pub struct TransactionServiceMock { + _event_publisher: Sender>, + request_stream: + Option>>, + shutdown_signal: ShutdownSignal, +} + +impl TransactionServiceMock { + pub fn new( + event_publisher: Sender>, + request_stream: Receiver< + TransactionServiceRequest, + Result, + >, + shutdown_signal: ShutdownSignal, + ) -> Self { + Self { + _event_publisher: event_publisher, + request_stream: Some(request_stream), + shutdown_signal, + } + } + + pub async fn run(mut self) { + info!(target: LOG_TARGET, "Starting Mock Transaction Service"); + + let mut shutdown = self.shutdown_signal.clone(); + let mut request_stream = self.request_stream.take().unwrap(); + + loop { + tokio::select! { + Some(request_context) = request_stream.next() => { + let (request, reply_tx) = request_context.split(); + Self::handle_request(request, reply_tx); + }, + _ = shutdown.wait() => { + info!(target: LOG_TARGET, "Transaction service mock shutting down because it received the shutdown signal"); + break; + } + } + } + } + + fn handle_request( + request: TransactionServiceRequest, + reply_tx: oneshot::Sender>, + ) { + info!(target: LOG_TARGET, "Handling Request: {}", request); + + match request { + TransactionServiceRequest::ImportUtxo(_, _, _, _) => { + let _ = reply_tx + .send(Ok(TransactionServiceResponse::UtxoImported(42))) + .map_err(|e| { + warn!(target: LOG_TARGET, "Failed to send reply"); + e + }); + }, + _ => panic!("Transaction Service Mock does not support this call"), + } + } +} diff --git a/base_layer/wallet/tests/support/utils.rs b/base_layer/wallet/tests/support/utils.rs index 5c42c06452..30179dc8bf 100644 --- a/base_layer/wallet/tests/support/utils.rs +++ b/base_layer/wallet/tests/support/utils.rs @@ -20,8 +20,6 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{fmt::Debug, thread, time::Duration}; - use rand::{CryptoRng, Rng}; use tari_common_types::types::{CommitmentFactory, PrivateKey, PublicKey}; use tari_core::transactions::{ @@ -34,32 +32,6 @@ use tari_crypto::{ script, }; -pub fn assert_change(mut func: F, to: T, poll_count: usize) -where - F: FnMut() -> T, - T: Eq + Debug, -{ - let mut i = 0; - loop { - let last_val = func(); - if last_val == to { - break; - } - - i += 1; - if i >= poll_count { - panic!( - "Value did not change to {:?} within {}ms (last value: {:?})", - to, - poll_count * 100, - last_val, - ); - } - - thread::sleep(Duration::from_millis(100)); - } -} - pub struct TestParams { pub spend_key: PrivateKey, pub change_spend_key: PrivateKey, diff --git a/base_layer/wallet/tests/transaction_service.rs b/base_layer/wallet/tests/transaction_service.rs new file mode 100644 index 0000000000..fee8cdd673 --- /dev/null +++ b/base_layer/wallet/tests/transaction_service.rs @@ -0,0 +1,24 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +pub mod support; +mod transaction_service_tests; diff --git a/base_layer/wallet/tests/transaction_service_tests/mod.rs b/base_layer/wallet/tests/transaction_service_tests/mod.rs new file mode 100644 index 0000000000..4431baf1d3 --- /dev/null +++ b/base_layer/wallet/tests/transaction_service_tests/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +mod service; +mod storage; +mod transaction_protocols; diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs similarity index 99% rename from base_layer/wallet/tests/transaction_service/service.rs rename to base_layer/wallet/tests/transaction_service_tests/service.rs index 75d1468e60..e8cc71fbd5 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -123,7 +123,7 @@ use tari_wallet::{ }, storage::{ database::WalletDatabase, - sqlite_db::WalletSqliteDatabase, + sqlite_db::wallet::WalletSqliteDatabase, sqlite_utilities::{run_migration_and_create_sqlite_connection, WalletDbConnection}, }, test_utils::{create_consensus_constants, make_wallet_database_connection}, diff --git a/base_layer/wallet/tests/transaction_service/storage.rs b/base_layer/wallet/tests/transaction_service_tests/storage.rs similarity index 100% rename from base_layer/wallet/tests/transaction_service/storage.rs rename to base_layer/wallet/tests/transaction_service_tests/storage.rs diff --git a/base_layer/wallet/tests/transaction_service/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs similarity index 97% rename from base_layer/wallet/tests/transaction_service/transaction_protocols.rs rename to base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs index 7364b171c3..67650fe53e 100644 --- a/base_layer/wallet/tests/transaction_service/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs @@ -91,16 +91,7 @@ use crate::support::{ utils::make_input, }; -// Just in case other options become apparent in later testing -#[derive(PartialEq)] -pub enum TxProtocolTestConfig { - WithConnection, - WithoutConnection, -} - -pub async fn setup( - config: TxProtocolTestConfig, -) -> ( +pub async fn setup() -> ( TransactionServiceResources, OutboundServiceMockState, MockRpcServer>, @@ -125,13 +116,11 @@ pub async fn setup( let wallet_connectivity = create_wallet_connectivity_mock(); - if config == TxProtocolTestConfig::WithConnection { - let mut connection = mock_rpc_server - .create_connection(server_node_identity.to_peer(), protocol_name.into()) - .await; + let mut connection = mock_rpc_server + .create_connection(server_node_identity.to_peer(), protocol_name.into()) + .await; - wallet_connectivity.set_base_node_wallet_rpc_client(connect_rpc_client(&mut connection).await); - } + wallet_connectivity.set_base_node_wallet_rpc_client(connect_rpc_client(&mut connection).await); let db_name = format!("{}.sqlite3", random::string(8).as_str()); let temp_dir = tempdir().unwrap(); @@ -245,7 +234,7 @@ async fn tx_broadcast_protocol_submit_success() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; let mut event_stream = resources.event_publisher.subscribe(); wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); @@ -329,7 +318,7 @@ async fn tx_broadcast_protocol_submit_rejection() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; let mut event_stream = resources.event_publisher.subscribe(); add_transaction_to_database(1, 1 * T, true, None, None, resources.db.clone()).await; @@ -400,7 +389,7 @@ async fn tx_broadcast_protocol_restart_protocol_as_query() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; add_transaction_to_database(1, 1 * T, true, None, None, resources.db.clone()).await; @@ -487,7 +476,7 @@ async fn tx_broadcast_protocol_submit_success_followed_by_rejection() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; let mut event_stream = resources.event_publisher.subscribe(); add_transaction_to_database(1, 1 * T, true, None, None, resources.db.clone()).await; @@ -577,7 +566,7 @@ async fn tx_broadcast_protocol_submit_already_mined() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; add_transaction_to_database(1, 1 * T, true, None, None, resources.db.clone()).await; // Set Base Node to respond with AlreadyMined @@ -641,7 +630,7 @@ async fn tx_broadcast_protocol_submit_and_base_node_gets_changed() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; add_transaction_to_database(1, 1 * T, true, None, None, resources.db.clone()).await; @@ -737,7 +726,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -878,7 +867,7 @@ async fn tx_revalidation() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -1002,7 +991,7 @@ async fn tx_validation_protocol_reorg() { _temp_dir, _transaction_event_receiver, wallet_connectivity, - ) = setup(TxProtocolTestConfig::WithConnection).await; + ) = setup().await; // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) diff --git a/base_layer/wallet/tests/utxo_scanner.rs b/base_layer/wallet/tests/utxo_scanner.rs new file mode 100644 index 0000000000..59538a266c --- /dev/null +++ b/base_layer/wallet/tests/utxo_scanner.rs @@ -0,0 +1,731 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use chrono::{Duration as ChronoDuration, Utc}; +use rand::{rngs::OsRng, RngCore}; +use tari_comms::{ + peer_manager::PeerFeatures, + protocol::rpc::{mock::MockRpcServer, NamedProtocolService}, + test_utils::{ + mocks::{create_connectivity_mock, ConnectivityManagerMockState}, + node_identity::build_node_identity, + }, +}; +use tari_core::{ + base_node::rpc::BaseNodeWalletRpcServer, + blocks::BlockHeader, + proto::base_node::{ChainMetadata, TipInfoResponse}, + tari_utilities::{epoch_time::EpochTime, Hashable}, + transactions::{tari_amount::MicroTari, transaction::UnblindedOutput, CryptoFactories}, +}; +use tari_key_manager::cipher_seed::CipherSeed; +use tari_service_framework::reply_channel; +use tari_shutdown::Shutdown; +use tari_test_utils::random; +use tari_wallet::{ + base_node_service::{ + handle::{BaseNodeEvent, BaseNodeServiceHandle}, + mock_base_node_service::MockBaseNodeService, + }, + connectivity_service::{create_wallet_connectivity_mock, WalletConnectivityInterface, WalletConnectivityMock}, + output_manager_service::storage::models::DbUnblindedOutput, + storage::{ + database::WalletDatabase, + sqlite_db::wallet::WalletSqliteDatabase, + sqlite_utilities::run_migration_and_create_sqlite_connection, + }, + utxo_scanner_service::{ + handle::UtxoScannerEvent, + service::{ScannedBlock, UtxoScannerService}, + uxto_scanner_service_builder::UtxoScannerMode, + }, +}; +use tempfile::{tempdir, TempDir}; +use tokio::{ + sync::{broadcast, mpsc}, + task, + time, +}; + +pub mod support; + +use support::{ + comms_rpc::{BaseNodeWalletRpcMockService, BaseNodeWalletRpcMockState, UtxosByBlock}, + output_manager_service_mock::{make_output_manager_service_mock, OutputManagerMockState}, + transaction_service_mock::make_transaction_service_mock, + utils::make_input, +}; + +async fn setup( + mode: UtxoScannerMode, + previous_db: Option>, +) -> ( + UtxoScannerService, + WalletDatabase, + broadcast::Sender, + broadcast::Sender>, + BaseNodeWalletRpcMockState, + MockRpcServer>, + ConnectivityManagerMockState, + WalletConnectivityMock, + OutputManagerMockState, + Shutdown, + TempDir, +) { + let shutdown = Shutdown::new(); + let factories = CryptoFactories::default(); + + // Base Node Service Mock + let (sender, receiver_bns) = reply_channel::unbounded(); + let (event_publisher_bns, _) = broadcast::channel(100); + let base_node_service_handle = BaseNodeServiceHandle::new(sender, event_publisher_bns.clone()); + let mut mock_base_node_service = MockBaseNodeService::new(receiver_bns, shutdown.to_signal()); + mock_base_node_service.set_default_base_node_state(); + task::spawn(mock_base_node_service.run()); + + // BaseNodeRpcService Mock + let service = BaseNodeWalletRpcMockService::new(); + let rpc_service_state = service.get_state(); + let server = BaseNodeWalletRpcServer::new(service); + let protocol_name = server.as_protocol_name(); + let server_node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let mut mock_server = MockRpcServer::new(server, server_node_identity.clone()); + mock_server.serve(); + + let rpc_server_connection = mock_server + .create_connection(server_node_identity.to_peer(), protocol_name.into()) + .await; + + let (comms_connectivity, connectivity_mock) = create_connectivity_mock(); + let comms_connectivity_mock_state = connectivity_mock.get_shared_state(); + comms_connectivity_mock_state + .add_active_connection(rpc_server_connection) + .await; + task::spawn(connectivity_mock.run()); + + let wallet_connectivity_mock = create_wallet_connectivity_mock(); + + let (ts_mock, ts_handle) = make_transaction_service_mock(shutdown.to_signal()); + task::spawn(ts_mock.run()); + + let (oms_mock, oms_handle) = make_output_manager_service_mock(shutdown.to_signal()); + let oms_mock_state = oms_mock.get_state(); + task::spawn(oms_mock.run()); + + let node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let (event_sender, _) = broadcast::channel(200); + + let temp_dir = tempdir().unwrap(); + let wallet_db = match previous_db { + None => { + let path_string = temp_dir.path().to_str().unwrap().to_string(); + let db_name = format!("{}.sqlite3", random::string(8).as_str()); + let db_path = format!("{}/{}", path_string, db_name); + // let db_path = "/tmp/test.sqlite3"; + + let db_connection = run_migration_and_create_sqlite_connection(&db_path, 16).unwrap(); + + WalletDatabase::new( + WalletSqliteDatabase::new(db_connection, None).expect("Should be able to create wallet database"), + ) + }, + Some(db) => db, + }; + + let scanning_service = UtxoScannerService::::builder() + .with_peers(vec![server_node_identity.public_key().clone()]) + .with_retry_limit(1) + .with_mode(mode) + .build_with_resources( + wallet_db.clone(), + comms_connectivity, + wallet_connectivity_mock.get_current_base_node_watcher(), + oms_handle, + ts_handle, + node_identity, + factories, + shutdown.to_signal(), + event_sender.clone(), + base_node_service_handle, + ); + ( + scanning_service, + wallet_db, + event_sender, + event_publisher_bns, + rpc_service_state, + mock_server, + comms_connectivity_mock_state, + wallet_connectivity_mock, + oms_mock_state, + shutdown, + temp_dir, + ) +} + +pub struct TestBlockData { + block_headers: HashMap, + unblinded_outputs: HashMap>, + utxos_by_block: Vec, +} + +/// Generates a set of block headers and unblinded outputs for each header. The `birthday_offset` specifies at which +/// block in the `num_block` the birthday timestamp will have passed i.e. it occured during the previous block period. +/// e.g. with `num_blocks` = 10 and `birthday_offset` = 5 the birthday timestamp will occur between block 4 and 5 +fn generate_block_headers_and_utxos( + start_height: u64, + num_blocks: u64, + birthday_epoch_time: u64, + birthday_offset: u64, + only_coinbase: bool, +) -> TestBlockData { + let factories = CryptoFactories::default(); + let mut block_headers = HashMap::new(); + let mut utxos_by_block = Vec::new(); + let mut unblinded_outputs = HashMap::new(); + for i in start_height..num_blocks + start_height { + let mut block_header = BlockHeader::new(0); + block_header.height = i; + block_header.timestamp = + EpochTime::from((birthday_epoch_time as i64 + (i as i64 - birthday_offset as i64) * 100i64 + 5) as u64); + block_headers.insert(i, block_header.clone()); + // Generate utxos for this block + let mut block_outputs = Vec::new(); + + for _j in 0..=i + 1 { + let (_ti, uo) = make_input( + &mut OsRng, + MicroTari::from(100 + OsRng.next_u64() % 1000), + &factories.commitment, + ); + block_outputs.push(uo); + if only_coinbase { + break; + } + } + + let transaction_outputs = block_outputs + .clone() + .iter() + .map(|uo| uo.as_transaction_output(&factories).unwrap()) + .collect(); + let utxos = UtxosByBlock { + height: i, + header_hash: block_header.hash(), + utxos: transaction_outputs, + }; + utxos_by_block.push(utxos); + unblinded_outputs.insert(i, block_outputs); + } + TestBlockData { + block_headers, + unblinded_outputs, + utxos_by_block, + } +} + +#[tokio::test] +async fn test_utxo_scanner_recovery() { + let _ = env_logger::try_init(); + let factories = CryptoFactories::default(); + let ( + scanning_service, + wallet_db, + scanner_event_sender, + _base_node_service_event_publisher, + rpc_service_state, + _rpc_mock_server, + _comms_connectivity_mock_state, + _wallet_connectivity_mock, + oms_mock_state, + _shutdown, + _temp_dir, + ) = setup(UtxoScannerMode::Recovery, None).await; + + let cipher_seed = CipherSeed::new(); + let birthday_epoch_time = (cipher_seed.birthday() - 2) as u64 * 60 * 60 * 24; + wallet_db.set_master_seed(cipher_seed).await.unwrap(); + + const NUM_BLOCKS: u64 = 11; + const BIRTHDAY_OFFSET: u64 = 5; + + let TestBlockData { + block_headers, + unblinded_outputs, + utxos_by_block, + } = generate_block_headers_and_utxos(0, NUM_BLOCKS, birthday_epoch_time, BIRTHDAY_OFFSET, false); + + rpc_service_state.set_utxos_by_block(utxos_by_block.clone()); + rpc_service_state.set_blocks(block_headers.clone()); + + let chain_metadata = ChainMetadata { + height_of_longest_chain: Some(NUM_BLOCKS - 1), + best_block: Some(block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash()), + accumulated_difficulty: Vec::new(), + pruned_height: 0, + }; + rpc_service_state.set_tip_info_response(TipInfoResponse { + metadata: Some(chain_metadata), + is_synced: true, + }); + + // Adding half the outputs of the blocks to the OMS mock + let mut db_unblinded_outputs = Vec::new(); + let mut total_outputs_to_recover = 0; + let mut total_amount_to_recover = MicroTari::from(0); + for (h, outputs) in unblinded_outputs.iter() { + for output in outputs.iter().skip(outputs.len() / 2) { + let dbo = DbUnblindedOutput::from_unblinded_output(output.clone(), &factories, None).unwrap(); + // Only the outputs in blocks after the birthday should be included in the recovered total + if *h >= NUM_BLOCKS.saturating_sub(BIRTHDAY_OFFSET).saturating_sub(2) { + total_outputs_to_recover += 1; + total_amount_to_recover += dbo.unblinded_output.value; + } + db_unblinded_outputs.push(dbo); + } + } + oms_mock_state.set_recoverable_outputs(db_unblinded_outputs); + + let mut scanner_event_stream = scanner_event_sender.subscribe(); + + tokio::spawn(scanning_service.run()); + + let delay = time::sleep(Duration::from_secs(60)); + tokio::pin!(delay); + loop { + tokio::select! { + _ = &mut delay => { + panic!("Completed event should have arrived by now."); + } + event = scanner_event_stream.recv() => { + if let UtxoScannerEvent::Completed { + final_height, + num_recovered, + value_recovered, + time_taken: _,} = event.unwrap() { + assert_eq!(final_height, NUM_BLOCKS-1); + assert_eq!(num_recovered, total_outputs_to_recover); + assert_eq!(value_recovered, total_amount_to_recover); + break; + } + } + } + } +} + +#[tokio::test] +async fn test_utxo_scanner_recovery_with_restart() { + let factories = CryptoFactories::default(); + let ( + scanning_service, + wallet_db, + _scanner_event_sender, + _base_node_service_event_publisher, + rpc_service_state, + _rpc_mock_server, + _comms_connectivity_mock_state, + _wallet_connectivity_mock, + oms_mock_state, + mut shutdown, + _temp_dir, + ) = setup(UtxoScannerMode::Recovery, None).await; + + let cipher_seed = CipherSeed::new(); + let birthday_epoch_time = (cipher_seed.birthday() - 2) as u64 * 60 * 60 * 24; + wallet_db.set_master_seed(cipher_seed).await.unwrap(); + + const NUM_BLOCKS: u64 = 11; + const BIRTHDAY_OFFSET: u64 = 5; + const SYNC_INTERRUPT: u64 = 6; + + let TestBlockData { + block_headers, + unblinded_outputs, + utxos_by_block, + } = generate_block_headers_and_utxos(0, NUM_BLOCKS, birthday_epoch_time, BIRTHDAY_OFFSET, false); + + rpc_service_state.set_utxos_by_block(utxos_by_block.clone()); + rpc_service_state.set_blocks(block_headers.clone()); + + let chain_metadata = ChainMetadata { + height_of_longest_chain: Some(NUM_BLOCKS - 1), + best_block: Some(block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash()), + accumulated_difficulty: Vec::new(), + pruned_height: 0, + }; + rpc_service_state.set_tip_info_response(TipInfoResponse { + metadata: Some(chain_metadata.clone()), + is_synced: true, + }); + + // Adding half the outputs of the blocks to the OMS mock + let mut db_unblinded_outputs = Vec::new(); + let mut total_outputs_to_recover = 0; + let mut total_amount_to_recover = MicroTari::from(0); + for (h, outputs) in unblinded_outputs.iter() { + for output in outputs.iter().skip(outputs.len() / 2) { + let dbo = DbUnblindedOutput::from_unblinded_output(output.clone(), &factories, None).unwrap(); + // Only the outputs in blocks after the birthday should be included in the recovered total + if *h >= NUM_BLOCKS.saturating_sub(BIRTHDAY_OFFSET).saturating_sub(2) { + total_outputs_to_recover += 1; + total_amount_to_recover += dbo.unblinded_output.value; + } + db_unblinded_outputs.push(dbo); + } + } + oms_mock_state.set_recoverable_outputs(db_unblinded_outputs.clone()); + + let (tx, rx) = mpsc::channel(100); + rpc_service_state.set_utxos_by_block_trigger_channel(rx); + + tokio::spawn(scanning_service.run()); + + tx.send(SYNC_INTERRUPT as usize).await.unwrap(); + + let _ = rpc_service_state + .wait_pop_sync_utxos_by_block_calls(1, Duration::from_secs(30)) + .await + .unwrap(); + + shutdown.trigger(); + + let ( + scanning_service, + _wallet_db, + scanner_event_sender, + _base_node_service_event_publisher, + rpc_service_state, + _rpc_mock_server, + _comms_connectivity_mock_state, + _wallet_connectivity_mock, + oms_mock_state, + _shutdown, + _temp_dir2, + ) = setup(UtxoScannerMode::Recovery, Some(wallet_db)).await; + rpc_service_state.set_utxos_by_block(utxos_by_block.clone()); + rpc_service_state.set_blocks(block_headers.clone()); + rpc_service_state.set_tip_info_response(TipInfoResponse { + metadata: Some(chain_metadata), + is_synced: true, + }); + oms_mock_state.set_recoverable_outputs(db_unblinded_outputs); + let mut scanner_event_stream = scanner_event_sender.subscribe(); + tokio::spawn(scanning_service.run()); + + let delay = time::sleep(Duration::from_secs(60)); + tokio::pin!(delay); + loop { + tokio::select! { + _ = &mut delay => { + panic!("Completed event should have arrived by now."); + } + event = scanner_event_stream.recv() => { + if let UtxoScannerEvent::Completed { + final_height, + num_recovered, + value_recovered, + time_taken: _,} = event.unwrap() { + assert_eq!(final_height, NUM_BLOCKS-1); + assert_eq!(num_recovered, total_outputs_to_recover); + assert_eq!(value_recovered, total_amount_to_recover); + break; + } + } + } + } +} + +#[tokio::test] +async fn test_utxo_scanner_recovery_with_restart_and_reorg() { + let factories = CryptoFactories::default(); + let ( + scanning_service, + wallet_db, + _scanner_event_sender, + _base_node_service_event_publisher, + rpc_service_state, + _rpc_mock_server, + _comms_connectivity_mock_state, + _wallet_connectivity_mock, + oms_mock_state, + mut shutdown, + _temp_dir, + ) = setup(UtxoScannerMode::Recovery, None).await; + + let cipher_seed = CipherSeed::new(); + let birthday_epoch_time = (cipher_seed.birthday() - 2) as u64 * 60 * 60 * 24; + wallet_db.set_master_seed(cipher_seed).await.unwrap(); + + const NUM_BLOCKS: u64 = 11; + const BIRTHDAY_OFFSET: u64 = 5; + const SYNC_INTERRUPT: u64 = 6; + + let TestBlockData { + mut block_headers, + mut unblinded_outputs, + utxos_by_block, + } = generate_block_headers_and_utxos(0, NUM_BLOCKS, birthday_epoch_time, BIRTHDAY_OFFSET, false); + + rpc_service_state.set_utxos_by_block(utxos_by_block.clone()); + rpc_service_state.set_blocks(block_headers.clone()); + + let chain_metadata = ChainMetadata { + height_of_longest_chain: Some(NUM_BLOCKS - 1), + best_block: Some(block_headers.get(&(NUM_BLOCKS - 1)).unwrap().clone().hash()), + accumulated_difficulty: Vec::new(), + pruned_height: 0, + }; + rpc_service_state.set_tip_info_response(TipInfoResponse { + metadata: Some(chain_metadata.clone()), + is_synced: true, + }); + + // Adding half the outputs of the blocks to the OMS mock + let mut db_unblinded_outputs = Vec::new(); + for (_h, outputs) in unblinded_outputs.iter() { + for output in outputs.iter().skip(outputs.len() / 2) { + let dbo = DbUnblindedOutput::from_unblinded_output(output.clone(), &factories, None).unwrap(); + db_unblinded_outputs.push(dbo); + } + } + oms_mock_state.set_recoverable_outputs(db_unblinded_outputs.clone()); + + let (tx, rx) = mpsc::channel(100); + rpc_service_state.set_utxos_by_block_trigger_channel(rx); + + tokio::spawn(scanning_service.run()); + + tx.send(SYNC_INTERRUPT as usize).await.unwrap(); + + let _ = rpc_service_state + .wait_pop_sync_utxos_by_block_calls(1, Duration::from_secs(30)) + .await + .unwrap(); + + shutdown.trigger(); + + // So at this point we have synced to block 6. We are going to create a reorg back to block 4 so that blocks 5-10 + // are new blocks. + block_headers.retain(|h, _| h <= &4u64); + unblinded_outputs.retain(|h, _| h <= &4u64); + let mut utxos_by_block = utxos_by_block + .into_iter() + .filter(|u| u.height <= 4) + .collect::>(); + + let TestBlockData { + block_headers: new_block_headers, + unblinded_outputs: new_unblinded_outputs, + utxos_by_block: mut new_utxos_by_block, + } = generate_block_headers_and_utxos(5, 5, birthday_epoch_time + 500, 0, false); + + block_headers.extend(new_block_headers); + utxos_by_block.append(&mut new_utxos_by_block); + unblinded_outputs.extend(new_unblinded_outputs); + + let ( + scanning_service, + _wallet_db, + scanner_event_sender, + _base_node_service_event_publisher, + rpc_service_state, + _rpc_mock_server, + _comms_connectivity_mock_state, + _wallet_connectivity_mock, + oms_mock_state, + _shutdown, + _temp_dir2, + ) = setup(UtxoScannerMode::Recovery, Some(wallet_db)).await; + rpc_service_state.set_utxos_by_block(utxos_by_block.clone()); + rpc_service_state.set_blocks(block_headers.clone()); + let chain_metadata = ChainMetadata { + height_of_longest_chain: Some(9), + best_block: Some(block_headers.get(&9).unwrap().clone().hash()), + accumulated_difficulty: Vec::new(), + pruned_height: 0, + }; + rpc_service_state.set_tip_info_response(TipInfoResponse { + metadata: Some(chain_metadata), + is_synced: true, + }); + + // calculate new recoverable outputs for the reorg + // Adding half the outputs of the blocks to the OMS mock + let mut db_unblinded_outputs = Vec::new(); + let mut total_outputs_to_recover = 0; + let mut total_amount_to_recover = MicroTari::from(0); + for (h, outputs) in unblinded_outputs.iter() { + for output in outputs.iter().skip(outputs.len() / 2) { + let dbo = DbUnblindedOutput::from_unblinded_output(output.clone(), &factories, None).unwrap(); + // Only the outputs in blocks after the birthday should be included in the recovered total + if *h >= 4 { + total_outputs_to_recover += 1; + total_amount_to_recover += dbo.unblinded_output.value; + } + db_unblinded_outputs.push(dbo); + } + } + + oms_mock_state.set_recoverable_outputs(db_unblinded_outputs); + + let mut scanner_event_stream = scanner_event_sender.subscribe(); + tokio::spawn(scanning_service.run()); + + let delay = time::sleep(Duration::from_secs(60)); + tokio::pin!(delay); + loop { + tokio::select! { + _ = &mut delay => { + panic!("Completed event should have arrived by now."); + } + event = scanner_event_stream.recv() => { + if let UtxoScannerEvent::Completed { + final_height, + num_recovered, + value_recovered, + time_taken: _,} = event.unwrap() { + assert_eq!(final_height, 9); + assert_eq!(num_recovered, total_outputs_to_recover); + assert_eq!(value_recovered, total_amount_to_recover); + break; + } + } + } + } +} + +#[tokio::test] +async fn test_utxo_scanner_scanned_block_cache_clearing() { + let ( + scanning_service, + wallet_db, + scanner_event_sender, + _base_node_service_event_publisher, + rpc_service_state, + _rpc_mock_server, + _comms_connectivity_mock_state, + _wallet_connectivity_mock, + _oms_mock_state, + _shutdown, + _temp_dir, + ) = setup(UtxoScannerMode::Recovery, None).await; + + for h in 0u64..800u64 { + let num_outputs = if h % 2 == 1 { Some(1) } else { None }; + let mut header_hash = h.to_le_bytes().to_vec(); + header_hash.extend([0u8; 24].to_vec()); + wallet_db + .save_scanned_block(ScannedBlock { + header_hash, + height: h, + num_outputs, + amount: None, + timestamp: Utc::now() + .naive_utc() + .checked_sub_signed(ChronoDuration::days(1000)) + .unwrap(), + }) + .await + .unwrap(); + } + + let cipher_seed = CipherSeed::new(); + let birthday_epoch_time = (cipher_seed.birthday() - 2) as u64 * 60 * 60 * 24; + wallet_db.set_master_seed(cipher_seed).await.unwrap(); + + const NUM_BLOCKS: u64 = 11; + const BIRTHDAY_OFFSET: u64 = 5; + + let TestBlockData { + block_headers, + unblinded_outputs: _unblinded_outputs, + utxos_by_block, + } = generate_block_headers_and_utxos(800, NUM_BLOCKS, birthday_epoch_time, BIRTHDAY_OFFSET, true); + + rpc_service_state.set_utxos_by_block(utxos_by_block.clone()); + rpc_service_state.set_blocks(block_headers.clone()); + + let chain_metadata = ChainMetadata { + height_of_longest_chain: Some(800 + NUM_BLOCKS - 1), + best_block: Some(block_headers.get(&(800 + NUM_BLOCKS - 1)).unwrap().clone().hash()), + accumulated_difficulty: Vec::new(), + pruned_height: 0, + }; + rpc_service_state.set_tip_info_response(TipInfoResponse { + metadata: Some(chain_metadata), + is_synced: true, + }); + + let first_block_header = block_headers.get(&(800)).unwrap().clone(); + wallet_db + .save_scanned_block(ScannedBlock { + header_hash: first_block_header.hash(), + height: first_block_header.height, + num_outputs: Some(0), + amount: None, + timestamp: Utc::now().naive_utc(), + }) + .await + .unwrap(); + + let mut scanner_event_stream = scanner_event_sender.subscribe(); + + tokio::spawn(scanning_service.run()); + + let delay = time::sleep(Duration::from_secs(60)); + tokio::pin!(delay); + loop { + tokio::select! { + _ = &mut delay => { + panic!("Completed event should have arrived by now."); + } + event = scanner_event_stream.recv() => { + if let UtxoScannerEvent::Completed { + final_height:_, + num_recovered:_, + value_recovered:_, + time_taken: _,} = event.unwrap(){ + break; + } + } + } + } + let scanned_blocks = wallet_db.get_scanned_blocks().await.unwrap(); + + use tari_wallet::utxo_scanner_service::service::SCANNED_BLOCK_CACHE_SIZE; + let threshold = 800 + NUM_BLOCKS - 1 - SCANNED_BLOCK_CACHE_SIZE; + + // Below the threshold the even indices had no outputs and should be cleared + for i in 0..threshold { + if i % 2 == 0 { + assert!(!scanned_blocks.iter().any(|sb| sb.height == i)); + } + } + // Check that above the threshold the even indices are still there + for i in threshold..800 { + if i % 2 == 0 { + assert!(scanned_blocks.iter().any(|sb| sb.height == i)); + } + } +} diff --git a/base_layer/wallet/tests/wallet/mod.rs b/base_layer/wallet/tests/wallet.rs similarity index 93% rename from base_layer/wallet/tests/wallet/mod.rs rename to base_layer/wallet/tests/wallet.rs index 3670a9e976..ce7dc5dab2 100644 --- a/base_layer/wallet/tests/wallet/mod.rs +++ b/base_layer/wallet/tests/wallet.rs @@ -1,3 +1,25 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + // Copyright 2019 The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the @@ -54,7 +76,7 @@ use tari_wallet::{ output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, storage::{ database::{DbKeyValuePair, WalletBackend, WalletDatabase, WriteOperation}, - sqlite_db::WalletSqliteDatabase, + sqlite_db::wallet::WalletSqliteDatabase, sqlite_utilities::{ initialize_sqlite_database_backends, partial_wallet_backup, @@ -74,7 +96,8 @@ use tari_wallet::{ use tempfile::tempdir; use tokio::{runtime::Runtime, time::sleep}; -use crate::support::{comms_and_services::get_next_memory_address, utils::make_input}; +pub mod support; +use support::{comms_and_services::get_next_memory_address, utils::make_input}; fn create_peer(public_key: CommsPublicKey, net_address: Multiaddr) -> Peer { Peer::new( diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 392525f88f..8569a9bd8e 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -140,7 +140,7 @@ use tari_wallet::{ error::{WalletError, WalletStorageError}, storage::{ database::WalletDatabase, - sqlite_db::WalletSqliteDatabase, + sqlite_db::wallet::WalletSqliteDatabase, sqlite_utilities::{initialize_sqlite_database_backends, partial_wallet_backup}, }, transaction_service::{ @@ -5442,7 +5442,7 @@ pub unsafe extern "C" fn wallet_is_recovery_in_progress(wallet: *mut TariWallet, /// - ConnectedToBaseNode, 0, 1 /// - ConnectionToBaseNodeFailed, number of retries, retry limit /// - Progress, current block, total number of blocks -/// - Completed, total number of UTXO's scanned, MicroTari recovered, +/// - Completed, total number of UTXO's recovered, MicroTari recovered, /// - ScanningRoundFailed, number of retries, retry limit /// - RecoveryFailed, 0, 0 /// diff --git a/base_layer/wallet_ffi/src/tasks.rs b/base_layer/wallet_ffi/src/tasks.rs index f538e92721..096e04b17b 100644 --- a/base_layer/wallet_ffi/src/tasks.rs +++ b/base_layer/wallet_ffi/src/tasks.rs @@ -87,8 +87,8 @@ pub async fn recovery_event_monitoring( ); }, Ok(UtxoScannerEvent::Progress { - current_index: current, - total_index: total, + current_height: current, + tip_height: total, }) => { unsafe { (recovery_progress_callback)(RecoveryEvent::Progress as u8, current, total); @@ -96,22 +96,27 @@ pub async fn recovery_event_monitoring( info!(target: LOG_TARGET, "Recovery progress: {}/{}", current, total); }, Ok(UtxoScannerEvent::Completed { - number_scanned: num_scanned, - number_received: num_utxos, - value_received: total_amount, + final_height, + num_recovered, + value_recovered, time_taken: elapsed, }) => { + let rate = (final_height as f32) * 1000f32 / (elapsed.as_millis() as f32); info!( target: LOG_TARGET, - "Recovery complete! Scanned = {} in {:.2?} ({} utxos/s), Recovered {} worth {}", - num_scanned, + "Recovery complete! Scanned {} blocks in {:.2?} ({:.2?} blocks/s), Recovered {} outputs worth {}", + final_height, elapsed, - num_scanned / (1 + elapsed.as_secs()), - num_utxos, - total_amount + rate, + num_recovered, + value_recovered ); unsafe { - (recovery_progress_callback)(RecoveryEvent::Completed as u8, num_scanned, u64::from(total_amount)); + (recovery_progress_callback)( + RecoveryEvent::Completed as u8, + num_recovered, + u64::from(value_recovered), + ); } break; }, diff --git a/base_layer/wallet_ffi/wallet.h b/base_layer/wallet_ffi/wallet.h index c6ed604541..b13281dc9b 100644 --- a/base_layer/wallet_ffi/wallet.h +++ b/base_layer/wallet_ffi/wallet.h @@ -688,7 +688,7 @@ bool wallet_is_recovery_in_progress(struct TariWallet *wallet, int *error_out); /// - ConnectedToBaseNode, 0, 1 /// - ConnectionToBaseNodeFailed, number of retries, retry limit /// - Progress, current block, total number of blocks -/// - Completed, total number of UTXO's scanned, MicroTari recovered, +/// - Completed, total number of UTXO's recovered, MicroTari recovered, /// - ScanningRoundFailed, number of retries, retry limit /// - RecoveryFailed, 0, 0 /// diff --git a/integration_tests/features/WalletRecovery.feature b/integration_tests/features/WalletRecovery.feature index 5b4484e708..14a7ac9e87 100644 --- a/integration_tests/features/WalletRecovery.feature +++ b/integration_tests/features/WalletRecovery.feature @@ -40,8 +40,8 @@ Feature: Wallet Recovery | 5 | | 10 | - # fails often on circle CI - @critical @flaky + + @critical Scenario: Recover one-sided payments Given I have a seed node NODE And I have 1 base nodes connected to all seed nodes diff --git a/integration_tests/features/WalletTransactions.feature b/integration_tests/features/WalletTransactions.feature index a279582e1b..96d7378903 100644 --- a/integration_tests/features/WalletTransactions.feature +++ b/integration_tests/features/WalletTransactions.feature @@ -1,7 +1,7 @@ @wallet-transact @wallet Feature: Wallet Transactions - @critical @flaky + @critical Scenario: Wallet sending and receiving one-sided transactions Given I have a seed node NODE And I have 1 base nodes connected to all seed nodes diff --git a/integration_tests/package-lock.json b/integration_tests/package-lock.json index f06b53441d..0ff556688c 100644 --- a/integration_tests/package-lock.json +++ b/integration_tests/package-lock.json @@ -195,7 +195,7 @@ }, "@babel/helper-validator-option": { "version": "7.14.5", - "resolved": false, + "resolved": "https://registry.npmjs.org/@babel/helper-validator-option/-/helper-validator-option-7.14.5.tgz", "integrity": "sha512-OX8D5eeX4XwcroVW45NMvoYaIuFI+GQpA2a8Gi+X/U/cDUIRsV37qQfF905F0htTRCREQIB4KqPeaveRJUl3Ow==", "dev": true }, @@ -1945,7 +1945,7 @@ }, "globals": { "version": "11.12.0", - "resolved": false, + "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", "dev": true }, @@ -2251,7 +2251,7 @@ }, "jsesc": { "version": "2.5.2", - "resolved": false, + "resolved": "https://registry.npmjs.org/jsesc/-/jsesc-2.5.2.tgz", "integrity": "sha512-OYu7XEzjkCQ3C5Ps3QIZsQfNpqoJyZZA99wd9aWd05NCtC5pWOkShK2mkL6HXQR6/Cy2lbNdPlZBpuQHXE63gA==", "dev": true }, @@ -3028,7 +3028,7 @@ }, "source-map": { "version": "0.5.7", - "resolved": false, + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=", "dev": true }, @@ -3280,7 +3280,7 @@ }, "to-fast-properties": { "version": "2.0.0", - "resolved": false, + "resolved": "https://registry.npmjs.org/to-fast-properties/-/to-fast-properties-2.0.0.tgz", "integrity": "sha1-3F5pjL0HkmW8c+A3doGk5Og/YW4=", "dev": true }, @@ -3430,68 +3430,104 @@ "dependencies": { "@grpc/grpc-js": { "version": "1.3.6", + "resolved": false, + "integrity": "sha512-v7+LQFbqZKmd/Tvf5/j1Xlbq6jXL/4d+gUtm2TNX4QiEC3ELWADmGr2dGlUyLl6aKTuYfsN72vAsO5zmavYkEg==", "requires": { "@types/node": ">=12.12.47" } }, "@grpc/proto-loader": { "version": "0.5.6", + "resolved": false, + "integrity": "sha512-DT14xgw3PSzPxwS13auTEwxhMMOoz33DPUKNtmYK/QYbBSpLXJy78FGGs5yVoxVobEqPm4iW9MOIoz0A3bLTRQ==", "requires": { "lodash.camelcase": "^4.3.0", "protobufjs": "^6.8.6" } }, "@protobufjs/aspromise": { - "version": "1.1.2" + "version": "1.1.2", + "resolved": false, + "integrity": "sha1-m4sMxmPWaafY9vXQiToU00jzD78=" }, "@protobufjs/base64": { - "version": "1.1.2" + "version": "1.1.2", + "resolved": false, + "integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==" }, "@protobufjs/codegen": { - "version": "2.0.4" + "version": "2.0.4", + "resolved": false, + "integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==" }, "@protobufjs/eventemitter": { - "version": "1.1.0" + "version": "1.1.0", + "resolved": false, + "integrity": "sha1-NVy8mLr61ZePntCV85diHx0Ga3A=" }, "@protobufjs/fetch": { "version": "1.1.0", + "resolved": false, + "integrity": "sha1-upn7WYYUr2VwDBYZ/wbUVLDYTEU=", "requires": { "@protobufjs/aspromise": "^1.1.1", "@protobufjs/inquire": "^1.1.0" } }, "@protobufjs/float": { - "version": "1.0.2" + "version": "1.0.2", + "resolved": false, + "integrity": "sha1-Xp4avctz/Ap8uLKR33jIy9l7h9E=" }, "@protobufjs/inquire": { - "version": "1.1.0" + "version": "1.1.0", + "resolved": false, + "integrity": "sha1-/yAOPnzyQp4tyvwRQIKOjMY48Ik=" }, "@protobufjs/path": { - "version": "1.1.2" + "version": "1.1.2", + "resolved": false, + "integrity": "sha1-bMKyDFya1q0NzP0hynZz2Nf79o0=" }, "@protobufjs/pool": { - "version": "1.1.0" + "version": "1.1.0", + "resolved": false, + "integrity": "sha1-Cf0V8tbTq/qbZbw2ZQbWrXhG/1Q=" }, "@protobufjs/utf8": { - "version": "1.1.0" + "version": "1.1.0", + "resolved": false, + "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" }, "@types/long": { - "version": "4.0.1" + "version": "4.0.1", + "resolved": false, + "integrity": "sha512-5tXH6Bx/kNGd3MgffdmP4dy2Z+G4eaXw0SE81Tq3BNadtnMR5/ySMzX4SLEzHJzSmPNn4HIdpQsBvXMUykr58w==" }, "@types/node": { - "version": "16.3.2" + "version": "16.3.2", + "resolved": false, + "integrity": "sha512-jJs9ErFLP403I+hMLGnqDRWT0RYKSvArxuBVh2veudHV7ifEC1WAmjJADacZ7mRbA2nWgHtn8xyECMAot0SkAw==" }, "grpc-promise": { - "version": "1.4.0" + "version": "1.4.0", + "resolved": false, + "integrity": "sha512-4BBXHXb5OjjBh7luylu8vFqL6H6aPn/LeqpQaSBeRzO/Xv95wHW/WkU9TJRqaCTMZ5wq9jTSvlJWp0vRJy1pVA==" }, "lodash.camelcase": { - "version": "4.3.0" + "version": "4.3.0", + "resolved": false, + "integrity": "sha1-soqmKIorn8ZRA1x3EfZathkDMaY=" }, "long": { - "version": "4.0.0" + "version": "4.0.0", + "resolved": false, + "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" }, "protobufjs": { "version": "6.11.2", + "resolved": false, + "integrity": "sha512-4BQJoPooKJl2G9j3XftkIXjoC9C0Av2NOrWmbLWT1vH32GcSUHjM0Arra6UfTsVyfMAuFzaLucXn1sadxJydAw==", "requires": { "@protobufjs/aspromise": "^1.1.2", "@protobufjs/base64": "^1.1.2",