Skip to content

Commit

Permalink
simplify db calls for sync_utxo and sync_kernel rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 25, 2021
1 parent 2c947d6 commit 90308ec
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 430 deletions.
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ message FindChainSplitResponse {
}

message SyncKernelsRequest {
bytes start_header_hash = 1;
uint64 start = 1;
bytes end_header_hash = 2;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ impl HorizonStateSync {

// We're already synced because we have full blocks higher than our target pruned height
if local_metadata.height_of_longest_chain() >= horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state was already synchronized.");
info!(
target: LOG_TARGET,
"Tip height is higher than our pruned height. Horizon state is already synchronized."
);
return StateEvent::HorizonStateSynchronized;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,20 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::num::TryFromIntError;

use thiserror::Error;
use tokio::task;

use tari_comms::{
connectivity::ConnectivityError,
peer_manager::NodeId,
protocol::rpc::{RpcError, RpcStatus},
};
use tari_mmr::error::MerkleMountainRangeError;

use crate::{
base_node::{comms_interface::CommsInterfaceError, state_machine_service::states::helpers::BaseNodeRequestError},
chain_storage::{ChainStorageError, MmrTree},
transactions::transaction_entities::error::TransactionError,
validation::ValidationError,
};
use std::num::TryFromIntError;
use tari_comms::{
connectivity::ConnectivityError,
protocol::rpc::{RpcError, RpcStatus},
};
use tari_mmr::error::MerkleMountainRangeError;
use thiserror::Error;
use tokio::task;

#[derive(Debug, Error)]
pub enum HorizonSyncError {
Expand Down Expand Up @@ -74,15 +70,6 @@ pub enum HorizonSyncError {
MerkleMountainRangeError(#[from] MerkleMountainRangeError),
#[error("Connectivity error: {0}")]
ConnectivityError(#[from] ConnectivityError),
#[error(
"Sync peer {peer} has a tip height of {remote_peer_height} which is less than the target height of \
{target_pruning_horizon}"
)]
InappropriateSyncPeer {
peer: NodeId,
target_pruning_horizon: u64,
remote_peer_height: u64,
},
}

impl From<TryFromIntError> for HorizonSyncError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,74 +135,39 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
debug!(target: LOG_TARGET, "Initializing");
self.initialize().await?;
// debug!(target: LOG_TARGET, "Initializing");
// self.initialize().await?;
debug!(target: LOG_TARGET, "Synchronizing kernels");
self.synchronize_kernels(client, to_header).await?;
debug!(target: LOG_TARGET, "Synchronizing outputs");
self.synchronize_outputs(client, to_header).await?;
Ok(())
}

async fn initialize(&mut self) -> Result<(), HorizonSyncError> {
let db = self.db();
let local_metadata = db.get_chain_metadata().await?;

if local_metadata.height_of_longest_chain() == 0 {
let horizon_data = db.fetch_horizon_data().await?;
self.utxo_sum = horizon_data.utxo_sum().clone();
self.kernel_sum = horizon_data.kernel_sum().clone();

return Ok(());
}

// let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
// let acc = db.fetch_block_accumulated_data(header.hash().clone()).await?;
let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height);
if local_metadata.pruned_height() < new_prune_height {
debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height);
db.prune_to_height(new_prune_height).await?;
}

// let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?;

// prune_to_height updates the horizon data
let horizon_data = db.fetch_horizon_data().await?;
// if *horizon_data.kernel_sum() != acc.cumulative_kernel_sum {
// error!(target: LOG_TARGET, "KERNEL SUM NOT EQUAL CALCULATED");
// }
// if *horizon_data.utxo_sum() != acc.cumulative_utxo_sum {
// error!(target: LOG_TARGET, "UTXO SUM NOT EQUAL CALCULATED");
// }

// let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?;
// if calc_kernel_sum != acc.cumulative_kernel_sum {
// error!(target: LOG_TARGET, "KERNEL SUM NOT EQUAL CALCULATED");
// }
// if calc_utxo_sum != acc.cumulative_utxo_sum {
// error!(target: LOG_TARGET, "UTXO SUM NOT EQUAL CALCULATED");
// }

// if calc_kernel_sum != *horizon_data.kernel_sum() {
// error!(target: LOG_TARGET, "HORIZON KERNEL SUM NOT EQUAL CALCULATED");
// }
// if calc_utxo_sum != *horizon_data.utxo_sum() {
// error!(target: LOG_TARGET, "HORIZON UTXO SUM NOT EQUAL CALCULATED");
// }

debug!(
target: LOG_TARGET,
"Loaded from horizon data utxo_sum = {}, kernel_sum = {}",
horizon_data.utxo_sum().to_hex(),
horizon_data.kernel_sum().to_hex(),
);
// self.utxo_sum = calc_utxo_sum;
// self.kernel_sum = calc_kernel_sum;
self.utxo_sum = horizon_data.utxo_sum().clone();
self.kernel_sum = horizon_data.kernel_sum().clone();

Ok(())
}
// async fn initialize(&mut self) -> Result<(), HorizonSyncError> {
// let db = self.db();
// let local_metadata = db.get_chain_metadata().await?;
//
// let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height);
// if local_metadata.pruned_height() < new_prune_height {
// debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height);
// db.prune_to_height(new_prune_height).await?;
// }
//
// // prune_to_height updates horizon data
// let horizon_data = db.fetch_horizon_data().await?;
//
// debug!(
// target: LOG_TARGET,
// "Loaded from horizon data utxo_sum = {}, kernel_sum = {}",
// horizon_data.utxo_sum().to_hex(),
// horizon_data.kernel_sum().to_hex(),
// );
// self.utxo_sum = horizon_data.utxo_sum().clone();
// self.kernel_sum = horizon_data.kernel_sum().clone();
//
// Ok(())
// }

