diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index e5d4e2fdd49..f4705fde1dd 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -115,7 +115,7 @@ impl FlatStorageShardCreator { &mut store_update, shard_uid, key, - Some(FlatStateValue::value_ref(&value)), + Some(FlatStateValue::inlined(&value)), ); num_items += 1; } diff --git a/core/store/src/db/mixeddb.rs b/core/store/src/db/mixeddb.rs index 244c2544a78..1966c3bd334 100644 --- a/core/store/src/db/mixeddb.rs +++ b/core/store/src/db/mixeddb.rs @@ -66,8 +66,10 @@ impl MixedDB { impl Database for MixedDB { fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> io::Result>> { if let Some(first_result) = self.first_db().get_raw_bytes(col, key)? { + tracing::trace!(target: "mixeddb", ?col, "Returning from first DB"); return Ok(Some(first_result)); } + tracing::trace!(target: "mixeddb", ?col, "Returning from second DB"); self.second_db().get_raw_bytes(col, key) } @@ -75,13 +77,15 @@ impl Database for MixedDB { assert!(col.is_rc()); if let Some(first_result) = self.first_db().get_with_rc_stripped(col, key)? { + tracing::trace!(target: "mixeddb", ?col, "Returning from first DB"); return Ok(Some(first_result)); } + tracing::trace!(target: "mixeddb", ?col, "Returning from second DB"); self.second_db().get_with_rc_stripped(col, key) } fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { - Self::merge_iter(self.read_db.iter(col), self.write_db.iter(col)) + Self::merge_iter(self.first_db().iter(col), self.second_db().iter(col)) } fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> { @@ -110,7 +114,11 @@ impl Database for MixedDB { ); } + /// The split db, in principle, should be read only and only used in view client. + /// However the view client *does* write to the db in order to update cache. + /// Hence we need to allow writing to the split db but only write to the hot db. fn write(&self, batch: DBTransaction) -> io::Result<()> { + tracing::trace!(target: "mixeddb", "Writing to writeDB"); self.write_db.write(batch) } diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index d268907160c..b73f16dc0e9 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -6,16 +6,24 @@ use near_chain::types::RuntimeAdapter; use near_chain::{ChainStore, ChainStoreAccess}; use near_chain_configs::GenesisValidationMode; use near_epoch_manager::{EpochManager, EpochManagerAdapter, EpochManagerHandle}; +use near_primitives::block::Tip; +use near_primitives::block_header::BlockHeader; use near_primitives::shard_layout::{account_id_to_shard_id, ShardVersion}; use near_primitives::state::FlatStateValue; use near_primitives::types::{BlockHeight, ShardId}; +use near_store::db::SplitDB; +use near_store::db::{Database, MixedDB, ReadOrder, TestDB}; use near_store::flat::{ inline_flat_state_values, store_helper, FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, FlatStorageManager, FlatStorageStatus, }; -use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener}; +use near_store::metadata::{DbKind, DB_VERSION}; +use near_store::{ + DBCol, Mode, NodeStorage, ShardUId, Store, StoreConfig, StoreOpener, Temperature, +}; use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt}; use std::collections::{HashMap, HashSet}; +use std::path::Path; use std::sync::atomic::AtomicBool; use std::{path::PathBuf, sync::Arc, time::Duration}; use tqdm::tqdm; @@ -84,9 +92,13 @@ pub struct ResetCmd { #[derive(Parser)] pub struct InitCmd { + #[clap(long)] shard_id: ShardId, - - #[clap(default_value = "3")] + #[clap(long)] + height: BlockHeight, + #[clap(long)] + target_db_path: PathBuf, + #[clap(long, default_value = "3")] num_threads: usize, } @@ -181,6 +193,109 @@ impl FlatStorageCommand { (node_storage, epoch_manager, hot_runtime, chain_store, hot_store) } + fn get_header(height: BlockHeight, store: &Store) -> BlockHeader { + let height_key = height.to_le_bytes(); + + let block_hash_vec = + store.get(DBCol::BlockHeight, &height_key).expect("Error reading from DB"); + let block_hash_vec = block_hash_vec.expect("No such height"); + let block_hash_key = block_hash_vec.as_slice(); + store + .get_ser::(DBCol::BlockHeader, &block_hash_key) + .expect("Error reading from DB") + .expect(format!("Block header not found with {block_hash_vec:?}").as_str()) + } + + fn set_block_misc(height: BlockHeight, source_store: &Store, target_store: &Store) { + let mut db_update = target_store.store_update(); + + let header = Self::get_header(height, source_store); + let tip = Tip::from_header(&header); + + let col = DBCol::BlockMisc; + db_update.set_ser(col, near_store::HEAD_KEY, &tip).expect("Unable to write HEAD_KEY"); + db_update + .set_ser(col, near_store::HEADER_HEAD_KEY, &tip) + .expect("Unable to write HEADER_HEAD_KEY"); + db_update + .set_ser(col, near_store::FINAL_HEAD_KEY, &tip) + .expect("Unable to write FINAL_HEAD_KEY"); + db_update.commit().expect("Unable to commit to TestDB"); + } + + fn get_amend_db(height: BlockHeight, store: &Store) -> Arc { + let db = TestDB::new(); + let db_store = NodeStorage::new(db.clone()).get_hot_store(); + Self::set_block_misc(height, store, &db_store); + db + } + + fn get_init_db( + opener: &StoreOpener, + home_dir: &PathBuf, + near_config: &NearConfig, + height: BlockHeight, + target_db_path: &Path, + ) -> (NodeStorage, Arc, Arc, ChainStore, Store) { + tracing::info!(target: "flat", ?height, ?target_db_path, "get_init_db"); + let node_storage = { + let (base_db, amend_db) = { + let node_storage = opener.open().expect("Unable to create NodeStorage"); + let amend_db = Self::get_amend_db(height, &node_storage.get_hot_store()); + let base_db = match node_storage.cold_db() { + Some(cold_storage) => { + let cold_storage = cold_storage.clone(); + SplitDB::new(node_storage.into_inner(Temperature::Hot), cold_storage) + } + None => node_storage.into_inner(Temperature::Hot), + }; + (base_db, amend_db) + }; + let write_db = { + let path = if target_db_path.is_absolute() { + PathBuf::from(target_db_path) + } else { + home_dir.join(&target_db_path) + }; + Arc::new( + near_store::db::RocksDB::open( + &path, + &StoreConfig::default(), + Mode::ReadWrite, + near_store::Temperature::Hot, + ) + .expect("Unable to open recovery db"), + ) + }; + + // This DB can only write to write_db + // When reading, it first reads from wirte_db, then amended_db, then base_db + let mixed_db = MixedDB::new( + MixedDB::new(amend_db, base_db, ReadOrder::ReadDBFirst), + write_db, + ReadOrder::WriteDBFirst, + ); + NodeStorage::new(mixed_db) + }; + + node_storage.get_hot_store().set_db_kind(DbKind::RPC).expect("Unable to set DbKind"); + node_storage.get_hot_store().set_db_version(DB_VERSION).expect("Unable to set DbKind"); + Self::set_block_misc(height, &node_storage.get_hot_store(), &node_storage.get_hot_store()); + + let epoch_manager = + EpochManager::new_arc_handle(node_storage.get_hot_store(), &near_config.genesis.config); + let hot_runtime = NightshadeRuntime::from_config( + home_dir, + node_storage.get_hot_store(), + &near_config, + epoch_manager.clone(), + ) + .expect("could not create transaction runtime"); + let chain_store = ChainStore::new(node_storage.get_hot_store(), 0, false); + let hot_store = node_storage.get_hot_store(); + (node_storage, epoch_manager, hot_runtime, chain_store, hot_store) + } + fn view( &self, cmd: &ViewCmd, @@ -258,10 +373,17 @@ impl FlatStorageCommand { opener: StoreOpener, ) -> anyhow::Result<()> { let (_, epoch_manager, rw_hot_runtime, rw_chain_store, rw_hot_store) = - Self::get_db(&opener, home_dir, &near_config, near_store::Mode::ReadWriteExisting); + Self::get_init_db(&opener, home_dir, &near_config, cmd.height, &cmd.target_db_path); let tip = rw_chain_store.final_head()?; let shard_uid = epoch_manager.shard_id_to_uid(cmd.shard_id, &tip.epoch_id)?; + + let mut store_update = rw_hot_store.store_update(); + store_update + .set_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes(), &FlatStorageStatus::Empty) + .expect("Unable to set FS status"); + store_update.commit().expect("Unable to change FS status"); + let mut creator = FlatStorageShardCreator::new(shard_uid, tip.height - 1, epoch_manager, rw_hot_runtime); let pool = rayon::ThreadPoolBuilder::new().num_threads(cmd.num_threads).build()?; @@ -639,7 +761,7 @@ impl FlatStorageCommand { home_dir, near_config.config.archive, &near_config.config.store, - None, + near_config.config.cold_store.as_ref(), ); match &self.subcmd {