Skip to content

Commit

Permalink
flat storage tool
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed Apr 24, 2024
1 parent 02a315a commit 096c2d5
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 7 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl FlatStorageShardCreator {
&mut store_update,
shard_uid,
key,
Some(FlatStateValue::value_ref(&value)),
Some(FlatStateValue::inlined(&value)),
);
num_items += 1;
}
Expand Down
10 changes: 9 additions & 1 deletion core/store/src/db/mixeddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,26 @@ impl MixedDB {
impl Database for MixedDB {
fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
if let Some(first_result) = self.first_db().get_raw_bytes(col, key)? {
tracing::trace!(target: "mixeddb", ?col, "Returning from first DB");
return Ok(Some(first_result));
}
tracing::trace!(target: "mixeddb", ?col, "Returning from second DB");
self.second_db().get_raw_bytes(col, key)
}

fn get_with_rc_stripped(&self, col: DBCol, key: &[u8]) -> io::Result<Option<DBSlice<'_>>> {
assert!(col.is_rc());

if let Some(first_result) = self.first_db().get_with_rc_stripped(col, key)? {
tracing::trace!(target: "mixeddb", ?col, "Returning from first DB");
return Ok(Some(first_result));
}
tracing::trace!(target: "mixeddb", ?col, "Returning from second DB");
self.second_db().get_with_rc_stripped(col, key)
}

fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
Self::merge_iter(self.read_db.iter(col), self.write_db.iter(col))
Self::merge_iter(self.first_db().iter(col), self.second_db().iter(col))
}

fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> {
Expand Down Expand Up @@ -110,7 +114,11 @@ impl Database for MixedDB {
);
}

/// The split db, in principle, should be read only and only used in view client.
/// However the view client *does* write to the db in order to update cache.
/// Hence we need to allow writing to the split db but only write to the hot db.
fn write(&self, batch: DBTransaction) -> io::Result<()> {
tracing::trace!(target: "mixeddb", "Writing to writeDB");
self.write_db.write(batch)
}