async fn synchronize_kernels(
&mut self,
Expand Down Expand Up @@ -247,7 +212,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
.fetch_header_containing_kernel_mmr(local_num_kernels + 1)
.await?;
let req = SyncKernelsRequest {
start_header_hash: current_header.hash().clone(),
start: local_num_kernels,
end_header_hash: to_header.hash(),
};
let mut kernel_stream = client.sync_kernels(req).await?;
Expand Down Expand Up @@ -407,15 +372,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
debug!(
target: LOG_TARGET,
"Found header for utxos at mmr pos: {} - {} height: {}",
start + 1,
start,
current_header.header().output_mmr_size,
current_header.height()
);

let db = self.db().clone();

let mut output_hashes = vec![];
let mut witness_hashes = vec![];
let mut txn = db.write_transaction();
let mut unpruned_outputs = vec![];
let mut mmr_position = start;
Expand All @@ -435,7 +398,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
while let Some(response) = output_stream.next().await {
let res: SyncUtxosResponse = response?;

if res.mmr_index > 0 && res.mmr_index != mmr_position {
if res.mmr_index != 0 && res.mmr_index != mmr_position {
return Err(HorizonSyncError::IncorrectResponse(format!(
"Expected MMR position of {} but got {}",
mmr_position, res.mmr_index,
Expand All @@ -458,9 +421,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
);
height_utxo_counter += 1;
let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
output_hashes.push(output.hash());
witness_hashes.push(output.witness_hash());
unpruned_outputs.push(output.clone());

output_mmr.push(output.hash())?;
witness_mmr.push(output.witness_hash())?;
self.utxo_sum = &self.utxo_sum + &output.commitment;

txn.insert_output_via_horizon_sync(
Expand All @@ -481,8 +445,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
current_header.height()
);
height_txo_counter += 1;
output_hashes.push(utxo.hash.clone());
witness_hashes.push(utxo.witness_hash.clone());
output_mmr.push(utxo.hash.clone())?;
witness_mmr.push(utxo.witness_hash.clone())?;

txn.insert_pruned_output_via_horizon_sync(
utxo.hash,
utxo.witness_hash,
Expand All @@ -501,15 +466,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
)));
}

// Validate root
for hash in output_hashes.drain(..) {
output_mmr.push(hash)?;
}

for hash in witness_hashes.drain(..) {
witness_mmr.push(hash)?;
}

// Check that the difference bitmap isn't excessively large. Bitmap::deserialize panics if greater
// than isize::MAX, however isize::MAX is still an inordinate amount of data. An
// arbitrary 4 MiB limit is used.
Expand Down Expand Up @@ -559,7 +515,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
});
}

