Skip to content

Commit

Permalink
feat: improve wallet recovery and scanning handling of reorgs (#3655)
Browse files Browse the repository at this point in the history
Description
---

@mikethetike Merging this into the `weatherwax` branch because I would like to see it tested on the Mobile apps sooner rather than later and not sure what the plan is for migrating them to the new `Development`

This PR updates the wallet UtxoScanner to better detect and handle reorgs. Previously only the last scanned block was check to see if it had been re-orged out and if it had then the whole chain was scanned from scratch. Reorgs of a few blocks are common so to reduce the amount of work done after a reorg this PR adds maintaining history of previously scanned block headers so that the depth of the reorg can be quickly determined and the rescan only be done for affected blocks. 

This PR adds a number of updates:
- Add the `sync_utxos_by_block` method to the `BaseNodeWalletService` RPC service to facilitate a more appropriate UTXO sync strategy for the wallet
- Add the `ScannedBlocks` table to the wallet DB to keep a history of scanned block headers and the number and amount of outputs recovered from each block.
- Update the UTXO scanner to use this stored history to detect reorgs and only sync the required UTXOs
- Built out a test harness to test the scanner thoroughly in Rust integration tests.


How Has This Been Tested?
---
Added new Rust integration tests
Added back excluded Cucumber tests
  • Loading branch information
philipr-za committed Dec 21, 2021
1 parent d006b67 commit fe9033b
Show file tree
Hide file tree
Showing 49 changed files with 2,407 additions and 480 deletions.
34 changes: 17 additions & 17 deletions applications/tari_console_wallet/src/recovery.rs
Expand Up @@ -29,7 +29,7 @@ use tari_crypto::tari_utilities::hex::Hex;
use tari_key_manager::{cipher_seed::CipherSeed, mnemonic::Mnemonic};
use tari_shutdown::Shutdown;
use tari_wallet::{
storage::sqlite_db::WalletSqliteDatabase,
storage::sqlite_db::wallet::WalletSqliteDatabase,
utxo_scanner_service::{handle::UtxoScannerEvent, service::UtxoScannerService},
WalletSqlite,
};
Expand Down Expand Up @@ -126,24 +126,24 @@ pub async fn wallet_recovery(
println!("OK (latency = {:.2?})", latency);
},
Ok(UtxoScannerEvent::Progress {
current_index: current,
total_index: total,
current_height,
tip_height,
}) => {
let percentage_progress = ((current as f32) * 100f32 / (total as f32)).round() as u32;
let percentage_progress = ((current_height as f32) * 100f32 / (tip_height as f32)).round() as u32;
debug!(
target: LOG_TARGET,
"{}: Recovery process {}% complete ({} of {} utxos).",
"{}: Recovery process {}% complete (Block {} of {}).",
Local::now(),
percentage_progress,
current,
total
current_height,
tip_height
);
println!(
"{}: Recovery process {}% complete ({} of {} utxos).",
"{}: Recovery process {}% complete (Block {} of {}).",
Local::now(),
percentage_progress,
current,
total
current_height,
tip_height
);
},
Ok(UtxoScannerEvent::ScanningRoundFailed {
Expand Down Expand Up @@ -172,15 +172,15 @@ pub async fn wallet_recovery(
warn!(target: LOG_TARGET, "{}", s);
},
Ok(UtxoScannerEvent::Completed {
number_scanned: num_scanned,
number_received: num_utxos,
value_received: total_amount,
time_taken: elapsed,
final_height,
num_recovered,
value_recovered,
time_taken,
}) => {
let rate = (num_scanned as f32) * 1000f32 / (elapsed.as_millis() as f32);
let rate = (final_height as f32) * 1000f32 / (time_taken.as_millis() as f32);
let stats = format!(
"Recovery complete! Scanned = {} in {:.2?} ({:.2?} utxos/s), Recovered {} worth {}",
num_scanned, elapsed, rate, num_utxos, total_amount
"Recovery complete! Scanned {} blocks in {:.2?} ({:.2?} blocks/s), Recovered {} outputs worth {}",
final_height, time_taken, rate, num_recovered, value_recovered
);
info!(target: LOG_TARGET, "{}", stats);
println!("{}", stats);
Expand Down
11 changes: 11 additions & 0 deletions base_layer/core/src/base_node/proto/rpc.proto
Expand Up @@ -76,3 +76,14 @@ message PrunedOutput {
bytes hash = 1;
bytes witness_hash = 2;
}

message SyncUtxosByBlockRequest {
bytes start_header_hash = 1;
bytes end_header_hash = 2;
}

message SyncUtxosByBlockResponse {
repeated tari.types.TransactionOutput outputs = 1;
uint64 height = 2;
bytes header_hash = 3;
}
16 changes: 15 additions & 1 deletion base_layer/core/src/base_node/rpc/mod.rs
Expand Up @@ -22,9 +22,12 @@

#[cfg(feature = "base_node")]
mod service;
#[cfg(feature = "base_node")]
pub mod sync_utxos_by_block_task;

#[cfg(feature = "base_node")]
pub use service::BaseNodeWalletRpcService;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_comms_rpc_macros::tari_rpc;

#[cfg(feature = "base_node")]
Expand All @@ -43,6 +46,8 @@ use crate::{
QueryDeletedRequest,
QueryDeletedResponse,
Signatures,
SyncUtxosByBlockRequest,
SyncUtxosByBlockResponse,
TipInfoResponse,
TxQueryBatchResponses,
TxQueryResponse,
Expand Down Expand Up @@ -97,6 +102,15 @@ pub trait BaseNodeWalletService: Send + Sync + 'static {
&self,
request: Request<u64>,
) -> Result<Response<proto::core::BlockHeader>, RpcStatus>;

#[rpc(method = 10)]
async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus>;

#[rpc(method = 11)]
async fn sync_utxos_by_block(
&self,
request: Request<SyncUtxosByBlockRequest>,
) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus>;
}

#[cfg(feature = "base_node")]
Expand Down
93 changes: 90 additions & 3 deletions base_layer/core/src/base_node/rpc/service.rs
@@ -1,7 +1,10 @@
use std::convert::TryFrom;

use log::*;
use tari_common_types::types::Signature;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_crypto::tari_utilities::hex::Hex;
use tokio::sync::mpsc;

// Copyright 2020, The Tari Project
//
Expand All @@ -26,7 +29,11 @@ use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
// DAMAGE.
use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
base_node::{
rpc::{sync_utxos_by_block_task::SyncUtxosByBlockTask, BaseNodeWalletService},
state_machine_service::states::StateInfo,
StateMachineHandle,
},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput, UtxoMinedInfo},
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
Expand All @@ -37,6 +44,8 @@ use crate::{
QueryDeletedRequest,
QueryDeletedResponse,
Signatures as SignaturesProto,
SyncUtxosByBlockRequest,
SyncUtxosByBlockResponse,
TipInfoResponse,
TxLocation,
TxQueryBatchResponse,
Expand All @@ -52,7 +61,6 @@ use crate::{
},
transactions::transaction::Transaction,
};

const LOG_TARGET: &str = "c::base_node::rpc";

pub struct BaseNodeWalletRpcService<B> {
Expand Down Expand Up @@ -499,4 +507,83 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc

Ok(Response::new(header.into()))
}

async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus> {
let requested_epoch_time: u64 = request.into_message();

let tip_header = self
.db()
.fetch_tip_header()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let mut left_height = 0u64;
let mut right_height = tip_header.height();

while left_height <= right_height {
let mut mid_height = (left_height + right_height) / 2;

if mid_height == 0 {
return Ok(Response::new(0u64));
}
// If the two bounds are adjacent then perform the test between the right and left sides
if left_height == mid_height {
mid_height = right_height;
}

let mid_header = self
.db()
.fetch_header(mid_height)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| {
RpcStatus::not_found(format!("Header not found during search at height {}", mid_height))
})?;
let before_mid_header = self
.db()
.fetch_header(mid_height - 1)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| {
RpcStatus::not_found(format!("Header not found during search at height {}", mid_height - 1))
})?;

if requested_epoch_time < mid_header.timestamp.as_u64() &&
requested_epoch_time >= before_mid_header.timestamp.as_u64()
{
return Ok(Response::new(before_mid_header.height));
} else if mid_height == right_height {
return Ok(Response::new(right_height));
} else if requested_epoch_time <= mid_header.timestamp.as_u64() {
right_height = mid_height;
} else {
left_height = mid_height;
}
}

Ok(Response::new(0u64))
}

async fn sync_utxos_by_block(
&self,
request: Request<SyncUtxosByBlockRequest>,
) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus> {
let req = request.message();
let peer = request.context().peer_node_id();
debug!(
target: LOG_TARGET,
"Received sync_utxos_by_block request from {} from header {} to {} ",
peer,
req.start_header_hash.to_hex(),
req.end_header_hash.to_hex(),
);

// Number of blocks to load and push to the stream before loading the next batch. Most blocks have 1 output but
// full blocks will have 500
const BATCH_SIZE: usize = 5;
let (tx, rx) = mpsc::channel(BATCH_SIZE);
let task = SyncUtxosByBlockTask::new(self.db());
task.run(request.into_message(), tx).await?;

Ok(Streaming::new(rx))
}
}

0 comments on commit fe9033b

Please sign in to comment.