Skip to content

Commit

Permalink
Fix slow saving of latest witnesses (near#11354)
Browse files Browse the repository at this point in the history
Fixes: near#11258

Changes in this PR:
* Improved observability of saving latest witnesses
    * Added metrics
    * Added a tracing span, which will be visible in span analysis tools
* Added a printout in the logs with details about saving the latest
witness
* Fixed the extreme slowness of `save_latest_chunk_state_witness`, the
new solution doesn't iterate anything
* Start saving witnesses produced during shadow validation, I needed
that to properly test the change

The previous solution used `store().iter()` to find the witness with the
lowest height that needs to be removed to free up space, but it turned
out that this takes a really long time, ~100ms!

The new solution doesn't iterate anything, instead of that it maintains
a mapping from integer indexes to saved witnesses.
So the first observed witness gets index 0, the second one gets 1, third
gets 2, and so on...
When it's time to free up space we delete the witness with the lowest
index.

We maintain two pointers to the ends of this "queue", and move them
accordingly when the witnesses are removed and added.

This greatly improves the time needed to save the latest witness - with
new code generating the database update usually takes under 1ms, and
commiting it takes under 6ms (on shadow validation):


![image](https://github.com/near/nearcore/assets/149345204/06f379d3-1a36-4aa0-8c5f-043bab7bc36c)

([view the metrics
here](https://nearone.grafana.net/d/admakiv9pst8gd/save-latest-witnesses-stats?orgId=1&var-chain_id=mainnet&var-node_id=jan-mainnet-node&var-shard_id=All&from=1716234291000&to=1716241491000))

~7ms is still a non-negligible amount of time, but it's way better than
the previous ~100ms. It's a debug only feature, so 7ms might be
acceptable.
  • Loading branch information
jancionear authored and marcelo-gonzalez committed May 23, 2024
1 parent 37525e6 commit 1246e3a
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 17 deletions.
3 changes: 3 additions & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,9 @@ impl<'a> ChainStoreUpdate<'a> {
DBCol::LatestChunkStateWitnesses => {
store_update.delete(col, key);
}
DBCol::LatestWitnessesByIndex => {
store_update.delete(col, key);
}
DBCol::DbVersion
| DBCol::BlockMisc
| DBCol::_GCCount
Expand Down
33 changes: 33 additions & 0 deletions chain/chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,36 @@ pub(crate) static RESHARDING_STATUS: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.unwrap()
});

pub static SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_save_latest_witness_generate_update_time",
"Time taken to generate an update of latest witnesses",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 20).unwrap()),
)
.unwrap()
});
pub static SAVE_LATEST_WITNESS_COMMIT_UPDATE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_save_latest_witness_commit_update_time",
"Time taken to commit the update of latest witnesses",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 20).unwrap()),
)
.unwrap()
});
pub static SAVED_LATEST_WITNESSES_COUNT: Lazy<IntGauge> = Lazy::new(|| {
try_create_int_gauge(
"near_saved_latest_witnesses_count",
"Total number of saved latest witnesses",
)
.unwrap()
});
pub static SAVED_LATEST_WITNESSES_SIZE: Lazy<IntGauge> = Lazy::new(|| {
try_create_int_gauge(
"near_saved_latest_witnesses_size",
"Total size of saved latest witnesses (in bytes)",
)
.unwrap()
});
90 changes: 73 additions & 17 deletions chain/chain/src/store/latest_witnesses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! by the node. The latest witnesses are stored in the database and can be fetched
//! for analysis and debugging.
//! The number of stored witnesses is limited. When the limit is reached
//! the witness with the oldest height is removed from the database.
//! the oldest witness is removed from the database.
//! At the moment this module is used only for debugging purposes.

use std::io::ErrorKind;
Expand All @@ -12,6 +12,7 @@ use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::types::EpochId;
use near_store::DBCol;

use crate::metrics;
use crate::ChainStoreAccess;