// self.validate_rangeproofs(mem::take(&mut unpruned_outputs)).await?;
self.validate_rangeproofs(mem::take(&mut unpruned_outputs)).await?;

txn.update_deleted_bitmap(diff_bitmap.clone());

Expand All @@ -577,12 +533,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone());
txn.commit().await?;

let data = self.db().fetch_horizon_data().await?;
error!(
target: LOG_TARGET,
"***************** utxo = {} ********************* ",
data.utxo_sum().to_hex(),
);
debug!(
target: LOG_TARGET,
"UTXO: {}/{}, Header #{}, added {} utxos, added {} txos in {:.2?}",
Expand All @@ -593,17 +543,28 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
height_txo_counter,
timer.elapsed()
);

height_txo_counter = 0;
height_utxo_counter = 0;
timer = Instant::now();

current_header = db.fetch_chain_header(current_header.height() + 1).await?;
debug!(
target: LOG_TARGET,
"Expecting to receive the next UTXO set for header #{}",
current_header.height()
);
if mmr_position == end {
debug!(
target: LOG_TARGET,
"Sync complete at mmr position {}, height #{}",
mmr_position,
current_header.height()
);
break;
} else {
current_header = db.fetch_chain_header(current_header.height() + 1).await?;
debug!(
target: LOG_TARGET,
"Expecting to receive the next UTXO set {}-{} for header #{}",
mmr_position,
current_header.header().output_mmr_size,
current_header.height()
);
}
},
v => {
error!(target: LOG_TARGET, "Remote node returned an invalid response {:?}", v);
Expand Down Expand Up @@ -671,25 +632,17 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
)));

let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
// TODO: Use accumulated sums
// let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?;
// let utxo_sum = &self.utxo_sum;
// let kernel_sum = &self.kernel_sum;
// if *utxo_sum != calc_utxo_sum {
// error!(target: LOG_TARGET, "UTXO sum isnt equal!");
// }
// if *kernel_sum != calc_kernel_sum {
// error!(target: LOG_TARGET, "KERNEL sum isnt equal!");
// }
// TODO: Use cumulative kernel and utxo sums
let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?;

self.shared
.sync_validators
.final_horizon_state
.validate(
&*self.db().inner().db_read_access()?,
header.height() - 1,
&self.utxo_sum,
&self.kernel_sum,
header.height(),
&calc_utxo_sum,
&calc_kernel_sum,
)
.map_err(HorizonSyncError::FinalStateValidationFailed)?;

Expand Down Expand Up @@ -742,7 +695,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
);
let (utxos, _) = self
.db()
.fetch_utxos_by_mmr_position(prev_mmr, curr_header.header().output_mmr_size - 1, bitmap.clone())
.fetch_utxos_in_block(curr_header.hash().clone(), bitmap.clone())
.await?;
trace!(
target: LOG_TARGET,
Expand All @@ -752,19 +705,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
prev_kernel_mmr,
curr_header.header().kernel_mmr_size - 1
);
let kernels = self
.db()
.fetch_kernels_by_mmr_position(prev_kernel_mmr, curr_header.header().kernel_mmr_size - 1)
.await?;

let mut utxo_sum = HomomorphicCommitment::default();
trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
let mut prune_counter = 0;
for u in utxos {
match u {
PrunedOutput::NotPruned { output } => {
utxo_sum = &output.commitment + &utxo_sum;
pruned_utxo_sum = &output.commitment + &pruned_utxo_sum;
},
_ => {
prune_counter += 1;
Expand All @@ -776,8 +723,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}
prev_mmr = curr_header.header().output_mmr_size;

pruned_utxo_sum = &utxo_sum + &pruned_utxo_sum;

let kernels = self.db().fetch_kernels_in_block(curr_header.hash().clone()).await?;
trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
for k in kernels {
pruned_kernel_sum = &k.excess + &pruned_kernel_sum;
}
Expand All @@ -791,7 +738,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
pruned_utxo_sum
);
}

Ok((pruned_utxo_sum, pruned_kernel_sum))
}

Expand Down
Loading

0 comments on commit 90308ec

Please sign in to comment.