Expand Down
132 changes: 127 additions & 5 deletions tools/flat-storage/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@ use near_chain::types::RuntimeAdapter;
use near_chain::{ChainStore, ChainStoreAccess};
use near_chain_configs::GenesisValidationMode;
use near_epoch_manager::{EpochManager, EpochManagerAdapter, EpochManagerHandle};
use near_primitives::block::Tip;
use near_primitives::block_header::BlockHeader;
use near_primitives::shard_layout::{account_id_to_shard_id, ShardVersion};
use near_primitives::state::FlatStateValue;
use near_primitives::types::{BlockHeight, ShardId};
use near_store::db::SplitDB;
use near_store::db::{Database, MixedDB, ReadOrder, TestDB};
use near_store::flat::{
inline_flat_state_values, store_helper, FlatStateChanges, FlatStateDelta,
FlatStateDeltaMetadata, FlatStorageManager, FlatStorageStatus,
};
use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener};
use near_store::metadata::{DbKind, DB_VERSION};
use near_store::{
DBCol, Mode, NodeStorage, ShardUId, Store, StoreConfig, StoreOpener, Temperature,
};
use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tqdm::tqdm;
Expand Down Expand Up @@ -84,9 +92,13 @@ pub struct ResetCmd {

#[derive(Parser)]
pub struct InitCmd {
#[clap(long)]
shard_id: ShardId,

#[clap(default_value = "3")]
#[clap(long)]
height: BlockHeight,
#[clap(long)]
target_db_path: PathBuf,
#[clap(long, default_value = "3")]
num_threads: usize,
}

Expand Down Expand Up @@ -181,6 +193,109 @@ impl FlatStorageCommand {
(node_storage, epoch_manager, hot_runtime, chain_store, hot_store)
}

fn get_header(height: BlockHeight, store: &Store) -> BlockHeader {
let height_key = height.to_le_bytes();

let block_hash_vec =
store.get(DBCol::BlockHeight, &height_key).expect("Error reading from DB");
let block_hash_vec = block_hash_vec.expect("No such height");
let block_hash_key = block_hash_vec.as_slice();
store
.get_ser::<BlockHeader>(DBCol::BlockHeader, &block_hash_key)
.expect("Error reading from DB")
.expect(format!("Block header not found with {block_hash_vec:?}").as_str())
}

fn set_block_misc(height: BlockHeight, source_store: &Store, target_store: &Store) {
let mut db_update = target_store.store_update();

let header = Self::get_header(height, source_store);
let tip = Tip::from_header(&header);

let col = DBCol::BlockMisc;
db_update.set_ser(col, near_store::HEAD_KEY, &tip).expect("Unable to write HEAD_KEY");
db_update
.set_ser(col, near_store::HEADER_HEAD_KEY, &tip)
.expect("Unable to write HEADER_HEAD_KEY");
db_update
.set_ser(col, near_store::FINAL_HEAD_KEY, &tip)
.expect("Unable to write FINAL_HEAD_KEY");
db_update.commit().expect("Unable to commit to TestDB");
}

fn get_amend_db(height: BlockHeight, store: &Store) -> Arc<dyn Database> {
let db = TestDB::new();
let db_store = NodeStorage::new(db.clone()).get_hot_store();
Self::set_block_misc(height, store, &db_store);
db
}

fn get_init_db(
opener: &StoreOpener,
home_dir: &PathBuf,
near_config: &NearConfig,
height: BlockHeight,
target_db_path: &Path,
) -> (NodeStorage, Arc<EpochManagerHandle>, Arc<NightshadeRuntime>, ChainStore, Store) {
tracing::info!(target: "flat", ?height, ?target_db_path, "get_init_db");
let node_storage = {
let (base_db, amend_db) = {
let node_storage = opener.open().expect("Unable to create NodeStorage");
let amend_db = Self::get_amend_db(height, &node_storage.get_hot_store());
let base_db = match node_storage.cold_db() {
Some(cold_storage) => {
let cold_storage = cold_storage.clone();
SplitDB::new(node_storage.into_inner(Temperature::Hot), cold_storage)
}
None => node_storage.into_inner(Temperature::Hot),
};
(base_db, amend_db)
};
let write_db = {
let path = if target_db_path.is_absolute() {
PathBuf::from(target_db_path)
} else {
home_dir.join(&target_db_path)
};
Arc::new(
near_store::db::RocksDB::open(
&path,
&StoreConfig::default(),
Mode::ReadWrite,
near_store::Temperature::Hot,
)
.expect("Unable to open recovery db"),
)
};

// This DB can only write to write_db
// When reading, it first reads from wirte_db, then amended_db, then base_db
let mixed_db = MixedDB::new(
MixedDB::new(amend_db, base_db, ReadOrder::ReadDBFirst),
write_db,
ReadOrder::WriteDBFirst,
);
NodeStorage::new(mixed_db)
};

node_storage.get_hot_store().set_db_kind(DbKind::RPC).expect("Unable to set DbKind");
node_storage.get_hot_store().set_db_version(DB_VERSION).expect("Unable to set DbKind");
Self::set_block_misc(height, &node_storage.get_hot_store(), &node_storage.get_hot_store());

let epoch_manager =
EpochManager::new_arc_handle(node_storage.get_hot_store(), &near_config.genesis.config);
let hot_runtime = NightshadeRuntime::from_config(
home_dir,
node_storage.get_hot_store(),
&near_config,
epoch_manager.clone(),
)
.expect("could not create transaction runtime");
let chain_store = ChainStore::new(node_storage.get_hot_store(), 0, false);
let hot_store = node_storage.get_hot_store();
(node_storage, epoch_manager, hot_runtime, chain_store, hot_store)
}

fn view(
&self,
cmd: &ViewCmd,
Expand Down Expand Up @@ -258,10 +373,17 @@ impl FlatStorageCommand {
opener: StoreOpener,
) -> anyhow::Result<()> {
let (_, epoch_manager, rw_hot_runtime, rw_chain_store, rw_hot_store) =
Self::get_db(&opener, home_dir, &near_config, near_store::Mode::ReadWriteExisting);
Self::get_init_db(&opener, home_dir, &near_config, cmd.height, &cmd.target_db_path);

let tip = rw_chain_store.final_head()?;
let shard_uid = epoch_manager.shard_id_to_uid(cmd.shard_id, &tip.epoch_id)?;

let mut store_update = rw_hot_store.store_update();
store_update
.set_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes(), &FlatStorageStatus::Empty)
.expect("Unable to set FS status");
store_update.commit().expect("Unable to change FS status");

let mut creator =
FlatStorageShardCreator::new(shard_uid, tip.height - 1, epoch_manager, rw_hot_runtime);
let pool = rayon::ThreadPoolBuilder::new().num_threads(cmd.num_threads).build()?;
Expand Down Expand Up @@ -639,7 +761,7 @@ impl FlatStorageCommand {
home_dir,
near_config.config.archive,
&near_config.config.store,
None,
near_config.config.cold_store.as_ref(),
);

match &self.subcmd {
Expand Down

0 comments on commit 096c2d5

Please sign in to comment.