use super::ChainStore;
Expand All @@ -36,6 +37,7 @@ pub struct LatestWitnessesKey {
pub height: u64,
pub shard_id: u64,
pub epoch_id: EpochId,
pub witness_size: u64,
/// Each witness has a random UUID to ensure that the key is unique.
/// It allows to store multiple witnesses with the same height and shard_id.
pub random_uuid: [u8; 16],
Expand All @@ -45,21 +47,22 @@ impl LatestWitnessesKey {
/// `LatestWitnessesKey` has custom serialization to ensure that the binary representation
/// starts with big-endian height and shard_id.
/// This allows to query using a key prefix to find all witnesses for a given height (and shard_id).
pub fn serialized(&self) -> [u8; 64] {
let mut result = [0u8; 64];
pub fn serialized(&self) -> [u8; 72] {
let mut result = [0u8; 72];
result[..8].copy_from_slice(&self.height.to_be_bytes());
result[8..16].copy_from_slice(&self.shard_id.to_be_bytes());
result[16..48].copy_from_slice(&self.epoch_id.0 .0);
result[48..].copy_from_slice(&self.random_uuid);
result[48..56].copy_from_slice(&self.witness_size.to_be_bytes());
result[56..].copy_from_slice(&self.random_uuid);
result
}

pub fn deserialize(data: &[u8]) -> Result<LatestWitnessesKey, std::io::Error> {
if data.len() != 64 {
if data.len() != 72 {
return Err(std::io::Error::new(
ErrorKind::InvalidInput,
format!(
"Cannot deserialize LatestWitnessesKey, expected 64 bytes, got {}",
"Cannot deserialize LatestWitnessesKey, expected 72 bytes, got {}",
data.len()
),
));
Expand All @@ -70,7 +73,8 @@ impl LatestWitnessesKey {
height: u64::from_be_bytes(data[0..8].try_into().unwrap()),
shard_id: u64::from_be_bytes(data[8..16].try_into().unwrap()),
epoch_id: EpochId(CryptoHash(data[16..48].try_into().unwrap())),
random_uuid: data[48..].try_into().unwrap(),
witness_size: u64::from_be_bytes(data[48..56].try_into().unwrap()),
random_uuid: data[56..].try_into().unwrap(),
})
}
}
Expand All @@ -81,6 +85,8 @@ impl LatestWitnessesKey {
struct LatestWitnessesInfo {
pub count: u64,
pub total_size: u64,
pub lowest_index: u64,
pub next_witness_index: u64,
}

impl LatestWitnessesInfo {
Expand All @@ -99,6 +105,15 @@ impl ChainStore {
&mut self,
witness: &ChunkStateWitness,
) -> Result<(), std::io::Error> {
let start_time = std::time::Instant::now();
let _span = tracing::info_span!(
target: "client",
"save_latest_chunk_state_witness",
witness_height = witness.chunk_header.height_created(),
witness_shard = witness.chunk_header.shard_id(),
)
.entered();

let serialized_witness = borsh::to_vec(witness)?;
let serialized_witness_size: u64 =
serialized_witness.len().try_into().expect("Cannot convert usize to u64");
Expand All @@ -118,23 +133,36 @@ impl ChainStore {
.get_ser::<LatestWitnessesInfo>(DBCol::Misc, LATEST_WITNESSES_INFO)?
.unwrap_or_default();

let new_witness_index = info.next_witness_index;

// Adjust the info to include the new witness.
info.count += 1;
info.total_size += serialized_witness.len() as u64;
info.next_witness_index += 1;

// Go over witnesses with increasing (height, shard_id) and remove them until the limits are satisfied.
// Height and shard id are stored in big-endian representation, so sorting the binary representation is
// the same as sorting the integers.
let mut store_update = self.store().store_update();
for item in self.store().iter(DBCol::LatestChunkStateWitnesses) {
if info.is_within_limits() {
break;
}

let (key_bytes, witness_bytes) = item?;
store_update.delete(DBCol::LatestChunkStateWitnesses, &key_bytes);
// Go over witnesses with increasing indexes and remove them until the limits are satisfied.
while !info.is_within_limits() && info.lowest_index < info.next_witness_index {
let key_to_delete = self
.store()
.get(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes())?
.ok_or_else(|| {
std::io::Error::new(
ErrorKind::NotFound,
format!(
"Cannot find witness key to delete with index {}",
info.lowest_index
),
)
})?;
let key_deser = LatestWitnessesKey::deserialize(&key_to_delete)?;

store_update.delete(DBCol::LatestChunkStateWitnesses, &key_to_delete);
store_update.delete(DBCol::LatestWitnessesByIndex, &info.lowest_index.to_be_bytes());
info.lowest_index += 1;
info.count -= 1;
info.total_size -= witness_bytes.len() as u64;
info.total_size -= key_deser.witness_size;
}

// Limits are ok, insert the new witness.
Expand All @@ -144,16 +172,44 @@ impl ChainStore {
height: witness.chunk_header.height_created(),
shard_id: witness.chunk_header.shard_id(),
epoch_id: witness.epoch_id.clone(),
witness_size: serialized_witness_size,
random_uuid,
};
store_update.set(DBCol::LatestChunkStateWitnesses, &key.serialized(), &serialized_witness);
store_update.set(
DBCol::LatestWitnessesByIndex,
&new_witness_index.to_be_bytes(),
&key.serialized(),
);

// Update LatestWitnessesInfo
store_update.set(DBCol::Misc, &LATEST_WITNESSES_INFO, &borsh::to_vec(&info)?);

let store_update_time = start_time.elapsed();

// Commit the transaction
store_update.commit()?;

let store_commit_time = start_time.elapsed().saturating_sub(store_update_time);

let shard_id_str = witness.chunk_header.shard_id().to_string();
metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME
.with_label_values(&[shard_id_str.as_str()])
.observe(store_update_time.as_secs_f64());
metrics::SAVE_LATEST_WITNESS_COMMIT_UPDATE_TIME
.with_label_values(&[shard_id_str.as_str()])
.observe(store_commit_time.as_secs_f64());
metrics::SAVED_LATEST_WITNESSES_COUNT.set(info.count as i64);
metrics::SAVED_LATEST_WITNESSES_SIZE.set(info.total_size as i64);

tracing::debug!(
?store_update_time,
?store_commit_time,
total_count = info.count,
total_size = info.total_size,
"Saved latest witness",
);

Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions chain/client/src/stateless_validation/shadow_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ impl Client {
chunk,
validated_transactions.storage_proof,
)?;
if self.config.save_latest_witnesses {
self.chain.chain_store.save_latest_chunk_state_witness(&witness)?;
}
let (encoded_witness, raw_witness_size) = {
let shard_id_label = shard_id.to_string();
let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME
Expand Down
7 changes: 7 additions & 0 deletions core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ pub enum DBCol {
/// - *Rows*: `LatestWitnessesKey`
/// - *Column type*: `ChunkStateWitness`
LatestChunkStateWitnesses,
/// Each observed LatestChunkStateWitness gets an index, in increasing order.
/// Witnesses with the lowest index are garbage collected first.
/// u64 -> LatestWitnessesKey
LatestWitnessesByIndex,
/// Column to store data for Epoch Sync.
/// Does not contain data for genesis epoch.
/// - *Rows*: `epoch_id`
Expand Down Expand Up @@ -330,6 +334,7 @@ pub enum DBKeyType {
PartId,
ColumnId,
LatestWitnessesKey,
LatestWitnessIndex,
}

impl DBCol {
Expand Down Expand Up @@ -467,6 +472,7 @@ impl DBCol {
DBCol::StateTransitionData => false,
// LatestChunkStateWitnesses stores the last N observed witnesses, used only for debugging.
DBCol::LatestChunkStateWitnesses => false,
DBCol::LatestWitnessesByIndex => false,

// Columns that are not GC-ed need not be copied to the cold storage.
DBCol::BlockHeader
Expand Down Expand Up @@ -567,6 +573,7 @@ impl DBCol {
DBCol::FlatStorageStatus => &[DBKeyType::ShardUId],
DBCol::StateTransitionData => &[DBKeyType::BlockHash, DBKeyType::ShardId],
DBCol::LatestChunkStateWitnesses => &[DBKeyType::LatestWitnessesKey],
DBCol::LatestWitnessesByIndex => &[DBKeyType::LatestWitnessIndex],
#[cfg(feature = "new_epoch_sync")]
DBCol::EpochSyncInfo => &[DBKeyType::EpochId],
}
Expand Down

0 comments on commit 1246e3a

Please sign in to comment.