Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve wallet recovery and scanning handling of reorgs #3655

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions applications/tari_console_wallet/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tari_crypto::tari_utilities::hex::Hex;
use tari_key_manager::{cipher_seed::CipherSeed, mnemonic::Mnemonic};
use tari_shutdown::Shutdown;
use tari_wallet::{
storage::sqlite_db::WalletSqliteDatabase,
storage::sqlite_db::wallet::WalletSqliteDatabase,
utxo_scanner_service::{handle::UtxoScannerEvent, service::UtxoScannerService},
WalletSqlite,
};
Expand Down Expand Up @@ -126,24 +126,24 @@ pub async fn wallet_recovery(
println!("OK (latency = {:.2?})", latency);
},
Ok(UtxoScannerEvent::Progress {
current_index: current,
total_index: total,
current_height,
tip_height,
}) => {
let percentage_progress = ((current as f32) * 100f32 / (total as f32)).round() as u32;
let percentage_progress = ((current_height as f32) * 100f32 / (tip_height as f32)).round() as u32;
debug!(
target: LOG_TARGET,
"{}: Recovery process {}% complete ({} of {} utxos).",
"{}: Recovery process {}% complete (Block {} of {}).",
Local::now(),
percentage_progress,
current,
total
current_height,
tip_height
);
println!(
"{}: Recovery process {}% complete ({} of {} utxos).",
"{}: Recovery process {}% complete (Block {} of {}).",
Local::now(),
percentage_progress,
current,
total
current_height,
tip_height
);
},
Ok(UtxoScannerEvent::ScanningRoundFailed {
Expand Down Expand Up @@ -172,15 +172,15 @@ pub async fn wallet_recovery(
warn!(target: LOG_TARGET, "{}", s);
},
Ok(UtxoScannerEvent::Completed {
number_scanned: num_scanned,
number_received: num_utxos,
value_received: total_amount,
time_taken: elapsed,
final_height,
num_recovered,
value_recovered,
time_taken,
}) => {
let rate = (num_scanned as f32) * 1000f32 / (elapsed.as_millis() as f32);
let rate = (final_height as f32) * 1000f32 / (time_taken.as_millis() as f32);
let stats = format!(
"Recovery complete! Scanned = {} in {:.2?} ({:.2?} utxos/s), Recovered {} worth {}",
num_scanned, elapsed, rate, num_utxos, total_amount
"Recovery complete! Scanned {} blocks in {:.2?} ({:.2?} blocks/s), Recovered {} outputs worth {}",
final_height, time_taken, rate, num_recovered, value_recovered
);
info!(target: LOG_TARGET, "{}", stats);
println!("{}", stats);
Expand Down
11 changes: 11 additions & 0 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,14 @@ message PrunedOutput {
bytes hash = 1;
bytes witness_hash = 2;
}

message SyncUtxosByBlockRequest {
bytes start_header_hash = 1;
bytes end_header_hash = 2;
}

message SyncUtxosByBlockResponse {
repeated tari.types.TransactionOutput outputs = 1;
uint64 height = 2;
bytes header_hash = 3;
}
16 changes: 15 additions & 1 deletion base_layer/core/src/base_node/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

#[cfg(feature = "base_node")]
mod service;
#[cfg(feature = "base_node")]
pub mod sync_utxos_by_block_task;

#[cfg(feature = "base_node")]
pub use service::BaseNodeWalletRpcService;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_comms_rpc_macros::tari_rpc;

#[cfg(feature = "base_node")]
Expand All @@ -43,6 +46,8 @@ use crate::{
QueryDeletedRequest,
QueryDeletedResponse,
Signatures,
SyncUtxosByBlockRequest,
SyncUtxosByBlockResponse,
TipInfoResponse,
TxQueryBatchResponses,
TxQueryResponse,
Expand Down Expand Up @@ -97,6 +102,15 @@ pub trait BaseNodeWalletService: Send + Sync + 'static {
&self,
request: Request<u64>,
) -> Result<Response<proto::core::BlockHeader>, RpcStatus>;

#[rpc(method = 10)]
async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus>;

#[rpc(method = 11)]
async fn sync_utxos_by_block(
&self,
request: Request<SyncUtxosByBlockRequest>,
) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus>;
}

#[cfg(feature = "base_node")]
Expand Down
93 changes: 90 additions & 3 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::convert::TryFrom;

use log::*;
use tari_common_types::types::Signature;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_crypto::tari_utilities::hex::Hex;
use tokio::sync::mpsc;

// Copyright 2020, The Tari Project
//
Expand All @@ -26,7 +29,11 @@ use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
// DAMAGE.
use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
base_node::{
rpc::{sync_utxos_by_block_task::SyncUtxosByBlockTask, BaseNodeWalletService},
state_machine_service::states::StateInfo,
StateMachineHandle,
},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput, UtxoMinedInfo},
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
Expand All @@ -37,6 +44,8 @@ use crate::{
QueryDeletedRequest,
QueryDeletedResponse,
Signatures as SignaturesProto,
SyncUtxosByBlockRequest,
SyncUtxosByBlockResponse,
TipInfoResponse,
TxLocation,
TxQueryBatchResponse,
Expand All @@ -52,7 +61,6 @@ use crate::{
},
transactions::transaction::Transaction,
};

const LOG_TARGET: &str = "c::base_node::rpc";

pub struct BaseNodeWalletRpcService<B> {
Expand Down Expand Up @@ -499,4 +507,83 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc

Ok(Response::new(header.into()))
}

async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus> {
let requested_epoch_time: u64 = request.into_message();

let tip_header = self
.db()
.fetch_tip_header()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let mut left_height = 0u64;
let mut right_height = tip_header.height();

while left_height <= right_height {
let mut mid_height = (left_height + right_height) / 2;

if mid_height == 0 {
return Ok(Response::new(0u64));
}
// If the two bounds are adjacent then perform the test between the right and left sides
if left_height == mid_height {
mid_height = right_height;
}

let mid_header = self
.db()
.fetch_header(mid_height)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| {
RpcStatus::not_found(format!("Header not found during search at height {}", mid_height))
})?;
let before_mid_header = self
.db()
.fetch_header(mid_height - 1)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| {
RpcStatus::not_found(format!("Header not found during search at height {}", mid_height - 1))
})?;

if requested_epoch_time < mid_header.timestamp.as_u64() &&
requested_epoch_time >= before_mid_header.timestamp.as_u64()
{
return Ok(Response::new(before_mid_header.height));
} else if mid_height == right_height {
return Ok(Response::new(right_height));
} else if requested_epoch_time <= mid_header.timestamp.as_u64() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if requested_epoch_time <= mid_header.timestamp.as_u64() {
}
if requested_epoch_time <= mid_header.timestamp.as_u64() {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be made into a separate if. It does not have to be a single chained if-else statement.
We can even split it up into three if's but two is probably better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clippy begs to differ

right_height = mid_height;
} else {
left_height = mid_height;
}
}

Ok(Response::new(0u64))
}

async fn sync_utxos_by_block(
&self,
request: Request<SyncUtxosByBlockRequest>,
) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus> {
let req = request.message();
let peer = request.context().peer_node_id();
debug!(
target: LOG_TARGET,
"Received sync_utxos_by_block request from {} from header {} to {} ",
peer,
req.start_header_hash.to_hex(),
req.end_header_hash.to_hex(),
);

// Number of blocks to load and push to the stream before loading the next batch. Most blocks have 1 output but
// full blocks will have 500
const BATCH_SIZE: usize = 5;
let (tx, rx) = mpsc::channel(BATCH_SIZE);
let task = SyncUtxosByBlockTask::new(self.db());
task.run(request.into_message(), tx).await?;

Ok(Streaming::new(rx))
}
}
Loading