Skip to content

Commit

Permalink
perf(db): Improve storage switching for state keeper cache (#2234)
Browse files Browse the repository at this point in the history
## What ❔

Improves switching logic between Postgres and RocksDB for SK cache. With
these changes, RocksDB is guaranteed to be used if it's up to date when
it's opened.

## Why ❔

Previously, Postgres could be used after node start (primarily if
there's a pending L1 batch) even if RocksDB is up to date. This is
caused by potential delays when opening RocksDB. This behavior was
observed in the wild, e.g. for a mainnet full node with pruning enabled.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli committed Jun 14, 2024
1 parent 3889794 commit 7c8e24c
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 76 deletions.
226 changes: 207 additions & 19 deletions core/lib/state/src/catchup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{sync::Arc, time::Instant};
use std::{error, fmt, time::Instant};

use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
Expand All @@ -10,6 +9,85 @@ use zksync_types::L1BatchNumber;

use crate::{RocksdbStorage, RocksdbStorageOptions, StateKeeperColumnFamily};

/// Initial RocksDB cache state returned by [`RocksdbCell::ensure_initialized()`].
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct InitialRocksdbState {
/// Last processed L1 batch number in the RocksDB cache + 1 (i.e., the batch that the cache is ready to process).
/// `None` if the cache is empty (i.e., needs recovery).
pub l1_batch_number: Option<L1BatchNumber>,
}

/// Error returned from [`RocksdbCell`] methods if the corresponding [`AsyncCatchupTask`] has failed
/// or was canceled.
#[derive(Debug)]
pub struct AsyncCatchupFailed(());

impl fmt::Display for AsyncCatchupFailed {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("Async RocksDB cache catchup failed or was canceled")
}
}

impl error::Error for AsyncCatchupFailed {}

/// `OnceCell` equivalent that can be `.await`ed. Correspondingly, it has the following invariants:
///
/// - The cell is only set once
/// - The cell is always set to `Some(_)`.
///
/// `OnceCell` (either from `once_cell` or `tokio`) is not used because it lacks a way to wait for the cell
/// to be initialized. `once_cell::sync::OnceCell` has a blocking `wait()` method, but since it's blocking,
/// it risks spawning non-cancellable threads if misused.
type AsyncOnceCell<T> = watch::Receiver<Option<T>>;

/// A lazily initialized handle to RocksDB cache returned from [`AsyncCatchupTask::new()`].
#[derive(Debug)]
pub struct RocksdbCell {
initial_state: AsyncOnceCell<InitialRocksdbState>,
db: AsyncOnceCell<RocksDB<StateKeeperColumnFamily>>,
}

impl RocksdbCell {
/// Waits until RocksDB is initialized and returns it.
///
/// # Errors
///
/// Returns an error if the async catch-up task failed or was canceled before initialization.
#[allow(clippy::missing_panics_doc)] // false positive
pub async fn wait(&self) -> Result<RocksDB<StateKeeperColumnFamily>, AsyncCatchupFailed> {
self.db
.clone()
.wait_for(Option::is_some)
.await
// `unwrap` below is safe by construction
.map(|db| db.clone().unwrap())
.map_err(|_| AsyncCatchupFailed(()))
}

/// Gets a RocksDB instance if it has been initialized.
pub fn get(&self) -> Option<RocksDB<StateKeeperColumnFamily>> {
self.db.borrow().clone()
}

/// Ensures that the RocksDB has started catching up, and returns the **initial** RocksDB state
/// at the start of the catch-up.
///
/// # Errors
///
/// Returns an error if the async catch-up task failed or was canceled.
#[allow(clippy::missing_panics_doc)] // false positive
pub async fn ensure_initialized(&self) -> Result<InitialRocksdbState, AsyncCatchupFailed> {
self.initial_state
.clone()
.wait_for(Option::is_some)
.await
// `unwrap` below is safe by construction
.map(|state| state.clone().unwrap())
.map_err(|_| AsyncCatchupFailed(()))
}
}

/// A runnable task that blocks until the provided RocksDB cache instance is caught up with
/// Postgres.
///
Expand All @@ -19,37 +97,52 @@ pub struct AsyncCatchupTask {
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
state_keeper_db_options: RocksdbStorageOptions,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
initial_state_sender: watch::Sender<Option<InitialRocksdbState>>,
db_sender: watch::Sender<Option<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
}

impl AsyncCatchupTask {
/// Create a new catch-up task with the provided Postgres and RocksDB instances. Optionally
/// accepts the last L1 batch number to catch up to (defaults to latest if not specified).
pub fn new(
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
state_keeper_db_options: RocksdbStorageOptions,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
) -> Self {
Self {
pub fn new(pool: ConnectionPool<Core>, state_keeper_db_path: String) -> (Self, RocksdbCell) {
let (initial_state_sender, initial_state) = watch::channel(None);
let (db_sender, db) = watch::channel(None);
let this = Self {
pool,
state_keeper_db_path,
state_keeper_db_options,
rocksdb_cell,
to_l1_batch_number,
}
state_keeper_db_options: RocksdbStorageOptions::default(),
initial_state_sender,
db_sender,
to_l1_batch_number: None,
};
(this, RocksdbCell { initial_state, db })
}

/// Sets RocksDB options.
#[must_use]
pub fn with_db_options(mut self, options: RocksdbStorageOptions) -> Self {
self.state_keeper_db_options = options;
self
}

/// Sets the L1 batch number to catch up. By default, the task will catch up to the latest L1 batch
/// (at the start of catch-up).
#[must_use]
pub fn with_target_l1_batch_number(mut self, number: L1BatchNumber) -> Self {
self.to_l1_batch_number = Some(number);
self
}

/// Block until RocksDB cache instance is caught up with Postgres.
///
/// # Errors
///
/// Propagates RocksDB and Postgres errors.
#[tracing::instrument(name = "catch_up", skip_all, fields(target_l1_batch = ?self.to_l1_batch_number))]
pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let started_at = Instant::now();
tracing::debug!("Catching up RocksDB asynchronously");
tracing::info!("Catching up RocksDB asynchronously");

let mut rocksdb_builder = RocksdbStorage::builder_with_options(
self.state_keeper_db_path.as_ref(),
Expand All @@ -58,6 +151,12 @@ impl AsyncCatchupTask {
.await
.context("Failed creating RocksDB storage builder")?;

let initial_state = InitialRocksdbState {
l1_batch_number: rocksdb_builder.l1_batch_number().await,
};
tracing::info!("Initialized RocksDB catchup from state: {initial_state:?}");
self.initial_state_sender.send_replace(Some(initial_state));

let mut connection = self.pool.connection_tagged("state_keeper").await?;
let was_recovered_from_snapshot = rocksdb_builder
.ensure_ready(&mut connection, &stop_receiver)
Expand All @@ -76,12 +175,101 @@ impl AsyncCatchupTask {
.context("Failed to catch up RocksDB to Postgres")?;
drop(connection);
if let Some(rocksdb) = rocksdb {
self.rocksdb_cell
.set(rocksdb.into_rocksdb())
.map_err(|_| anyhow::anyhow!("Async RocksDB cache was initialized twice"))?;
self.db_sender.send_replace(Some(rocksdb.into_rocksdb()));
} else {
tracing::info!("Synchronizing RocksDB interrupted");
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use tempfile::TempDir;
use test_casing::test_casing;
use zksync_types::L2BlockNumber;

use super::*;
use crate::{
test_utils::{create_l1_batch, create_l2_block, gen_storage_logs, prepare_postgres},
RocksdbStorageBuilder,
};

#[tokio::test]
async fn catching_up_basics() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
prepare_postgres(&mut conn).await;
let storage_logs = gen_storage_logs(20..40);
create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await;
create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await;
drop(conn);

let temp_dir = TempDir::new().unwrap();
let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned());
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(stop_receiver));

let initial_state = rocksdb_cell.ensure_initialized().await.unwrap();
assert_eq!(initial_state.l1_batch_number, None);

let db = rocksdb_cell.wait().await.unwrap();
assert_eq!(
RocksdbStorageBuilder::from_rocksdb(db)
.l1_batch_number()
.await,
Some(L1BatchNumber(2))
);
task_handle.await.unwrap().unwrap();
drop(rocksdb_cell); // should be enough to release RocksDB lock

let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool, temp_dir.path().to_str().unwrap().to_owned());
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(stop_receiver));

let initial_state = rocksdb_cell.ensure_initialized().await.unwrap();
assert_eq!(initial_state.l1_batch_number, Some(L1BatchNumber(2)));

task_handle.await.unwrap().unwrap();
rocksdb_cell.get().unwrap(); // RocksDB must be caught up at this point
}

#[derive(Debug)]
enum CancellationScenario {
DropTask,
CancelTask,
}

impl CancellationScenario {
const ALL: [Self; 2] = [Self::DropTask, Self::CancelTask];
}

#[test_casing(2, CancellationScenario::ALL)]
#[tokio::test]
async fn catching_up_cancellation(scenario: CancellationScenario) {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
prepare_postgres(&mut conn).await;
let storage_logs = gen_storage_logs(20..40);
create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await;
create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await;
drop(conn);

let temp_dir = TempDir::new().unwrap();
let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned());
let (stop_sender, stop_receiver) = watch::channel(false);
match scenario {
CancellationScenario::DropTask => drop(task),
CancellationScenario::CancelTask => {
stop_sender.send_replace(true);
task.run(stop_receiver).await.unwrap();
}
}

assert!(rocksdb_cell.get().is_none());
rocksdb_cell.wait().await.unwrap_err();
}
}
2 changes: 1 addition & 1 deletion core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod test_utils;

pub use self::{
cache::sequential_cache::SequentialCache,
catchup::AsyncCatchupTask,
catchup::{AsyncCatchupTask, RocksdbCell},
in_memory::InMemoryStorage,
// Note, that `test_infra` of the bootloader tests relies on this value to be exposed
in_memory::IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/state/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl ReadStorage for RocksdbWithMemory {
}

impl ReadStorage for PgOrRocksdbStorage<'_> {
fn read_value(&mut self, key: &StorageKey) -> zksync_types::StorageValue {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
match self {
Self::Postgres(postgres) => postgres.read_value(key),
Self::Rocksdb(rocksdb) => rocksdb.read_value(key),
Expand Down
Loading

0 comments on commit 7c8e24c

Please sign in to comment.