diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index f76f38846566..046e458b7b1a 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -30,7 +30,7 @@ mod witness; pub use self::{ in_memory::{InMemoryStorage, IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID}, postgres::{PostgresStorage, PostgresStorageCaches}, - rocksdb::{RocksbStorageBuilder, RocksdbStorage}, + rocksdb::{RocksbStorageBuilder, RocksdbStorage, StateKeeperColumnFamily}, shadow_storage::ShadowStorage, storage_view::{StorageView, StorageViewMetrics}, witness::WitnessStorage, diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index b9df5402ee69..5d22a4ed9bdc 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -54,10 +54,14 @@ fn deserialize_l1_batch_number(bytes: &[u8]) -> u32 { u32::from_le_bytes(bytes) } +/// RocksDB column families used by the state keeper. #[derive(Debug, Clone, Copy)] -enum StateKeeperColumnFamily { +pub enum StateKeeperColumnFamily { + /// Column family containing tree state information. State, + /// Column family containing contract contents. Contracts, + /// Column family containing bytecodes for new contracts that a certain contract may deploy. FactoryDeps, } @@ -125,7 +129,8 @@ impl From for RocksdbSyncError { /// [`ReadStorage`] implementation backed by RocksDB. #[derive(Debug)] pub struct RocksdbStorage { - db: RocksDB, + /// Underlying RocksDB instance + pub db: RocksDB, pending_patch: InMemoryStorage, enum_index_migration_chunk_size: usize, /// Test-only listeners to events produced by the storage. @@ -208,16 +213,32 @@ impl RocksdbStorage { /// # Errors /// /// Propagates RocksDB I/O errors. - pub async fn builder(path: &Path) -> anyhow::Result { + pub async fn open_builder(path: &Path) -> anyhow::Result { Self::new(path.to_path_buf()) .await .map(RocksbStorageBuilder) } + /// Creates a new storage builder with the provided RocksDB instance. + /// + /// # Errors + /// + /// Propagates RocksDB I/O errors. + pub async fn builder( + db: RocksDB, + ) -> anyhow::Result { + Self::new_rocksdb(db).await.map(RocksbStorageBuilder) + } + async fn new(path: PathBuf) -> anyhow::Result { + Self::new_rocksdb(RocksDB::new(&path).context("failed initializing state keeper RocksDB")?) + .await + } + + async fn new_rocksdb(db: RocksDB) -> anyhow::Result { tokio::task::spawn_blocking(move || { Ok(Self { - db: RocksDB::new(&path).context("failed initializing state keeper RocksDB")?, + db, pending_patch: InMemoryStorage::default(), enum_index_migration_chunk_size: 100, #[cfg(test)] diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index c95325cdea69..46d931a1b248 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -78,7 +78,7 @@ async fn rocksdb_storage_basics() { async fn sync_test_storage(dir: &TempDir, conn: &mut StorageProcessor<'_>) -> RocksdbStorage { let (_stop_sender, stop_receiver) = watch::channel(false); - RocksdbStorage::builder(dir.path()) + RocksdbStorage::open_builder(dir.path()) .await .expect("Failed initializing RocksDB") .synchronize(conn, &stop_receiver) @@ -119,7 +119,7 @@ async fn rocksdb_storage_syncing_fault_tolerance() { let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); let (stop_sender, stop_receiver) = watch::channel(false); - let mut storage = RocksdbStorage::builder(dir.path()) + let mut storage = RocksdbStorage::open_builder(dir.path()) .await .expect("Failed initializing RocksDB"); let mut expected_l1_batch_number = L1BatchNumber(0); @@ -137,7 +137,7 @@ async fn rocksdb_storage_syncing_fault_tolerance() { assert!(storage.is_none()); // Resume storage syncing and check that it completes. - let storage = RocksdbStorage::builder(dir.path()) + let storage = RocksdbStorage::open_builder(dir.path()) .await .expect("Failed initializing RocksDB"); assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(3))); diff --git a/core/lib/zksync_core/src/block_reverter/mod.rs b/core/lib/zksync_core/src/block_reverter/mod.rs index a4b860722f86..42f23e6fbfa8 100644 --- a/core/lib/zksync_core/src/block_reverter/mod.rs +++ b/core/lib/zksync_core/src/block_reverter/mod.rs @@ -207,7 +207,7 @@ impl BlockReverter { /// Reverts blocks in the state keeper cache. async fn rollback_state_keeper_cache(&self, last_l1_batch_to_keep: L1BatchNumber) { tracing::info!("opening DB with state keeper cache..."); - let sk_cache = RocksdbStorage::builder(self.state_keeper_cache_path.as_ref()) + let sk_cache = RocksdbStorage::open_builder(self.state_keeper_cache_path.as_ref()) .await .expect("Failed initializing state keeper cache"); diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs index c61feebb17cc..528809bec4e2 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs @@ -13,7 +13,7 @@ use multivm::{ use once_cell::sync::OnceCell; use tokio::{ runtime::Handle, - sync::{mpsc, watch, RwLock}, + sync::{mpsc, watch}, }; use zksync_dal::ConnectionPool; use zksync_state::{ReadStorage, StorageView, WriteStorage}; @@ -38,7 +38,7 @@ pub struct MainBatchExecutor { max_allowed_tx_gas_limit: U256, upload_witness_inputs_to_gcs: bool, optional_bytecode_compression: bool, - cached_storage: Arc>, + cached_storage: CachedStorage, } impl MainBatchExecutor { @@ -56,11 +56,11 @@ impl MainBatchExecutor { max_allowed_tx_gas_limit, upload_witness_inputs_to_gcs, optional_bytecode_compression, - cached_storage: Arc::new(RwLock::new(CachedStorage::new( + cached_storage: CachedStorage::new( pool, state_keeper_db_path, enum_index_migration_chunk_size, - ))), + ), } } } @@ -88,10 +88,9 @@ impl BatchExecutor for MainBatchExecutor { let stop_receiver = stop_receiver.clone(); let handle = tokio::task::spawn_blocking(move || { let rt_handle = Handle::current(); - let mut cached_storage = rt_handle.block_on(cached_storage.write()); - if let Some(storage) = rt_handle - .block_on(cached_storage.access_storage(rt_handle.clone(), stop_receiver)) - .unwrap() + let factory = cached_storage.factory(); + if let Some(storage) = + rt_handle.block_on(factory.access_storage(rt_handle.clone(), &stop_receiver)) { executor.run( storage, diff --git a/core/lib/zksync_core/src/state_keeper/cached_storage.rs b/core/lib/zksync_core/src/state_keeper/cached_storage.rs index 37055ae5e62b..01eaa518091d 100644 --- a/core/lib/zksync_core/src/state_keeper/cached_storage.rs +++ b/core/lib/zksync_core/src/state_keeper/cached_storage.rs @@ -1,10 +1,13 @@ -use std::fmt::Debug; +use std::{ + fmt::Debug, + sync::{Arc, Mutex}, +}; use anyhow::Context; -use tokio::{runtime::Handle, sync::watch, task::JoinHandle}; +use tokio::{runtime::Handle, sync::watch}; use zksync_dal::{ConnectionPool, StorageProcessor}; -use zksync_state::{PostgresStorage, ReadStorage, RocksdbStorage}; -use zksync_types::L1BatchNumber; +use zksync_state::{PostgresStorage, ReadStorage, RocksdbStorage, StateKeeperColumnFamily}; +use zksync_storage::RocksDB; type BoxReadStorage<'a> = Box; @@ -14,83 +17,24 @@ type BoxReadStorage<'a> = Box; /// /// This struct's main design purpose is to be able to produce [`ReadStorage`] implementation with /// as little blocking operations as possible to ensure liveliness. -#[derive(Debug)] +#[derive(Clone)] pub struct CachedStorage { - pool: ConnectionPool, - state_keeper_db_path: String, - enum_index_migration_chunk_size: usize, - state: CachedStorageState, + inner: Arc>, } -#[derive(Debug)] -enum CachedStorageState { - /// Cached storage has not been initialized yet, the state of RocksDB to Postgres is undefined - NotInitialized, - /// RocksDB is trying to catch up to Postgres asynchronously (the task is represented by the - /// handle contained inside) - CatchingUp { - /// Handle owning permission to join on an asynchronous task to catch up RocksDB to - /// Postgres. - /// - /// # `await` value: - /// - /// - Returns `Ok(Some(rocksdb))` if the process succeeded and `rocksdb` is caught up - /// - Returns `Ok(None)` if the process is interrupted - /// - Returns `Err(err)` for any propagated Postgres/RocksDB error - rocksdb_sync_handle: JoinHandle>>, - }, - /// RocksDB has finished catching up in the near past but is not guaranteed to be in complete - /// sync with Postgres. That being said, except for extreme circumstances the gap should be - /// small (see [`CATCH_UP_L1_BATCHES_TOLERANCE`]). - CaughtUp { - /// Last L1 batch number that RocksDB was caught up to - rocksdb_l1_batch_number: L1BatchNumber, - }, -} - -impl CachedStorage { - /// Specifies the number of L1 batches we allow for RocksDB to fall behind - /// Postgres before we stop trying to catch-up synchronously and instead - /// use Postgres-backed [`ReadStorage`] connections. - const CATCH_UP_L1_BATCHES_TOLERANCE: u32 = 100; - - pub fn new( - pool: ConnectionPool, - state_keeper_db_path: String, - enum_index_migration_chunk_size: usize, - ) -> CachedStorage { - CachedStorage { - pool, - state_keeper_db_path, - enum_index_migration_chunk_size, - state: CachedStorageState::NotInitialized, - } +impl Debug for CachedStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CachedStorage").finish() } +} - /// Spawns a new asynchronous task that tries to make RocksDB caught up with Postgres. - /// The handle to the task is returned as a part of the resulting state. - async fn start_catch_up( - pool: ConnectionPool, - state_keeper_db_path: &str, - enum_index_migration_chunk_size: usize, - stop_receiver: watch::Receiver, - ) -> anyhow::Result { - tracing::debug!("Catching up RocksDB asynchronously"); - let mut rocksdb_builder = RocksdbStorage::builder(state_keeper_db_path.as_ref()) - .await - .context("Failed initializing RocksDB storage")?; - rocksdb_builder.enable_enum_index_migration(enum_index_migration_chunk_size); - let rocksdb_sync_handle = tokio::task::spawn(async move { - let mut storage = pool.access_storage().await?; - rocksdb_builder - .synchronize(&mut storage, &stop_receiver) - .await - }); - Ok(CachedStorageState::CatchingUp { - rocksdb_sync_handle, - }) - } +#[derive(Clone)] +pub enum CachedStorageFactory { + Postgres(ConnectionPool), + Rocksdb(RocksDB, ConnectionPool), +} +impl CachedStorageFactory { /// Returns a [`ReadStorage`] implementation backed by Postgres async fn access_storage_pg( rt_handle: Handle, @@ -135,15 +79,13 @@ impl CachedStorage { /// returns a [`ReadStorage`] implementation backed by caught-up RocksDB. async fn access_storage_rocksdb<'a>( conn: &mut StorageProcessor<'_>, - state_keeper_db_path: &str, - enum_index_migration_chunk_size: usize, + rocksdb: RocksDB, stop_receiver: &watch::Receiver, ) -> anyhow::Result>> { tracing::debug!("Catching up RocksDB synchronously"); - let mut rocksdb_builder = RocksdbStorage::builder(state_keeper_db_path.as_ref()) + let rocksdb_builder = RocksdbStorage::builder(rocksdb) .await .context("Failed initializing RocksDB storage")?; - rocksdb_builder.enable_enum_index_migration(enum_index_migration_chunk_size); let rocksdb = rocksdb_builder .synchronize(conn, stop_receiver) .await @@ -157,114 +99,62 @@ impl CachedStorage { Ok(rocksdb.map(|rocksdb| Box::new(rocksdb) as BoxReadStorage<'a>)) } - /// Produces a [`ReadStorage`] implementation backed by either Postgres or - /// RocksDB (if it's caught up). - /// - /// # Return value - /// - /// Returns `Ok(None)` if the process is interrupted using `stop_receiver`. - /// - /// # Errors - /// - /// Propagates RocksDB and Postgres errors. pub async fn access_storage( - &mut self, + &self, rt_handle: Handle, - stop_receiver: watch::Receiver, - ) -> anyhow::Result> { - // FIXME: This method can potentially be simplified by using recursion but that requires - // `BoxFuture` and `Pin` which IMO makes the types much more unreadable than as is - match self.state { - CachedStorageState::NotInitialized => { - // Presuming we are behind, (re-)starting catch up process - self.state = Self::start_catch_up( - self.pool.clone(), - &self.state_keeper_db_path, - self.enum_index_migration_chunk_size, - stop_receiver, - ) - .await?; - Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?)) - } - CachedStorageState::CatchingUp { - ref mut rocksdb_sync_handle, - } => { - if !rocksdb_sync_handle.is_finished() { - // Has not finished catching up yet - return Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?)); - } - // RocksDB has finished catching up. Regardless of the outcome, we won't need the - // handle anymore so it's safe to replace it - let rocksdb_sync_handle = std::mem::replace( - rocksdb_sync_handle, - tokio::task::spawn_blocking(|| Ok(None)), - ); - match rocksdb_sync_handle.await?? { - Some(rocksdb_storage) => { - // Caught-up successfully - self.state = CachedStorageState::CaughtUp { - rocksdb_l1_batch_number: rocksdb_storage - .l1_batch_number() - .await - .unwrap_or_default(), - }; - Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?)) - } - None => { - // Interrupted - self.state = CachedStorageState::NotInitialized; - Ok(None) - } - } + stop_receiver: &watch::Receiver, + ) -> Option { + match self { + CachedStorageFactory::Postgres(pool) => { + Some(Self::access_storage_pg(rt_handle, pool).await.expect("")) } - CachedStorageState::CaughtUp { - rocksdb_l1_batch_number, - } => { - let mut conn = self - .pool + CachedStorageFactory::Rocksdb(rocksdb, pool) => { + let mut conn = pool .access_storage_tagged("state_keeper") .await - .context("Failed getting a Postgres connection")?; - let Some(postgres_l1_batch_number) = conn - .blocks_dal() - .get_sealed_l1_batch_number() - .await - .context("Failed fetching sealed L1 batch number")? - else { - return Self::access_storage_rocksdb( - &mut conn, - &self.state_keeper_db_path, - self.enum_index_migration_chunk_size, - &stop_receiver, - ) - .await; - }; - if rocksdb_l1_batch_number + Self::CATCH_UP_L1_BATCHES_TOLERANCE - < postgres_l1_batch_number - { - tracing::warn!( - %rocksdb_l1_batch_number, - %postgres_l1_batch_number, - "RocksDB fell behind Postgres, trying to catch up asynchronously" - ); - self.state = Self::start_catch_up( - self.pool.clone(), - &self.state_keeper_db_path, - self.enum_index_migration_chunk_size, - stop_receiver, - ) - .await?; - Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?)) - } else { - Self::access_storage_rocksdb( - &mut conn, - &self.state_keeper_db_path, - self.enum_index_migration_chunk_size, - &stop_receiver, - ) + .expect("Failed getting a Postgres connection"); + Self::access_storage_rocksdb(&mut conn, rocksdb.clone(), stop_receiver) .await - } + .expect("") } } } } + +impl CachedStorage { + pub fn new( + pool: ConnectionPool, + state_keeper_db_path: String, + enum_index_migration_chunk_size: usize, + ) -> CachedStorage { + let inner = Arc::new(Mutex::new(CachedStorageFactory::Postgres(pool.clone()))); + let factory = inner.clone(); + tokio::task::spawn(async move { + tracing::debug!("Catching up RocksDB asynchronously"); + let mut rocksdb_builder = RocksdbStorage::open_builder(state_keeper_db_path.as_ref()) + .await + .expect("Failed initializing RocksDB storage"); + rocksdb_builder.enable_enum_index_migration(enum_index_migration_chunk_size); + let mut storage = pool.access_storage().await.expect(""); + let (_, stop_receiver) = watch::channel(false); + let rocksdb = rocksdb_builder + .synchronize(&mut storage, &stop_receiver) + .await + .expect(""); + drop(storage); + if let Some(rocksdb) = rocksdb { + let mut factory_guard = factory.lock().expect(""); + *factory_guard = CachedStorageFactory::Rocksdb(rocksdb.db, pool) + } else { + tracing::warn!("Interrupted"); + } + }); + CachedStorage { inner } + } + + pub fn factory(&self) -> CachedStorageFactory { + // it's important that we don't hold the lock for long; + // that's why `CachedStorageFactory` implements `Clone` + self.inner.lock().expect("poisoned").clone() + } +}