diff --git a/applications/tari_base_node/src/bootstrap.rs b/applications/tari_base_node/src/bootstrap.rs index 516e93d2f1..e584827cd2 100644 --- a/applications/tari_base_node/src/bootstrap.rs +++ b/applications/tari_base_node/src/bootstrap.rs @@ -170,7 +170,7 @@ where B: BlockchainBackend + 'static orphan_db_clean_out_threshold: config.orphan_db_clean_out_threshold, max_randomx_vms: config.max_randomx_vms, blocks_behind_before_considered_lagging: self.config.blocks_behind_before_considered_lagging, - block_sync_validation_concurrency: num_cpus::get(), + sync_validation_concurrency: num_cpus::get(), ..Default::default() }, self.rules, diff --git a/base_layer/core/src/base_node/state_machine_service/initializer.rs b/base_layer/core/src/base_node/state_machine_service/initializer.rs index 04a66f073c..582be35742 100644 --- a/base_layer/core/src/base_node/state_machine_service/initializer.rs +++ b/base_layer/core/src/base_node/state_machine_service/initializer.rs @@ -107,7 +107,7 @@ where B: BlockchainBackend + 'static rules.clone(), factories, config.bypass_range_proof_verification, - config.block_sync_validation_concurrency, + config.sync_validation_concurrency, ); let max_randomx_vms = config.max_randomx_vms; diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index 4bb5aa7ccb..fb364fc797 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -54,7 +54,7 @@ pub struct BaseNodeStateMachineConfig { pub max_randomx_vms: usize, pub blocks_behind_before_considered_lagging: u64, pub bypass_range_proof_verification: bool, - pub block_sync_validation_concurrency: usize, + pub sync_validation_concurrency: usize, } impl Default for BaseNodeStateMachineConfig { @@ -68,7 +68,7 @@ impl Default for BaseNodeStateMachineConfig { max_randomx_vms: 0, blocks_behind_before_considered_lagging: 0, bypass_range_proof_verification: false, - block_sync_validation_concurrency: 8, + sync_validation_concurrency: 8, } } } @@ -259,9 +259,13 @@ impl BaseNodeStateMachine { /// Polls both the interrupt signal and the given future. If the given future `state_fut` is ready first it's value is /// returned, otherwise if the interrupt signal is triggered, `StateEvent::UserQuit` is returned. -async fn select_next_state_event(interrupt_signal: ShutdownSignal, state_fut: F) -> StateEvent -where F: Future { +async fn select_next_state_event(interrupt_signal: I, state_fut: F) -> StateEvent +where + F: Future, + I: Future, +{ futures::pin_mut!(state_fut); + futures::pin_mut!(interrupt_signal); // If future A and B are both ready `future::select` will prefer A match future::select(interrupt_signal, state_fut).await { Either::Left(_) => StateEvent::UserQuit, diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs index ca2161e033..e4a163f003 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs @@ -25,30 +25,26 @@ // TODO: Move the horizon synchronizer to the `sync` module -use log::*; +mod config; +pub use self::config::HorizonSyncConfig; +mod error; pub use error::HorizonSyncError; -use horizon_state_synchronization::HorizonStateSynchronization; -use crate::{ - base_node::{sync::SyncPeer, BaseNodeStateMachine}, - chain_storage::BlockchainBackend, - transactions::CryptoFactories, -}; +mod horizon_state_synchronization; +use horizon_state_synchronization::HorizonStateSynchronization; use super::{ events_and_states::{HorizonSyncInfo, HorizonSyncStatus}, StateEvent, StateInfo, }; - -pub use self::config::HorizonSyncConfig; - -mod config; - -mod error; - -mod horizon_state_synchronization; +use crate::{ + base_node::{sync::SyncPeer, BaseNodeStateMachine}, + chain_storage::BlockchainBackend, + transactions::CryptoFactories, +}; +use log::*; const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync"; @@ -72,21 +68,26 @@ impl HorizonStateSync { ) -> StateEvent { let local_metadata = match shared.db.get_chain_metadata().await { Ok(metadata) => metadata, - Err(err) => return StateEvent::FatalError(err.to_string()), + Err(err) => return err.into(), }; - if local_metadata.height_of_longest_chain() > 0 && - local_metadata.height_of_longest_chain() >= local_metadata.pruned_height() - { + let last_header = match shared.db.fetch_last_header().await { + Ok(h) => h, + Err(err) => return err.into(), + }; + + let horizon_sync_height = local_metadata.horizon_block(last_header.height); + if local_metadata.pruned_height() >= horizon_sync_height { + info!(target: LOG_TARGET, "Horizon state was already synchronized."); return StateEvent::HorizonStateSynchronized; } - let horizon_sync_height = match shared.db.fetch_last_header().await { - Ok(header) => header.height.saturating_sub(local_metadata.pruning_horizon()), - Err(err) => return StateEvent::FatalError(err.to_string()), - }; - - if local_metadata.height_of_longest_chain() > horizon_sync_height { + // We're already synced because we have full blocks higher than our target pruned height + if local_metadata.height_of_longest_chain() >= horizon_sync_height { + info!( + target: LOG_TARGET, + "Tip height is higher than our pruned height. Horizon state is already synchronized." + ); return StateEvent::HorizonStateSynchronized; } @@ -94,7 +95,7 @@ impl HorizonStateSync { shared.set_state_info(StateInfo::HorizonSync(info)); let prover = CryptoFactories::default().range_proof; - let mut horizon_state = HorizonStateSynchronization::new(shared, &self.sync_peer, horizon_sync_height, &prover); + let mut horizon_state = HorizonStateSynchronization::new(shared, &self.sync_peer, horizon_sync_height, prover); match horizon_state.synchronize().await { Ok(()) => { diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs index 8dd46d70f7..df62e5495a 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/error.rs @@ -20,23 +20,20 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::num::TryFromIntError; - -use thiserror::Error; -use tokio::task; - -use tari_comms::{ - connectivity::ConnectivityError, - protocol::rpc::{RpcError, RpcStatus}, -}; -use tari_mmr::error::MerkleMountainRangeError; - use crate::{ base_node::{comms_interface::CommsInterfaceError, state_machine_service::states::helpers::BaseNodeRequestError}, chain_storage::{ChainStorageError, MmrTree}, transactions::transaction_entities::error::TransactionError, validation::ValidationError, }; +use std::num::TryFromIntError; +use tari_comms::{ + connectivity::ConnectivityError, + protocol::rpc::{RpcError, RpcStatus}, +}; +use tari_mmr::error::MerkleMountainRangeError; +use thiserror::Error; +use tokio::task; #[derive(Debug, Error)] pub enum HorizonSyncError { @@ -71,7 +68,7 @@ pub enum HorizonSyncError { ConversionError(String), #[error("MerkleMountainRangeError: {0}")] MerkleMountainRangeError(#[from] MerkleMountainRangeError), - #[error("Connectivity Error: {0}")] + #[error("Connectivity error: {0}")] ConnectivityError(#[from] ConnectivityError), } diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs index 05385f876a..0a3e208e29 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs @@ -20,22 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{ - convert::{TryFrom, TryInto}, - sync::Arc, -}; - -use croaring::Bitmap; -use futures::StreamExt; -use log::*; -use tari_crypto::{ - commitment::HomomorphicCommitment, - tari_utilities::{hex::Hex, Hashable}, -}; - -use tari_common_types::types::{HashDigest, RangeProofService}; -use tari_mmr::{MerkleMountainRange, MutableMmr}; - +use super::error::HorizonSyncError; use crate::{ base_node::{ state_machine_service::{ @@ -44,7 +29,7 @@ use crate::{ }, sync::{rpc, SyncPeer}, }, - blocks::BlockHeader, + blocks::{BlockHeader, ChainHeader, UpdateBlockAccumulatedData}, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree, PrunedOutput}, proto::base_node::{ sync_utxo as proto_sync_utxo, @@ -59,8 +44,23 @@ use crate::{ transaction_output::TransactionOutput, }, }; - -use super::error::HorizonSyncError; +use croaring::Bitmap; +use futures::{stream::FuturesUnordered, StreamExt}; +use log::*; +use std::{ + cmp, + convert::{TryFrom, TryInto}, + mem, + sync::Arc, + time::Instant, +}; +use tari_common_types::types::{Commitment, HashDigest, RangeProofService}; +use tari_crypto::{ + commitment::HomomorphicCommitment, + tari_utilities::{hex::Hex, Hashable}, +}; +use tari_mmr::{MerkleMountainRange, MutableMmr}; +use tokio::task; const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync"; @@ -68,9 +68,10 @@ pub struct HorizonStateSynchronization<'a, B: BlockchainBackend> { shared: &'a mut BaseNodeStateMachine, sync_peer: &'a SyncPeer, horizon_sync_height: u64, - prover: &'a RangeProofService, + prover: Arc, num_kernels: u64, num_outputs: u64, + full_bitmap: Option, } impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { @@ -78,7 +79,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { shared: &'a mut BaseNodeStateMachine, sync_peer: &'a SyncPeer, horizon_sync_height: u64, - prover: &'a RangeProofService, + prover: Arc, ) -> Self { Self { shared, @@ -87,14 +88,16 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { prover, num_kernels: 0, num_outputs: 0, + full_bitmap: None, } } pub async fn synchronize(&mut self) -> Result<(), HorizonSyncError> { debug!( target: LOG_TARGET, - "Preparing database for horizon sync to height (#{})", self.horizon_sync_height + "Preparing database for horizon sync to height #{}", self.horizon_sync_height ); + let header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| { ChainStorageError::ValueNotFound { entity: "Header", @@ -130,6 +133,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { client: &mut rpc::BaseNodeSyncRpcClient, to_header: &BlockHeader, ) -> Result<(), HorizonSyncError> { + debug!(target: LOG_TARGET, "Initializing"); + self.initialize().await?; debug!(target: LOG_TARGET, "Synchronizing kernels"); self.synchronize_kernels(client, to_header).await?; debug!(target: LOG_TARGET, "Synchronizing outputs"); @@ -137,6 +142,21 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { Ok(()) } + async fn initialize(&mut self) -> Result<(), HorizonSyncError> { + let db = self.db(); + let local_metadata = db.get_chain_metadata().await?; + + let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height); + if local_metadata.pruned_height() < new_prune_height { + debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height); + db.prune_to_height(new_prune_height).await?; + } + + self.full_bitmap = Some(db.fetch_deleted_bitmap_at_tip().await?.into_bitmap()); + + Ok(()) + } + async fn synchronize_kernels( &mut self, client: &mut rpc::BaseNodeSyncRpcClient, @@ -174,41 +194,43 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { latency.unwrap_or_default().as_millis() ); - let start = local_num_kernels; - let end = remote_num_kernels; - let end_hash = to_header.hash(); - + let mut current_header = self + .db() + .fetch_header_containing_kernel_mmr(local_num_kernels + 1) + .await?; let req = SyncKernelsRequest { - start, - end_header_hash: end_hash, + start: local_num_kernels, + end_header_hash: to_header.hash(), }; let mut kernel_stream = client.sync_kernels(req).await?; - let mut current_header = self.db().fetch_header_containing_kernel_mmr(start + 1).await?; debug!( target: LOG_TARGET, "Found header for kernels at mmr pos: {} height: {}", - start, + local_num_kernels, current_header.height() ); - let mut kernels = vec![]; + let mut kernel_hashes = vec![]; let db = self.db().clone(); let mut txn = db.write_transaction(); - let mut mmr_position = start; + let mut mmr_position = local_num_kernels; + let end = remote_num_kernels; while let Some(kernel) = kernel_stream.next().await { let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?; kernel .verify_signature() .map_err(HorizonSyncError::InvalidKernelSignature)?; - kernels.push(kernel.clone()); + kernel_hashes.push(kernel.hash()); + txn.insert_kernel_via_horizon_sync(kernel, current_header.hash().clone(), mmr_position as u32); if mmr_position == current_header.header().kernel_mmr_size - 1 { + let num_kernels = kernel_hashes.len(); debug!( target: LOG_TARGET, "Header #{} ({} kernels)", current_header.height(), - kernels.len() + num_kernels, ); // Validate root let block_data = db @@ -217,8 +239,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let kernel_pruned_set = block_data.dissolve().0; let mut kernel_mmr = MerkleMountainRange::::new(kernel_pruned_set); - for kernel in kernels.drain(..) { - kernel_mmr.push(kernel.hash())?; + for hash in kernel_hashes.drain(..) { + kernel_mmr.push(hash)?; } let mmr_root = kernel_mmr.get_merkle_root()?; @@ -231,13 +253,29 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { }); } - txn.update_pruned_hash_set( - MmrTree::Kernel, + let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?; + debug!( + target: LOG_TARGET, + "Updating block data at height {}", + current_header.height() + ); + txn.update_block_accumulated_data_via_horizon_sync( current_header.hash().clone(), - kernel_mmr.get_pruned_hash_set()?, + UpdateBlockAccumulatedData { + kernel_hash_set: Some(kernel_hash_set), + ..Default::default() + }, ); txn.commit().await?; + debug!( + target: LOG_TARGET, + "Committed {} kernel(s), ({}/{}) {} remaining", + num_kernels, + mmr_position + 1, + end, + end - (mmr_position + 1) + ); if mmr_position < end - 1 { current_header = db.fetch_chain_header(current_header.height() + 1).await?; } @@ -308,39 +346,39 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { include_deleted_bitmaps: true, include_pruned_utxos: true, }; - let mut output_stream = client.sync_utxos(req).await?; let mut current_header = self.db().fetch_header_containing_utxo_mmr(start + 1).await?; + let mut output_stream = client.sync_utxos(req).await?; + debug!( target: LOG_TARGET, "Found header for utxos at mmr pos: {} - {} height: {}", - start + 1, + start, current_header.header().output_mmr_size, current_header.height() ); let db = self.db().clone(); - let mut output_hashes = vec![]; - let mut witness_hashes = vec![]; let mut txn = db.write_transaction(); let mut unpruned_outputs = vec![]; let mut mmr_position = start; let mut height_utxo_counter = 0u64; let mut height_txo_counter = 0u64; + let mut timer = Instant::now(); let block_data = db .fetch_block_accumulated_data(current_header.header().prev_hash.clone()) .await?; - let (_, output_pruned_set, rp_pruned_set, mut full_bitmap) = block_data.dissolve(); + let (_, output_pruned_set, witness_pruned_set, _) = block_data.dissolve(); let mut output_mmr = MerkleMountainRange::::new(output_pruned_set); - let mut witness_mmr = MerkleMountainRange::::new(rp_pruned_set); + let mut witness_mmr = MerkleMountainRange::::new(witness_pruned_set); while let Some(response) = output_stream.next().await { let res: SyncUtxosResponse = response?; - if res.mmr_index > 0 && res.mmr_index != mmr_position { + if res.mmr_index != 0 && res.mmr_index != mmr_position { return Err(HorizonSyncError::IncorrectResponse(format!( "Expected MMR position of {} but got {}", mmr_position, res.mmr_index, @@ -363,9 +401,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ); height_utxo_counter += 1; let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?; - output_hashes.push(output.hash()); - witness_hashes.push(output.witness_hash()); unpruned_outputs.push(output.clone()); + + output_mmr.push(output.hash())?; + witness_mmr.push(output.witness_hash())?; + txn.insert_output_via_horizon_sync( output, current_header.hash().clone(), @@ -384,8 +424,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { current_header.height() ); height_txo_counter += 1; - output_hashes.push(utxo.hash.clone()); - witness_hashes.push(utxo.witness_hash.clone()); + output_mmr.push(utxo.hash.clone())?; + witness_mmr.push(utxo.witness_hash.clone())?; + txn.insert_pruned_output_via_horizon_sync( utxo.hash, utxo.witness_hash, @@ -404,29 +445,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ))); } - debug!( - target: LOG_TARGET, - "UTXO: {} (Header #{}), added {} utxos, added {} txos", - mmr_position, - current_header.height(), - height_utxo_counter, - height_txo_counter - ); - - height_txo_counter = 0; - height_utxo_counter = 0; - - // Validate root - for hash in output_hashes.drain(..) { - output_mmr.push(hash)?; - } - - for hash in witness_hashes.drain(..) { - witness_mmr.push(hash)?; - } - - // Check that the difference bitmap is excessively large. Bitmap::deserialize panics if greater than - // isize::MAX, however isize::MAX is still an inordinate amount of data. An + // Check that the difference bitmap isn't excessively large. Bitmap::deserialize panics if greater + // than isize::MAX, however isize::MAX is still an inordinate amount of data. An // arbitrary 4 MiB limit is used. const MAX_DIFF_BITMAP_BYTE_LEN: usize = 4 * 1024 * 1024; if diff_bitmap.len() > MAX_DIFF_BITMAP_BYTE_LEN { @@ -448,11 +468,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { // Merge the differences into the final bitmap so that we can commit to the entire spend state // in the output MMR - full_bitmap.or_inplace(&diff_bitmap); - full_bitmap.run_optimize(); + let bitmap = self.full_bitmap_mut(); + bitmap.or_inplace(&diff_bitmap); + bitmap.run_optimize(); let pruned_output_set = output_mmr.get_pruned_hash_set()?; - let output_mmr = MutableMmr::::new(pruned_output_set.clone(), full_bitmap.clone())?; + let output_mmr = MutableMmr::::new(pruned_output_set.clone(), bitmap.clone())?; let mmr_root = output_mmr.get_merkle_root()?; if mmr_root != current_header.header().output_mr { @@ -474,29 +495,54 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { }); } - // Validate rangeproofs if the MMR matches - for o in unpruned_outputs.drain(..) { - o.verify_range_proof(self.prover) - .map_err(|err| HorizonSyncError::InvalidRangeProof(o.hash().to_hex(), err.to_string()))?; - } + self.validate_rangeproofs(mem::take(&mut unpruned_outputs)).await?; txn.update_deleted_bitmap(diff_bitmap.clone()); - txn.update_pruned_hash_set(MmrTree::Utxo, current_header.hash().clone(), pruned_output_set); - txn.update_pruned_hash_set( - MmrTree::Witness, + + let witness_hash_set = witness_mmr.get_pruned_hash_set()?; + txn.update_block_accumulated_data_via_horizon_sync( current_header.hash().clone(), - witness_mmr.get_pruned_hash_set()?, + UpdateBlockAccumulatedData { + utxo_hash_set: Some(pruned_output_set), + witness_hash_set: Some(witness_hash_set), + deleted_diff: Some(diff_bitmap.into()), + ..Default::default() + }, ); - txn.update_block_accumulated_data_with_deleted_diff(current_header.hash().clone(), diff_bitmap); - txn.commit().await?; - current_header = db.fetch_chain_header(current_header.height() + 1).await?; debug!( target: LOG_TARGET, - "Expecting to receive the next UTXO set for header #{}", - current_header.height() + "UTXO: {}/{}, Header #{}, added {} utxos, added {} txos in {:.2?}", + mmr_position, + end, + current_header.height(), + height_utxo_counter, + height_txo_counter, + timer.elapsed() ); + height_txo_counter = 0; + height_utxo_counter = 0; + timer = Instant::now(); + + if mmr_position == end { + debug!( + target: LOG_TARGET, + "Sync complete at mmr position {}, height #{}", + mmr_position, + current_header.height() + ); + break; + } else { + current_header = db.fetch_chain_header(current_header.height() + 1).await?; + debug!( + target: LOG_TARGET, + "Expecting to receive the next UTXO set {}-{} for header #{}", + mmr_position, + current_header.header().output_mmr_size, + current_header.height() + ); + } }, v => { error!(target: LOG_TARGET, "Remote node returned an invalid response {:?}", v); @@ -520,6 +566,37 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { "Sync node did not send all utxos requested".to_string(), )); } + + Ok(()) + } + + async fn validate_rangeproofs(&self, mut unpruned_outputs: Vec) -> Result<(), HorizonSyncError> { + let concurrency = self.shared.config.sync_validation_concurrency; + let mut chunk_size = unpruned_outputs.len() / concurrency; + if unpruned_outputs.len() % concurrency > 0 { + chunk_size += 1; + } + // Validate rangeproofs in parallel + let mut tasks = (0..concurrency) + .map(|_| { + let end = cmp::min(unpruned_outputs.len(), chunk_size); + unpruned_outputs.drain(..end).collect::>() + }) + .map(|chunk| { + let prover = self.prover.clone(); + task::spawn_blocking(move || -> Result<(), HorizonSyncError> { + for o in chunk { + o.verify_range_proof(&prover) + .map_err(|err| HorizonSyncError::InvalidRangeProof(o.hash().to_hex(), err.to_string()))?; + } + Ok(()) + }) + }) + .collect::>(); + + while let Some(result) = tasks.next().await { + result??; + } Ok(()) } @@ -528,22 +605,75 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { async fn finalize_horizon_sync(&mut self) -> Result<(), HorizonSyncError> { debug!(target: LOG_TARGET, "Validating horizon state"); - let info = HorizonSyncInfo::new(vec![self.sync_peer.node_id().clone()], HorizonSyncStatus::Finalizing); - self.shared.set_state_info(StateInfo::HorizonSync(info)); + self.shared.set_state_info(StateInfo::HorizonSync(HorizonSyncInfo::new( + vec![self.sync_peer.node_id().clone()], + HorizonSyncStatus::Finalizing, + ))); let header = self.db().fetch_chain_header(self.horizon_sync_height).await?; + let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?; + + self.shared + .sync_validators + .final_horizon_state + .validate( + &*self.db().inner().db_read_access()?, + header.height(), + &calc_utxo_sum, + &calc_kernel_sum, + ) + .map_err(HorizonSyncError::FinalStateValidationFailed)?; + + let metadata = self.db().get_chain_metadata().await?; + info!( + target: LOG_TARGET, + "Horizon state validation succeeded! Committing horizon state." + ); + self.db() + .write_transaction() + .set_best_block( + header.height(), + header.hash().clone(), + header.accumulated_data().total_accumulated_difficulty, + metadata.best_block().clone(), + ) + .set_pruned_height(header.height()) + .set_horizon_data(calc_kernel_sum, calc_utxo_sum) + .commit() + .await?; + + Ok(()) + } + + fn take_final_bitmap(&mut self) -> Arc { + self.full_bitmap + .take() + .map(Arc::new) + .expect("take_full_bitmap called before initialize") + } + + fn full_bitmap_mut(&mut self) -> &mut Bitmap { + self.full_bitmap + .as_mut() + .expect("full_bitmap_mut called before initialize") + } + + /// (UTXO sum, Kernel sum) + async fn calculate_commitment_sums( + &mut self, + header: &ChainHeader, + ) -> Result<(Commitment, Commitment), HorizonSyncError> { let mut pruned_utxo_sum = HomomorphicCommitment::default(); let mut pruned_kernel_sum = HomomorphicCommitment::default(); let mut prev_mmr = 0; let mut prev_kernel_mmr = 0; - let bitmap = Arc::new( - self.db() - .fetch_complete_deleted_bitmap_at(header.hash().clone()) - .await? - .into_bitmap(), - ); - let expected_prev_best_block = self.shared.db.get_chain_metadata().await?.best_block().clone(); + + let bitmap = self.take_final_bitmap(); + let mut txn = self.db().write_transaction(); + let mut utxo_mmr_position = 0; + let mut prune_positions = vec![]; + for h in 0..=header.height() { let curr_header = self.db().fetch_chain_header(h).await?; @@ -555,10 +685,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { prev_mmr, curr_header.header().output_mmr_size - 1 ); - let (utxos, _) = self - .db() - .fetch_utxos_by_mmr_position(prev_mmr, curr_header.header().output_mmr_size - 1, bitmap.clone()) - .await?; + let (utxos, _) = self.db().fetch_utxos_in_block(curr_header.hash().clone(), None).await?; trace!( target: LOG_TARGET, "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}", @@ -567,74 +694,59 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { prev_kernel_mmr, curr_header.header().kernel_mmr_size - 1 ); - let kernels = self - .db() - .fetch_kernels_by_mmr_position(prev_kernel_mmr, curr_header.header().kernel_mmr_size - 1) - .await?; - - let mut utxo_sum = HomomorphicCommitment::default(); - debug!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); - debug!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len()); + + trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len()); let mut prune_counter = 0; for u in utxos { match u { PrunedOutput::NotPruned { output } => { - utxo_sum = &output.commitment + &utxo_sum; + if bitmap.contains(utxo_mmr_position) { + debug!( + target: LOG_TARGET, + "Found output that needs pruning at height: {} position: {}", h, utxo_mmr_position + ); + prune_positions.push(utxo_mmr_position); + prune_counter += 1; + } else { + pruned_utxo_sum = &output.commitment + &pruned_utxo_sum; + } }, _ => { prune_counter += 1; }, } + utxo_mmr_position += 1; } if prune_counter > 0 { - debug!(target: LOG_TARGET, "Pruned {} outputs", prune_counter); + trace!(target: LOG_TARGET, "Pruned {} outputs", prune_counter); } prev_mmr = curr_header.header().output_mmr_size; - pruned_utxo_sum = &utxo_sum + &pruned_utxo_sum; - + let kernels = self.db().fetch_kernels_in_block(curr_header.hash().clone()).await?; + trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); for k in kernels { pruned_kernel_sum = &k.excess + &pruned_kernel_sum; } prev_kernel_mmr = curr_header.header().kernel_mmr_size; - trace!( - target: LOG_TARGET, - "Height: {} Kernel sum:{:?} Pruned UTXO sum: {:?}", - h, - pruned_kernel_sum, - pruned_utxo_sum - ); + if h % 1000 == 0 { + debug!( + target: LOG_TARGET, + "Final Validation: {:.2}% complete. Height: {}, mmr_position: {} ", + (h as f32 / header.height() as f32) * 100.0, + h, + utxo_mmr_position, + ); + } } - self.shared - .sync_validators - .final_horizon_state - .validate( - &*self.db().clone().into_inner().db_read_access()?, - header.height(), - &pruned_utxo_sum, - &pruned_kernel_sum, - ) - .map_err(HorizonSyncError::FinalStateValidationFailed)?; - - info!( - target: LOG_TARGET, - "Horizon state validation succeeded! Committing horizon state." - ); - self.db() - .write_transaction() - .set_best_block( - header.height(), - header.hash().clone(), - header.accumulated_data().total_accumulated_difficulty, - expected_prev_best_block, - ) - .set_pruned_height(header.height(), pruned_kernel_sum, pruned_utxo_sum) - .commit() - .await?; + if !prune_positions.is_empty() { + debug!(target: LOG_TARGET, "Pruning {} spent outputs", prune_positions.len()); + txn.prune_output_at_positions(prune_positions); + txn.commit().await?; + } - Ok(()) + Ok((pruned_utxo_sum, pruned_kernel_sum)) } #[inline] diff --git a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs index fb7c85673b..6f50f748a9 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs @@ -56,17 +56,28 @@ impl DecideNextSync { ); if shared.config.pruning_horizon > 0 { - // Filter sync peers that claim to be able to provide full blocks up until our pruned height + let last_header = match shared.db.fetch_last_header().await { + Ok(h) => h, + Err(err) => return err.into(), + }; + + let horizon_sync_height = local_metadata.horizon_block(last_header.height); + // Filter sync peers that claim to be able to provide blocks up until our pruned height let sync_peers_iter = self.sync_peers.iter().filter(|sync_peer| { - let chain_metadata = sync_peer.claimed_chain_metadata(); - let our_pruned_height_from_peer = - local_metadata.horizon_block(chain_metadata.height_of_longest_chain()); - let their_pruned_height = chain_metadata.pruned_height(); - our_pruned_height_from_peer >= their_pruned_height + let remote_metadata = sync_peer.claimed_chain_metadata(); + remote_metadata.height_of_longest_chain() >= horizon_sync_height }); match find_best_latency(sync_peers_iter) { - Some(sync_peer) => ProceedToHorizonSync(sync_peer), + Some(sync_peer) => { + debug!( + target: LOG_TARGET, + "Proceeding to horizon sync with sync peer {} with a latency of {:.2?}", + sync_peer.node_id(), + sync_peer.latency() + ); + ProceedToHorizonSync(sync_peer) + }, None => Continue, } } else { @@ -76,7 +87,15 @@ impl DecideNextSync { }); match find_best_latency(sync_peers_iter) { - Some(sync_peer) => ProceedToBlockSync(sync_peer), + Some(sync_peer) => { + debug!( + target: LOG_TARGET, + "Proceeding to block sync with sync peer {} with a latency of {:.2?}", + sync_peer.node_id(), + sync_peer.latency() + ); + ProceedToBlockSync(sync_peer) + }, None => Continue, } } diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index b17da13bc3..b476f4078f 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -31,7 +31,7 @@ use crate::{ proto::base_node::SyncBlocksRequest, tari_utilities::{hex::Hex, Hashable}, transactions::aggregated_body::AggregateBody, - validation::BlockSyncBodyValidation, + validation::{BlockSyncBodyValidation, ValidationError}, }; use futures::StreamExt; use log::*; @@ -96,6 +96,7 @@ impl BlockSynchronizer { self.db.cleanup_orphans().await?; Ok(()) }, + Err(err @ BlockSyncError::ValidationError(ValidationError::AsyncTaskFailed(_))) => Err(err), Err(err @ BlockSyncError::ValidationError(_)) | Err(err @ BlockSyncError::ReceivedInvalidBlockBody(_)) => { self.ban_peer(node_id, &err).await?; Err(err) diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 3748a557f5..f0fcd4a148 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -114,7 +114,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { target: LOG_TARGET, "Attempting to synchronize headers with `{}`", node_id ); - match self.attempt_sync(sync_peer, peer_conn).await { + match self.attempt_sync(sync_peer, peer_conn.clone()).await { Ok(()) => return Ok(sync_peer.clone()), // Try another peer Err(err @ BlockHeaderSyncError::NotInSync) => { diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 8a48cb3ce1..cc62c819d0 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -22,7 +22,7 @@ use crate::{ base_node::sync::rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService}, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, OrNotFound}, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, iterators::NonOverlappingIntegerPairIter, proto, proto::base_node::{ @@ -34,6 +34,7 @@ use crate::{ SyncUtxosRequest, SyncUtxosResponse, }, + tari_utilities::Hashable, }; use log::*; use std::{ @@ -387,47 +388,57 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ request: Request, ) -> Result, RpcStatus> { let req = request.into_message(); - const BATCH_SIZE: usize = 1000; - let (tx, rx) = mpsc::channel(BATCH_SIZE); + let (tx, rx) = mpsc::channel(100); let db = self.db(); - task::spawn(async move { - let end = match db - .fetch_chain_header_by_block_hash(req.end_header_hash.clone()) - .await - .or_not_found("BlockHeader", "hash", req.end_header_hash.to_hex()) - .map_err(RpcStatus::log_internal_error(LOG_TARGET)) - { - Ok(header) => { - if header.header().kernel_mmr_size < req.start { - let _ = tx - .send(Err(RpcStatus::bad_request("Start mmr position after requested header"))) - .await; - return; - } + let start_header = db + .fetch_header_containing_kernel_mmr(req.start + 1) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .into_header(); + + let end_header = db + .fetch_header_by_block_hash(req.end_header_hash.clone()) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found("Unknown end header"))?; + + let mut current_height = start_header.height; + let end_height = end_header.height; + let mut current_mmr_position = start_header.kernel_mmr_size; + let mut current_header_hash = start_header.hash(); + + if current_height > end_height { + return Err(RpcStatus::bad_request("start header height is after end header")); + } - header.header().kernel_mmr_size - }, - Err(err) => { - let _ = tx.send(Err(err)).await; - return; - }, - }; - let iter = NonOverlappingIntegerPairIter::new(req.start, end, BATCH_SIZE); - for (start, end) in iter { + task::spawn(async move { + while current_height <= end_height { if tx.is_closed() { break; } - debug!(target: LOG_TARGET, "Streaming kernels {} to {}", start, end); let res = db - .fetch_kernels_by_mmr_position(start, end) + .fetch_kernels_in_block(current_header_hash.clone()) .await .map_err(RpcStatus::log_internal_error(LOG_TARGET)); match res { Ok(kernels) if kernels.is_empty() => { + let _ = tx + .send(Err(RpcStatus::general(format!( + "No kernels in block {}", + current_header_hash.to_hex() + )))) + .await; break; }, Ok(kernels) => { + debug!( + target: LOG_TARGET, + "Streaming kernels {} to {}", + current_mmr_position, + current_mmr_position + kernels.len() as u64 + ); + current_mmr_position += kernels.len() as u64; let kernels = kernels.into_iter().map(proto::types::TransactionKernel::from).map(Ok); // Ensure task stops if the peer prematurely stops their RPC session if utils::mpsc::send_all(&tx, kernels).await.is_err() { @@ -439,6 +450,36 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ break; }, } + + current_height += 1; + + if current_height <= end_height { + let res = db + .fetch_header(current_height) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET)); + match res { + Ok(Some(header)) => { + current_header_hash = header.hash(); + }, + Ok(None) => { + let _ = tx + .send(Err(RpcStatus::not_found(format!( + "Could not find header #{} while streaming UTXOs after position {}", + current_height, current_mmr_position + )))) + .await; + break; + }, + Err(err) => { + error!(target: LOG_TARGET, "DB error while streaming kernels: {}", err); + let _ = tx + .send(Err(RpcStatus::general("DB error while streaming kernels"))) + .await; + break; + }, + } + } } }); Ok(Streaming::new(rx)) @@ -450,15 +491,18 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ let peer = request.context().peer_node_id(); debug!( target: LOG_TARGET, - "Received sync_utxos request from {} (start = {}, include_pruned_utxos = {}, include_deleted_bitmaps = {})", + "Received sync_utxos request from header {} to {} (start = {}, include_pruned_utxos = {}, \ + include_deleted_bitmaps = {})", peer, req.start, + req.end_header_hash.to_hex(), req.include_pruned_utxos, req.include_deleted_bitmaps ); let (tx, rx) = mpsc::channel(200); - task::spawn(SyncUtxosTask::new(self.db(), request.into_message()).run(tx)); + let task = SyncUtxosTask::new(self.db()); + task.run(request.into_message(), tx).await?; Ok(Streaming::new(rx)) } diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index 343be2425b..48da40edc3 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -21,137 +21,169 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ + blocks::BlockHeader, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, proto, proto::base_node::{SyncUtxo, SyncUtxosRequest, SyncUtxosResponse}, }; use log::*; -use std::{cmp, sync::Arc, time::Instant}; +use std::{sync::Arc, time::Instant}; use tari_comms::{protocol::rpc::RpcStatus, utils}; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task}; const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task"; pub(crate) struct SyncUtxosTask { db: AsyncBlockchainDb, - request: SyncUtxosRequest, } impl SyncUtxosTask where B: BlockchainBackend + 'static { - pub(crate) fn new(db: AsyncBlockchainDb, request: SyncUtxosRequest) -> Self { - Self { db, request } + pub(crate) fn new(db: AsyncBlockchainDb) -> Self { + Self { db } } - pub(crate) async fn run(self, mut tx: mpsc::Sender>) { - if let Err(err) = self.start_streaming(&mut tx).await { - let _ = tx.send(Err(err)).await; - } - } - - async fn start_streaming( - &self, - tx: &mut mpsc::Sender>, + pub(crate) async fn run( + self, + request: SyncUtxosRequest, + mut tx: mpsc::Sender>, ) -> Result<(), RpcStatus> { + let start_header = self + .db + .fetch_header_containing_utxo_mmr(request.start + 1) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + let end_header = self .db - .fetch_header_by_block_hash(self.request.end_header_hash.clone()) + .fetch_header_by_block_hash(request.end_header_hash.clone()) .await .map_err(RpcStatus::log_internal_error(LOG_TARGET))? - .ok_or_else(|| { - RpcStatus::not_found(format!( - "End header hash {} is was not found", - self.request.end_header_hash.to_hex() - )) - })?; + .ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?; - if self.request.start > end_header.output_mmr_size - 1 { + if start_header.height() > end_header.height { return Err(RpcStatus::bad_request(format!( - "start index {} cannot be greater than the end header's output MMR size ({})", - self.request.start, end_header.output_mmr_size + "start header height {} cannot be greater than the end header height ({})", + start_header.height(), + end_header.height ))); } - let prev_header = self - .db - .fetch_header_containing_utxo_mmr(self.request.start) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; - let (mut prev_header, _) = prev_header.into_parts(); + let (skip_outputs, prev_utxo_mmr_size) = if start_header.height() == 0 { + (request.start, 0) + } else { + let prev_header = self + .db + .fetch_header_by_block_hash(start_header.header().prev_hash.clone()) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found("Previous start header hash is was not found"))?; + + let skip = request.start.checked_sub(prev_header.output_mmr_size) + // This is a data inconsistency because fetch_header_containing_utxo_mmr returned the header we are basing this on + .ok_or_else(|| RpcStatus::general(format!("Data inconsistency: output mmr size of header at {} was more than the start index {}", prev_header.height, request.start)))?; + (skip, prev_header.output_mmr_size) + }; + + let include_pruned_utxos = request.include_pruned_utxos; + let include_deleted_bitmaps = request.include_deleted_bitmaps; + task::spawn(async move { + if let Err(err) = self + .start_streaming( + &mut tx, + start_header.into_header(), + skip_outputs, + prev_utxo_mmr_size, + end_header, + include_pruned_utxos, + include_deleted_bitmaps, + ) + .await + { + let _ = tx.send(Err(err)).await; + } + }); - if prev_header.height > end_header.height { - return Err(RpcStatus::bad_request("start index is greater than end index")); - } - // we need to construct a temp bitmap for the height the client requested + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn start_streaming( + &self, + tx: &mut mpsc::Sender>, + mut current_header: BlockHeader, + mut skip_outputs: u64, + mut prev_utxo_mmr_size: u64, + end_header: BlockHeader, + include_pruned_utxos: bool, + include_deleted_bitmaps: bool, + ) -> Result<(), RpcStatus> { + // we need to fetch the spent bitmap for the height the client requested let bitmap = self .db .fetch_complete_deleted_bitmap_at(end_header.hash()) .await - .map_err(|_| RpcStatus::not_found("Could not get tip deleted bitmap"))? + .map_err(|err| { + error!(target: LOG_TARGET, "Failed to get deleted bitmap: {}", err); + RpcStatus::general(format!( + "Could not get deleted bitmap at hash {}", + end_header.hash().to_hex() + )) + })? .into_bitmap(); - let bitmap = Arc::new(bitmap); + debug!( + target: LOG_TARGET, + "Starting stream task with current_header: {}, skip_outputs: {}, prev_utxo_mmr_size: {}, end_header: {}, \ + include_pruned_utxos: {:?}, include_deleted_bitmaps: {:?}", + current_header.hash().to_hex(), + skip_outputs, + prev_utxo_mmr_size, + end_header.hash().to_hex(), + include_pruned_utxos, + include_deleted_bitmaps + ); loop { let timer = Instant::now(); - if prev_header.height == end_header.height { - break; - } - - let current_header = self - .db - .fetch_header(prev_header.height + 1) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET))? - .ok_or_else(|| { - RpcStatus::general(format!( - "Potential data consistency issue: header {} not found", - prev_header.height + 1 - )) - })?; + let current_header_hash = current_header.hash(); debug!( target: LOG_TARGET, - "previous header = {} ({}) current header = {} ({})", - prev_header.height, - prev_header.hash().to_hex(), + "current header = {} ({})", current_header.height, - current_header.hash().to_hex() + current_header_hash.to_hex() ); - let start = cmp::max(self.request.start, prev_header.output_mmr_size); - let end = current_header.output_mmr_size - 1; + let start = prev_utxo_mmr_size + skip_outputs; + let end = current_header.output_mmr_size; if tx.is_closed() { debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",); break; } - debug!( - target: LOG_TARGET, - "Streaming UTXOs {}-{} ({}) for block #{}", - start, - end, - end.saturating_sub(start).saturating_add(1), - current_header.height - ); let (utxos, deleted_diff) = self .db - .fetch_utxos_by_mmr_position(start, end, bitmap.clone()) + .fetch_utxos_in_block(current_header.hash(), Some(bitmap.clone())) .await .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; - trace!( + debug!( target: LOG_TARGET, - "Loaded {} UTXO(s) and |deleted_diff| = {}", + "Streaming UTXO(s) {}-{} ({}) for block #{}. Deleted diff len = {}", + start, + end, utxos.len(), + current_header.height, deleted_diff.cardinality(), ); let utxos = utxos .into_iter() .enumerate() + .skip(skip_outputs as usize) // Only include pruned UTXOs if include_pruned_utxos is true - .filter(|(_, utxo)| self.request.include_pruned_utxos || !utxo.is_pruned()) + .filter(|(_, utxo)| include_pruned_utxos || !utxo.is_pruned()) .map(|(i, utxo)| { SyncUtxosResponse { utxo_or_deleted: Some(proto::base_node::sync_utxos_response::UtxoOrDeleted::Utxo( @@ -167,7 +199,10 @@ where B: BlockchainBackend + 'static break; } - if self.request.include_deleted_bitmaps { + // We only want to skip the first block UTXOs + skip_outputs = 0; + + if include_deleted_bitmaps { let bitmaps = SyncUtxosResponse { utxo_or_deleted: Some(proto::base_node::sync_utxos_response::UtxoOrDeleted::DeletedDiff( deleted_diff.serialize(), @@ -187,14 +222,29 @@ where B: BlockchainBackend + 'static timer.elapsed() ); - prev_header = current_header; + prev_utxo_mmr_size = current_header.output_mmr_size; + if current_header.height + 1 > end_header.height { + break; + } + + current_header = self + .db + .fetch_header(current_header.height + 1) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| { + RpcStatus::general(format!( + "Potential data consistency issue: header {} not found", + current_header.height + 1 + )) + })?; } debug!( target: LOG_TARGET, "UTXO sync completed to UTXO {} (Header hash = {})", - prev_header.output_mmr_size, - prev_header.hash().to_hex() + current_header.output_mmr_size, + current_header.hash().to_hex() ); Ok(()) diff --git a/base_layer/core/src/blocks/accumulated_data.rs b/base_layer/core/src/blocks/accumulated_data.rs index c14f494c2e..88c45f9504 100644 --- a/base_layer/core/src/blocks/accumulated_data.rs +++ b/base_layer/core/src/blocks/accumulated_data.rs @@ -53,8 +53,8 @@ const LOG_TARGET: &str = "c::bn::acc_data"; pub struct BlockAccumulatedData { pub(crate) kernels: PrunedHashSet, pub(crate) outputs: PrunedHashSet, + pub(crate) witness: PrunedHashSet, pub(crate) deleted: DeletedBitmap, - pub(crate) range_proofs: PrunedHashSet, pub(crate) kernel_sum: Commitment, } @@ -62,14 +62,14 @@ impl BlockAccumulatedData { pub fn new( kernels: PrunedHashSet, outputs: PrunedHashSet, - range_proofs: PrunedHashSet, + witness: PrunedHashSet, deleted: Bitmap, total_kernel_sum: Commitment, ) -> Self { Self { kernels, outputs, - range_proofs, + witness, deleted: DeletedBitmap { deleted }, kernel_sum: total_kernel_sum, } @@ -79,8 +79,13 @@ impl BlockAccumulatedData { &self.deleted.deleted } + pub fn set_deleted(&mut self, deleted: DeletedBitmap) -> &mut Self { + self.deleted = deleted; + self + } + pub fn dissolve(self) -> (PrunedHashSet, PrunedHashSet, PrunedHashSet, Bitmap) { - (self.kernels, self.outputs, self.range_proofs, self.deleted.deleted) + (self.kernels, self.outputs, self.witness, self.deleted.deleted) } pub fn kernel_sum(&self) -> &Commitment { @@ -96,7 +101,7 @@ impl Default for BlockAccumulatedData { deleted: DeletedBitmap { deleted: Bitmap::create(), }, - range_proofs: Default::default(), + witness: Default::default(), kernel_sum: Default::default(), } } @@ -110,11 +115,20 @@ impl Display for BlockAccumulatedData { self.outputs.len().unwrap_or(0), self.deleted.deleted.cardinality(), self.kernels.len().unwrap_or(0), - self.range_proofs.len().unwrap_or(0) + self.witness.len().unwrap_or(0) ) } } +#[derive(Debug, Clone, Default)] +pub struct UpdateBlockAccumulatedData { + pub kernel_hash_set: Option, + pub utxo_hash_set: Option, + pub witness_hash_set: Option, + pub deleted_diff: Option, + pub kernel_sum: Option, +} + /// Wrapper struct to serialize and deserialize Bitmap #[derive(Debug, Clone)] pub struct DeletedBitmap { diff --git a/base_layer/core/src/blocks/mod.rs b/base_layer/core/src/blocks/mod.rs index 19a49c3071..3b7eb851f1 100644 --- a/base_layer/core/src/blocks/mod.rs +++ b/base_layer/core/src/blocks/mod.rs @@ -30,6 +30,7 @@ pub use accumulated_data::{ ChainHeader, CompleteDeletedBitmap, DeletedBitmap, + UpdateBlockAccumulatedData, }; mod error; diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 99e2bcbd4d..116e87e8dc 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -30,7 +30,6 @@ use tari_common_types::{ chain_metadata::ChainMetadata, types::{BlockHash, Commitment, HashOutput, Signature}, }; -use tari_mmr::pruned_hashset::PrunedHashSet; use crate::{ blocks::{ @@ -44,6 +43,7 @@ use crate::{ DeletedBitmap, HistoricalBlock, NewBlockTemplate, + UpdateBlockAccumulatedData, }, chain_storage::{ blockchain_database::MmrRoots, @@ -157,7 +157,7 @@ impl AsyncBlockchainDb { //---------------------------------- Metadata --------------------------------------------// make_async_fn!(get_chain_metadata() -> ChainMetadata, "get_chain_metadata"); - make_async_fn!(fetch_horizon_data() -> Option, "fetch_horizon_data"); + make_async_fn!(fetch_horizon_data() -> HorizonData, "fetch_horizon_data"); //---------------------------------- TXO --------------------------------------------// make_async_fn!(fetch_utxo(hash: HashOutput) -> Option, "fetch_utxo"); @@ -166,12 +166,12 @@ impl AsyncBlockchainDb { make_async_fn!(fetch_utxos_and_mined_info(hashes: Vec) -> Vec>, "fetch_utxos_and_mined_info"); - make_async_fn!(fetch_utxos_by_mmr_position(start: u64, end: u64, deleted: Arc) -> (Vec, Bitmap), "fetch_utxos_by_mmr_position"); + make_async_fn!(fetch_utxos_in_block(hash: HashOutput, deleted: Option>) -> (Vec, Bitmap), "fetch_utxos_in_block"); //---------------------------------- Kernel --------------------------------------------// make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig"); - make_async_fn!(fetch_kernels_by_mmr_position(start: u64, end: u64) -> Vec, "fetch_kernels_by_mmr_position"); + make_async_fn!(fetch_kernels_in_block(hash: HashOutput) -> Vec, "fetch_kernels_in_block"); //---------------------------------- MMR --------------------------------------------// make_async_fn!(prepare_new_block(template: NewBlockTemplate) -> Block, "prepare_new_block"); @@ -240,6 +240,8 @@ impl AsyncBlockchainDb { //---------------------------------- Misc. --------------------------------------------// + make_async_fn!(prune_to_height(height: u64) -> (), "prune_to_height"); + make_async_fn!(rewind_to_height(height: u64) -> Vec>, "rewind_to_height"); make_async_fn!(rewind_to_hash(hash: BlockHash) -> Vec>, "rewind_to_hash"); @@ -292,16 +294,21 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> { &mut self, height: u64, hash: HashOutput, - accumulated_data: u128, + accumulated_difficulty: u128, expected_prev_best_block: HashOutput, ) -> &mut Self { self.transaction - .set_best_block(height, hash, accumulated_data, expected_prev_best_block); + .set_best_block(height, hash, accumulated_difficulty, expected_prev_best_block); + self + } + + pub fn set_pruned_height(&mut self, height: u64) -> &mut Self { + self.transaction.set_pruned_height(height); self } - pub fn set_pruned_height(&mut self, height: u64, kernel_sum: Commitment, utxo_sum: Commitment) -> &mut Self { - self.transaction.set_pruned_height(height, kernel_sum, utxo_sum); + pub fn set_horizon_data(&mut self, kernel_sum: Commitment, utxo_sum: Commitment) -> &mut Self { + self.transaction.set_horizon_data(kernel_sum, utxo_sum); self } @@ -340,23 +347,12 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> { self } - pub fn update_pruned_hash_set( - &mut self, - mmr_tree: MmrTree, - header_hash: HashOutput, - pruned_hash_set: PrunedHashSet, - ) -> &mut Self { - self.transaction - .update_pruned_hash_set(mmr_tree, header_hash, pruned_hash_set); - self - } - - pub fn update_block_accumulated_data_with_deleted_diff( + pub fn update_block_accumulated_data_via_horizon_sync( &mut self, header_hash: HashOutput, - deleted: Bitmap, + values: UpdateBlockAccumulatedData, ) -> &mut Self { - self.transaction.update_deleted_with_diff(header_hash, deleted); + self.transaction.update_block_accumulated_data(header_hash, values); self } @@ -376,6 +372,11 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> { self } + pub fn prune_output_at_positions(&mut self, positions: Vec) -> &mut Self { + self.transaction.prune_outputs_at_positions(positions); + self + } + pub async fn commit(&mut self) -> Result<(), ChainStorageError> { let transaction = mem::take(&mut self.transaction); self.db.write(transaction).await diff --git a/base_layer/core/src/chain_storage/blockchain_backend.rs b/base_layer/core/src/chain_storage/blockchain_backend.rs index 17595aae6c..ede1ca87c2 100644 --- a/base_layer/core/src/chain_storage/blockchain_backend.rs +++ b/base_layer/core/src/chain_storage/blockchain_backend.rs @@ -100,14 +100,11 @@ pub trait BlockchainBackend: Send + Sync { excess_sig: &Signature, ) -> Result, ChainStorageError>; - /// Fetch kernels by MMR position - fn fetch_kernels_by_mmr_position(&self, start: u64, end: u64) -> Result, ChainStorageError>; - - fn fetch_utxos_by_mmr_position( + /// Fetch all UTXOs and spends in the block + fn fetch_utxos_in_block( &self, - start: u64, - end: u64, - deleted: &Bitmap, + header_hash: &HashOutput, + deleted: Option<&Bitmap>, ) -> Result<(Vec, Bitmap), ChainStorageError>; /// Fetch a specific output. Returns the output and the leaf index in the output MMR diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index e2a189b54a..07cd9c20b3 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -52,6 +52,7 @@ use crate::{ DeletedBitmap, HistoricalBlock, NewBlockTemplate, + UpdateBlockAccumulatedData, }, chain_storage::{ consts::{ @@ -211,8 +212,19 @@ where B: BlockchainBackend }; if is_empty { info!(target: LOG_TARGET, "Blockchain db is empty. Adding genesis block."); - let genesis_block = blockchain_db.consensus_manager.get_genesis_block(); - blockchain_db.insert_block(Arc::new(genesis_block))?; + let genesis_block = Arc::new(blockchain_db.consensus_manager.get_genesis_block()); + blockchain_db.insert_block(genesis_block.clone())?; + let mut txn = DbTransaction::new(); + let body = &genesis_block.block().body; + let utxo_sum = body.outputs().iter().map(|k| &k.commitment).sum::(); + let kernel_sum = body.kernels().iter().map(|k| &k.excess).sum::(); + txn.update_block_accumulated_data(genesis_block.hash().clone(), UpdateBlockAccumulatedData { + kernel_sum: Some(kernel_sum.clone()), + ..Default::default() + }); + txn.set_pruned_height(0); + txn.set_horizon_data(kernel_sum, utxo_sum); + blockchain_db.write(txn)?; blockchain_db.store_pruning_horizon(config.pruning_horizon)?; } if cleanup_orphans_at_startup { @@ -361,23 +373,18 @@ where B: BlockchainBackend db.fetch_kernel_by_excess_sig(&excess_sig) } - pub fn fetch_kernels_by_mmr_position( - &self, - start: u64, - end: u64, - ) -> Result, ChainStorageError> { + pub fn fetch_kernels_in_block(&self, hash: HashOutput) -> Result, ChainStorageError> { let db = self.db_read_access()?; - db.fetch_kernels_by_mmr_position(start, end) + db.fetch_kernels_in_block(&hash) } - pub fn fetch_utxos_by_mmr_position( + pub fn fetch_utxos_in_block( &self, - start: u64, - end: u64, - deleted: Arc, + hash: HashOutput, + deleted: Option>, ) -> Result<(Vec, Bitmap), ChainStorageError> { let db = self.db_read_access()?; - db.fetch_utxos_by_mmr_position(start, end, deleted.as_ref()) + db.fetch_utxos_in_block(&hash, deleted.as_deref()) } /// Returns the block header at the given block height. @@ -576,7 +583,7 @@ where B: BlockchainBackend /// Returns the sum of all kernels pub fn fetch_kernel_commitment_sum(&self, at_hash: &HashOutput) -> Result { - Ok(self.fetch_block_accumulated_data(at_hash.clone())?.kernel_sum) + Ok(self.fetch_block_accumulated_data(at_hash.clone())?.kernel_sum().clone()) } /// Returns `n` hashes from height _h - offset_ where _h_ is the tip header height back to `h - n - offset`. @@ -872,6 +879,12 @@ where B: BlockchainBackend store_pruning_horizon(&mut *db, pruning_horizon) } + /// Prunes the blockchain up to and including the given height + pub fn prune_to_height(&self, height: u64) -> Result<(), ChainStorageError> { + let mut db = self.db_write_access()?; + prune_to_height(&mut *db, height) + } + /// Fetch a block from the blockchain database. /// /// # Returns @@ -975,9 +988,9 @@ where B: BlockchainBackend rewind_to_hash(&mut *db, hash) } - pub fn fetch_horizon_data(&self) -> Result, ChainStorageError> { + pub fn fetch_horizon_data(&self) -> Result { let db = self.db_read_access()?; - db.fetch_horizon_data() + Ok(db.fetch_horizon_data()?.unwrap_or_default()) } pub fn fetch_complete_deleted_bitmap_at( @@ -1074,7 +1087,7 @@ pub fn calculate_mmr_roots(db: &T, block: &Block) -> Resul let BlockAccumulatedData { kernels, outputs, - range_proofs, + witness: range_proofs, .. } = db .fetch_block_accumulated_data(&header.prev_hash)? @@ -2037,6 +2050,7 @@ fn cleanup_orphans(db: &mut T, orphan_storage_capacity: us db.delete_oldest_orphans(horizon_height, orphan_storage_capacity) } + fn prune_database_if_needed( db: &mut T, pruning_horizon: u64, @@ -2058,34 +2072,75 @@ fn prune_database_if_needed( pruning_interval, ); if metadata.pruned_height() < abs_pruning_horizon.saturating_sub(pruning_interval) { - let last_pruned = metadata.pruned_height(); + prune_to_height(db, abs_pruning_horizon)?; + } + + Ok(()) +} + +fn prune_to_height(db: &mut T, target_horizon_height: u64) -> Result<(), ChainStorageError> { + let metadata = db.fetch_chain_metadata()?; + let last_pruned = metadata.pruned_height(); + if target_horizon_height < last_pruned { + return Err(ChainStorageError::InvalidArguments { + func: "prune_to_height", + arg: "target_horizon_height", + message: format!( + "Target pruning horizon {} is less than current pruning horizon {}", + target_horizon_height, last_pruned + ), + }); + } + + if target_horizon_height == last_pruned { info!( target: LOG_TARGET, - "Pruning blockchain database at height {} (was={})", abs_pruning_horizon, last_pruned, + "Blockchain already pruned to height {}", target_horizon_height ); - let mut last_block = db.fetch_block_accumulated_data_by_height(last_pruned).or_not_found( + return Ok(()); + } + + if target_horizon_height > metadata.height_of_longest_chain() { + return Err(ChainStorageError::InvalidArguments { + func: "prune_to_height", + arg: "target_horizon_height", + message: format!( + "Target pruning horizon {} is greater than current block height {}", + target_horizon_height, + metadata.height_of_longest_chain() + ), + }); + } + + info!( + target: LOG_TARGET, + "Pruning blockchain database at height {} (was={})", target_horizon_height, last_pruned, + ); + let mut last_block = db.fetch_block_accumulated_data_by_height(last_pruned).or_not_found( + "BlockAccumulatedData", + "height", + last_pruned.to_string(), + )?; + let mut txn = DbTransaction::new(); + for block_to_prune in (last_pruned + 1)..=target_horizon_height { + let header = db.fetch_chain_header_by_height(block_to_prune)?; + let curr_block = db.fetch_block_accumulated_data_by_height(block_to_prune).or_not_found( "BlockAccumulatedData", "height", - last_pruned.to_string(), + block_to_prune.to_string(), )?; - let mut txn = DbTransaction::new(); - for block_to_prune in (last_pruned + 1)..abs_pruning_horizon { - let curr_block = db.fetch_block_accumulated_data_by_height(block_to_prune).or_not_found( - "BlockAccumulatedData", - "height", - block_to_prune.to_string(), - )?; - // Note, this could actually be done in one step instead of each block, since deleted is - // accumulated - let inputs_to_prune = curr_block.deleted.bitmap().clone() - last_block.deleted.bitmap(); - last_block = curr_block; - - txn.prune_outputs_and_update_horizon(inputs_to_prune.to_vec(), block_to_prune); - } + // Note, this could actually be done in one step instead of each block, since deleted is + // accumulated + let output_mmr_positions = curr_block.deleted() - last_block.deleted(); + last_block = curr_block; - db.write(txn)?; + txn.prune_outputs_at_positions(output_mmr_positions.to_vec()); + txn.delete_all_inputs_in_block(header.hash().clone()); } + txn.set_pruned_height(target_horizon_height); + + db.write(txn)?; Ok(()) } diff --git a/base_layer/core/src/chain_storage/db_transaction.rs b/base_layer/core/src/chain_storage/db_transaction.rs index 066528f73b..bfdd480033 100644 --- a/base_layer/core/src/chain_storage/db_transaction.rs +++ b/base_layer/core/src/chain_storage/db_transaction.rs @@ -1,18 +1,3 @@ -use std::{ - fmt, - fmt::{Display, Error, Formatter}, - sync::Arc, -}; - -use croaring::Bitmap; -use tari_crypto::tari_utilities::{ - hex::{to_hex, Hex}, - Hashable, -}; - -use tari_common_types::types::{BlockHash, Commitment, HashOutput}; -use tari_mmr::pruned_hashset::PrunedHashSet; - // Copyright 2019. The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the @@ -34,14 +19,28 @@ use tari_mmr::pruned_hashset::PrunedHashSet; // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + use crate::{ - blocks::{Block, BlockHeader, BlockHeaderAccumulatedData, ChainBlock, ChainHeader}, - chain_storage::{error::ChainStorageError, MmrTree}, + blocks::{Block, BlockHeader, BlockHeaderAccumulatedData, ChainBlock, ChainHeader, UpdateBlockAccumulatedData}, + chain_storage::error::ChainStorageError, transactions::transaction_entities::{ transaction_kernel::TransactionKernel, transaction_output::TransactionOutput, }, }; +use std::{ + fmt, + fmt::{Display, Error, Formatter}, + sync::Arc, +}; + +use crate::chain_storage::HorizonData; +use croaring::Bitmap; +use tari_common_types::types::{BlockHash, Commitment, HashOutput}; +use tari_crypto::tari_utilities::{ + hex::{to_hex, Hex}, + Hashable, +}; #[derive(Debug)] pub struct DbTransaction { @@ -148,31 +147,26 @@ impl DbTransaction { self } - pub fn update_pruned_hash_set( - &mut self, - mmr_tree: MmrTree, - header_hash: HashOutput, - pruned_hash_set: PrunedHashSet, - ) -> &mut Self { - self.operations.push(WriteOperation::UpdatePrunedHashSet { - mmr_tree, - header_hash, - pruned_hash_set: Box::new(pruned_hash_set), + pub fn prune_outputs_at_positions(&mut self, output_mmr_positions: Vec) -> &mut Self { + self.operations.push(WriteOperation::PruneOutputsAtMmrPositions { + output_positions: output_mmr_positions, }); self } - pub fn prune_outputs_and_update_horizon(&mut self, output_mmr_positions: Vec, horizon: u64) -> &mut Self { - self.operations.push(WriteOperation::PruneOutputsAndUpdateHorizon { - output_positions: output_mmr_positions, - horizon, - }); + pub fn delete_all_inputs_in_block(&mut self, block_hash: BlockHash) -> &mut Self { + self.operations + .push(WriteOperation::DeleteAllInputsInBlock { block_hash }); self } - pub fn update_deleted_with_diff(&mut self, header_hash: HashOutput, deleted: Bitmap) -> &mut Self { + pub fn update_block_accumulated_data( + &mut self, + header_hash: HashOutput, + values: UpdateBlockAccumulatedData, + ) -> &mut Self { self.operations - .push(WriteOperation::UpdateDeletedBlockAccumulatedDataWithDiff { header_hash, deleted }); + .push(WriteOperation::UpdateBlockAccumulatedData { header_hash, values }); self } @@ -248,11 +242,14 @@ impl DbTransaction { self } - pub fn set_pruned_height(&mut self, height: u64, kernel_sum: Commitment, utxo_sum: Commitment) -> &mut Self { - self.operations.push(WriteOperation::SetPrunedHeight { - height, - kernel_sum, - utxo_sum, + pub fn set_pruned_height(&mut self, height: u64) -> &mut Self { + self.operations.push(WriteOperation::SetPrunedHeight { height }); + self + } + + pub fn set_horizon_data(&mut self, kernel_sum: Commitment, utxo_sum: Commitment) -> &mut Self { + self.operations.push(WriteOperation::SetHorizonData { + horizon_data: HorizonData::new(kernel_sum, utxo_sum), }); self } @@ -304,25 +301,18 @@ pub enum WriteOperation { DeleteOrphanChainTip(HashOutput), InsertOrphanChainTip(HashOutput), InsertMoneroSeedHeight(Vec, u64), - UpdatePrunedHashSet { - mmr_tree: MmrTree, + UpdateBlockAccumulatedData { header_hash: HashOutput, - pruned_hash_set: Box, - }, - UpdateDeletedBlockAccumulatedDataWithDiff { - header_hash: HashOutput, - deleted: Bitmap, + values: UpdateBlockAccumulatedData, }, UpdateDeletedBitmap { deleted: Bitmap, }, - PruneOutputsAndUpdateHorizon { + PruneOutputsAtMmrPositions { output_positions: Vec, - horizon: u64, }, - UpdateKernelSum { - header_hash: HashOutput, - kernel_sum: Commitment, + DeleteAllInputsInBlock { + block_hash: BlockHash, }, SetAccumulatedDataForOrphan(BlockHeaderAccumulatedData), SetBestBlock { @@ -334,8 +324,9 @@ pub enum WriteOperation { SetPruningHorizonConfig(u64), SetPrunedHeight { height: u64, - kernel_sum: Commitment, - utxo_sum: Commitment, + }, + SetHorizonData { + horizon_data: HorizonData, }, } @@ -389,14 +380,6 @@ impl fmt::Display for WriteOperation { write!(f, "Insert Monero seed string {} for height: {}", data.to_hex(), height) }, InsertChainOrphanBlock(block) => write!(f, "InsertChainOrphanBlock({})", block.hash().to_hex()), - UpdatePrunedHashSet { - mmr_tree, header_hash, .. - } => write!( - f, - "Update pruned hash set: {} header: {}", - mmr_tree, - header_hash.to_hex() - ), InsertPrunedOutput { header_hash: _, header_height: _, @@ -404,23 +387,14 @@ impl fmt::Display for WriteOperation { witness_hash: _, mmr_position: _, } => write!(f, "Insert pruned output"), - UpdateDeletedBlockAccumulatedDataWithDiff { - header_hash: _, - deleted: _, - } => write!(f, "Add deleted data for block"), + UpdateBlockAccumulatedData { header_hash, .. } => { + write!(f, "Update Block data for block {}", header_hash.to_hex()) + }, UpdateDeletedBitmap { deleted } => { write!(f, "Merge deleted bitmap at tip ({} new indexes)", deleted.cardinality()) }, - PruneOutputsAndUpdateHorizon { - output_positions, - horizon, - } => write!( - f, - "Prune {} outputs and set horizon to {}", - output_positions.len(), - horizon - ), - UpdateKernelSum { header_hash, .. } => write!(f, "Update kernel sum for block: {}", header_hash.to_hex()), + PruneOutputsAtMmrPositions { output_positions } => write!(f, "Prune {} output(s)", output_positions.len()), + DeleteAllInputsInBlock { block_hash } => write!(f, "Delete outputs in block {}", block_hash.to_hex()), SetAccumulatedDataForOrphan(accumulated_data) => { write!(f, "Set accumulated data for orphan {}", accumulated_data) }, @@ -440,6 +414,7 @@ impl fmt::Display for WriteOperation { SetPrunedHeight { height, .. } => write!(f, "Set pruned height to {}", height), DeleteHeader(height) => write!(f, "Delete header at height: {}", height), DeleteOrphan(hash) => write!(f, "Delete orphan with hash: {}", hash.to_hex()), + SetHorizonData { .. } => write!(f, "Set horizon data"), } } } diff --git a/base_layer/core/src/chain_storage/error.rs b/base_layer/core/src/chain_storage/error.rs index 14cbd7a53f..c776ec2222 100644 --- a/base_layer/core/src/chain_storage/error.rs +++ b/base_layer/core/src/chain_storage/error.rs @@ -169,13 +169,7 @@ pub trait OrNotFound { impl OrNotFound for Result, ChainStorageError> { fn or_not_found(self, entity: &'static str, field: &'static str, value: String) -> Result { - match self { - Ok(inner) => match inner { - None => Err(ChainStorageError::ValueNotFound { entity, field, value }), - Some(v) => Ok(v), - }, - Err(err) => Err(err), - } + self.and_then(|inner| inner.ok_or(ChainStorageError::ValueNotFound { entity, field, value })) } } diff --git a/base_layer/core/src/chain_storage/horizon_data.rs b/base_layer/core/src/chain_storage/horizon_data.rs index 6213d490f3..ae6a120bb4 100644 --- a/base_layer/core/src/chain_storage/horizon_data.rs +++ b/base_layer/core/src/chain_storage/horizon_data.rs @@ -21,9 +21,8 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use serde::{Deserialize, Serialize}; use tari_common_types::types::Commitment; -use tari_crypto::tari_utilities::ByteArray; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct HorizonData { kernel_sum: Commitment, utxo_sum: Commitment, @@ -35,10 +34,7 @@ impl HorizonData { } pub fn zero() -> Self { - HorizonData { - kernel_sum: Commitment::from_bytes(&[0u8; 32]).expect("Could not create commitment"), - utxo_sum: Commitment::from_bytes(&[0u8; 32]).expect("Could not create commitment"), - } + Default::default() } pub fn kernel_sum(&self) -> &Commitment { diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 0d3e18a6a5..6a8908ad75 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -24,22 +24,14 @@ // let's ignore this clippy error in this module #![allow(clippy::ptr_arg)] -use std::{convert::TryFrom, fmt, fs, fs::File, ops::Deref, path::Path, sync::Arc, time::Instant}; - use croaring::Bitmap; use fs2::FileExt; use lmdb_zero::{ConstTransaction, Database, Environment, ReadTransaction, WriteTransaction}; use log::*; use serde::{Deserialize, Serialize}; +use std::{fmt, fs, fs::File, ops::Deref, path::Path, sync::Arc, time::Instant}; use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex, ByteArray}; -use tari_common_types::{ - chain_metadata::ChainMetadata, - types::{BlockHash, Commitment, HashDigest, HashOutput, Signature, BLOCK_HASH_LENGTH}, -}; -use tari_mmr::{pruned_hashset::PrunedHashSet, Hash, MerkleMountainRange, MutableMmr}; -use tari_storage::lmdb_store::{db, LMDBBuilder, LMDBConfig, LMDBStore}; - use crate::{ blocks::{ Block, @@ -49,6 +41,7 @@ use crate::{ ChainBlock, ChainHeader, DeletedBitmap, + UpdateBlockAccumulatedData, }, chain_storage::{ db_transaction::{DbKey, DbTransaction, DbValue, WriteOperation}, @@ -94,6 +87,12 @@ use crate::{ }, }, }; +use tari_common_types::{ + chain_metadata::ChainMetadata, + types::{BlockHash, Commitment, HashDigest, HashOutput, Signature, BLOCK_HASH_LENGTH}, +}; +use tari_mmr::{Hash, MerkleMountainRange, MutableMmr}; +use tari_storage::lmdb_store::{db, LMDBBuilder, LMDBConfig, LMDBStore}; type DatabaseRef = Arc>; @@ -320,32 +319,19 @@ impl LMDBDatabase { self.insert_orphan_block(&write_txn, chain_block.block())?; self.set_accumulated_data_for_orphan(&write_txn, chain_block.accumulated_data())?; }, - UpdatePrunedHashSet { - mmr_tree, - header_hash, - pruned_hash_set, - } => { - self.update_pruned_hash_set(&write_txn, *mmr_tree, header_hash, (**pruned_hash_set).clone())?; - }, - UpdateDeletedBlockAccumulatedDataWithDiff { header_hash, deleted } => { - self.update_deleted_block_accumulated_data_with_diff(&write_txn, header_hash, deleted.clone())?; + UpdateBlockAccumulatedData { header_hash, values } => { + self.update_block_accumulated_data(&write_txn, header_hash, values.clone())?; }, UpdateDeletedBitmap { deleted } => { let mut bitmap = self.load_deleted_bitmap_model(&write_txn)?; bitmap.merge(deleted)?; bitmap.finish()?; }, - PruneOutputsAndUpdateHorizon { - output_positions, - horizon, - } => { - self.prune_outputs_and_update_horizon(&write_txn, output_positions, *horizon)?; + PruneOutputsAtMmrPositions { output_positions } => { + self.prune_outputs_at_positions(&write_txn, output_positions)?; }, - UpdateKernelSum { - header_hash, - kernel_sum, - } => { - self.update_block_accumulated_data_kernel_sum(&write_txn, header_hash, kernel_sum.clone())?; + DeleteAllInputsInBlock { block_hash } => { + self.delete_all_inputs_in_block(&write_txn, block_hash)?; }, SetBestBlock { height, @@ -397,20 +383,18 @@ impl LMDBDatabase { MetadataValue::PruningHorizon(*pruning_horizon), )?; }, - SetPrunedHeight { - height, - kernel_sum, - utxo_sum, - } => { + SetPrunedHeight { height } => { self.set_metadata( &write_txn, MetadataKey::PrunedHeight, MetadataValue::PrunedHeight(*height), )?; + }, + SetHorizonData { horizon_data } => { self.set_metadata( &write_txn, MetadataKey::HorizonData, - MetadataValue::HorizonData(HorizonData::new(kernel_sum.clone(), utxo_sum.clone())), + MetadataValue::HorizonData(horizon_data.clone()), )?; }, } @@ -1049,17 +1033,16 @@ impl LMDBDatabase { self.fetch_block_accumulated_data(&*txn, header.height - 1)? .ok_or_else(|| ChainStorageError::ValueNotFound { entity: "BlockAccumulatedData", - field: "prev_hash", - value: header.prev_hash.to_hex(), + field: "height", + value: (header.height - 1).to_string(), })? }; let mut total_kernel_sum = Commitment::default(); - let mut total_utxo_sum = Commitment::default(); let BlockAccumulatedData { kernels: pruned_kernel_set, outputs: pruned_output_set, - range_proofs: pruned_proof_set, + witness: pruned_proof_set, .. } = data; @@ -1079,7 +1062,6 @@ impl LMDBDatabase { let mut output_mmr = MutableMmr::::new(pruned_output_set, Bitmap::create())?; let mut witness_mmr = MerkleMountainRange::::new(pruned_proof_set); for output in outputs { - total_utxo_sum = &total_utxo_sum + &output.commitment; output_mmr.push(output.hash())?; witness_mmr.push(output.witness_hash())?; debug!(target: LOG_TARGET, "Inserting output `{}`", output.commitment.to_hex()); @@ -1093,7 +1075,6 @@ impl LMDBDatabase { } for input in inputs { - total_utxo_sum = &total_utxo_sum - &input.commitment; let index = self .fetch_mmr_leaf_index(&**txn, MmrTree::Utxo, &input.output_hash())? .ok_or(ChainStorageError::UnspendableInput)?; @@ -1151,31 +1132,11 @@ impl LMDBDatabase { ) } - fn update_block_accumulated_data_kernel_sum( + fn update_block_accumulated_data( &self, write_txn: &WriteTransaction<'_>, header_hash: &HashOutput, - kernel_sum: Commitment, - ) -> Result<(), ChainStorageError> { - let height = self.fetch_height_from_hash(write_txn, header_hash).or_not_found( - "BlockHash", - "hash", - header_hash.to_hex(), - )?; - let mut block_accum_data = self - .fetch_block_accumulated_data(write_txn, height)? - .unwrap_or_default(); - - block_accum_data.kernel_sum = kernel_sum; - lmdb_replace(write_txn, &self.block_accumulated_data_db, &height, &block_accum_data)?; - Ok(()) - } - - fn update_deleted_block_accumulated_data_with_diff( - &self, - write_txn: &WriteTransaction<'_>, - header_hash: &HashOutput, - deleted: Bitmap, + values: UpdateBlockAccumulatedData, ) -> Result<(), ChainStorageError> { let height = self.fetch_height_from_hash(write_txn, header_hash).or_not_found( "BlockHash", @@ -1184,10 +1145,25 @@ impl LMDBDatabase { )?; let mut block_accum_data = self - .fetch_block_accumulated_data(write_txn, height)? + .fetch_block_accumulated_data(&*write_txn, height)? .unwrap_or_default(); - block_accum_data.deleted = deleted.into(); + if let Some(deleted_diff) = values.deleted_diff { + block_accum_data.deleted = deleted_diff; + } + if let Some(kernel_sum) = values.kernel_sum { + block_accum_data.kernel_sum = kernel_sum; + } + if let Some(kernel_hash_set) = values.kernel_hash_set { + block_accum_data.kernels = kernel_hash_set; + } + if let Some(utxo_hash_set) = values.utxo_hash_set { + block_accum_data.outputs = utxo_hash_set; + } + if let Some(witness_hash_set) = values.witness_hash_set { + block_accum_data.witness = witness_hash_set; + } + lmdb_replace(write_txn, &self.block_accumulated_data_db, &height, &block_accum_data)?; Ok(()) } @@ -1215,36 +1191,21 @@ impl LMDBDatabase { Ok(()) } - fn update_pruned_hash_set( + fn delete_all_inputs_in_block( &self, - write_txn: &WriteTransaction<'_>, - mmr_tree: MmrTree, - header_hash: &HashOutput, - pruned_hash_set: PrunedHashSet, + txn: &WriteTransaction<'_>, + block_hash: &BlockHash, ) -> Result<(), ChainStorageError> { - let height = self.fetch_height_from_hash(write_txn, header_hash).or_not_found( - "BlockHash", - "hash", - header_hash.to_hex(), - )?; - let mut block_accum_data = self - .fetch_block_accumulated_data(write_txn, height)? - .unwrap_or_default(); - match mmr_tree { - MmrTree::Kernel => block_accum_data.kernels = pruned_hash_set, - MmrTree::Utxo => block_accum_data.outputs = pruned_hash_set, - MmrTree::Witness => block_accum_data.range_proofs = pruned_hash_set, - } - - lmdb_replace(write_txn, &self.block_accumulated_data_db, &height, &block_accum_data)?; + let inputs = + lmdb_delete_keys_starting_with::(txn, &self.inputs_db, block_hash.to_hex().as_str())?; + debug!(target: LOG_TARGET, "Deleted {} input(s)", inputs.len()); Ok(()) } - fn prune_outputs_and_update_horizon( + fn prune_outputs_at_positions( &self, write_txn: &WriteTransaction<'_>, output_positions: &[u32], - horizon: u64, ) -> Result<(), ChainStorageError> { for pos in output_positions { let (_height, hash) = lmdb_first_after::<_, (u64, Vec)>( @@ -1258,12 +1219,6 @@ impl LMDBDatabase { self.prune_output(write_txn, &key)?; } - self.set_metadata( - write_txn, - MetadataKey::PrunedHeight, - MetadataValue::PrunedHeight(horizon), - )?; - Ok(()) } @@ -1628,12 +1583,11 @@ impl BlockchainBackend for LMDBDatabase { fn fetch_kernels_in_block(&self, header_hash: &HashOutput) -> Result, ChainStorageError> { let txn = self.read_transaction()?; - Ok( - lmdb_fetch_keys_starting_with(header_hash.to_hex().as_str(), &txn, &self.kernels_db)? - .into_iter() - .map(|f: TransactionKernelRowData| f.kernel) - .collect(), - ) + let kernels = lmdb_fetch_keys_starting_with(header_hash.to_hex().as_str(), &txn, &self.kernels_db)? + .into_iter() + .map(|f: TransactionKernelRowData| f.kernel) + .collect(); + Ok(kernels) } fn fetch_kernel_by_excess( @@ -1671,163 +1625,58 @@ impl BlockchainBackend for LMDBDatabase { } } - fn fetch_kernels_by_mmr_position(&self, start: u64, end: u64) -> Result, ChainStorageError> { + fn fetch_utxos_in_block( + &self, + header_hash: &HashOutput, + deleted: Option<&Bitmap>, + ) -> Result<(Vec, Bitmap), ChainStorageError> { let txn = self.read_transaction()?; - if let Some(start_height) = lmdb_first_after(&txn, &self.kernel_mmr_size_index, &(start + 1).to_be_bytes())? { - let end_height: u64 = - lmdb_first_after(&txn, &self.kernel_mmr_size_index, &(end + 1).to_be_bytes())?.unwrap_or(start_height); - let previous_mmr_count = if start_height == 0 { - 0 + let utxos = lmdb_fetch_keys_starting_with::( + header_hash.to_hex().as_str(), + &txn, + &self.utxos_db, + )? + .into_iter() + .map(|row| { + if deleted.map(|b| b.contains(row.mmr_position)).unwrap_or(false) { + return PrunedOutput::Pruned { + output_hash: row.hash, + witness_hash: row.witness_hash, + }; + } + if let Some(output) = row.output { + PrunedOutput::NotPruned { output } } else { - let header: BlockHeader = - lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist"); - debug!(target: LOG_TARGET, "Previous header:{}", header); - header.kernel_mmr_size - }; - - let total_size = (end - start) as usize + 1; - let mut result = Vec::with_capacity(total_size); - - let mut skip_amount = (start - previous_mmr_count) as usize; - debug!( - target: LOG_TARGET, - "Fetching kernels by MMR position. Start {}, end {}, in headers at height {}-{}, prev mmr count: {}, \ - skipping the first:{}", - start, - end, - start_height, - end_height, - previous_mmr_count, - skip_amount - ); - - for height in start_height..=end_height { - let hash = lmdb_get::<_, BlockHeaderAccumulatedData>(&txn, &self.header_accumulated_data_db, &height)? - .ok_or_else(|| ChainStorageError::ValueNotFound { - entity: "BlockHeader", - field: "height", - value: height.to_string(), - })? - .hash; - - result.extend( - lmdb_fetch_keys_starting_with::( - hash.to_hex().as_str(), - &txn, - &self.kernels_db, - )? - .into_iter() - .skip(skip_amount) - .take(total_size - result.len()) - .map(|f| f.kernel), - ); - - skip_amount = 0; + PrunedOutput::Pruned { + output_hash: row.hash, + witness_hash: row.witness_hash, + } } - Ok(result) - } else { - Ok(vec![]) - } - } - - fn fetch_utxos_by_mmr_position( - &self, - start: u64, - end: u64, - deleted: &Bitmap, - ) -> Result<(Vec, Bitmap), ChainStorageError> { - let txn = self.read_transaction()?; - let start_height = lmdb_first_after(&txn, &self.output_mmr_size_index, &(start + 1).to_be_bytes())? - .ok_or_else(|| { - ChainStorageError::InvalidQuery(format!( - "Unable to find block height from start output MMR index {}", - start - )) - })?; - let end_height: u64 = - lmdb_first_after(&txn, &self.output_mmr_size_index, &(end + 1).to_be_bytes())?.unwrap_or(start_height); + }) + .collect(); - let previous_mmr_count = if start_height == 0 { - 0 - } else { - let header: BlockHeader = - lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist"); - debug!(target: LOG_TARGET, "Previous header:{}", header); - header.output_mmr_size - }; + let height = + self.fetch_height_from_hash(&txn, header_hash)? + .ok_or_else(|| ChainStorageError::ValueNotFound { + entity: "BlockHeader", + field: "hash", + value: header_hash.to_hex(), + })?; - let total_size = end - .checked_sub(start) - .and_then(|v| v.checked_add(1)) - .and_then(|v| usize::try_from(v).ok()) - .ok_or_else(|| { - ChainStorageError::InvalidQuery("fetch_utxos_by_mmr_position: end is less than start".to_string()) - })?; - let mut result = Vec::with_capacity(total_size); + // Builds a BitMap of the deleted UTXO MMR indexes that occurred at the current height + let acc_data = + self.fetch_block_accumulated_data(&txn, height)? + .ok_or_else(|| ChainStorageError::ValueNotFound { + entity: "BlockAccumulatedData", + field: "height", + value: height.to_string(), + })?; - let mut skip_amount = (start - previous_mmr_count) as usize; - debug!( - target: LOG_TARGET, - "Fetching outputs by MMR position. Start {}, end {}, starting in header at height {}, prev mmr count: \ - {}, skipping the first:{}", - start, - end, - start_height, - previous_mmr_count, - skip_amount - ); let mut difference_bitmap = Bitmap::create(); + difference_bitmap.or_inplace(acc_data.deleted()); - for height in start_height..=end_height { - let accum_data = - lmdb_get::<_, BlockHeaderAccumulatedData>(&txn, &self.header_accumulated_data_db, &height)? - .ok_or_else(|| ChainStorageError::ValueNotFound { - entity: "BlockHeader", - field: "height", - value: height.to_string(), - })?; - - result.extend( - lmdb_fetch_keys_starting_with::( - accum_data.hash.to_hex().as_str(), - &txn, - &self.utxos_db, - )? - .into_iter() - .skip(skip_amount) - .take(total_size - result.len()) - .map(|row| { - if deleted.contains(row.mmr_position) { - return PrunedOutput::Pruned { - output_hash: row.hash, - witness_hash: row.witness_hash, - }; - } - if let Some(output) = row.output { - PrunedOutput::NotPruned { output } - } else { - PrunedOutput::Pruned { - output_hash: row.hash, - witness_hash: row.witness_hash, - } - } - }), - ); - - // Builds a BitMap of the deleted UTXO MMR indexes that occurred at the current height - let diff_bitmap = self - .fetch_block_accumulated_data(&txn, height) - .or_not_found("BlockAccumulatedData", "height", height.to_string())? - .deleted() - .clone(); - difference_bitmap.or_inplace(&diff_bitmap); - - skip_amount = 0; - } - - difference_bitmap.run_optimize(); - Ok((result, difference_bitmap)) + Ok((utxos, difference_bitmap)) } fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { @@ -2171,7 +2020,7 @@ impl BlockchainBackend for LMDBDatabase { fn fetch_horizon_data(&self) -> Result, ChainStorageError> { let txn = self.read_transaction()?; - fetch_horizon_data(&txn, &self.metadata_db) + Ok(Some(fetch_horizon_data(&txn, &self.metadata_db)?)) } fn get_stats(&self) -> Result { @@ -2228,7 +2077,7 @@ fn fetch_chain_height(txn: &ConstTransaction<'_>, db: &Database) -> Result, db: &Database) -> Result { let k = MetadataKey::PrunedHeight; let val: Option = lmdb_get(txn, db, &k.as_u32())?; @@ -2237,18 +2086,22 @@ fn fetch_pruned_height(txn: &ConstTransaction<'_>, db: &Database) -> Result Ok(0), } } -// Fetches the best block hash from the provided metadata db. -fn fetch_horizon_data(txn: &ConstTransaction<'_>, db: &Database) -> Result, ChainStorageError> { + +/// Fetches the horizon data from the provided metadata db. +fn fetch_horizon_data(txn: &ConstTransaction<'_>, db: &Database) -> Result { let k = MetadataKey::HorizonData; let val: Option = lmdb_get(txn, db, &k.as_u32())?; match val { - Some(MetadataValue::HorizonData(data)) => Ok(Some(data)), - None => Ok(None), - _ => Err(ChainStorageError::ValueNotFound { - entity: "ChainMetadata", - field: "HorizonData", + Some(MetadataValue::HorizonData(data)) => Ok(data), + None => Err(ChainStorageError::ValueNotFound { + entity: "HorizonData", + field: "metadata", value: "".to_string(), }), + Some(k) => Err(ChainStorageError::DataInconsistencyDetected { + function: "fetch_horizon_data", + details: format!("Received incorrect value {:?} for key horizon data", k), + }), } } // Fetches the best block hash from the provided metadata db. diff --git a/base_layer/core/src/chain_storage/tests/blockchain_database.rs b/base_layer/core/src/chain_storage/tests/blockchain_database.rs index 4bcd6099ec..0d7365229d 100644 --- a/base_layer/core/src/chain_storage/tests/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/tests/blockchain_database.rs @@ -20,15 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::sync::Arc; - -use tari_common::configuration::Network; -use tari_test_utils::unpack_enum; - use crate::{ blocks::{Block, BlockHeader, NewBlockTemplate}, chain_storage::{BlockchainDatabase, ChainStorageError}, consensus::ConsensusManager, + crypto::tari_utilities::hex::Hex, proof_of_work::Difficulty, tari_utilities::Hashable, test_helpers::{ @@ -38,9 +34,18 @@ use crate::{ }, transactions::{ tari_amount::T, - transaction_entities::{transaction::Transaction, unblinded_output::UnblindedOutput}, + test_helpers::{schema_to_transaction, TransactionSchema}, + transaction_entities::{ + output_features::OutputFeatures, + transaction::Transaction, + unblinded_output::UnblindedOutput, + }, }, + txn_schema, }; +use std::sync::Arc; +use tari_common::configuration::Network; +use tari_test_utils::unpack_enum; fn setup() -> BlockchainDatabase { create_new_blockchain() @@ -62,7 +67,13 @@ fn add_many_chained_blocks( size: usize, db: &BlockchainDatabase, ) -> (Vec>, Vec) { - let mut prev_block = Arc::new(db.fetch_block(0).unwrap().try_into_block().unwrap()); + let last_header = db.fetch_last_header().unwrap(); + let mut prev_block = db + .fetch_block(last_header.height) + .unwrap() + .try_into_block() + .map(Arc::new) + .unwrap(); let mut blocks = Vec::with_capacity(size); let mut outputs = Vec::with_capacity(size); for _ in 1..=size as u64 { @@ -353,16 +364,6 @@ mod fetch_block_hashes_from_header_tip { } mod add_block { - use crate::{ - chain_storage::ChainStorageError, - crypto::tari_utilities::hex::Hex, - transactions::{ - tari_amount::T, - test_helpers::{schema_to_transaction, TransactionSchema}, - transaction_entities::output_features::OutputFeatures, - }, - txn_schema, - }; use super::*; @@ -484,3 +485,90 @@ mod prepare_new_block { assert_eq!(block.header.height, 1); } } + +mod fetch_header_containing_utxo_mmr { + use super::*; + + #[test] + fn it_returns_genesis() { + let db = setup(); + let genesis = db.fetch_block(0).unwrap(); + assert_eq!(genesis.block().body.outputs().len(), 4001); + let mut mmr_position = 0; + genesis.block().body.outputs().iter().for_each(|_| { + let header = db.fetch_header_containing_utxo_mmr(mmr_position).unwrap(); + assert_eq!(header.height(), 0); + mmr_position += 1; + }); + let err = db.fetch_header_containing_utxo_mmr(4002).unwrap_err(); + matches!(err, ChainStorageError::ValueNotFound { .. }); + } + + #[test] + fn it_returns_corresponding_header() { + let db = setup(); + let genesis = db.fetch_block(0).unwrap(); + let _ = add_many_chained_blocks(5, &db); + let num_genesis_outputs = genesis.block().body.outputs().len() as u64; + + for i in 1..=5 { + let header = db.fetch_header_containing_utxo_mmr(num_genesis_outputs + i).unwrap(); + assert_eq!(header.height(), i); + } + let err = db + .fetch_header_containing_utxo_mmr(num_genesis_outputs + 5 + 1) + .unwrap_err(); + matches!(err, ChainStorageError::ValueNotFound { .. }); + } +} + +mod fetch_header_containing_kernel_mmr { + use super::*; + + #[test] + fn it_returns_genesis() { + let db = setup(); + let genesis = db.fetch_block(0).unwrap(); + assert_eq!(genesis.block().body.kernels().len(), 2); + let mut mmr_position = 0; + genesis.block().body.kernels().iter().for_each(|_| { + let header = db.fetch_header_containing_kernel_mmr(mmr_position).unwrap(); + assert_eq!(header.height(), 0); + mmr_position += 1; + }); + let err = db.fetch_header_containing_kernel_mmr(3).unwrap_err(); + matches!(err, ChainStorageError::ValueNotFound { .. }); + } + + #[test] + fn it_returns_corresponding_header() { + let db = setup(); + let genesis = db.fetch_block(0).unwrap(); + let (blocks, outputs) = add_many_chained_blocks(1, &db); + let num_genesis_kernels = genesis.block().body.kernels().len() as u64; + let (txns, _) = schema_to_transaction(&[txn_schema!(from: vec![outputs[0].clone()], to: vec![50 * T])]); + + let (block, _) = create_next_block(&blocks[0], txns); + db.add_block(block).unwrap(); + let _ = add_many_chained_blocks(3, &db); + + let header = db.fetch_header_containing_kernel_mmr(num_genesis_kernels).unwrap(); + assert_eq!(header.height(), 0); + let header = db.fetch_header_containing_kernel_mmr(num_genesis_kernels + 1).unwrap(); + assert_eq!(header.height(), 1); + + for i in 2..=3 { + let header = db.fetch_header_containing_kernel_mmr(num_genesis_kernels + i).unwrap(); + assert_eq!(header.height(), 2); + } + for i in 4..=6 { + let header = db.fetch_header_containing_kernel_mmr(num_genesis_kernels + i).unwrap(); + assert_eq!(header.height(), i - 1); + } + + let err = db + .fetch_header_containing_kernel_mmr(num_genesis_kernels + 6 + 1) + .unwrap_err(); + matches!(err, ChainStorageError::ValueNotFound { .. }); + } +} diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index 462c972621..f18b55ee65 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -268,20 +268,12 @@ impl BlockchainBackend for TempDatabase { self.db.as_ref().unwrap().fetch_kernel_by_excess_sig(excess_sig) } - fn fetch_kernels_by_mmr_position(&self, start: u64, end: u64) -> Result, ChainStorageError> { - self.db.as_ref().unwrap().fetch_kernels_by_mmr_position(start, end) - } - - fn fetch_utxos_by_mmr_position( + fn fetch_utxos_in_block( &self, - start: u64, - end: u64, - deleted: &Bitmap, + header_hash: &HashOutput, + deleted: Option<&Bitmap>, ) -> Result<(Vec, Bitmap), ChainStorageError> { - self.db - .as_ref() - .unwrap() - .fetch_utxos_by_mmr_position(start, end, deleted) + self.db.as_ref().unwrap().fetch_utxos_in_block(header_hash, deleted) } fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { diff --git a/base_layer/core/src/transactions/aggregated_body.rs b/base_layer/core/src/transactions/aggregated_body.rs index 1592115d7b..0cda236641 100644 --- a/base_layer/core/src/transactions/aggregated_body.rs +++ b/base_layer/core/src/transactions/aggregated_body.rs @@ -462,11 +462,7 @@ impl AggregateBody { fn validate_range_proofs(&self, range_proof_service: &RangeProofService) -> Result<(), TransactionError> { trace!(target: LOG_TARGET, "Checking range proofs"); for o in &self.outputs { - if !o.verify_range_proof(range_proof_service)? { - return Err(TransactionError::ValidationError( - "Range proof could not be verified".into(), - )); - } + o.verify_range_proof(range_proof_service)?; } Ok(()) } diff --git a/base_layer/core/src/transactions/coinbase_builder.rs b/base_layer/core/src/transactions/coinbase_builder.rs index e6e3450465..b74751feb8 100644 --- a/base_layer/core/src/transactions/coinbase_builder.rs +++ b/base_layer/core/src/transactions/coinbase_builder.rs @@ -324,7 +324,7 @@ mod test { assert!(factories .commitment .open_value(&p.spend_key, block_reward.into(), utxo.commitment())); - assert!(utxo.verify_range_proof(&factories.range_proof).unwrap()); + utxo.verify_range_proof(&factories.range_proof).unwrap(); assert!(utxo.features.flags.contains(OutputFlags::COINBASE_OUTPUT)); assert_eq!( tx.body.check_coinbase_output( diff --git a/base_layer/core/src/transactions/transaction_entities/mod.rs b/base_layer/core/src/transactions/transaction_entities/mod.rs index 34a1d76771..396f9cb1c7 100644 --- a/base_layer/core/src/transactions/transaction_entities/mod.rs +++ b/base_layer/core/src/transactions/transaction_entities/mod.rs @@ -156,7 +156,7 @@ mod test { }); let script = unblinded_output1.script.clone(); let tx_output1 = unblinded_output1.as_transaction_output(&factories).unwrap(); - assert!(tx_output1.verify_range_proof(&factories.range_proof).unwrap()); + tx_output1.verify_range_proof(&factories.range_proof).unwrap(); let unblinded_output2 = test_params_2.create_unblinded_output(UtxoTestParams { value: (2u64.pow(32) + 1u64).into(), @@ -196,7 +196,7 @@ mod test { ) .unwrap(), ); - assert!(!tx_output3.verify_range_proof(&factories.range_proof).unwrap()); + tx_output3.verify_range_proof(&factories.range_proof).unwrap_err(); } #[test] diff --git a/base_layer/core/src/transactions/transaction_entities/transaction_output.rs b/base_layer/core/src/transactions/transaction_entities/transaction_output.rs index 204278b27a..a9f5a290eb 100644 --- a/base_layer/core/src/transactions/transaction_entities/transaction_output.rs +++ b/base_layer/core/src/transactions/transaction_entities/transaction_output.rs @@ -117,8 +117,14 @@ impl TransactionOutput { } /// Verify that range proof is valid - pub fn verify_range_proof(&self, prover: &RangeProofService) -> Result { - Ok(prover.verify(&self.proof.0, &self.commitment)) + pub fn verify_range_proof(&self, prover: &RangeProofService) -> Result<(), TransactionError> { + if prover.verify(&self.proof.0, &self.commitment) { + Ok(()) + } else { + Err(TransactionError::ValidationError( + "Recipient output range proof failed to verify".to_string(), + )) + } } /// Verify that the metadata signature is valid diff --git a/base_layer/core/src/transactions/transaction_protocol/recipient.rs b/base_layer/core/src/transactions/transaction_protocol/recipient.rs index 2ddf1c5e02..af92a1b2ee 100644 --- a/base_layer/core/src/transactions/transaction_protocol/recipient.rs +++ b/base_layer/core/src/transactions/transaction_protocol/recipient.rs @@ -266,7 +266,7 @@ mod test { assert!(factories .commitment .open_value(&p.spend_key, 500, &data.output.commitment)); - assert!(data.output.verify_range_proof(&factories.range_proof).unwrap()); + data.output.verify_range_proof(&factories.range_proof).unwrap(); let r_sum = &msg.public_nonce + &p.public_nonce; let e = build_challenge(&r_sum, &m); let s = Signature::sign(p.spend_key.clone(), p.nonce, &e).unwrap(); diff --git a/base_layer/core/src/transactions/transaction_protocol/sender.rs b/base_layer/core/src/transactions/transaction_protocol/sender.rs index dc90972098..3961ba05cc 100644 --- a/base_layer/core/src/transactions/transaction_protocol/sender.rs +++ b/base_layer/core/src/transactions/transaction_protocol/sender.rs @@ -395,11 +395,7 @@ impl SenderTransactionProtocol { ) -> Result<(), TPE> { match &mut self.state { SenderState::CollectingSingleSignature(info) => { - if !rec.output.verify_range_proof(prover)? { - return Err(TPE::ValidationError( - "Recipient output range proof failed to verify".into(), - )); - } + rec.output.verify_range_proof(prover)?; // Consolidate transaction info info.outputs.push(rec.output.clone()); @@ -756,7 +752,7 @@ mod test { crypto_factories::CryptoFactories, tari_amount::*, test_helpers::{create_test_input, create_unblinded_output, TestParams}, - transaction_entities::{KernelFeatures, OutputFeatures, TransactionOutput}, + transaction_entities::{KernelFeatures, OutputFeatures, TransactionError, TransactionOutput}, transaction_protocol::{ sender::SenderTransactionProtocol, single_receiver::SingleReceiverTransactionProtocol, @@ -1049,7 +1045,9 @@ mod test { Ok(_) => panic!("Range proof should have failed to verify"), Err(e) => assert_eq!( e, - TransactionProtocolError::ValidationError("Recipient output range proof failed to verify".into()) + TransactionProtocolError::TransactionBuildError(TransactionError::ValidationError( + "Recipient output range proof failed to verify".into() + )) ), } } diff --git a/base_layer/core/src/transactions/transaction_protocol/single_receiver.rs b/base_layer/core/src/transactions/transaction_protocol/single_receiver.rs index 93356a4a5f..9fd710d4ee 100644 --- a/base_layer/core/src/transactions/transaction_protocol/single_receiver.rs +++ b/base_layer/core/src/transactions/transaction_protocol/single_receiver.rs @@ -221,10 +221,7 @@ mod test { factories.commitment.open_value(&k, info.amount.into(), &out.commitment), "Output commitment is invalid" ); - assert!( - out.verify_range_proof(&factories.range_proof).unwrap(), - "Range proof is invalid" - ); + out.verify_range_proof(&factories.range_proof).unwrap(); assert!(out.features.flags.is_empty(), "Output features flags have changed"); } } diff --git a/base_layer/core/src/validation/error.rs b/base_layer/core/src/validation/error.rs index 369cf1831a..bc2f3bb0d5 100644 --- a/base_layer/core/src/validation/error.rs +++ b/base_layer/core/src/validation/error.rs @@ -57,7 +57,7 @@ pub enum ValidationError { InvalidAccountingBalance, #[error("Transaction contains already spent inputs")] ContainsSTxO, - #[error("Transaction contains already outputs that already exist")] + #[error("Transaction contains outputs that already exist")] ContainsTxO, #[error("Transaction contains an output commitment that already exists")] ContainsDuplicateUtxoCommitment, diff --git a/base_layer/core/tests/chain_storage_tests/chain_storage.rs b/base_layer/core/tests/chain_storage_tests/chain_storage.rs index ef6a257488..b58bba6766 100644 --- a/base_layer/core/tests/chain_storage_tests/chain_storage.rs +++ b/base_layer/core/tests/chain_storage_tests/chain_storage.rs @@ -1718,7 +1718,7 @@ fn pruned_mode_cleanup_and_fetch_block() { let _block5 = append_block(&store, &block4, vec![], &consensus_manager, 1.into()).unwrap(); let metadata = store.get_chain_metadata().unwrap(); - assert_eq!(metadata.pruned_height(), 1); + assert_eq!(metadata.pruned_height(), 2); assert_eq!(metadata.height_of_longest_chain(), 5); assert_eq!(metadata.pruning_horizon(), 3); } diff --git a/common/src/build/protobuf.rs b/common/src/build/protobuf.rs index 6898052e94..875320a468 100644 --- a/common/src/build/protobuf.rs +++ b/common/src/build/protobuf.rs @@ -24,9 +24,12 @@ where P: AsRef + Display { .output() .unwrap(); - if !out.status.success() { - panic!("status: {} - {}", out.status, String::from_utf8_lossy(&out.stderr)); - } + assert!( + out.status.success(), + "status: {} - {}", + out.status, + String::from_utf8_lossy(&out.stderr) + ); } } diff --git a/comms/Cargo.toml b/comms/Cargo.toml index 861ab55179..76b6ce3fa0 100644 --- a/comms/Cargo.toml +++ b/comms/Cargo.toml @@ -69,4 +69,4 @@ tari_common = { version = "^0.21", path = "../common", features = ["build"] } c_integration = [] avx2 = ["tari_crypto/avx2"] metrics = [] -rpc = ["tower-make"] +rpc = ["tower-make", "tower/util"] diff --git a/comms/dht/src/config.rs b/comms/dht/src/config.rs index ccfd260c9b..978f3581d9 100644 --- a/comms/dht/src/config.rs +++ b/comms/dht/src/config.rs @@ -82,6 +82,7 @@ pub struct DhtConfig { /// Default: 6 hrs pub ban_duration: Duration, /// This allows the use of test addresses in the network. + /// Default: false pub allow_test_addresses: bool, /// The maximum number of messages over `flood_ban_timespan` to allow before banning the peer (for `ban_duration`) /// Default: 1000 messages diff --git a/integration_tests/features/Propagation.feature b/integration_tests/features/Propagation.feature index 3c1f5ef8fb..54f7cfaab5 100644 --- a/integration_tests/features/Propagation.feature +++ b/integration_tests/features/Propagation.feature @@ -101,4 +101,4 @@ Feature: Block Propagation Then TX1 is in the MINED of all nodes When I mine 17 blocks on SENDER Then all nodes are on the same chain at height 21 - Then node PNODE1 has a pruned height of 15 + Then node PNODE1 has a pruned height of 16