From 1246e3a3766b498c8a25273766a0e5c4ccab7f1f Mon Sep 17 00:00:00 2001 From: Jan Ciolek <149345204+jancionear@users.noreply.github.com> Date: Wed, 22 May 2024 11:17:23 +0200 Subject: [PATCH] Fix slow saving of latest witnesses (#11354) Fixes: https://github.com/near/nearcore/issues/11258 Changes in this PR: * Improved observability of saving latest witnesses * Added metrics * Added a tracing span, which will be visible in span analysis tools * Added a printout in the logs with details about saving the latest witness * Fixed the extreme slowness of `save_latest_chunk_state_witness`, the new solution doesn't iterate anything * Start saving witnesses produced during shadow validation, I needed that to properly test the change The previous solution used `store().iter()` to find the witness with the lowest height that needs to be removed to free up space, but it turned out that this takes a really long time, ~100ms! The new solution doesn't iterate anything, instead of that it maintains a mapping from integer indexes to saved witnesses. So the first observed witness gets index 0, the second one gets 1, third gets 2, and so on... When it's time to free up space we delete the witness with the lowest index. We maintain two pointers to the ends of this "queue", and move them accordingly when the witnesses are removed and added. This greatly improves the time needed to save the latest witness - with new code generating the database update usually takes under 1ms, and commiting it takes under 6ms (on shadow validation): ![image](https://github.com/near/nearcore/assets/149345204/06f379d3-1a36-4aa0-8c5f-043bab7bc36c) ([view the metrics here](https://nearone.grafana.net/d/admakiv9pst8gd/save-latest-witnesses-stats?orgId=1&var-chain_id=mainnet&var-node_id=jan-mainnet-node&var-shard_id=All&from=1716234291000&to=1716241491000)) ~7ms is still a non-negligible amount of time, but it's way better than the previous ~100ms. It's a debug only feature, so 7ms might be acceptable. --- chain/chain/src/garbage_collection.rs | 3 + chain/chain/src/metrics.rs | 33 +++++++ chain/chain/src/store/latest_witnesses.rs | 90 +++++++++++++++---- .../stateless_validation/shadow_validate.rs | 3 + core/store/src/columns.rs | 7 ++ 5 files changed, 119 insertions(+), 17 deletions(-) diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index d719dc2a9c3..0bf08500947 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -1040,6 +1040,9 @@ impl<'a> ChainStoreUpdate<'a> { DBCol::LatestChunkStateWitnesses => { store_update.delete(col, key); } + DBCol::LatestWitnessesByIndex => { + store_update.delete(col, key); + } DBCol::DbVersion | DBCol::BlockMisc | DBCol::_GCCount diff --git a/chain/chain/src/metrics.rs b/chain/chain/src/metrics.rs index f5922188fbe..5eeb9d7ae78 100644 --- a/chain/chain/src/metrics.rs +++ b/chain/chain/src/metrics.rs @@ -238,3 +238,36 @@ pub(crate) static RESHARDING_STATUS: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub static SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_save_latest_witness_generate_update_time", + "Time taken to generate an update of latest witnesses", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 20).unwrap()), + ) + .unwrap() +}); +pub static SAVE_LATEST_WITNESS_COMMIT_UPDATE_TIME: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_save_latest_witness_commit_update_time", + "Time taken to commit the update of latest witnesses", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 20).unwrap()), + ) + .unwrap() +}); +pub static SAVED_LATEST_WITNESSES_COUNT: Lazy = Lazy::new(|| { + try_create_int_gauge( + "near_saved_latest_witnesses_count", + "Total number of saved latest witnesses", + ) + .unwrap() +}); +pub static SAVED_LATEST_WITNESSES_SIZE: Lazy = Lazy::new(|| { + try_create_int_gauge( + "near_saved_latest_witnesses_size", + "Total size of saved latest witnesses (in bytes)", + ) + .unwrap() +}); diff --git a/chain/chain/src/store/latest_witnesses.rs b/chain/chain/src/store/latest_witnesses.rs index 1c1c6497cae..4ecec8bdd69 100644 --- a/chain/chain/src/store/latest_witnesses.rs +++ b/chain/chain/src/store/latest_witnesses.rs @@ -2,7 +2,7 @@ //! by the node. The latest witnesses are stored in the database and can be fetched //! for analysis and debugging. //! The number of stored witnesses is limited. When the limit is reached -//! the witness with the oldest height is removed from the database. +//! the oldest witness is removed from the database. //! At the moment this module is used only for debugging purposes. use std::io::ErrorKind; @@ -12,6 +12,7 @@ use near_primitives::stateless_validation::ChunkStateWitness; use near_primitives::types::EpochId; use near_store::DBCol; +use crate::metrics; use crate::ChainStoreAccess; use super::ChainStore; @@ -36,6 +37,7 @@ pub struct LatestWitnessesKey { pub height: u64, pub shard_id: u64, pub epoch_id: EpochId, + pub witness_size: u64, /// Each witness has a random UUID to ensure that the key is unique. /// It allows to store multiple witnesses with the same height and shard_id. pub random_uuid: [u8; 16], @@ -45,21 +47,22 @@ impl LatestWitnessesKey { /// `LatestWitnessesKey` has custom serialization to ensure that the binary representation /// starts with big-endian height and shard_id. /// This allows to query using a key prefix to find all witnesses for a given height (and shard_id). - pub fn serialized(&self) -> [u8; 64] { - let mut result = [0u8; 64]; + pub fn serialized(&self) -> [u8; 72] { + let mut result = [0u8; 72]; result[..8].copy_from_slice(&self.height.to_be_bytes()); result[8..16].copy_from_slice(&self.shard_id.to_be_bytes()); result[16..48].copy_from_slice(&self.epoch_id.0 .0); - result[48..].copy_from_slice(&self.random_uuid); + result[48..56].copy_from_slice(&self.witness_size.to_be_bytes()); + result[56..].copy_from_slice(&self.random_uuid); result } pub fn deserialize(data: &[u8]) -> Result { - if data.len() != 64 { + if data.len() != 72 { return Err(std::io::Error::new( ErrorKind::InvalidInput, format!( - "Cannot deserialize LatestWitnessesKey, expected 64 bytes, got {}", + "Cannot deserialize LatestWitnessesKey, expected 72 bytes, got {}", data.len() ), )); @@ -70,7 +73,8 @@ impl LatestWitnessesKey { height: u64::from_be_bytes(data[0..8].try_into().unwrap()), shard_id: u64::from_be_bytes(data[8..16].try_into().unwrap()), epoch_id: EpochId(CryptoHash(data[16..48].try_into().unwrap())), - random_uuid: data[48..].try_into().unwrap(), + witness_size: u64::from_be_bytes(data[48..56].try_into().unwrap()), + random_uuid: data[56..].try_into().unwrap(), }) } } @@ -81,6 +85,8 @@ impl LatestWitnessesKey { struct LatestWitnessesInfo { pub count: u64, pub total_size: u64, + pub lowest_index: u64, + pub next_witness_index: u64, } impl LatestWitnessesInfo { @@ -99,6 +105,15 @@ impl ChainStore { &mut self, witness: &ChunkStateWitness, ) -> Result<(), std::io::Error> { + let start_time = std::time::Instant::now(); + let _span = tracing::info_span!( + target: "client", + "save_latest_chunk_state_witness", + witness_height = witness.chunk_header.height_created(), + witness_shard = witness.chunk_header.shard_id(), + ) + .entered(); + let serialized_witness = borsh::to_vec(witness)?; let serialized_witness_size: u64 = serialized_witness.len().try_into().expect("Cannot convert usize to u64"); @@ -118,23 +133,36 @@ impl ChainStore { .get_ser::(DBCol::Misc, LATEST_WITNESSES_INFO)? .unwrap_or_default(); + let new_witness_index = info.next_witness_index; + // Adjust the info to include the new witness. info.count += 1; info.total_size += serialized_witness.len() as u64; + info.next_witness_index += 1; - // Go over witnesses with increasing (height, shard_id) and remove them until the limits are satisfied. - // Height and shard id are stored in big-endian representation, so sorting the binary representation is - // the same as sorting the integers. let mut store_update = self.store().store_update(); - for item in self.store().iter(DBCol::LatestChunkStateWitnesses) { - if info.is_within_limits() { - break; - } - let (key_bytes, witness_bytes) = item?; - store_update.delete(DBCol::LatestChunkStateWitnesses, &key_bytes); + // Go over witnesses with increasing indexes and remove them until the limits are satisfied. + while !info.is_within_limits() && info.lowest_index < info.next_witness_index { + let key_to_delete = self + .store() + .get(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes())? + .ok_or_else(|| { + std::io::Error::new( + ErrorKind::NotFound, + format!( + "Cannot find witness key to delete with index {}", + info.lowest_index + ), + ) + })?; + let key_deser = LatestWitnessesKey::deserialize(&key_to_delete)?; + + store_update.delete(DBCol::LatestChunkStateWitnesses, &key_to_delete); + store_update.delete(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes()); + info.lowest_index += 1; info.count -= 1; - info.total_size -= witness_bytes.len() as u64; + info.total_size -= key_deser.witness_size; } // Limits are ok, insert the new witness. @@ -144,16 +172,44 @@ impl ChainStore { height: witness.chunk_header.height_created(), shard_id: witness.chunk_header.shard_id(), epoch_id: witness.epoch_id.clone(), + witness_size: serialized_witness_size, random_uuid, }; store_update.set(DBCol::LatestChunkStateWitnesses, &key.serialized(), &serialized_witness); + store_update.set( + DBCol::LatestWitnessesByIndex, + &new_witness_index.to_be_bytes(), + &key.serialized(), + ); // Update LatestWitnessesInfo store_update.set(DBCol::Misc, &LATEST_WITNESSES_INFO, &borsh::to_vec(&info)?); + let store_update_time = start_time.elapsed(); + // Commit the transaction store_update.commit()?; + let store_commit_time = start_time.elapsed().saturating_sub(store_update_time); + + let shard_id_str = witness.chunk_header.shard_id().to_string(); + metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME + .with_label_values(&[shard_id_str.as_str()]) + .observe(store_update_time.as_secs_f64()); + metrics::SAVE_LATEST_WITNESS_COMMIT_UPDATE_TIME + .with_label_values(&[shard_id_str.as_str()]) + .observe(store_commit_time.as_secs_f64()); + metrics::SAVED_LATEST_WITNESSES_COUNT.set(info.count as i64); + metrics::SAVED_LATEST_WITNESSES_SIZE.set(info.total_size as i64); + + tracing::debug!( + ?store_update_time, + ?store_commit_time, + total_count = info.count, + total_size = info.total_size, + "Saved latest witness", + ); + Ok(()) } diff --git a/chain/client/src/stateless_validation/shadow_validate.rs b/chain/client/src/stateless_validation/shadow_validate.rs index 3e8d421ebe1..34e1428227d 100644 --- a/chain/client/src/stateless_validation/shadow_validate.rs +++ b/chain/client/src/stateless_validation/shadow_validate.rs @@ -83,6 +83,9 @@ impl Client { chunk, validated_transactions.storage_proof, )?; + if self.config.save_latest_witnesses { + self.chain.chain_store.save_latest_chunk_state_witness(&witness)?; + } let (encoded_witness, raw_witness_size) = { let shard_id_label = shard_id.to_string(); let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 673263a1032..6c497ac24a2 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -290,6 +290,10 @@ pub enum DBCol { /// - *Rows*: `LatestWitnessesKey` /// - *Column type*: `ChunkStateWitness` LatestChunkStateWitnesses, + /// Each observed LatestChunkStateWitness gets an index, in increasing order. + /// Witnesses with the lowest index are garbage collected first. + /// u64 -> LatestWitnessesKey + LatestWitnessesByIndex, /// Column to store data for Epoch Sync. /// Does not contain data for genesis epoch. /// - *Rows*: `epoch_id` @@ -330,6 +334,7 @@ pub enum DBKeyType { PartId, ColumnId, LatestWitnessesKey, + LatestWitnessIndex, } impl DBCol { @@ -467,6 +472,7 @@ impl DBCol { DBCol::StateTransitionData => false, // LatestChunkStateWitnesses stores the last N observed witnesses, used only for debugging. DBCol::LatestChunkStateWitnesses => false, + DBCol::LatestWitnessesByIndex => false, // Columns that are not GC-ed need not be copied to the cold storage. DBCol::BlockHeader @@ -567,6 +573,7 @@ impl DBCol { DBCol::FlatStorageStatus => &[DBKeyType::ShardUId], DBCol::StateTransitionData => &[DBKeyType::BlockHash, DBKeyType::ShardId], DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey], + DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex], #[cfg(feature = "new_epoch_sync")] DBCol::EpochSyncInfo => &[DBKeyType::EpochId], }