Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state-keeper): implement asynchronous RocksDB cache #1256

Merged
merged 58 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ae58bf0
implement asynchronous cached storage
itegulov Feb 27, 2024
b5ebc49
add a remark on recursion
itegulov Feb 27, 2024
594b460
spellcheck + lint
itegulov Feb 27, 2024
6f18451
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Feb 27, 2024
25943b9
recover postgres miniblock number from snapshot
itegulov Mar 1, 2024
6031d91
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 1, 2024
3b027a4
wipe storage logs when deleting genesis
itegulov Mar 4, 2024
a11b669
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 4, 2024
8b91205
add extra logs
itegulov Mar 4, 2024
34d7f64
overhaul CachedStorage to presume RocksDB never falls behind after it…
itegulov Mar 5, 2024
74a2c1e
simplify delete_logs_inner
itegulov Mar 5, 2024
fa9e0bc
use latest miniblock from sealed l1 batch
itegulov Mar 5, 2024
a777fdc
derive debug
itegulov Mar 6, 2024
d4f23bd
refactor `ReadStorageFactory` into its own trait
itegulov Mar 6, 2024
fe52b84
replace `BoxReadStorage` with enum type
itegulov Mar 6, 2024
35714ae
rename `cached_storage.rs` to `state_keeper_storage.rs`
itegulov Mar 6, 2024
089310f
make `MainBatchExecutor` accept generic `ReadStorageFactory`
itegulov Mar 6, 2024
801d46b
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 6, 2024
8bd23c3
lint
itegulov Mar 6, 2024
30df5d1
fill in empty expects
itegulov Mar 6, 2024
5793359
adapt and add tests for multiple storage types
itegulov Mar 6, 2024
bd97ea8
Revert "add extra logs"
itegulov Mar 8, 2024
ac354f2
make `db` private again
itegulov Mar 8, 2024
d74f5e3
refactor RocksDB constructors and fix type in the builder's name
itegulov Mar 8, 2024
d2442c0
bake `'static` into `ReadStorageFactory`
itegulov Mar 8, 2024
12f9826
propagate state keeper storage errors further upstream
itegulov Mar 8, 2024
9b030dc
find latest sealed miniblock first, try snapshot later
itegulov Mar 8, 2024
fd54290
elide lifetime
itegulov Mar 8, 2024
e1da770
shorten `Self` references
itegulov Mar 8, 2024
587b434
change logging level for sync logs
itegulov Mar 8, 2024
4288c5c
simplify `ReadStorageFactory` by incorporating `OnceCell`
itegulov Mar 12, 2024
b96ab11
fix catchup test
itegulov Mar 12, 2024
c52a869
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 12, 2024
600c624
tmp: refactor background catchup into a task type
itegulov Mar 12, 2024
a6810b5
move `StorageType` methods to `tester.rs`
itegulov Mar 13, 2024
4f4ab95
use `test_casing::Product` instead of `itertools`
itegulov Mar 13, 2024
3aa268c
use `Handle::current` at PG constructor instead of passing it around
itegulov Mar 13, 2024
3e3c7dc
elide lifetimes where possible
itegulov Mar 13, 2024
c204aa3
refactor `load_latest_sealed_miniblock`
itegulov Mar 13, 2024
f71e233
replace `ok_or_else` with `context`
itegulov Mar 13, 2024
59c713a
`match` -> `if let Some(_) =`
itegulov Mar 13, 2024
d0e0607
expose `AsyncCatchupTask` and manage it through node framework
itegulov Mar 13, 2024
bfd7b5b
revert (most) `RocksdbStorage` changes
itegulov Mar 13, 2024
bafb28a
reuse async pg code for test pg storage
itegulov Mar 13, 2024
d52a7d0
lint
itegulov Mar 13, 2024
0d884be
get rid of `From` implementations
itegulov Mar 13, 2024
b7eb9db
make tests aware of background tasks
itegulov Mar 13, 2024
53c3de0
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 13, 2024
043d05c
move wait to the proper place
itegulov Mar 13, 2024
d5c6b2c
address minor comments
itegulov Mar 16, 2024
10da2dc
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 16, 2024
f6516bb
make catch-up a managed task
itegulov Mar 16, 2024
f38c546
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 19, 2024
5ab1cdf
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 21, 2024
68398d4
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 26, 2024
0b76b99
address comments
itegulov Mar 26, 2024
9edba2c
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 26, 2024
92cbc0c
Merge remote-tracking branch 'origin/main' into daniyar-pla-747-state…
itegulov Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ stdout
GCS
websocket
struct
struct's
localhost
TOML
config
Expand Down
19 changes: 19 additions & 0 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,24 @@ impl BlocksDal<'_, '_> {
Ok(())
}

