Skip to content

Commit

Permalink
fix(split-storage) - optimize copying of state (#10403)
Browse files Browse the repository at this point in the history
The current approach means that when looking for a trie node in the db
we need to check all shards. In practice most nodes are only ever
present in one shard so the current implementation is ~4x slower when it
comes to reading trie nodes than it could be. The state reads also
happen to account for majority of time spent in copying heights from hot
to cold.

This fix should resolve the ever growing cold head lag that is described
in more detail on
[zulip](https://near.zulipchat.com/#narrow/stream/308695-pagoda.2Fprivate/topic/Split.20storage.20nodes.20increasing.20cold.20head.20lag)
which is a major issue for our archival nodes.
  • Loading branch information
wacban committed Jan 11, 2024
1 parent 84f3d28 commit 97e72b9
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 65 deletions.
164 changes: 117 additions & 47 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn update_cold_db(
shard_layout: &ShardLayout,
height: &BlockHeight,
) -> io::Result<bool> {
let _span = tracing::debug_span!(target: "store", "update cold db", height = height);
let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height);
let _timer = metrics::COLD_COPY_DURATION.start_timer();

let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };
Expand All @@ -69,16 +69,24 @@ pub fn update_cold_db(
return Ok(false);
}

let key_type_to_keys = get_keys_from_store(&mut store_with_cache, shard_layout, height)?;
let height_key = height.to_le_bytes();
let block_hash_vec = store_with_cache.get_or_err(DBCol::BlockHeight, &height_key)?;
let block_hash_key = block_hash_vec.as_slice();

let key_type_to_keys =
get_keys_from_store(&mut store_with_cache, shard_layout, &height_key, block_hash_key)?;
for col in DBCol::iter() {
if col.is_cold() {
copy_from_store(
cold_db,
&mut store_with_cache,
col,
combine_keys(&key_type_to_keys, &col.key_type()),
)?;
if !col.is_cold() {
continue;
}

if col == DBCol::State {
copy_state_from_store(shard_layout, block_hash_key, cold_db, &mut store_with_cache)?;
continue;
}

let keys = combine_keys(&key_type_to_keys, &col.key_type());
copy_from_store(cold_db, &mut store_with_cache, col, keys)?;
}

Ok(true)
Expand Down Expand Up @@ -109,6 +117,60 @@ fn rc_aware_set(
};
}

// A specialized version of copy_from_store for the State column. Finds all the
// State nodes that were inserted at given height by reading from the
// TrieChanges and inserts them into the cold store.
//
// The generic implementation is not efficient for State because it would
// attempt to read every node from every shard. Here we know exactly what shard
// the node belongs to.
fn copy_state_from_store(
shard_layout: &ShardLayout,
block_hash_key: &[u8],
cold_db: &ColdDB,
hot_store: &mut StoreWithCache,
) -> io::Result<()> {
let col = DBCol::State;
let _span = tracing::debug_span!(target: "cold_store", "copy_state_from_store", %col);
let instant = std::time::Instant::now();

let mut transaction = DBTransaction::new();
for shard_uid in shard_layout.shard_uids() {
debug_assert_eq!(
DBCol::TrieChanges.key_type(),
&[DBKeyType::BlockHash, DBKeyType::ShardUId]
);

let shard_uid_key = shard_uid.to_bytes();
let key = join_two_keys(&block_hash_key, &shard_uid_key);
let trie_changes: Option<TrieChanges> =
hot_store.get_ser::<TrieChanges>(DBCol::TrieChanges, &key)?;

let Some(trie_changes) = trie_changes else { continue };
for op in trie_changes.insertions() {
hot_store.insert_state_to_cache_from_op(op, &shard_uid_key);

let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let value = hot_store.get(DBCol::State, &key)?;
let value =
value.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, hex::encode(&key)))?;

tracing::trace!(target: "cold_store", pretty_key=?near_fmt::StorageKey(&key), "copying state node to colddb");
rc_aware_set(&mut transaction, DBCol::State, key, value);
}
}

