Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state-keeper): implement asynchronous RocksDB cache #1256

Merged
merged 58 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ae58bf0
implement asynchronous cached storage
itegulov Feb 27, 2024
b5ebc49
add a remark on recursion
itegulov Feb 27, 2024
594b460
spellcheck + lint
itegulov Feb 27, 2024
6f18451
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Feb 27, 2024
25943b9
recover postgres miniblock number from snapshot
itegulov Mar 1, 2024
6031d91
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 1, 2024
3b027a4
wipe storage logs when deleting genesis
itegulov Mar 4, 2024
a11b669
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 4, 2024
8b91205
add extra logs
itegulov Mar 4, 2024
34d7f64
overhaul CachedStorage to presume RocksDB never falls behind after it…
itegulov Mar 5, 2024
74a2c1e
simplify delete_logs_inner
itegulov Mar 5, 2024
fa9e0bc
use latest miniblock from sealed l1 batch
itegulov Mar 5, 2024
a777fdc
derive debug
itegulov Mar 6, 2024
d4f23bd
refactor `ReadStorageFactory` into its own trait
itegulov Mar 6, 2024
fe52b84
replace `BoxReadStorage` with enum type
itegulov Mar 6, 2024
35714ae
rename `cached_storage.rs` to `state_keeper_storage.rs`
itegulov Mar 6, 2024
089310f
make `MainBatchExecutor` accept generic `ReadStorageFactory`
itegulov Mar 6, 2024
801d46b
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 6, 2024
8bd23c3
lint
itegulov Mar 6, 2024
30df5d1
fill in empty expects
itegulov Mar 6, 2024
5793359
adapt and add tests for multiple storage types
itegulov Mar 6, 2024
bd97ea8
Revert "add extra logs"
itegulov Mar 8, 2024
ac354f2
make `db` private again
itegulov Mar 8, 2024
d74f5e3
refactor RocksDB constructors and fix type in the builder's name
itegulov Mar 8, 2024
d2442c0
bake `'static` into `ReadStorageFactory`
itegulov Mar 8, 2024
12f9826
propagate state keeper storage errors further upstream
itegulov Mar 8, 2024
9b030dc
find latest sealed miniblock first, try snapshot later
itegulov Mar 8, 2024
fd54290
elide lifetime
itegulov Mar 8, 2024
e1da770
shorten `Self` references
itegulov Mar 8, 2024
587b434
change logging level for sync logs
itegulov Mar 8, 2024
4288c5c
simplify `ReadStorageFactory` by incorporating `OnceCell`
itegulov Mar 12, 2024
b96ab11
fix catchup test
itegulov Mar 12, 2024
c52a869
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 12, 2024
600c624
tmp: refactor background catchup into a task type
itegulov Mar 12, 2024
a6810b5
move `StorageType` methods to `tester.rs`
itegulov Mar 13, 2024
4f4ab95
use `test_casing::Product` instead of `itertools`
itegulov Mar 13, 2024
3aa268c
use `Handle::current` at PG constructor instead of passing it around
itegulov Mar 13, 2024
3e3c7dc
elide lifetimes where possible
itegulov Mar 13, 2024
c204aa3
refactor `load_latest_sealed_miniblock`
itegulov Mar 13, 2024
f71e233
replace `ok_or_else` with `context`
itegulov Mar 13, 2024
59c713a
`match` -> `if let Some(_) =`
itegulov Mar 13, 2024
d0e0607
expose `AsyncCatchupTask` and manage it through node framework
itegulov Mar 13, 2024
bfd7b5b
revert (most) `RocksdbStorage` changes
itegulov Mar 13, 2024
bafb28a
reuse async pg code for test pg storage
itegulov Mar 13, 2024
d52a7d0
lint
itegulov Mar 13, 2024
0d884be
get rid of `From` implementations
itegulov Mar 13, 2024
b7eb9db
make tests aware of background tasks
itegulov Mar 13, 2024
53c3de0
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 13, 2024
043d05c
move wait to the proper place
itegulov Mar 13, 2024
d5c6b2c
address minor comments
itegulov Mar 16, 2024
10da2dc
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 16, 2024
f6516bb
make catch-up a managed task
itegulov Mar 16, 2024
f38c546
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 19, 2024
5ab1cdf
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 21, 2024
68398d4
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 26, 2024
0b76b99
address comments
itegulov Mar 26, 2024
9edba2c
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 26, 2024
92cbc0c
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ stdout
GCS
websocket
struct
struct's
localhost
TOML
config
Expand Down
2 changes: 1 addition & 1 deletion core/lib/state/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ impl RocksdbStorage {
/// # Panics
///
/// Panics on RocksDB errors.
async fn l1_batch_number(&self) -> Option<L1BatchNumber> {
pub async fn l1_batch_number(&self) -> Option<L1BatchNumber> {
let cf = StateKeeperColumnFamily::State;
let db = self.db.clone();
let number_bytes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ use multivm::{
MultiVMTracer, VmInstance,
};
use once_cell::sync::OnceCell;
use tokio::sync::{mpsc, watch};
use tokio::{
runtime::Handle,
sync::{mpsc, watch, RwLock},
};
use zksync_dal::ConnectionPool;
use zksync_state::{RocksdbStorage, StorageView, WriteStorage};
use zksync_state::{ReadStorage, StorageView, WriteStorage};
use zksync_types::{vm_trace::Call, Transaction, U256};
use zksync_utils::bytecode::CompressedBytecodeInfo;

use super::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult};
use crate::{
metrics::{InteractionType, TxStage, APP_METRICS},
state_keeper::{
cached_storage::CachedStorage,
metrics::{TxExecutionStage, EXECUTOR_METRICS, KEEPER_METRICS},
types::ExecutionMetricsForCriteria,
},
Expand All @@ -30,13 +34,11 @@ use crate::{
/// Creates a "real" batch executor which maintains the VM (as opposed to the test builder which doesn't use the VM).
#[derive(Debug, Clone)]
pub struct MainBatchExecutor {
state_keeper_db_path: String,
pool: ConnectionPool,
save_call_traces: bool,
max_allowed_tx_gas_limit: U256,
upload_witness_inputs_to_gcs: bool,
enum_index_migration_chunk_size: usize,
optional_bytecode_compression: bool,
cached_storage: Arc<RwLock<CachedStorage>>,
}

impl MainBatchExecutor {
Expand All @@ -50,13 +52,15 @@ impl MainBatchExecutor {
optional_bytecode_compression: bool,
) -> Self {
Self {
state_keeper_db_path,
pool,
save_call_traces,
max_allowed_tx_gas_limit,
upload_witness_inputs_to_gcs,
enum_index_migration_chunk_size,
optional_bytecode_compression,
cached_storage: Arc::new(RwLock::new(CachedStorage::new(
pool,
state_keeper_db_path,
enum_index_migration_chunk_size,
))),
}
}
}
Expand All @@ -69,20 +73,6 @@ impl BatchExecutor for MainBatchExecutor {
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
) -> Option<BatchExecutorHandle> {
let mut secondary_storage = RocksdbStorage::builder(self.state_keeper_db_path.as_ref())
.await
.expect("Failed initializing state keeper storage");
secondary_storage.enable_enum_index_migration(self.enum_index_migration_chunk_size);
let mut conn = self
.pool
.access_storage_tagged("state_keeper")
.await
.unwrap();
let secondary_storage = secondary_storage
.synchronize(&mut conn, stop_receiver)
.await
.expect("Failed synchronizing secondary state keeper storage")?;

// Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued
// until a previous command is processed), capacity 1 is enough for the commands channel.
let (commands_sender, commands_receiver) = mpsc::channel(1);
Expand All @@ -94,13 +84,22 @@ impl BatchExecutor for MainBatchExecutor {
};
let upload_witness_inputs_to_gcs = self.upload_witness_inputs_to_gcs;

let cached_storage = self.cached_storage.clone();
let stop_receiver = stop_receiver.clone();
let handle = tokio::task::spawn_blocking(move || {
executor.run(
secondary_storage,
l1_batch_params,
system_env,
upload_witness_inputs_to_gcs,
)
let rt_handle = Handle::current();
let mut cached_storage = rt_handle.block_on(cached_storage.write());
itegulov marked this conversation as resolved.
Show resolved Hide resolved
if let Some(storage) = rt_handle
.block_on(cached_storage.access_storage(rt_handle.clone(), stop_receiver))
.unwrap()
{
executor.run(
storage,
l1_batch_params,
system_env,
upload_witness_inputs_to_gcs,
);
};
itegulov marked this conversation as resolved.
Show resolved Hide resolved
});
Some(BatchExecutorHandle {
handle,
Expand All @@ -124,9 +123,9 @@ struct CommandReceiver {
}

impl CommandReceiver {
pub(super) fn run(
pub(super) fn run<S: ReadStorage>(
mut self,
secondary_storage: RocksdbStorage,
secondary_storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
upload_witness_inputs_to_gcs: bool,
Expand Down
253 changes: 253 additions & 0 deletions core/lib/zksync_core/src/state_keeper/cached_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
use std::fmt::Debug;

use anyhow::Context;
use tokio::{runtime::Handle, sync::watch, task::JoinHandle};
use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_state::{PostgresStorage, ReadStorage, RocksdbStorage};
use zksync_types::L1BatchNumber;

type BoxReadStorage<'a> = Box<dyn ReadStorage + Send + 'a>;

/// Encapsulates a storage that can produce a short-lived [`ReadStorage`] implementation backed by
/// either Postgres or RocksDB (if it's caught up). Maintains internal state of how behind RocksDB
/// is compared to Postgres and actively tries to catch it up in the background.
///
/// This struct's main design purpose is to be able to produce [`ReadStorage`] implementation with
/// as little blocking operations as possible to ensure liveliness.
#[derive(Debug)]
pub struct CachedStorage {
itegulov marked this conversation as resolved.
Show resolved Hide resolved
pool: ConnectionPool,
state_keeper_db_path: String,
enum_index_migration_chunk_size: usize,
RomanBrodetski marked this conversation as resolved.
Show resolved Hide resolved
state: CachedStorageState,
}

#[derive(Debug)]
enum CachedStorageState {
/// Cached storage has not been initialized yet, the state of RocksDB to Postgres is undefined
NotInitialized,
/// RocksDB is trying to catch up to Postgres asynchronously (the task is represented by the
/// handle contained inside)
CatchingUp {
/// Handle owning permission to join on an asynchronous task to catch up RocksDB to
/// Postgres.
///
/// # `await` value:
///
/// - Returns `Ok(Some(rocksdb))` if the process succeeded and `rocksdb` is caught up
/// - Returns `Ok(None)` if the process is interrupted
/// - Returns `Err(err)` for any propagated Postgres/RocksDB error
rocksdb_sync_handle: JoinHandle<anyhow::Result<Option<RocksdbStorage>>>,
},
/// RocksDB has finished catching up in the near past but is not guaranteed to be in complete
/// sync with Postgres. That being said, except for extreme circumstances the gap should be
/// small (see [`CATCH_UP_L1_BATCHES_TOLERANCE`]).
CaughtUp {
/// Last L1 batch number that RocksDB was caught up to
rocksdb_l1_batch_number: L1BatchNumber,
},
}

impl CachedStorage {
/// Specifies the number of L1 batches we allow for RocksDB to fall behind
/// Postgres before we stop trying to catch-up synchronously and instead
/// use Postgres-backed [`ReadStorage`] connections.
const CATCH_UP_L1_BATCHES_TOLERANCE: u32 = 100;

pub fn new(
pool: ConnectionPool,
state_keeper_db_path: String,
enum_index_migration_chunk_size: usize,
) -> CachedStorage {
CachedStorage {
pool,
state_keeper_db_path,
enum_index_migration_chunk_size,
state: CachedStorageState::NotInitialized,
}
}

/// Spawns a new asynchronous task that tries to make RocksDB caught up with Postgres.
/// The handle to the task is returned as a part of the resulting state.
async fn start_catch_up(
pool: ConnectionPool,
state_keeper_db_path: &str,
enum_index_migration_chunk_size: usize,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<CachedStorageState> {
tracing::debug!("Catching up RocksDB asynchronously");
let mut rocksdb_builder = RocksdbStorage::builder(state_keeper_db_path.as_ref())
.await
.context("Failed initializing RocksDB storage")?;
rocksdb_builder.enable_enum_index_migration(enum_index_migration_chunk_size);
let rocksdb_sync_handle = tokio::task::spawn(async move {
let mut storage = pool.access_storage().await?;
rocksdb_builder
.synchronize(&mut storage, &stop_receiver)
.await
});
Ok(CachedStorageState::CatchingUp {
rocksdb_sync_handle,
})
}

/// Returns a [`ReadStorage`] implementation backed by Postgres
async fn access_storage_pg(
rt_handle: Handle,
pool: &ConnectionPool,
) -> anyhow::Result<BoxReadStorage> {
let mut connection = pool.access_storage().await?;
let postgres_l1_batch_number = connection
.blocks_dal()
.get_sealed_l1_batch_number()
.await?
.unwrap_or_default();
let block_number = connection
.blocks_dal()
.get_sealed_miniblock_number()
.await?
.unwrap_or_default();
tracing::debug!(%postgres_l1_batch_number, "Using Postgres-based storage");
Ok(
Box::new(PostgresStorage::new_async(rt_handle, connection, block_number, true).await?)
as BoxReadStorage,
)
}

/// Catches up RocksDB synchronously (i.e. assumes the gap is small) and
/// returns a [`ReadStorage`] implementation backed by caught-up RocksDB.
async fn access_storage_rocksdb<'a>(
conn: &mut StorageProcessor<'_>,
state_keeper_db_path: &str,
enum_index_migration_chunk_size: usize,
stop_receiver: &watch::Receiver<bool>,
) -> anyhow::Result<Option<BoxReadStorage<'a>>> {
tracing::debug!("Catching up RocksDB synchronously");
let mut rocksdb_builder = RocksdbStorage::builder(state_keeper_db_path.as_ref())
.await
.context("Failed initializing RocksDB storage")?;
rocksdb_builder.enable_enum_index_migration(enum_index_migration_chunk_size);
let rocksdb = rocksdb_builder
.synchronize(conn, stop_receiver)
.await
.context("Failed to catch up state keeper RocksDB storage to Postgres")?;
if let Some(rocksdb) = &rocksdb {
let rocksdb_l1_batch_number = rocksdb.l1_batch_number().await.unwrap_or_default();
tracing::debug!(%rocksdb_l1_batch_number, "Using RocksDB-based storage");
} else {
tracing::warn!("Interrupted")
}
Ok(rocksdb.map(|rocksdb| Box::new(rocksdb) as BoxReadStorage<'a>))
}

/// Produces a [`ReadStorage`] implementation backed by either Postgres or
/// RocksDB (if it's caught up).
///
/// # Return value
///
/// Returns `Ok(None)` if the process is interrupted using `stop_receiver`.
///
/// # Errors
///
/// Propagates RocksDB and Postgres errors.
pub async fn access_storage(
&mut self,
rt_handle: Handle,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<Option<BoxReadStorage>> {
itegulov marked this conversation as resolved.
Show resolved Hide resolved
// FIXME: This method can potentially be simplified by using recursion but that requires
// `BoxFuture` and `Pin` which IMO makes the types much more unreadable than as is
match self.state {
CachedStorageState::NotInitialized => {
// Presuming we are behind, (re-)starting catch up process
self.state = Self::start_catch_up(
self.pool.clone(),
&self.state_keeper_db_path,
self.enum_index_migration_chunk_size,
stop_receiver,
)
.await?;
Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?))
itegulov marked this conversation as resolved.
Show resolved Hide resolved
}
CachedStorageState::CatchingUp {
ref mut rocksdb_sync_handle,
} => {
if !rocksdb_sync_handle.is_finished() {
// Has not finished catching up yet
return Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?));
}
// RocksDB has finished catching up. Regardless of the outcome, we won't need the
// handle anymore so it's safe to replace it
let rocksdb_sync_handle = std::mem::replace(
rocksdb_sync_handle,
tokio::task::spawn_blocking(|| Ok(None)),
);
match rocksdb_sync_handle.await?? {
Some(rocksdb_storage) => {
// Caught-up successfully
self.state = CachedStorageState::CaughtUp {
rocksdb_l1_batch_number: rocksdb_storage
.l1_batch_number()
.await
.unwrap_or_default(),
};
Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?))
itegulov marked this conversation as resolved.
Show resolved Hide resolved
}
None => {
// Interrupted
self.state = CachedStorageState::NotInitialized;
Ok(None)
}
}
}
CachedStorageState::CaughtUp {
rocksdb_l1_batch_number,
} => {
let mut conn = self
.pool
.access_storage_tagged("state_keeper")
.await
.context("Failed getting a Postgres connection")?;
let Some(postgres_l1_batch_number) = conn
.blocks_dal()
.get_sealed_l1_batch_number()
.await
.context("Failed fetching sealed L1 batch number")?
else {
return Self::access_storage_rocksdb(
&mut conn,
&self.state_keeper_db_path,
self.enum_index_migration_chunk_size,
&stop_receiver,
)
.await;
itegulov marked this conversation as resolved.
Show resolved Hide resolved
};
if rocksdb_l1_batch_number + Self::CATCH_UP_L1_BATCHES_TOLERANCE
< postgres_l1_batch_number
{
tracing::warn!(
%rocksdb_l1_batch_number,
%postgres_l1_batch_number,
"RocksDB fell behind Postgres, trying to catch up asynchronously"
);
self.state = Self::start_catch_up(
self.pool.clone(),
&self.state_keeper_db_path,
self.enum_index_migration_chunk_size,
stop_receiver,
)
.await?;
Ok(Some(Self::access_storage_pg(rt_handle, &self.pool).await?))
} else {
Self::access_storage_rocksdb(
&mut conn,
&self.state_keeper_db_path,
self.enum_index_migration_chunk_size,
&stop_receiver,
)
.await
}
}
}
}
}
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub use self::{
use crate::fee_model::BatchFeeModelInputProvider;

mod batch_executor;
mod cached_storage;
pub(crate) mod extractors;
pub(crate) mod io;
mod keeper;
Expand Down
Loading