diff --git a/core/store/src/cold_storage.rs b/core/store/src/cold_storage.rs index 57fa4b322f3..0e4cc2ebe4e 100644 --- a/core/store/src/cold_storage.rs +++ b/core/store/src/cold_storage.rs @@ -60,7 +60,7 @@ pub fn update_cold_db( shard_layout: &ShardLayout, height: &BlockHeight, ) -> io::Result { - let _span = tracing::debug_span!(target: "store", "update cold db", height = height); + let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height); let _timer = metrics::COLD_COPY_DURATION.start_timer(); let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() }; @@ -69,16 +69,24 @@ pub fn update_cold_db( return Ok(false); } - let key_type_to_keys = get_keys_from_store(&mut store_with_cache, shard_layout, height)?; + let height_key = height.to_le_bytes(); + let block_hash_vec = store_with_cache.get_or_err(DBCol::BlockHeight, &height_key)?; + let block_hash_key = block_hash_vec.as_slice(); + + let key_type_to_keys = + get_keys_from_store(&mut store_with_cache, shard_layout, &height_key, block_hash_key)?; for col in DBCol::iter() { - if col.is_cold() { - copy_from_store( - cold_db, - &mut store_with_cache, - col, - combine_keys(&key_type_to_keys, &col.key_type()), - )?; + if !col.is_cold() { + continue; } + + if col == DBCol::State { + copy_state_from_store(shard_layout, block_hash_key, cold_db, &mut store_with_cache)?; + continue; + } + + let keys = combine_keys(&key_type_to_keys, &col.key_type()); + copy_from_store(cold_db, &mut store_with_cache, col, keys)?; } Ok(true) @@ -109,6 +117,60 @@ fn rc_aware_set( }; } +// A specialized version of copy_from_store for the State column. Finds all the +// State nodes that were inserted at given height by reading from the +// TrieChanges and inserts them into the cold store. +// +// The generic implementation is not efficient for State because it would +// attempt to read every node from every shard. Here we know exactly what shard +// the node belongs to. +fn copy_state_from_store( + shard_layout: &ShardLayout, + block_hash_key: &[u8], + cold_db: &ColdDB, + hot_store: &mut StoreWithCache, +) -> io::Result<()> { + let col = DBCol::State; + let _span = tracing::debug_span!(target: "cold_store", "copy_state_from_store", %col); + let instant = std::time::Instant::now(); + + let mut transaction = DBTransaction::new(); + for shard_uid in shard_layout.shard_uids() { + debug_assert_eq!( + DBCol::TrieChanges.key_type(), + &[DBKeyType::BlockHash, DBKeyType::ShardUId] + ); + + let shard_uid_key = shard_uid.to_bytes(); + let key = join_two_keys(&block_hash_key, &shard_uid_key); + let trie_changes: Option = + hot_store.get_ser::(DBCol::TrieChanges, &key)?; + + let Some(trie_changes) = trie_changes else { continue }; + for op in trie_changes.insertions() { + hot_store.insert_state_to_cache_from_op(op, &shard_uid_key); + + let key = join_two_keys(&shard_uid_key, op.hash().as_bytes()); + let value = hot_store.get(DBCol::State, &key)?; + let value = + value.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, hex::encode(&key)))?; + + tracing::trace!(target: "cold_store", pretty_key=?near_fmt::StorageKey(&key), "copying state node to colddb"); + rc_aware_set(&mut transaction, DBCol::State, key, value); + } + } + + let read_duration = instant.elapsed(); + + let instant = std::time::Instant::now(); + cold_db.write(transaction)?; + let write_duration = instant.elapsed(); + + tracing::trace!(target: "cold_store", ?read_duration, ?write_duration, "finished"); + + Ok(()) +} + /// Gets values for given keys in a column from provided hot_store. /// Creates a transaction based on that values with set DBOp s. /// Writes that transaction to cold_db. @@ -118,9 +180,17 @@ fn copy_from_store( col: DBCol, keys: Vec, ) -> io::Result<()> { - let _span = tracing::debug_span!(target: "store", "create and write transaction to cold db", col = %col); + debug_assert!(col.is_cold()); + + // note this function should only be used for state in tests where it's + // needed to copy state records from genesis + + let _span = tracing::debug_span!(target: "cold_store", "copy_from_store", col = %col); + let instant = std::time::Instant::now(); let mut transaction = DBTransaction::new(); + let mut good_keys = 0; + let total_keys = keys.len(); for key in keys { // TODO: Look into using RocksDB’s multi_key function. It // might speed things up. Currently our Database abstraction @@ -135,10 +205,19 @@ fn copy_from_store( // write raw bytes. This would also allow us to bypass stripping and // re-adding the reference count. + good_keys += 1; rc_aware_set(&mut transaction, col, key, value); } } + + let read_duration = instant.elapsed(); + + let instant = std::time::Instant::now(); cold_db.write(transaction)?; + let write_duration = instant.elapsed(); + + tracing::trace!(target: "cold_store", ?col, ?good_keys, ?total_keys, ?read_duration, ?write_duration, "finished"); + return Ok(()); } @@ -155,7 +234,7 @@ pub fn update_cold_head( hot_store: &Store, height: &BlockHeight, ) -> io::Result<()> { - tracing::debug!(target: "store", "update HEAD of cold db to {}", height); + tracing::debug!(target: "cold_store", "update HEAD of cold db to {}", height); let mut store = StoreWithCache { store: hot_store, cache: StoreCache::new() }; @@ -222,17 +301,26 @@ pub fn copy_all_data_to_cold( Ok(CopyAllDataToColdStatus::EverythingCopied) } +// The copy_state_from_store function depends on the state nodes to be present +// in the trie changes. This isn't the case for genesis so instead this method +// can be used to copy the genesis records from hot to cold. +// TODO - How did copying from genesis worked in the prod migration to split storage? pub fn test_cold_genesis_update(cold_db: &ColdDB, hot_store: &Store) -> io::Result<()> { let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() }; for col in DBCol::iter() { - if col.is_cold() { - copy_from_store( - cold_db, - &mut store_with_cache, - col, - hot_store.iter(col).map(|x| x.unwrap().0.to_vec()).collect(), - )?; + if !col.is_cold() { + continue; } + + // Note that we use the generic implementation of `copy_from_store` also + // for the State column that otherwise should be copied using the + // specialized `copy_state_from_store`. + copy_from_store( + cold_db, + &mut store_with_cache, + col, + hot_store.iter(col).map(|x| x.unwrap().0.to_vec()).collect(), + )?; } Ok(()) } @@ -256,13 +344,11 @@ pub fn test_get_store_initial_writes(column: DBCol) -> u64 { fn get_keys_from_store( store: &mut StoreWithCache, shard_layout: &ShardLayout, - height: &BlockHeight, + height_key: &[u8], + block_hash_key: &[u8], ) -> io::Result>> { let mut key_type_to_keys = HashMap::new(); - let height_key = height.to_le_bytes(); - let block_hash_key = store.get_or_err(DBCol::BlockHeight, &height_key)?.as_slice().to_vec(); - let block: Block = store.get_ser_or_err(DBCol::Block, &block_hash_key)?; let chunks = block .chunks() @@ -273,11 +359,19 @@ fn get_keys_from_store( .collect::>>()?; for key_type in DBKeyType::iter() { + if key_type == DBKeyType::TrieNodeOrValueHash { + // The TrieNodeOrValueHash is only used in the State column, which is handled separately. + continue; + } + key_type_to_keys.insert( key_type, match key_type { + DBKeyType::TrieNodeOrValueHash => { + unreachable!(); + } DBKeyType::BlockHeight => vec![height_key.to_vec()], - DBKeyType::BlockHash => vec![block_hash_key.clone()], + DBKeyType::BlockHash => vec![block_hash_key.to_vec()], DBKeyType::PreviousBlockHash => { vec![block.header().prev_hash().as_bytes().to_vec()] } @@ -289,30 +383,6 @@ fn get_keys_from_store( .shard_uids() .map(|shard_uid| shard_uid.to_bytes().to_vec()) .collect(), - // TODO: don't write values of State column to cache. Write them directly to colddb. - DBKeyType::TrieNodeOrValueHash => { - let mut keys = vec![]; - for shard_uid in shard_layout.shard_uids() { - let shard_uid_key = shard_uid.to_bytes(); - - debug_assert_eq!( - DBCol::TrieChanges.key_type(), - &[DBKeyType::BlockHash, DBKeyType::ShardUId] - ); - let trie_changes_option: Option = store.get_ser( - DBCol::TrieChanges, - &join_two_keys(&block_hash_key, &shard_uid_key), - )?; - - if let Some(trie_changes) = trie_changes_option { - for op in trie_changes.insertions() { - store.insert_state_to_cache_from_op(op, &shard_uid_key); - keys.push(op.hash().as_bytes().to_vec()); - } - } - } - keys - } // TODO: write StateChanges values to colddb directly, not to cache. DBKeyType::TrieKey => { let mut keys = vec![]; diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 6a2ea17b3b6..4b4d0a628bd 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -561,12 +561,33 @@ impl fmt::Display for DBCol { } } -#[test] -fn column_props_sanity() { +#[cfg(test)] +mod tests { + use super::*; use strum::IntoEnumIterator; - for col in DBCol::iter() { - // Check that rc and write_once are mutually exclusive. - assert!((col.is_rc() as u32) + (col.is_insert_only() as u32) <= 1, "{col}") + #[test] + fn column_props_sanity() { + for col in DBCol::iter() { + // Check that rc and write_once are mutually exclusive. + assert!((col.is_rc() as u32) + (col.is_insert_only() as u32) <= 1, "{col}") + } + } + + // In split storage archival nodes the State column and the + // TrieNodeOrValueHash db key type and handled separately. + // This implementation asserts that the TrieNodeOrValueHash key type is + // only use in the State column and in no other columns. + #[test] + fn key_type_split_storage_sanity() { + for col in DBCol::iter() { + if col == DBCol::State { + continue; + } + let key_types = col.key_type(); + for key_type in key_types { + assert_ne!(key_type, &DBKeyType::TrieNodeOrValueHash); + } + } } } diff --git a/integration-tests/src/tests/client/cold_storage.rs b/integration-tests/src/tests/client/cold_storage.rs index e62538dd0b2..ac1a9716ddb 100644 --- a/integration-tests/src/tests/client/cold_storage.rs +++ b/integration-tests/src/tests/client/cold_storage.rs @@ -31,14 +31,14 @@ fn check_key(first_store: &Store, second_store: &Store, col: DBCol, key: &[u8]) let pretty_key = near_fmt::StorageKey(key); tracing::debug!("Checking {:?} {:?}", col, pretty_key); - let first_res = first_store.get(col, key); - let second_res = second_store.get(col, key); + let first_res = first_store.get(col, key).unwrap(); + let second_res = second_store.get(col, key).unwrap(); if col == DBCol::PartialChunks { tracing::debug!("{:?}", first_store.get_ser::(col, key)); } - assert_eq!(first_res.unwrap(), second_res.unwrap(), "col: {:?} key: {:?}", col, pretty_key); + assert_eq!(first_res, second_res, "col: {:?} key: {:?}", col, pretty_key); } fn check_iter( @@ -86,11 +86,11 @@ fn test_storage_after_commit_of_cold_update() { .nightshade_runtimes(&genesis) .build(); - let (store, ..) = create_test_node_storage_with_cold(DB_VERSION, DbKind::Hot); + let (storage, ..) = create_test_node_storage_with_cold(DB_VERSION, DbKind::Hot); let mut last_hash = *env.clients[0].chain.genesis().hash(); - test_cold_genesis_update(&*store.cold_db().unwrap(), &env.clients[0].runtime_adapter.store()) + test_cold_genesis_update(&*storage.cold_db().unwrap(), &env.clients[0].runtime_adapter.store()) .unwrap(); let state_reads = test_get_store_reads(DBCol::State); @@ -148,7 +148,7 @@ fn test_storage_after_commit_of_cold_update() { env.process_block(0, block.clone(), Provenance::PRODUCED); update_cold_db( - &*store.cold_db().unwrap(), + &*storage.cold_db().unwrap(), &env.clients[0].runtime_adapter.store(), &env.clients[0] .epoch_manager @@ -202,7 +202,7 @@ fn test_storage_after_commit_of_cold_update() { if col.is_cold() { let num_checks = check_iter( &env.clients[0].runtime_adapter.store(), - &store.get_cold_store().unwrap(), + &storage.get_cold_store().unwrap(), col, &no_check_rules, ); diff --git a/nearcore/src/cold_storage.rs b/nearcore/src/cold_storage.rs index 249fbed5c58..7333b7d9d37 100644 --- a/nearcore/src/cold_storage.rs +++ b/nearcore/src/cold_storage.rs @@ -34,6 +34,7 @@ impl ColdStoreLoopHandle { } /// The ColdStoreCopyResult indicates if and what block was copied. +#[derive(Debug)] enum ColdStoreCopyResult { // No block was copied. The cold head is up to date with the final head. NoBlockCopied, @@ -69,7 +70,7 @@ fn cold_store_copy( let hot_tail = hot_store.get_ser::(DBCol::BlockMisc, TAIL_KEY)?; let hot_tail_height = hot_tail.unwrap_or(genesis_height); - tracing::debug!(target: "cold_store", "cold store loop, cold_head {}, hot_final_head {}, hot_tail {}", cold_head_height, hot_final_head_height, hot_tail_height); + let _span = tracing::debug_span!(target: "cold_store", "cold_store_copy", cold_head_height, hot_final_head_height, hot_tail_height).entered(); if cold_head_height > hot_final_head_height { return Err(anyhow::anyhow!( @@ -120,11 +121,14 @@ fn cold_store_copy( update_cold_head(cold_db, hot_store, &next_height)?; - if next_height >= hot_final_head_height { + let result = if next_height >= hot_final_head_height { Ok(ColdStoreCopyResult::LatestBlockCopied) } else { Ok(ColdStoreCopyResult::OtherBlockCopied) - } + }; + + tracing::trace!(target: "cold_store", ?result, "ending"); + result } fn cold_store_copy_result_to_string(result: &anyhow::Result) -> &str { @@ -260,12 +264,17 @@ fn cold_store_loop( tracing::debug!(target : "cold_store", "Stopping the cold store loop"); break; } + + let instant = std::time::Instant::now(); let result = cold_store_copy(&hot_store, &cold_store, &cold_db, genesis_height, epoch_manager); + let duration = instant.elapsed(); - metrics::COLD_STORE_COPY_RESULT - .with_label_values(&[cold_store_copy_result_to_string(&result)]) - .inc(); + let result_string = cold_store_copy_result_to_string(&result); + metrics::COLD_STORE_COPY_RESULT.with_label_values(&[result_string]).inc(); + if duration > std::time::Duration::from_secs(1) { + tracing::debug!(target : "cold_store", "cold_store_copy took {}s", duration.as_secs_f64()); + } let sleep_duration = split_storage_config.cold_store_loop_sleep_duration; match result {