Skip to content

Commit

Permalink
overhaul CachedStorage to presume RocksDB never falls behind after it…
Browse files Browse the repository at this point in the history
… catches up
  • Loading branch information
itegulov committed Mar 5, 2024
1 parent 8b91205 commit 7294f16
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 197 deletions.
2 changes: 1 addition & 1 deletion core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod witness;
pub use self::{
in_memory::{InMemoryStorage, IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID},
postgres::{PostgresStorage, PostgresStorageCaches},
rocksdb::{RocksbStorageBuilder, RocksdbStorage},
rocksdb::{RocksbStorageBuilder, RocksdbStorage, StateKeeperColumnFamily},
shadow_storage::ShadowStorage,
storage_view::{StorageView, StorageViewMetrics},
witness::WitnessStorage,
Expand Down
29 changes: 25 additions & 4 deletions core/lib/state/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ fn deserialize_l1_batch_number(bytes: &[u8]) -> u32 {
u32::from_le_bytes(bytes)
}

/// RocksDB column families used by the state keeper.
#[derive(Debug, Clone, Copy)]
enum StateKeeperColumnFamily {
pub enum StateKeeperColumnFamily {
/// Column family containing tree state information.
State,
/// Column family containing contract contents.
Contracts,
/// Column family containing bytecodes for new contracts that a certain contract may deploy.
FactoryDeps,
}

Expand Down Expand Up @@ -125,7 +129,8 @@ impl From<anyhow::Error> for RocksdbSyncError {
/// [`ReadStorage`] implementation backed by RocksDB.
#[derive(Debug)]
pub struct RocksdbStorage {
db: RocksDB<StateKeeperColumnFamily>,
/// Underlying RocksDB instance
pub db: RocksDB<StateKeeperColumnFamily>,
pending_patch: InMemoryStorage,
enum_index_migration_chunk_size: usize,
/// Test-only listeners to events produced by the storage.
Expand Down Expand Up @@ -208,16 +213,32 @@ impl RocksdbStorage {
/// # Errors
///
/// Propagates RocksDB I/O errors.
pub async fn builder(path: &Path) -> anyhow::Result<RocksbStorageBuilder> {
pub async fn open_builder(path: &Path) -> anyhow::Result<RocksbStorageBuilder> {
Self::new(path.to_path_buf())
.await
.map(RocksbStorageBuilder)
}

/// Creates a new storage builder with the provided RocksDB instance.
///
/// # Errors
///
/// Propagates RocksDB I/O errors.
pub async fn builder(
db: RocksDB<StateKeeperColumnFamily>,
) -> anyhow::Result<RocksbStorageBuilder> {
Self::new_rocksdb(db).await.map(RocksbStorageBuilder)
}

async fn new(path: PathBuf) -> anyhow::Result<Self> {
Self::new_rocksdb(RocksDB::new(&path).context("failed initializing state keeper RocksDB")?)
.await
}

async fn new_rocksdb(db: RocksDB<StateKeeperColumnFamily>) -> anyhow::Result<Self> {
tokio::task::spawn_blocking(move || {
Ok(Self {
db: RocksDB::new(&path).context("failed initializing state keeper RocksDB")?,
db,
pending_patch: InMemoryStorage::default(),
enum_index_migration_chunk_size: 100,
#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions core/lib/state/src/rocksdb/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn rocksdb_storage_basics() {

async fn sync_test_storage(dir: &TempDir, conn: &mut StorageProcessor<'_>) -> RocksdbStorage {
let (_stop_sender, stop_receiver) = watch::channel(false);
RocksdbStorage::builder(dir.path())
RocksdbStorage::open_builder(dir.path())
.await
.expect("Failed initializing RocksDB")
.synchronize(conn, &stop_receiver)
Expand Down Expand Up @@ -119,7 +119,7 @@ async fn rocksdb_storage_syncing_fault_tolerance() {

let dir = TempDir::new().expect("cannot create temporary dir for state keeper");
let (stop_sender, stop_receiver) = watch::channel(false);
let mut storage = RocksdbStorage::builder(dir.path())
let mut storage = RocksdbStorage::open_builder(dir.path())
.await
.expect("Failed initializing RocksDB");
let mut expected_l1_batch_number = L1BatchNumber(0);
Expand All @@ -137,7 +137,7 @@ async fn rocksdb_storage_syncing_fault_tolerance() {
assert!(storage.is_none());

// Resume storage syncing and check that it completes.
let storage = RocksdbStorage::builder(dir.path())
let storage = RocksdbStorage::open_builder(dir.path())
.await
.expect("Failed initializing RocksDB");
assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(3)));
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/block_reverter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl BlockReverter {
/// Reverts blocks in the state keeper cache.
async fn rollback_state_keeper_cache(&self, last_l1_batch_to_keep: L1BatchNumber) {
tracing::info!("opening DB with state keeper cache...");
let sk_cache = RocksdbStorage::builder(self.state_keeper_cache_path.as_ref())
let sk_cache = RocksdbStorage::open_builder(self.state_keeper_cache_path.as_ref())
.await
.expect("Failed initializing state keeper cache");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use multivm::{
use once_cell::sync::OnceCell;
use tokio::{
runtime::Handle,
sync::{mpsc, watch, RwLock},
sync::{mpsc, watch},
};
use zksync_dal::ConnectionPool;
use zksync_state::{ReadStorage, StorageView, WriteStorage};
Expand All @@ -38,7 +38,7 @@ pub struct MainBatchExecutor {
max_allowed_tx_gas_limit: U256,
upload_witness_inputs_to_gcs: bool,
optional_bytecode_compression: bool,
cached_storage: Arc<RwLock<CachedStorage>>,
cached_storage: CachedStorage,
}

impl MainBatchExecutor {
Expand All @@ -56,11 +56,11 @@ impl MainBatchExecutor {
max_allowed_tx_gas_limit,
upload_witness_inputs_to_gcs,
optional_bytecode_compression,
cached_storage: Arc::new(RwLock::new(CachedStorage::new(
cached_storage: CachedStorage::new(
pool,
state_keeper_db_path,
enum_index_migration_chunk_size,
))),
),
}
}
}
Expand Down Expand Up @@ -88,10 +88,9 @@ impl BatchExecutor for MainBatchExecutor {
let stop_receiver = stop_receiver.clone();
let handle = tokio::task::spawn_blocking(move || {
let rt_handle = Handle::current();
let mut cached_storage = rt_handle.block_on(cached_storage.write());
if let Some(storage) = rt_handle
.block_on(cached_storage.access_storage(rt_handle.clone(), stop_receiver))
.unwrap()
let factory = cached_storage.factory();
if let Some(storage) =
rt_handle.block_on(factory.access_storage(rt_handle.clone(), &stop_receiver))
{
executor.run(
storage,
Expand Down

0 comments on commit 7294f16

Please sign in to comment.