async fn delete_logs_inner(
&mut self,
last_miniblock_to_keep: Option<MiniblockNumber>,
itegulov marked this conversation as resolved.
Show resolved Hide resolved
) -> sqlx::Result<()> {
let block_number = last_miniblock_to_keep.map_or(-1, |number| number.0 as i64);
sqlx::query!(
r#"
DELETE FROM storage_logs
WHERE
miniblock_number > $1
"#,
block_number
)
.execute(self.storage.conn())
.await?;
Ok(())
}

/// Returns sum of predicted gas costs on the given L1 batch range.
/// Panics if the sum doesn't fit into `u32`.
pub async fn get_l1_batches_predicted_gas(
Expand Down Expand Up @@ -2321,6 +2339,7 @@ impl BlocksDal<'_, '_> {
self.delete_initial_writes_inner(None)
.await
.context("delete_initial_writes_inner()")?;
self.delete_logs_inner(None).await.context("no")?;
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub(crate) fn new_vm_state<S: WriteStorage, H: HistoryMode>(
} else {
// This is the scenario of either the first L2 block ever or
// the first block after the upgrade for support of L2 blocks.
tracing::info!("Could not locate the last L2 block, presuming this is the first one");
L2Block {
number: l1_batch_env.first_l2_block.number.saturating_sub(1),
timestamp: 0,
Expand Down
1 change: 1 addition & 0 deletions core/lib/multivm/src/versions/vm_latest/utils/l2_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub fn load_last_l2_block<S: ReadStorage>(storage: StoragePtr<S>) -> Option<L2Bl
let mut storage_ptr = storage.borrow_mut();
let current_l2_block_info = storage_ptr.read_value(&current_l2_block_info_key);
let (block_number, block_timestamp) = unpack_block_info(h256_to_u256(current_l2_block_info));
tracing::info!(block_number, block_timestamp, "Loaded last L2 block");
let block_number = block_number as u32;
if block_number == 0 {
// The block does not exist yet
Expand Down
1 change: 1 addition & 0 deletions core/lib/state/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl<'a> PostgresStorage<'a> {
.with_context(|| {
format!("failed resolving L1 batch number for miniblock #{block_number}")
})?;
tracing::info!(?resolved, "Resolved L1 batch for Postgres storage");
itegulov marked this conversation as resolved.
Show resolved Hide resolved
Ok(Self {
rt_handle,
connection,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/state/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ impl RocksdbStorage {
/// # Panics
///
/// Panics on RocksDB errors.
async fn l1_batch_number(&self) -> Option<L1BatchNumber> {
pub async fn l1_batch_number(&self) -> Option<L1BatchNumber> {
let cf = StateKeeperColumnFamily::State;
let db = self.db.clone();
let number_bytes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ use multivm::{
MultiVMTracer, VmInstance,
};
use once_cell::sync::OnceCell;
use tokio::sync::{mpsc, watch};
use tokio::{
runtime::Handle,
sync::{mpsc, watch, RwLock},
};
use zksync_dal::ConnectionPool;
use zksync_state::{RocksdbStorage, StorageView, WriteStorage};
use zksync_state::{ReadStorage, StorageView, WriteStorage};
use zksync_types::{vm_trace::Call, Transaction, U256};
use zksync_utils::bytecode::CompressedBytecodeInfo;

use super::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult};
use crate::{
metrics::{InteractionType, TxStage, APP_METRICS},
state_keeper::{
cached_storage::CachedStorage,
metrics::{TxExecutionStage, BATCH_TIP_METRICS, EXECUTOR_METRICS, KEEPER_METRICS},
types::ExecutionMetricsForCriteria,
},
Expand All @@ -30,13 +34,11 @@ use crate::{
/// Creates a "real" batch executor which maintains the VM (as opposed to the test builder which doesn't use the VM).
#[derive(Debug, Clone)]
pub struct MainBatchExecutor {
state_keeper_db_path: String,
pool: ConnectionPool,
save_call_traces: bool,
max_allowed_tx_gas_limit: U256,
upload_witness_inputs_to_gcs: bool,
enum_index_migration_chunk_size: usize,
optional_bytecode_compression: bool,
cached_storage: Arc<RwLock<CachedStorage>>,
}

impl MainBatchExecutor {
Expand All @@ -50,13 +52,15 @@ impl MainBatchExecutor {
optional_bytecode_compression: bool,
) -> Self {
Self {
state_keeper_db_path,
pool,
save_call_traces,
max_allowed_tx_gas_limit,
upload_witness_inputs_to_gcs,
enum_index_migration_chunk_size,
optional_bytecode_compression,
cached_storage: Arc::new(RwLock::new(CachedStorage::new(
pool,
state_keeper_db_path,
enum_index_migration_chunk_size,
))),
}
}
}
Expand All @@ -69,20 +73,6 @@ impl BatchExecutor for MainBatchExecutor {
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
) -> Option<BatchExecutorHandle> {
let mut secondary_storage = RocksdbStorage::builder(self.state_keeper_db_path.as_ref())
.await
.expect("Failed initializing state keeper storage");
secondary_storage.enable_enum_index_migration(self.enum_index_migration_chunk_size);
let mut conn = self
.pool
.access_storage_tagged("state_keeper")
.await
.unwrap();
let secondary_storage = secondary_storage
.synchronize(&mut conn, stop_receiver)
.await
.expect("Failed synchronizing secondary state keeper storage")?;

// Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued
// until a previous command is processed), capacity 1 is enough for the commands channel.
let (commands_sender, commands_receiver) = mpsc::channel(1);
Expand All @@ -94,13 +84,22 @@ impl BatchExecutor for MainBatchExecutor {
};
let upload_witness_inputs_to_gcs = self.upload_witness_inputs_to_gcs;

let cached_storage = self.cached_storage.clone();
let stop_receiver = stop_receiver.clone();
let handle = tokio::task::spawn_blocking(move || {
executor.run(
secondary_storage,
l1_batch_params,
system_env,
upload_witness_inputs_to_gcs,
)
let rt_handle = Handle::current();
let mut cached_storage = rt_handle.block_on(cached_storage.write());
itegulov marked this conversation as resolved.
Show resolved Hide resolved
if let Some(storage) = rt_handle
.block_on(cached_storage.access_storage(rt_handle.clone(), stop_receiver))
.unwrap()
{
executor.run(
storage,
l1_batch_params,
system_env,
upload_witness_inputs_to_gcs,
);
};
itegulov marked this conversation as resolved.
Show resolved Hide resolved
});
Some(BatchExecutorHandle {
handle,
Expand All @@ -124,14 +123,18 @@ struct CommandReceiver {
}

impl CommandReceiver {
pub(super) fn run(
pub(super) fn run<S: ReadStorage>(
mut self,
secondary_storage: RocksdbStorage,
secondary_storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
upload_witness_inputs_to_gcs: bool,
) {
tracing::info!("Starting executing batch #{:?}", &l1_batch_params.number);
tracing::info!(
first_l2_block = ?l1_batch_params.first_l2_block,
l1_batch_number = ?l1_batch_params.number,
"Starting executing batch",
);

let storage_view = StorageView::new(secondary_storage).to_rc_ptr();

Expand Down
Loading
Loading