let read_duration = instant.elapsed();

let instant = std::time::Instant::now();
cold_db.write(transaction)?;
let write_duration = instant.elapsed();

tracing::trace!(target: "cold_store", ?read_duration, ?write_duration, "finished");

Ok(())
}

/// Gets values for given keys in a column from provided hot_store.
/// Creates a transaction based on that values with set DBOp s.
/// Writes that transaction to cold_db.
Expand All @@ -118,9 +180,17 @@ fn copy_from_store(
col: DBCol,
keys: Vec<StoreKey>,
) -> io::Result<()> {
let _span = tracing::debug_span!(target: "store", "create and write transaction to cold db", col = %col);
debug_assert!(col.is_cold());

// note this function should only be used for state in tests where it's
// needed to copy state records from genesis

let _span = tracing::debug_span!(target: "cold_store", "copy_from_store", col = %col);
let instant = std::time::Instant::now();

let mut transaction = DBTransaction::new();
let mut good_keys = 0;
let total_keys = keys.len();
for key in keys {
// TODO: Look into using RocksDB’s multi_key function. It
// might speed things up. Currently our Database abstraction
Expand All @@ -135,10 +205,19 @@ fn copy_from_store(
// write raw bytes. This would also allow us to bypass stripping and
// re-adding the reference count.

good_keys += 1;
rc_aware_set(&mut transaction, col, key, value);
}
}

let read_duration = instant.elapsed();

let instant = std::time::Instant::now();
cold_db.write(transaction)?;
let write_duration = instant.elapsed();

tracing::trace!(target: "cold_store", ?col, ?good_keys, ?total_keys, ?read_duration, ?write_duration, "finished");

return Ok(());
}

