Skip to content

Commit

Permalink
fix(congestion control) - fix integration with state sync, add tests (#…
Browse files Browse the repository at this point in the history
…11261)

* Fixed a number of bugs in the congestion control and state sync
implementation.
* Each needed block needs a separate timer for requesting it, otherwise
some never get requested.
  * More blocks need to be allowed to be stored for state sync. 
  * Wait until all sync blocks are ready before starting state sync. 
  * Check if block exists depending on how we store it. 
* Added a nayduck test that tests state sync with missing chunks at the
end of the epoch.
  • Loading branch information
wacban committed May 9, 2024
1 parent 44359d5 commit 326c609
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 61 deletions.
10 changes: 5 additions & 5 deletions chain/chain/src/store_validator/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ pub(crate) fn trie_changes_chunk_extra_exists(
// 1. Block with `block_hash` should be available
let block = unwrap_or_err_db!(
sv.store.get_ser::<Block>(DBCol::Block, block_hash.as_ref()),
"Can't get Block from DB"
"Can't get Block from DB - trie_changes_chunk_extra_exists"
);
// 2) Chunk Extra with `block_hash` and `shard_uid` should be available and match with the new root
let chunk_extra = unwrap_or_err_db!(
Expand Down Expand Up @@ -671,7 +671,7 @@ pub(crate) fn outcome_id_block_exists(
) -> Result<(), StoreValidatorError> {
unwrap_or_err_db!(
sv.store.get_ser::<Block>(DBCol::Block, block_hash.as_ref()),
"Can't get Block from DB"
"Can't get Block from DB - outcome_id_block_exists"
);
Ok(())
}
Expand Down Expand Up @@ -735,7 +735,7 @@ pub(crate) fn state_sync_info_block_exists(
) -> Result<(), StoreValidatorError> {
unwrap_or_err_db!(
sv.store.get_ser::<Block>(DBCol::Block, block_hash.as_ref()),
"Can't get Block from DB"
"Can't get Block from DB - state_sync_info_block_exists"
);
Ok(())
}
Expand All @@ -747,7 +747,7 @@ pub(crate) fn chunk_extra_block_exists(
) -> Result<(), StoreValidatorError> {
unwrap_or_err_db!(
sv.store.get_ser::<Block>(DBCol::Block, block_hash.as_ref()),
"Can't get Block from DB"
"Can't get Block from DB - chunk_extra_block_exists"
);
Ok(())
}
Expand Down Expand Up @@ -840,7 +840,7 @@ pub(crate) fn state_header_block_exists(
) -> Result<(), StoreValidatorError> {
unwrap_or_err_db!(
sv.store.get_ser::<Block>(DBCol::Block, key.1.as_ref()),
"Can't get Block from DB"
"Can't get Block from DB - state_header_block_exists"
);
Ok(())
}
Expand Down
15 changes: 9 additions & 6 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ pub struct Client {
tier1_accounts_cache: Option<(EpochId, Arc<AccountKeys>)>,
/// Used when it is needed to create flat storage in background for some shards.
flat_storage_creator: Option<FlatStorageCreator>,
/// When the "sync block" was requested.
/// The "sync block" is the last block of the previous epoch, i.e. `prev_hash` of the `sync_hash` block.
pub last_time_sync_block_requested: Option<near_async::time::Utc>,
/// A map storing the last time a block was requested for state sync.
pub last_time_sync_block_requested: HashMap<CryptoHash, near_async::time::Utc>,
/// Helper module for stateless validation functionality like chunk witness production, validation
/// chunk endorsements tracking etc.
pub chunk_validator: ChunkValidator,
Expand Down Expand Up @@ -407,7 +406,7 @@ impl Client {
chunk_production_info: lru::LruCache::new(PRODUCTION_TIMES_CACHE_SIZE),
tier1_accounts_cache: None,
flat_storage_creator,
last_time_sync_block_requested: None,
last_time_sync_block_requested: HashMap::new(),
chunk_validator,
chunk_inclusion_tracker: ChunkInclusionTracker::new(),
chunk_endorsement_tracker,
Expand Down Expand Up @@ -1722,7 +1721,11 @@ impl Client {
#[cfg(feature = "test_features")]
match self.adv_produce_chunks {
Some(AdvProduceChunksMode::StopProduce) => {
tracing::info!(target: "adversary", "skipping chunk production due to adversary configuration");
tracing::info!(
target: "adversary",
block_height = block.header().height(),
"skipping chunk production due to adversary configuration"
);
return;
}
_ => {}
Expand Down Expand Up @@ -2558,7 +2561,7 @@ impl Client {
debug!(target: "client", ?hash, "send_block_request_to_peer: block already known")
}
Err(err) => {
error!(target: "client", ?err, "send_block_request_to_peer: failed to check block exists")
error!(target: "client", ?hash, ?err, "send_block_request_to_peer: failed to check block exists")
}
}
}
Expand Down
139 changes: 90 additions & 49 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ impl Handler<NetworkAdversarialMessage> for ClientActorInner {
}
}
NetworkAdversarialMessage::AdvProduceChunks(adv_produce_chunks) => {
info!(target: "adversary", mode=?adv_produce_chunks, "setting adversary produce chunks");
self.client.adv_produce_chunks = Some(adv_produce_chunks);
None
}
Expand Down Expand Up @@ -1683,6 +1684,13 @@ impl ClientActorInner {
_ => unreachable!("Sync status should have been StateSync!"),
};

// Waiting for all the sync blocks to be available because they are
// needed to finalize state sync.
if !have_block {
tracing::debug!(target: "sync", "waiting for sync blocks");
return;
}

let state_sync_result = self.client.state_sync.run(
&me,
sync_hash,
Expand All @@ -1702,10 +1710,6 @@ impl ClientActorInner {
match state_sync_result {
StateSyncResult::InProgress => (),
StateSyncResult::Completed => {
if !have_block {
trace!(target: "sync", "Sync done. Waiting for sync block.");
return;
}
info!(target: "sync", "State sync: all shards are done");

let mut block_processing_artifacts = BlockProcessingArtifact::default();
Expand Down Expand Up @@ -1737,41 +1741,40 @@ impl ClientActorInner {
let now = self.clock.now_utc();

let mut have_all = true;
let mut req_any = false;

let block_hash = *block_header.hash();
let prev_block_hash = *block_header.prev_hash();
let sync_hash = *block_header.hash();
let prev_hash = *block_header.prev_hash();

let mut needed_block_hashes = vec![prev_block_hash, block_hash];
let mut extra_block_hashes = self.get_extra_sync_block_hashes(prev_block_hash);
let mut needed_block_hashes = vec![prev_hash, sync_hash];
let mut extra_block_hashes = self.get_extra_sync_block_hashes(prev_hash);
tracing::debug!(target: "sync", ?needed_block_hashes, ?extra_block_hashes, "request_sync_blocks: block hashes for state sync");
needed_block_hashes.append(&mut extra_block_hashes);

for hash in needed_block_hashes.into_iter() {
let (request_block, have_block) = self.sync_block_status(&prev_block_hash, now)?;
for hash in needed_block_hashes.clone().into_iter() {
let (request_block, have_block) = self.sync_block_status(&sync_hash, &hash, now)?;
tracing::trace!(target: "sync", ?hash, ?request_block, ?have_block, "request_sync_blocks");
have_all = have_all && have_block;
req_any = req_any || request_block;

if have_block {
self.client.last_time_sync_block_requested.remove(&hash);
}

if !request_block {
tracing::trace!(target: "sync", ?hash, ?have_block, "request_sync_blocks: skipping - no request");
continue;
}

let peer_info = self.network_info.highest_height_peers.choose(&mut thread_rng());
let Some(peer_info) = peer_info else {
tracing::trace!(target: "sync", ?hash, "request_sync_blocks: skipping - no peer");
continue;
};
let id = peer_info.peer_info.id.clone();
self.client.request_block(hash, id.clone());
}

// If any block was requested update the last time we requested a block.
// Do it outside of the loop to keep it consistent for all blocks.
if req_any {
self.client.last_time_sync_block_requested = Some(now);
let peer_id = peer_info.peer_info.id.clone();
self.client.last_time_sync_block_requested.insert(hash, now);
self.client.request_block(hash, peer_id);
}

if have_all {
self.client.last_time_sync_block_requested = None;
}
tracing::debug!(target: "sync", ?have_all, "request_sync_blocks: done");

Ok(have_all)
}
Expand Down Expand Up @@ -1814,8 +1817,6 @@ impl ClientActorInner {
next_hash = *next_header.prev_hash();
}

tracing::debug!(target: "sync", ?min_height_included, ?extra_block_hashes, "get_extra_sync_block_hashes: Extra block hashes for state sync");

extra_block_hashes
}

Expand Down Expand Up @@ -1860,6 +1861,7 @@ impl ClientActorInner {
let new_state_sync_status = StateSyncStatus { sync_hash, sync_status: HashMap::default() };
let new_sync_status = SyncStatus::StateSync(new_state_sync_status);
self.client.sync_status.update(new_sync_status);
self.client.last_time_sync_block_requested.clear();
// This is the first time we run state sync.
return Ok(true);
}
Expand Down Expand Up @@ -1892,30 +1894,39 @@ impl ClientActorInner {
Ok(block_sync_result)
}

/// Verifies if the node possesses sync block. It is the last block of the
/// previous epoch. If the block is absent, the node requests it from peers.
/// Verifies if the node possesses the given block. If the block is absent,
/// the node should request it from peers.
///
/// the return value is a tuple (request_block, have_block)
///
/// the return value (false, false) means that the node already requested
/// the block but hasn't received it yet
fn sync_block_status(
&self,
sync_hash: &CryptoHash,
block_hash: &CryptoHash,
now: Utc,
) -> Result<(bool, bool), near_chain::Error> {
if self.client.chain.block_exists(block_hash)? {
// The sync hash block is saved as an orphan. The other blocks are saved
// as regular blocks. Check if block exists depending on that.
let block_exists = if sync_hash == block_hash {
self.client.chain.is_orphan(block_hash)
} else {
self.client.chain.block_exists(block_hash)?
};

if block_exists {
return Ok((false, true));
}
let timeout = self.client.config.state_sync_timeout;
let timeout = near_async::time::Duration::try_from(timeout);
let timeout = timeout.unwrap();

let Some(last_time) = self.client.last_time_sync_block_requested else {
let Some(last_time) = self.client.last_time_sync_block_requested.get(block_hash) else {
return Ok((true, false));
};

if (now - last_time) >= timeout {
if (now - *last_time) >= timeout {
tracing::error!(
target: "sync",
%block_hash,
Expand All @@ -1939,35 +1950,65 @@ impl ClientActorInner {
)
}

/// Checks if the node is syncing its State and applies special logic in that case.
/// A node usually ignores blocks that are too far ahead, but in case of a node syncing its state it is looking for 2 specific blocks:
/// Checks if the node is syncing its State and applies special logic in
/// that case. A node usually ignores blocks that are too far ahead, but in
/// case of a node syncing its state it is looking for 2 specific blocks:
/// * The first block of the new epoch
/// * The last block of the prev epoch
///
/// Additionally if there were missing chunks in the blocks leading to the
/// sync hash block we need to store extra blocks.
///
/// Returns whether the node is syncing its state.
fn maybe_receive_state_sync_blocks(&mut self, block: &Block) -> bool {
let SyncStatus::StateSync(StateSyncStatus { sync_hash, .. }) = self.client.sync_status
else {
return false;
};
if let Ok(header) = self.client.chain.get_block_header(&sync_hash) {
let block: MaybeValidated<Block> = (*block).clone().into();
let block_hash = *block.hash();
if let Err(err) = self.client.chain.validate_block(&block) {
byzantine_assert!(false);
error!(target: "client", ?err, ?block_hash, "Received an invalid block during state sync");

let Ok(header) = self.client.chain.get_block_header(&sync_hash) else {
return true;
};

let block: MaybeValidated<Block> = (*block).clone().into();
let block_hash = *block.hash();
if let Err(err) = self.client.chain.validate_block(&block) {
byzantine_assert!(false);
error!(target: "client", ?err, ?block_hash, "Received an invalid block during state sync");
}

let extra_block_hashes = self.get_extra_sync_block_hashes(*header.prev_hash());
tracing::trace!(target: "sync", ?extra_block_hashes, "maybe_receive_state_sync_blocks: Extra block hashes for state sync");

// Notice that two blocks are saved differently:
// * save_orphan() for the sync hash block
// * save_block() for the prev block and all the extra blocks
// TODO why is that the case? Shouldn't the block without a parent block
// be the orphan?

if block_hash == sync_hash {
// The first block of the new epoch.
tracing::debug!(target: "sync", block_hash=?block.hash(), "maybe_receive_state_sync_blocks - save sync hash block");
self.client.chain.save_orphan(block, Provenance::NONE, false);
return true;
}

if &block_hash == header.prev_hash() {
// The last block of the previous epoch.
tracing::debug!(target: "sync", block_hash=?block.hash(), "maybe_receive_state_sync_blocks - save prev hash block");
if let Err(err) = self.client.chain.save_block(block) {
error!(target: "client", ?err, ?block_hash, "Failed to save a block during state sync");
}
// Notice that two blocks are saved differently:
// * save_block() for one block.
// * save_orphan() for another block.
if &block_hash == header.prev_hash() {
// The last block of the previous epoch.
if let Err(err) = self.client.chain.save_block(block) {
error!(target: "client", ?err, ?block_hash, "Failed to save a block during state sync");
}
} else if block_hash == sync_hash {
// The first block of the new epoch.
self.client.chain.save_orphan(block, Provenance::NONE, false);
return true;
}

if extra_block_hashes.contains(&block_hash) {
// Extra blocks needed when there are missing chunks.
tracing::debug!(target: "sync", block_hash=?block.hash(), "maybe_receive_state_sync_blocks - save extra block");
if let Err(err) = self.client.chain.save_block(block) {
error!(target: "client", ?err, ?block_hash, "Failed to save a block during state sync");
}
return true;
}
true
}
Expand Down
3 changes: 2 additions & 1 deletion chain/client/src/sync/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ impl BlockSync {
request_from_archival,
peer = ?peer.peer_info.id,
num_peers = highest_height_peers.len(),
"Block sync: requested block");
"Block sync: requested block"
);
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::BlockRequest { hash, peer_id: peer.peer_info.id.clone() },
));
Expand Down
3 changes: 3 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pytest --timeout=600 sanity/state_sync_routed.py manytx 115 --features nightly
# working on a fix / deciding whether to remove them.
#pytest --timeout=300 sanity/state_sync_late.py notx
#pytest --timeout=300 sanity/state_sync_late.py notx --features nightly
pytest sanity/state_sync_missing_chunks.py
pytest sanity/state_sync_missing_chunks.py --features nightly

pytest sanity/single_shard_tracking.py
pytest sanity/single_shard_tracking.py --features nightly

Expand Down
Loading

0 comments on commit 326c609

Please sign in to comment.