diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index d719dc2a9c3..0bf08500947 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -1040,6 +1040,9 @@ impl<'a> ChainStoreUpdate<'a> { DBCol::LatestChunkStateWitnesses => { store_update.delete(col, key); } + DBCol::LatestWitnessesByIndex => { + store_update.delete(col, key); + } DBCol::DbVersion | DBCol::BlockMisc | DBCol::_GCCount diff --git a/chain/chain/src/metrics.rs b/chain/chain/src/metrics.rs index f5922188fbe..5eeb9d7ae78 100644 --- a/chain/chain/src/metrics.rs +++ b/chain/chain/src/metrics.rs @@ -238,3 +238,36 @@ pub(crate) static RESHARDING_STATUS: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub static SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_save_latest_witness_generate_update_time", + "Time taken to generate an update of latest witnesses", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 20).unwrap()), + ) + .unwrap() +}); +pub static SAVE_LATEST_WITNESS_COMMIT_UPDATE_TIME: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_save_latest_witness_commit_update_time", + "Time taken to commit the update of latest witnesses", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 20).unwrap()), + ) + .unwrap() +}); +pub static SAVED_LATEST_WITNESSES_COUNT: Lazy = Lazy::new(|| { + try_create_int_gauge( + "near_saved_latest_witnesses_count", + "Total number of saved latest witnesses", + ) + .unwrap() +}); +pub static SAVED_LATEST_WITNESSES_SIZE: Lazy = Lazy::new(|| { + try_create_int_gauge( + "near_saved_latest_witnesses_size", + "Total size of saved latest witnesses (in bytes)", + ) + .unwrap() +}); diff --git a/chain/chain/src/store/latest_witnesses.rs b/chain/chain/src/store/latest_witnesses.rs index 1c1c6497cae..4ecec8bdd69 100644 --- a/chain/chain/src/store/latest_witnesses.rs +++ b/chain/chain/src/store/latest_witnesses.rs @@ -2,7 +2,7 @@ //! by the node. The latest witnesses are stored in the database and can be fetched //! for analysis and debugging. //! The number of stored witnesses is limited. When the limit is reached -//! the witness with the oldest height is removed from the database. +//! the oldest witness is removed from the database. //! At the moment this module is used only for debugging purposes. use std::io::ErrorKind; @@ -12,6 +12,7 @@ use near_primitives::stateless_validation::ChunkStateWitness; use near_primitives::types::EpochId; use near_store::DBCol; +use crate::metrics; use crate::ChainStoreAccess; use super::ChainStore; @@ -36,6 +37,7 @@ pub struct LatestWitnessesKey { pub height: u64, pub shard_id: u64, pub epoch_id: EpochId, + pub witness_size: u64, /// Each witness has a random UUID to ensure that the key is unique. /// It allows to store multiple witnesses with the same height and shard_id. pub random_uuid: [u8; 16], @@ -45,21 +47,22 @@ impl LatestWitnessesKey { /// `LatestWitnessesKey` has custom serialization to ensure that the binary representation /// starts with big-endian height and shard_id. /// This allows to query using a key prefix to find all witnesses for a given height (and shard_id). - pub fn serialized(&self) -> [u8; 64] { - let mut result = [0u8; 64]; + pub fn serialized(&self) -> [u8; 72] { + let mut result = [0u8; 72]; result[..8].copy_from_slice(&self.height.to_be_bytes()); result[8..16].copy_from_slice(&self.shard_id.to_be_bytes()); result[16..48].copy_from_slice(&self.epoch_id.0 .0); - result[48..].copy_from_slice(&self.random_uuid); + result[48..56].copy_from_slice(&self.witness_size.to_be_bytes()); + result[56..].copy_from_slice(&self.random_uuid); result } pub fn deserialize(data: &[u8]) -> Result { - if data.len() != 64 { + if data.len() != 72 { return Err(std::io::Error::new( ErrorKind::InvalidInput, format!( - "Cannot deserialize LatestWitnessesKey, expected 64 bytes, got {}", + "Cannot deserialize LatestWitnessesKey, expected 72 bytes, got {}", data.len() ), )); @@ -70,7 +73,8 @@ impl LatestWitnessesKey { height: u64::from_be_bytes(data[0..8].try_into().unwrap()), shard_id: u64::from_be_bytes(data[8..16].try_into().unwrap()), epoch_id: EpochId(CryptoHash(data[16..48].try_into().unwrap())), - random_uuid: data[48..].try_into().unwrap(), + witness_size: u64::from_be_bytes(data[48..56].try_into().unwrap()), + random_uuid: data[56..].try_into().unwrap(), }) } } @@ -81,6 +85,8 @@ impl LatestWitnessesKey { struct LatestWitnessesInfo { pub count: u64, pub total_size: u64, + pub lowest_index: u64, + pub next_witness_index: u64, } impl LatestWitnessesInfo { @@ -99,6 +105,15 @@ impl ChainStore { &mut self, witness: &ChunkStateWitness, ) -> Result<(), std::io::Error> { + let start_time = std::time::Instant::now(); + let _span = tracing::info_span!( + target: "client", + "save_latest_chunk_state_witness", + witness_height = witness.chunk_header.height_created(), + witness_shard = witness.chunk_header.shard_id(), + ) + .entered(); + let serialized_witness = borsh::to_vec(witness)?; let serialized_witness_size: u64 = serialized_witness.len().try_into().expect("Cannot convert usize to u64"); @@ -118,23 +133,36 @@ impl ChainStore { .get_ser::(DBCol::Misc, LATEST_WITNESSES_INFO)? .unwrap_or_default(); + let new_witness_index = info.next_witness_index; + // Adjust the info to include the new witness. info.count += 1; info.total_size += serialized_witness.len() as u64; + info.next_witness_index += 1; - // Go over witnesses with increasing (height, shard_id) and remove them until the limits are satisfied. - // Height and shard id are stored in big-endian representation, so sorting the binary representation is - // the same as sorting the integers. let mut store_update = self.store().store_update(); - for item in self.store().iter(DBCol::LatestChunkStateWitnesses) { - if info.is_within_limits() { - break; - } - let (key_bytes, witness_bytes) = item?; - store_update.delete(DBCol::LatestChunkStateWitnesses, &key_bytes); + // Go over witnesses with increasing indexes and remove them until the limits are satisfied. + while !info.is_within_limits() && info.lowest_index < info.next_witness_index { + let key_to_delete = self + .store() + .get(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes())? + .ok_or_else(|| { + std::io::Error::new( + ErrorKind::NotFound, + format!( + "Cannot find witness key to delete with index {}", + info.lowest_index + ), + ) + })?; + let key_deser = LatestWitnessesKey::deserialize(&key_to_delete)?; + + store_update.delete(DBCol::LatestChunkStateWitnesses, &key_to_delete); + store_update.delete(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes()); + info.lowest_index += 1; info.count -= 1; - info.total_size -= witness_bytes.len() as u64; + info.total_size -= key_deser.witness_size; } // Limits are ok, insert the new witness. @@ -144,16 +172,44 @@ impl ChainStore { height: witness.chunk_header.height_created(), shard_id: witness.chunk_header.shard_id(), epoch_id: witness.epoch_id.clone(), + witness_size: serialized_witness_size, random_uuid, }; store_update.set(DBCol::LatestChunkStateWitnesses, &key.serialized(), &serialized_witness); + store_update.set( + DBCol::LatestWitnessesByIndex, + &new_witness_index.to_be_bytes(), + &key.serialized(), + ); // Update LatestWitnessesInfo store_update.set(DBCol::Misc, &LATEST_WITNESSES_INFO, &borsh::to_vec(&info)?); + let store_update_time = start_time.elapsed(); + // Commit the transaction store_update.commit()?; + let store_commit_time = start_time.elapsed().saturating_sub(store_update_time); + + let shard_id_str = witness.chunk_header.shard_id().to_string(); + metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME + .with_label_values(&[shard_id_str.as_str()]) + .observe(store_update_time.as_secs_f64()); + metrics::SAVE_LATEST_WITNESS_COMMIT_UPDATE_TIME + .with_label_values(&[shard_id_str.as_str()]) + .observe(store_commit_time.as_secs_f64()); + metrics::SAVED_LATEST_WITNESSES_COUNT.set(info.count as i64); + metrics::SAVED_LATEST_WITNESSES_SIZE.set(info.total_size as i64); + + tracing::debug!( + ?store_update_time, + ?store_commit_time, + total_count = info.count, + total_size = info.total_size, + "Saved latest witness", + ); + Ok(()) } diff --git a/chain/client/src/stateless_validation/shadow_validate.rs b/chain/client/src/stateless_validation/shadow_validate.rs index 3e8d421ebe1..34e1428227d 100644 --- a/chain/client/src/stateless_validation/shadow_validate.rs +++ b/chain/client/src/stateless_validation/shadow_validate.rs @@ -83,6 +83,9 @@ impl Client { chunk, validated_transactions.storage_proof, )?; + if self.config.save_latest_witnesses { + self.chain.chain_store.save_latest_chunk_state_witness(&witness)?; + } let (encoded_witness, raw_witness_size) = { let shard_id_label = shard_id.to_string(); let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 673263a1032..6c497ac24a2 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -290,6 +290,10 @@ pub enum DBCol { /// - *Rows*: `LatestWitnessesKey` /// - *Column type*: `ChunkStateWitness` LatestChunkStateWitnesses, + /// Each observed LatestChunkStateWitness gets an index, in increasing order. + /// Witnesses with the lowest index are garbage collected first. + /// u64 -> LatestWitnessesKey + LatestWitnessesByIndex, /// Column to store data for Epoch Sync. /// Does not contain data for genesis epoch. /// - *Rows*: `epoch_id` @@ -330,6 +334,7 @@ pub enum DBKeyType { PartId, ColumnId, LatestWitnessesKey, + LatestWitnessIndex, } impl DBCol { @@ -467,6 +472,7 @@ impl DBCol { DBCol::StateTransitionData => false, // LatestChunkStateWitnesses stores the last N observed witnesses, used only for debugging. DBCol::LatestChunkStateWitnesses => false, + DBCol::LatestWitnessesByIndex => false, // Columns that are not GC-ed need not be copied to the cold storage. DBCol::BlockHeader @@ -567,6 +573,7 @@ impl DBCol { DBCol::FlatStorageStatus => &[DBKeyType::ShardUId], DBCol::StateTransitionData => &[DBKeyType::BlockHash, DBKeyType::ShardId], DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey], + DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex], #[cfg(feature = "new_epoch_sync")] DBCol::EpochSyncInfo => &[DBKeyType::EpochId], }