Expand All @@ -155,7 +234,7 @@ pub fn update_cold_head(
hot_store: &Store,
height: &BlockHeight,
) -> io::Result<()> {
tracing::debug!(target: "store", "update HEAD of cold db to {}", height);
tracing::debug!(target: "cold_store", "update HEAD of cold db to {}", height);

let mut store = StoreWithCache { store: hot_store, cache: StoreCache::new() };

Expand Down Expand Up @@ -222,17 +301,26 @@ pub fn copy_all_data_to_cold(
Ok(CopyAllDataToColdStatus::EverythingCopied)
}

// The copy_state_from_store function depends on the state nodes to be present
// in the trie changes. This isn't the case for genesis so instead this method
// can be used to copy the genesis records from hot to cold.
// TODO - How did copying from genesis worked in the prod migration to split storage?
pub fn test_cold_genesis_update(cold_db: &ColdDB, hot_store: &Store) -> io::Result<()> {
let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };
for col in DBCol::iter() {
if col.is_cold() {
copy_from_store(
cold_db,
&mut store_with_cache,
col,
hot_store.iter(col).map(|x| x.unwrap().0.to_vec()).collect(),
)?;
if !col.is_cold() {
continue;
}

// Note that we use the generic implementation of `copy_from_store` also
// for the State column that otherwise should be copied using the
// specialized `copy_state_from_store`.
copy_from_store(
cold_db,
&mut store_with_cache,
col,
hot_store.iter(col).map(|x| x.unwrap().0.to_vec()).collect(),
)?;
}
Ok(())
}
Expand All @@ -256,13 +344,11 @@ pub fn test_get_store_initial_writes(column: DBCol) -> u64 {
fn get_keys_from_store(
store: &mut StoreWithCache,
shard_layout: &ShardLayout,
height: &BlockHeight,
height_key: &[u8],
block_hash_key: &[u8],
) -> io::Result<HashMap<DBKeyType, Vec<StoreKey>>> {
let mut key_type_to_keys = HashMap::new();

let height_key = height.to_le_bytes();
let block_hash_key = store.get_or_err(DBCol::BlockHeight, &height_key)?.as_slice().to_vec();

let block: Block = store.get_ser_or_err(DBCol::Block, &block_hash_key)?;
let chunks = block
.chunks()
Expand All @@ -273,11 +359,19 @@ fn get_keys_from_store(
.collect::<io::Result<Vec<ShardChunk>>>()?;

for key_type in DBKeyType::iter() {
if key_type == DBKeyType::TrieNodeOrValueHash {
// The TrieNodeOrValueHash is only used in the State column, which is handled separately.
continue;
}

key_type_to_keys.insert(
key_type,
match key_type {
DBKeyType::TrieNodeOrValueHash => {
unreachable!();
}
DBKeyType::BlockHeight => vec![height_key.to_vec()],
DBKeyType::BlockHash => vec![block_hash_key.clone()],
DBKeyType::BlockHash => vec![block_hash_key.to_vec()],
DBKeyType::PreviousBlockHash => {
vec![block.header().prev_hash().as_bytes().to_vec()]
}
Expand All @@ -289,30 +383,6 @@ fn get_keys_from_store(
.shard_uids()
.map(|shard_uid| shard_uid.to_bytes().to_vec())
.collect(),
// TODO: don't write values of State column to cache. Write them directly to colddb.
DBKeyType::TrieNodeOrValueHash => {
let mut keys = vec![];
for shard_uid in shard_layout.shard_uids() {
let shard_uid_key = shard_uid.to_bytes();

debug_assert_eq!(
DBCol::TrieChanges.key_type(),
&[DBKeyType::BlockHash, DBKeyType::ShardUId]
);
let trie_changes_option: Option<TrieChanges> = store.get_ser(
DBCol::TrieChanges,
&join_two_keys(&block_hash_key, &shard_uid_key),
)?;

if let Some(trie_changes) = trie_changes_option {
for op in trie_changes.insertions() {
store.insert_state_to_cache_from_op(op, &shard_uid_key);
keys.push(op.hash().as_bytes().to_vec());
}
}
}
keys
}
// TODO: write StateChanges values to colddb directly, not to cache.
DBKeyType::TrieKey => {
let mut keys = vec![];
Expand Down
31 changes: 26 additions & 5 deletions core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,33 @@ impl fmt::Display for DBCol {
}
}

#[test]
fn column_props_sanity() {
#[cfg(test)]
mod tests {
use super::*;
use strum::IntoEnumIterator;

for col in DBCol::iter() {
// Check that rc and write_once are mutually exclusive.
assert!((col.is_rc() as u32) + (col.is_insert_only() as u32) <= 1, "{col}")
#[test]
fn column_props_sanity() {
for col in DBCol::iter() {
// Check that rc and write_once are mutually exclusive.
assert!((col.is_rc() as u32) + (col.is_insert_only() as u32) <= 1, "{col}")
}
}

// In split storage archival nodes the State column and the
// TrieNodeOrValueHash db key type and handled separately.
// This implementation asserts that the TrieNodeOrValueHash key type is
// only use in the State column and in no other columns.
#[test]
fn key_type_split_storage_sanity() {
for col in DBCol::iter() {
if col == DBCol::State {
continue;
}
let key_types = col.key_type();
for key_type in key_types {
assert_ne!(key_type, &DBKeyType::TrieNodeOrValueHash);
}
}
}
}
14 changes: 7 additions & 7 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ fn check_key(first_store: &Store, second_store: &Store, col: DBCol, key: &[u8])
let pretty_key = near_fmt::StorageKey(key);
tracing::debug!("Checking {:?} {:?}", col, pretty_key);

let first_res = first_store.get(col, key);
let second_res = second_store.get(col, key);
let first_res = first_store.get(col, key).unwrap();
let second_res = second_store.get(col, key).unwrap();

if col == DBCol::PartialChunks {
tracing::debug!("{:?}", first_store.get_ser::<PartialEncodedChunk>(col, key));
}

assert_eq!(first_res.unwrap(), second_res.unwrap(), "col: {:?} key: {:?}", col, pretty_key);
assert_eq!(first_res, second_res, "col: {:?} key: {:?}", col, pretty_key);
}

fn check_iter(
Expand Down Expand Up @@ -86,11 +86,11 @@ fn test_storage_after_commit_of_cold_update() {
.nightshade_runtimes(&genesis)
.build();

let (store, ..) = create_test_node_storage_with_cold(DB_VERSION, DbKind::Hot);
let (storage, ..) = create_test_node_storage_with_cold(DB_VERSION, DbKind::Hot);

let mut last_hash = *env.clients[0].chain.genesis().hash();

test_cold_genesis_update(&*store.cold_db().unwrap(), &env.clients[0].runtime_adapter.store())
test_cold_genesis_update(&*storage.cold_db().unwrap(), &env.clients[0].runtime_adapter.store())
.unwrap();

let state_reads = test_get_store_reads(DBCol::State);
Expand Down Expand Up @@ -148,7 +148,7 @@ fn test_storage_after_commit_of_cold_update() {
env.process_block(0, block.clone(), Provenance::PRODUCED);

update_cold_db(
&*store.cold_db().unwrap(),
&*storage.cold_db().unwrap(),
&env.clients[0].runtime_adapter.store(),
&env.clients[0]
.epoch_manager
Expand Down Expand Up @@ -202,7 +202,7 @@ fn test_storage_after_commit_of_cold_update() {
if col.is_cold() {
let num_checks = check_iter(
&env.clients[0].runtime_adapter.store(),
&store.get_cold_store().unwrap(),
&storage.get_cold_store().unwrap(),
col,
&no_check_rules,
);
Expand Down
21 changes: 15 additions & 6 deletions nearcore/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl ColdStoreLoopHandle {
}

/// The ColdStoreCopyResult indicates if and what block was copied.
#[derive(Debug)]
enum ColdStoreCopyResult {
// No block was copied. The cold head is up to date with the final head.
NoBlockCopied,
Expand Down Expand Up @@ -69,7 +70,7 @@ fn cold_store_copy(
let hot_tail = hot_store.get_ser::<u64>(DBCol::BlockMisc, TAIL_KEY)?;
let hot_tail_height = hot_tail.unwrap_or(genesis_height);

tracing::debug!(target: "cold_store", "cold store loop, cold_head {}, hot_final_head {}, hot_tail {}", cold_head_height, hot_final_head_height, hot_tail_height);
let _span = tracing::debug_span!(target: "cold_store", "cold_store_copy", cold_head_height, hot_final_head_height, hot_tail_height).entered();

if cold_head_height > hot_final_head_height {
return Err(anyhow::anyhow!(
Expand Down Expand Up @@ -120,11 +121,14 @@ fn cold_store_copy(

update_cold_head(cold_db, hot_store, &next_height)?;

if next_height >= hot_final_head_height {
let result = if next_height >= hot_final_head_height {
Ok(ColdStoreCopyResult::LatestBlockCopied)
} else {
Ok(ColdStoreCopyResult::OtherBlockCopied)
}
};

tracing::trace!(target: "cold_store", ?result, "ending");
result
}

fn cold_store_copy_result_to_string(result: &anyhow::Result<ColdStoreCopyResult>) -> &str {
Expand Down Expand Up @@ -260,12 +264,17 @@ fn cold_store_loop(
tracing::debug!(target : "cold_store", "Stopping the cold store loop");
break;
}

let instant = std::time::Instant::now();
let result =
cold_store_copy(&hot_store, &cold_store, &cold_db, genesis_height, epoch_manager);
let duration = instant.elapsed();

metrics::COLD_STORE_COPY_RESULT
.with_label_values(&[cold_store_copy_result_to_string(&result)])
.inc();
let result_string = cold_store_copy_result_to_string(&result);
metrics::COLD_STORE_COPY_RESULT.with_label_values(&[result_string]).inc();
if duration > std::time::Duration::from_secs(1) {
tracing::debug!(target : "cold_store", "cold_store_copy took {}s", duration.as_secs_f64());
}

let sleep_duration = split_storage_config.cold_store_loop_sleep_duration;
match result {
Expand Down

0 comments on commit 97e72b9

Please sign in to comment.