Skip to content

Commit

Permalink
feat!: fix horizon sync after smt upgrade (#6006)
Browse files Browse the repository at this point in the history
Description
---
- Fixed horizon sync: 
  - initial sync;
  - re-sync after initial sync;
  - re-sync after being offline;
  - sync from prune node after initial sync.
- Added logic to detect genesis block outputs being spent.
- Fixed an issue where a tip block body could not be inserted due to the
input being a compact input.
- Removed unused release mode code.
- ~~Renamed some struct members and functions related to these changes
to depict their use better.~~
- **Edit:** Fixed `fn fetch_outputs_in_block_with_spend_state(...)`
whereby it did not filter out outputs with a spent state at the target
header. Updated integration level unit test
`test_horizon_sync_from_archival_node_happy_path()` to verify this
behaviour.
- **Edit:** Fixed `fn prune_outputs_spent_at_hash(..)` whereby it used
the wrong key(s) to try and prune outputs.

_**Note**: Initial prune node sync can still be optimized if we can
allow it to happen from another prune node, as this PR restricts initial
prune node sync from an archival node. That is left for another PR._

Motivation and Context
---
- Horizon sync was not working.
- There were no integration-level horizon sync unit tests.

How Has This Been Tested?
---
- Added integration-level horizon sync unit tests.
- Added integration-level block sync unit test.
- System-level tests [**TBD**]

What process can a PR reviewer use to test or verify this change?
---
- Code walk-through.
- Run the integration-level horizon sync and block sync unit tests.
- Selected output of `test_horizon_sync_from_archival_node_happy_path()`
with trace logs is added here to assist reviewers
([pr_#6006.log](https://github.com/tari-project/tari/files/14039003/pr_.6006.log)):

- The horizon sync scenario is from blocks 10 to 20, where coinbases in
blocks 10 to 15 are spent in block 16.
- Notice log extract where outputs from block 11 to block 15 as well as
corresponding inputs in block 16 are not streamed.

``` rust
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Starting UTXO stream for peer '4e0e87239540d0b25f401283c5'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Starting stream task with current_header: 14d84dc13934e94a79618fa733de7e877dd5a6e80fd67fc0d26074b6a11510e3, end_header: 0b43c2da6011e57ab95451df2630234b5e3f3674f6584e48d0d357d13a6b5e6f
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streaming TXO(s) for block #11 (14d84dc13934e94a79618fa733de7e877dd5a6e80fd67fc0d26074b6a11510e3)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 outputs in response for block #11 '14d84dc13934e94a79618fa733de7e877dd5a6e80fd67fc0d26074b6a11510e3'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 inputs in response for block #11 '14d84dc13934e94a79618fa733de7e877dd5a6e80fd67fc0d26074b6a11510e3'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streamed 0 TXOs in 231.60┬╡s (including stream backpressure)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streaming TXO(s) for block #12 (d8e09ed6b43ad65d64950f804990dc181d3798d1c8d19c6924c0f7ffd0185aa1)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 outputs in response for block #12 'd8e09ed6b43ad65d64950f804990dc181d3798d1c8d19c6924c0f7ffd0185aa1'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 inputs in response for block #12 'd8e09ed6b43ad65d64950f804990dc181d3798d1c8d19c6924c0f7ffd0185aa1'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streamed 0 TXOs in 134.80┬╡s (including stream backpressure)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streaming TXO(s) for block #13 (b31f587fbd545f6ce1be855b88388e9efbf4bf9f4205aa9088d2f1d372db2826)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 outputs in response for block #13 'b31f587fbd545f6ce1be855b88388e9efbf4bf9f4205aa9088d2f1d372db2826'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 inputs in response for block #13 'b31f587fbd545f6ce1be855b88388e9efbf4bf9f4205aa9088d2f1d372db2826'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streamed 0 TXOs in 118.10┬╡s (including stream backpressure)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streaming TXO(s) for block #14 (48271b358f9d2f08b406ee6c482f98716b3b03eef44f28610e58cca805837142)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 outputs in response for block #14 '48271b358f9d2f08b406ee6c482f98716b3b03eef44f28610e58cca805837142'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 inputs in response for block #14 '48271b358f9d2f08b406ee6c482f98716b3b03eef44f28610e58cca805837142'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streamed 0 TXOs in 102.40┬╡s (including stream backpressure)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streaming TXO(s) for block #15 (23e1772f4fd691d7e8ffed59ac8c5ac5d790352130e0c527e1ecd9e70bc9d4d1)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 outputs in response for block #15 '23e1772f4fd691d7e8ffed59ac8c5ac5d790352130e0c527e1ecd9e70bc9d4d1'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 0 inputs in response for block #15 '23e1772f4fd691d7e8ffed59ac8c5ac5d790352130e0c527e1ecd9e70bc9d4d1'
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streamed 0 TXOs in 150.90┬╡s (including stream backpressure)
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Streaming TXO(s) for block #16 (124ab4fd9e06e05a08656d071e75109f319ea856c288fbe0f79e008eb01ced89)
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment 'cab362382fa13da54ae5059eacbc0aedb91d10834569a27c3a0cc95008059f6b') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '96d3323d84ab40ba6c84a3151337cedd682e4c0a7acea2efbafa47ca5edca462') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '1c0498276efd43a04a9850313b93090c0e69fdbdd9e3b0eb1e270fdcac6dc729') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '98ebfe46f098343e40baa03c6820c26a28c39d82347947fe893b48270231df28') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '087ae3cd0c4afd40ed89e96ba4d90e117d7716622724491392eff8e5da2a1243') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '1e6bdc6fd6a443afbcf45a3415589b5f975bb24f1e745a0f1e082558874b8c41') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '381216fd50669b29027e2fd6943b8cf018e4edf4bee33a30fc41d8b62baf9212') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment 'b8887c04b12ff98ff75f9fda031d8dd7f5969f07afd4ce474a252958341ee515') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '9c430777aa2a4a93805e26a20a5320c079f7f0a69458142b9c9cfb61b82c3a55') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '1ca47c3bb33bdcd71be5f52eea9c7c35ab6c2e1a19b6b9d7a5bbbd4c9291506c') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '426891a0a609381ef66da5bdbe8b066bfeaec210b665d141ffc2b5fafc936009') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '44fda12192b185831a60f0850de493885105ee8fe1c68046ae23bb5a6cc1220d') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '68aab326d389391768947ac15c8d41b20e4d75fd35f21be99e1f9814c28f4c51') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment 'ec51f02c1fa611f98562c1199e05fabb7e573dbbd80f051fe7aa911bc5618348') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '8c136433dec73c1719bdb6367810833e17c22c29b752ef9ed1ecbf4af6efe017') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '4a1cf051b5215abb9d1bee55d2a3149bf5988d070d490b306aaa48928c3c7800') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '32867e8efa78f627e8b42585f13ebda587214a6c0a88cff1c322ec61b1009e34') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '8ef63b672ebc557d6134a77b66eb0025d2dbc7e5a99f9c2271139e457266583f') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment 'f040c3b43ab7283b693a05ad3ed07267afe3cbce7ead7f87dc883e7256686d6c') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '388d89c16078f0246016c6b4ef868043e0f35bc56284a17472d314a2b9a8735a') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '1a917dac4d616005d44039ae4cb5de86a793ec89c6ad99beef999272f74c4e3d') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Unspent TXO (commitment '742000c5781ca57a5cf7d87dc1f5c3fdb4043027c04c66bc954d5418438c3411') to peer
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 22 outputs in response for block #16 '124ab4fd9e06e05a08656d071e75109f319ea856c288fbe0f79e008eb01ced89'
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (hash '6c786a9017704c477fe326ea87a605c3c4c59779035677b9d478418008ca8338') not sent to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment 'acbc67b4d304afea0d004058881c4b11571200b5571a2324ca26f3544475d168') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment 'ec49e91354cbef0fdaa599d50ba7db91ff71a77c8292a4d0d6bee8ccba41c32e') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (hash 'c403de95b0caebe0ad0f173c36976201739fecaabdedeae29dce17e71c1b142d') not sent to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment '18b6707d0e44537b8fddbc02306ef3f7e1629751bb30fb194b3f3933047dd53a') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (hash 'e16dc3d652d5aeb1f6996b9071f29425d744101f830a56385fff4263e274026b') not sent to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment '9ee42164e479a3e2451dea627d4c97e9fc3f2283808a620eb7ef2f36acbb212a') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment 'b26bcee38a6d3c01701b62b679ab31796771e2e03dda0804c04eb5191f98664e') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment '1cedd6437174f06a31f610d45b6a199183d4f76af7ad53b902f40ad37c50d306') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment '5024d962d4bd92897f9aaa6990f9a30814c896cb9375e11aa121360c6204324d') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment '926066f00276815389d512d74316282a520331dfab890b1a49dfc5cc55ebb454') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment 'e43c29015632d5bb50c51d15f5fd669e6b50e44506cd36913d15ae5f785f9e39') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (commitment '2a6ee8813d2203cd4ff4644ca270e4a9b143c973d634ab583fcfbcf46b486405') to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (hash '31ab562c38b188b264421da2d0555aa07227d0965796433a7bc90f3a11b56430') not sent to peer
[2024-01-24T13:14:34Z TRACE tari_core::base_node::sync::rpc::sync_utxos_task] Spent TXO (hash '7bc39d95f57bec3f5baaf58b12dc6ab79fd73ae52e355e88dc2c1b3e0fb787e8') not sent to peer
[2024-01-24T13:14:34Z DEBUG tari_core::base_node::sync::rpc::sync_utxos_task] Adding 10 inputs in response for block #16 '124ab4fd9e06e05a08656d071e75109f319ea856c288fbe0f79e008eb01ced89'
``` 


<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
BREAKING CHANGE: Sync nodes can only sync from base nodes running the
same or later software version
  • Loading branch information
hansieodendaal committed Jan 31, 2024
1 parent ef53715 commit b6b80f6
Show file tree
Hide file tree
Showing 28 changed files with 2,194 additions and 557 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl CommandContext {
let mut missing_headers = Vec::new();
print!("Searching for height: ");
// We need to check every header, but not every block.
let horizon_height = meta.horizon_block_height(height);
let horizon_height = meta.pruned_height_at_given_chain_tip(height);
while height > 0 {
print!("{}", height);
io::stdout().flush().await?;
Expand Down
92 changes: 44 additions & 48 deletions base_layer/common_types/src/chain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::fmt::{Display, Error, Formatter};
use primitive_types::U256;
use serde::{Deserialize, Serialize};

use crate::types::{BlockHash, FixedHash};
use crate::types::BlockHash;

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct ChainMetadata {
/// The current chain height, or the block number of the longest valid chain, or `None` if there is no chain
/// The current chain height, or the block number of the longest valid chain
best_block_height: u64,
/// The block hash of the current tip of the longest valid chain
best_block_hash: BlockHash,
Expand All @@ -40,7 +40,7 @@ pub struct ChainMetadata {
pruning_horizon: u64,
/// The height of the pruning horizon. This indicates from what height a full block can be provided
/// (exclusive). If `pruned_height` is equal to the `best_block_height` no blocks can be
/// provided. Archival nodes wil always have an `pruned_height` of zero.
/// provided. Archival nodes wil always have a `pruned_height` of zero.
pruned_height: u64,
/// The total accumulated proof of work of the longest chain
accumulated_difficulty: U256,
Expand All @@ -67,37 +67,16 @@ impl ChainMetadata {
}
}

pub fn empty() -> ChainMetadata {
ChainMetadata {
best_block_height: 0,
best_block_hash: FixedHash::zero(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: 0.into(),
timestamp: 0,
}
}

/// The block height at the pruning horizon, given the chain height of the network. Typically database backends
/// cannot provide any block data earlier than this point.
/// Zero is returned if the blockchain still hasn't reached the pruning horizon.
pub fn horizon_block_height(&self, chain_tip: u64) -> u64 {
pub fn pruned_height_at_given_chain_tip(&self, chain_tip: u64) -> u64 {
match self.pruning_horizon {
0 => 0,
horizon => chain_tip.saturating_sub(horizon),
pruning_horizon => chain_tip.saturating_sub(pruning_horizon),
}
}

/// Set the pruning horizon to indicate that the chain is in archival mode (i.e. a pruning horizon of zero)
pub fn archival_mode(&mut self) {
self.pruning_horizon = 0;
}

/// Set the pruning horizon
pub fn set_pruning_horizon(&mut self, pruning_horizon: u64) {
self.pruning_horizon = pruning_horizon;
}

/// The configured number of blocks back from the tip that this database tracks. A value of 0 indicates that
/// pruning mode is disabled and the node will keep full blocks from the time it was set. If pruning horizon
/// was previously enabled, previously pruned blocks will remain pruned. If set from initial sync, full blocks
Expand All @@ -123,7 +102,7 @@ impl ChainMetadata {

/// The height of the pruning horizon. This indicates from what height a full block can be provided
/// (exclusive). If `pruned_height` is equal to the `best_block_height` no blocks can be
/// provided. Archival nodes wil always have an `pruned_height` of zero.
/// provided. Archival nodes wil always have a `pruned_height` of zero.
pub fn pruned_height(&self) -> u64 {
self.pruned_height
}
Expand All @@ -143,14 +122,11 @@ impl ChainMetadata {

impl Display for ChainMetadata {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
let height = self.best_block_height;
let best_block = self.best_block_hash;
let accumulated_difficulty = self.accumulated_difficulty;
writeln!(f, "Height of longest chain: {}", height)?;
writeln!(f, "Total accumulated difficulty: {}", accumulated_difficulty)?;
writeln!(f, "Best block: {}", best_block)?;
writeln!(f, "Best block height: {}", self.best_block_height)?;
writeln!(f, "Total accumulated difficulty: {}", self.accumulated_difficulty)?;
writeln!(f, "Best block hash: {}", self.best_block_hash)?;
writeln!(f, "Pruning horizon: {}", self.pruning_horizon)?;
writeln!(f, "Effective pruned height: {}", self.pruned_height)?;
writeln!(f, "Pruned height: {}", self.pruned_height)?;
Ok(())
}
}
Expand All @@ -161,33 +137,53 @@ mod test {

#[test]
fn horizon_block_on_default() {
let metadata = ChainMetadata::empty();
assert_eq!(metadata.horizon_block_height(0), 0);
let metadata = ChainMetadata {
best_block_height: 0,
best_block_hash: Default::default(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: Default::default(),
timestamp: 0,
};
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
}

#[test]
fn pruned_mode() {
let mut metadata = ChainMetadata::empty();
let mut metadata = ChainMetadata {
best_block_height: 0,
best_block_hash: Default::default(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: Default::default(),
timestamp: 0,
};
assert!(!metadata.is_pruned_node());
assert!(metadata.is_archival_node());
metadata.set_pruning_horizon(2880);
metadata.pruning_horizon = 2880;
assert!(metadata.is_pruned_node());
assert!(!metadata.is_archival_node());
assert_eq!(metadata.horizon_block_height(0), 0);
assert_eq!(metadata.horizon_block_height(100), 0);
assert_eq!(metadata.horizon_block_height(2880), 0);
assert_eq!(metadata.horizon_block_height(2881), 1);
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(100), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(2880), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(2881), 1);
}

#[test]
fn archival_node() {
let mut metadata = ChainMetadata::empty();
metadata.archival_mode();
let metadata = ChainMetadata {
best_block_height: 0,
best_block_hash: Default::default(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: Default::default(),
timestamp: 0,
};
// Chain is still empty
assert_eq!(metadata.horizon_block_height(0), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
// When pruning horizon is zero, the horizon block is always 0, the genesis block
assert_eq!(metadata.horizon_block_height(0), 0);
assert_eq!(metadata.horizon_block_height(100), 0);
assert_eq!(metadata.horizon_block_height(2881), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(100), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(2881), 0);
}
}
16 changes: 10 additions & 6 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,20 @@ message SyncKernelsRequest {
}

message SyncUtxosRequest {
// Start header hash to sync UTXOs from
bytes start_header_hash = 1;
// End header hash to sync UTXOs to
bytes end_header_hash = 2;
}
message SyncUtxosResponse {
tari.types.TransactionOutput output = 1;
bytes mined_header = 2;
}

message PrunedOutput {
bytes hash = 1;
message SyncUtxosResponse {
oneof txo {
// The unspent transaction output
tari.types.TransactionOutput output = 1;
// If the TXO is spent, the commitment bytes are returned
bytes commitment = 2;
}
bytes mined_header = 3;
}

message SyncUtxosByBlockRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,33 @@ impl HorizonStateSync {
Err(err) => return err.into(),
};

let sync_peers = &mut self.sync_peers;
// Order sync peers according to accumulated difficulty
sync_peers.sort_by(|a, b| {
b.claimed_chain_metadata()
.accumulated_difficulty()
.cmp(&a.claimed_chain_metadata().accumulated_difficulty())
});

// Target horizon sync height based on the last header we have synced
let last_header = match shared.db.fetch_last_header().await {
Ok(h) => h,
Err(err) => return err.into(),
};
let target_horizon_sync_height = local_metadata.pruned_height_at_given_chain_tip(last_header.height);

let horizon_sync_height = local_metadata.horizon_block_height(last_header.height);
if local_metadata.pruned_height() >= horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state was already synchronized.");
// Determine if we need to sync horizon state
if local_metadata.pruned_height() >= target_horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state is already synchronized.");
return StateEvent::HorizonStateSynchronized;
}

// We're already synced because we have full blocks higher than our target pruned height
if local_metadata.best_block_height() >= horizon_sync_height {
if local_metadata.best_block_height() >= target_horizon_sync_height {
info!(
target: LOG_TARGET,
"Tip height is higher than our pruned height. Horizon state is already synchronized."
"Our tip height is higher than our target pruned height. Horizon state is already synchronized."
);
return StateEvent::HorizonStateSynchronized;
}
let sync_peers = &mut self.sync_peers;

let db = shared.db.clone();
let config = shared.config.blockchain_sync_config.clone();
Expand All @@ -90,7 +97,7 @@ impl HorizonStateSync {
connectivity,
rules,
sync_peers,
horizon_sync_height,
target_horizon_sync_height,
prover,
validator,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,63 +62,71 @@ impl DecideNextSync {
);

if local_metadata.pruning_horizon() > 0 {
let last_header = match shared.db.fetch_last_header().await {
Ok(h) => h,
Err(err) => return err.into(),
};

let horizon_sync_height = local_metadata.horizon_block_height(last_header.height);
// Filter sync peers that claim to be able to provide blocks up until our pruned height
let sync_peers = self
.sync_peers
debug!(target: LOG_TARGET, "Local metadata: {}", local_metadata);
let mut sync_peers = self.sync_peers.clone();
let sync_peers = sync_peers
.drain(..)
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
remote_metadata.best_block_height() >= horizon_sync_height
debug!(target: LOG_TARGET, "Peer metadata: {}", remote_metadata);
let remote_is_archival_node = remote_metadata.pruned_height() == 0;
let general_sync_conditions =
// Must be able to provide the correct amount of full blocks past the pruned height (i.e. the
// pruning horizon), otherwise our horizon spec will not be met
remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >=
local_metadata.pruning_horizon() &&
// Must have a better blockchain tip than us
remote_metadata.best_block_height() > local_metadata.best_block_height() &&
// Must be able to provide full blocks from the height we need detailed information
remote_metadata.pruned_height() <= local_metadata.best_block_height();
let sync_from_prune_node = !remote_is_archival_node &&
// Must have done initial sync (to detect genesis TXO spends)
local_metadata.best_block_height() > 0;
general_sync_conditions && (remote_is_archival_node || sync_from_prune_node)
})
.collect::<Vec<_>>();

if sync_peers.is_empty() {
warn!(
target: LOG_TARGET,
"Unable to find any appropriate sync peers for horizon sync"
"Unable to find any appropriate sync peers for horizon sync, trying for block sync"
);
return Continue;
}

debug!(
target: LOG_TARGET,
"Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToHorizonSync(sync_peers)
} else {
// Filter sync peers that are able to provide full blocks from our current tip
let sync_peers = self
.sync_peers
.drain(..)
.filter(|sync_peer| {
sync_peer.claimed_chain_metadata().pruned_height() <= local_metadata.best_block_height()
})
.collect::<Vec<_>>();

if sync_peers.is_empty() {
warn!(
} else {
debug!(
target: LOG_TARGET,
"Unable to find any appropriate sync peers for block sync"
"Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
return Continue;
return ProceedToHorizonSync(sync_peers);
}
}

// This is not a pruned node or horizon sync is not possible, try for block sync

// Filter sync peers that are able to provide full blocks from our current tip
let sync_peers = self
.sync_peers
.drain(..)
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
remote_metadata.pruned_height() <= local_metadata.best_block_height()
})
.collect::<Vec<_>>();

debug!(
target: LOG_TARGET,
"Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToBlockSync(sync_peers)
if sync_peers.is_empty() {
warn!(target: LOG_TARGET, "Unable to find any appropriate sync peers for block sync");
return Continue;
}

debug!(
target: LOG_TARGET,
"Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToBlockSync(sync_peers)
}
}

Expand Down
12 changes: 11 additions & 1 deletion base_layer/core/src/base_node/sync/horizon_state_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tari_comms::{
};
use tari_crypto::errors::RangeProofError;
use tari_mmr::{error::MerkleMountainRangeError, sparse_merkle_tree::SMTError};
use tari_utilities::ByteArrayError;
use thiserror::Error;
use tokio::task;

Expand Down Expand Up @@ -97,6 +98,14 @@ pub enum HorizonSyncError {
PeerNotFound,
#[error("Sparse Merkle Tree error: {0}")]
SMTError(#[from] SMTError),
#[error("ByteArrayError error: {0}")]
ByteArrayError(String),
}

impl From<ByteArrayError> for HorizonSyncError {
fn from(e: ByteArrayError) -> Self {
HorizonSyncError::ByteArrayError(e.to_string())
}
}

impl From<TryFromIntError> for HorizonSyncError {
Expand Down Expand Up @@ -142,7 +151,8 @@ impl HorizonSyncError {
err @ HorizonSyncError::ConversionError(_) |
err @ HorizonSyncError::MerkleMountainRangeError(_) |
err @ HorizonSyncError::FixedHashSizeError(_) |
err @ HorizonSyncError::TransactionError(_) => Some(BanReason {
err @ HorizonSyncError::TransactionError(_) |
err @ HorizonSyncError::ByteArrayError(_) => Some(BanReason {
reason: format!("{}", err),
ban_duration: BanPeriod::Long,
}),
Expand Down
Loading

0 comments on commit b6b80f6

Please sign in to comment.