Skip to content

Commit

Permalink
feat(state-keeper): Remove WitnessBlockState generation from state …
Browse files Browse the repository at this point in the history
…keeper (#1507)

## What ❔

`WitnessBlockState` is only used in the state keeper persistence logic
as a temporary measure before the BWIP component is fully implemented.

## Why ❔

Removing this logic simplifies state keeper persistence interfaces.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Mar 28, 2024
1 parent ccb1fe4 commit 8ae0355
Show file tree
Hide file tree
Showing 20 changed files with 32 additions and 123 deletions.
1 change: 0 additions & 1 deletion core/bin/external_node/src/main.rs
Expand Up @@ -84,7 +84,6 @@ async fn build_state_keeper(
let batch_executor_base: Box<dyn BatchExecutor> = Box::new(MainBatchExecutor::new(
Arc::new(storage_factory),
save_call_traces,
false,
true,
));

Expand Down
5 changes: 0 additions & 5 deletions core/lib/config/src/configs/chain.rs
Expand Up @@ -110,10 +110,6 @@ pub struct StateKeeperConfig {
pub virtual_blocks_interval: u32,
pub virtual_blocks_per_miniblock: u32,

/// Flag which will enable storage to cache witness_inputs during State Keeper's run.
/// NOTE: This will slow down StateKeeper, to be used in non-production environments!
pub upload_witness_inputs_to_gcs: bool,

/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
pub enum_index_migration_chunk_size: Option<usize>,

Expand Down Expand Up @@ -153,7 +149,6 @@ impl StateKeeperConfig {
save_call_traces: true,
virtual_blocks_interval: 1,
virtual_blocks_per_miniblock: 1,
upload_witness_inputs_to_gcs: false,
enum_index_migration_chunk_size: None,
bootloader_hash: None,
default_aa_hash: None,
Expand Down
1 change: 0 additions & 1 deletion core/lib/config/src/testonly.rs
Expand Up @@ -288,7 +288,6 @@ impl RandomConfig for configs::chain::StateKeeperConfig {
save_call_traces: g.gen(),
virtual_blocks_interval: g.gen(),
virtual_blocks_per_miniblock: g.gen(),
upload_witness_inputs_to_gcs: g.gen(),
enum_index_migration_chunk_size: g.gen(),
bootloader_hash: g.gen(),
default_aa_hash: g.gen(),
Expand Down
2 changes: 0 additions & 2 deletions core/lib/env_config/src/chain.rs
Expand Up @@ -92,7 +92,6 @@ mod tests {
save_call_traces: false,
virtual_blocks_interval: 1,
virtual_blocks_per_miniblock: 1,
upload_witness_inputs_to_gcs: false,
enum_index_migration_chunk_size: Some(2_000),
bootloader_hash: Some(hash(
"0x010007ede999d096c84553fb514d3d6ca76fbf39789dda76bfeda9f3ae06236e",
Expand Down Expand Up @@ -129,7 +128,6 @@ mod tests {
CHAIN_STATE_KEEPER_FEE_MODEL_VERSION="V2"
CHAIN_STATE_KEEPER_VALIDATION_COMPUTATIONAL_GAS_LIMIT="10000000"
CHAIN_STATE_KEEPER_SAVE_CALL_TRACES="false"
CHAIN_STATE_KEEPER_UPLOAD_WITNESS_INPUTS_TO_GCS="false"
CHAIN_STATE_KEEPER_ENUM_INDEX_MIGRATION_CHUNK_SIZE="2000"
CHAIN_STATE_KEEPER_VIRTUAL_BLOCKS_PER_MINIBLOCK="1"
CHAIN_STATE_KEEPER_VIRTUAL_BLOCKS_INTERVAL="1"
Expand Down
3 changes: 0 additions & 3 deletions core/lib/protobuf_config/src/chain.rs
Expand Up @@ -133,8 +133,6 @@ impl ProtoRepr for proto::StateKeeper {
.context("virtual_blocks_interval")?,
virtual_blocks_per_miniblock: *required(&self.virtual_blocks_per_miniblock)
.context("virtual_blocks_per_miniblock")?,
upload_witness_inputs_to_gcs: *required(&self.upload_witness_inputs_to_gcs)
.context("upload_witness_inputs_to_gcs")?,
enum_index_migration_chunk_size: self
.enum_index_migration_chunk_size
.map(|x| x.try_into())
Expand Down Expand Up @@ -183,7 +181,6 @@ impl ProtoRepr for proto::StateKeeper {
save_call_traces: Some(this.save_call_traces),
virtual_blocks_interval: Some(this.virtual_blocks_interval),
virtual_blocks_per_miniblock: Some(this.virtual_blocks_per_miniblock),
upload_witness_inputs_to_gcs: Some(this.upload_witness_inputs_to_gcs),
enum_index_migration_chunk_size: this
.enum_index_migration_chunk_size
.as_ref()
Expand Down
1 change: 0 additions & 1 deletion core/lib/protobuf_config/src/proto/chain.proto
Expand Up @@ -49,7 +49,6 @@ message StateKeeper {
optional bool save_call_traces = 22; // required
optional uint32 virtual_blocks_interval = 23; // required
optional uint32 virtual_blocks_per_miniblock = 24; // required
optional bool upload_witness_inputs_to_gcs = 25; // required
optional uint64 enum_index_migration_chunk_size = 26; // optional
optional bytes bootloader_hash = 27; // required; H256
optional bytes default_aa_hash = 28; // required; H256
Expand Down
3 changes: 0 additions & 3 deletions core/lib/zksync_core/src/lib.rs
Expand Up @@ -500,7 +500,6 @@ pub async fn initialize_components(
&db_config,
&configs.mempool_config.clone().context("mempool_config")?,
batch_fee_input_provider,
store_factory.create_store().await,
stop_receiver.clone(),
)
.await
Expand Down Expand Up @@ -744,7 +743,6 @@ async fn add_state_keeper_to_task_futures(
db_config: &DBConfig,
mempool_config: &MempoolConfig,
batch_fee_input_provider: Arc<dyn BatchFeeModelInputProvider>,
object_store: Arc<dyn ObjectStore>,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let pool_builder = ConnectionPool::<Core>::singleton(postgres_config.master_url()?);
Expand All @@ -771,7 +769,6 @@ async fn add_state_keeper_to_task_futures(
contracts_config.l2_erc20_bridge_addr,
state_keeper_config.miniblock_seal_queue_capacity,
);
let persistence = persistence.with_object_store(object_store);
task_futures.push(tokio::spawn(miniblock_sealer.run()));

let (state_keeper, async_catchup_task) = create_state_keeper(
Expand Down
Expand Up @@ -35,21 +35,18 @@ use crate::{
pub struct MainBatchExecutor {
storage_factory: Arc<dyn ReadStorageFactory>,
save_call_traces: bool,
upload_witness_inputs_to_gcs: bool,
optional_bytecode_compression: bool,
}

impl MainBatchExecutor {
pub fn new(
storage_factory: Arc<dyn ReadStorageFactory>,
save_call_traces: bool,
upload_witness_inputs_to_gcs: bool,
optional_bytecode_compression: bool,
) -> Self {
Self {
storage_factory,
save_call_traces,
upload_witness_inputs_to_gcs,
optional_bytecode_compression,
}
}
Expand All @@ -71,7 +68,6 @@ impl BatchExecutor for MainBatchExecutor {
optional_bytecode_compression: self.optional_bytecode_compression,
commands: commands_receiver,
};
let upload_witness_inputs_to_gcs = self.upload_witness_inputs_to_gcs;

let storage_factory = self.storage_factory.clone();
let stop_receiver = stop_receiver.clone();
Expand All @@ -80,12 +76,7 @@ impl BatchExecutor for MainBatchExecutor {
.block_on(storage_factory.access_storage(&stop_receiver))
.expect("failed getting access to state keeper storage")
{
executor.run(
storage,
l1_batch_params,
system_env,
upload_witness_inputs_to_gcs,
);
executor.run(storage, l1_batch_params, system_env);
} else {
tracing::info!("Interrupted while trying to access state keeper storage");
}
Expand Down Expand Up @@ -116,7 +107,6 @@ impl CommandReceiver {
secondary_storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
upload_witness_inputs_to_gcs: bool,
) {
tracing::info!("Starting executing batch #{:?}", &l1_batch_params.number);

Expand All @@ -140,12 +130,7 @@ impl CommandReceiver {
}
Command::FinishBatch(resp) => {
let vm_block_result = self.finish_batch(&mut vm);
let witness_block_state = if upload_witness_inputs_to_gcs {
Some(storage_view.borrow_mut().witness_block_state())
} else {
None
};
resp.send((vm_block_result, witness_block_state)).unwrap();
resp.send(vm_block_result).unwrap();

// `storage_view` cannot be accessed while borrowed by the VM,
// so this is the only point at which storage metrics can be obtained
Expand Down
10 changes: 5 additions & 5 deletions core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs
Expand Up @@ -8,7 +8,7 @@ use tokio::{
sync::{mpsc, oneshot, watch},
task::JoinHandle,
};
use zksync_types::{vm_trace::Call, witness_block_state::WitnessBlockState, Transaction};
use zksync_types::{vm_trace::Call, Transaction};
use zksync_utils::bytecode::CompressedBytecodeInfo;

use crate::state_keeper::{
Expand Down Expand Up @@ -143,7 +143,7 @@ impl BatchExecutorHandle {
latency.observe();
}

pub(super) async fn finish_batch(self) -> (FinishedL1Batch, Option<WitnessBlockState>) {
pub(super) async fn finish_batch(self) -> FinishedL1Batch {
let (response_sender, response_receiver) = oneshot::channel();
self.commands
.send(Command::FinishBatch(response_sender))
Expand All @@ -152,10 +152,10 @@ impl BatchExecutorHandle {
let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::FinishBatch]
.start();
let resp = response_receiver.await.unwrap();
let finished_batch = response_receiver.await.unwrap();
self.handle.await.unwrap();
latency.observe();
resp
finished_batch
}
}

Expand All @@ -164,5 +164,5 @@ pub(super) enum Command {
ExecuteTx(Box<Transaction>, oneshot::Sender<TxExecutionResult>),
StartNextMiniblock(L2BlockEnv, oneshot::Sender<()>),
RollbackLastTx(oneshot::Sender<()>),
FinishBatch(oneshot::Sender<(FinishedL1Batch, Option<WitnessBlockState>)>),
FinishBatch(oneshot::Sender<FinishedL1Batch>),
}
Expand Up @@ -416,7 +416,6 @@ async fn bootloader_out_of_gas_for_any_tx() {
save_call_traces: false,
vm_gas_limit: Some(10),
validation_computational_gas_limit: u32::MAX,
upload_witness_inputs_to_gcs: false,
},
);

Expand Down Expand Up @@ -448,15 +447,20 @@ async fn bootloader_tip_out_of_gas() {
let res = executor.execute_tx(alice.execute()).await;
assert_executed(&res);

let (vm_block_res, _witness_block_state) = executor.finish_batch().await;
let finished_batch = executor.finish_batch().await;

// Just a bit below the gas used for the previous batch execution should be fine to execute the tx
// but not enough to execute the block tip.
tester.set_config(TestConfig {
save_call_traces: false,
vm_gas_limit: Some(vm_block_res.block_tip_execution_result.statistics.gas_used - 10),
vm_gas_limit: Some(
finished_batch
.block_tip_execution_result
.statistics
.gas_used
- 10,
),
validation_computational_gas_limit: u32::MAX,
upload_witness_inputs_to_gcs: false,
});

let second_executor = tester
Expand Down
Expand Up @@ -48,7 +48,6 @@ pub(super) struct TestConfig {
pub(super) save_call_traces: bool,
pub(super) vm_gas_limit: Option<u32>,
pub(super) validation_computational_gas_limit: u32,
pub(super) upload_witness_inputs_to_gcs: bool,
}

impl TestConfig {
Expand All @@ -59,7 +58,6 @@ impl TestConfig {
vm_gas_limit: None,
save_call_traces: false,
validation_computational_gas_limit: config.validation_computational_gas_limit,
upload_witness_inputs_to_gcs: false,
}
}
}
Expand Down Expand Up @@ -150,12 +148,8 @@ impl Tester {
l1_batch_env: L1BatchEnv,
system_env: SystemEnv,
) -> BatchExecutorHandle {
let mut batch_executor = MainBatchExecutor::new(
storage_factory,
self.config.save_call_traces,
self.config.upload_witness_inputs_to_gcs,
false,
);
let mut batch_executor =
MainBatchExecutor::new(storage_factory, self.config.save_call_traces, false);
let (_stop_sender, stop_receiver) = watch::channel(false);
batch_executor
.init_batch(l1_batch_env, system_env, &stop_receiver)
Expand Down Expand Up @@ -545,7 +539,7 @@ impl StorageSnapshot {
executor.start_next_miniblock(l2_block_env).await;
}

let (finished_batch, _) = executor.finish_batch().await;
let finished_batch = executor.finish_batch().await;
let storage_logs = &finished_batch.block_tip_execution_result.logs.storage_logs;
storage_writes_deduplicator.apply(storage_logs.iter().filter(|log| log.log_query.rw_flag));
let modified_entries = storage_writes_deduplicator.into_modified_key_values();
Expand Down
10 changes: 2 additions & 8 deletions core/lib/zksync_core/src/state_keeper/io/output_handler.rs
Expand Up @@ -4,7 +4,6 @@ use std::fmt;

use anyhow::Context as _;
use async_trait::async_trait;
use zksync_types::witness_block_state::WitnessBlockState;

use crate::state_keeper::{io::IoCursor, updates::UpdatesManager};

Expand All @@ -21,11 +20,7 @@ pub trait StateKeeperOutputHandler: 'static + Send + fmt::Debug {
async fn handle_miniblock(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()>;

/// Handles an L1 batch produced by the state keeper.
async fn handle_l1_batch(
&mut self,
_witness_block_state: Option<&WitnessBlockState>,
_updates_manager: &UpdatesManager,
) -> anyhow::Result<()> {
async fn handle_l1_batch(&mut self, _updates_manager: &UpdatesManager) -> anyhow::Result<()> {
Ok(())
}
}
Expand Down Expand Up @@ -86,12 +81,11 @@ impl OutputHandler {

pub(crate) async fn handle_l1_batch(
&mut self,
witness_block_state: Option<&WitnessBlockState>,
updates_manager: &UpdatesManager,
) -> anyhow::Result<()> {
for handler in &mut self.inner {
handler
.handle_l1_batch(witness_block_state, updates_manager)
.handle_l1_batch(updates_manager)
.await
.with_context(|| {
format!(
Expand Down
41 changes: 4 additions & 37 deletions core/lib/zksync_core/src/state_keeper/io/persistence.rs
@@ -1,13 +1,11 @@
//! State keeper persistence logic.

use std::{sync::Arc, time::Instant};
use std::time::Instant;

use anyhow::Context as _;
use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use zksync_dal::{ConnectionPool, Core};
use zksync_object_store::ObjectStore;
use zksync_types::{witness_block_state::WitnessBlockState, Address};
use zksync_types::Address;

use crate::{
metrics::{BlockStage, APP_METRICS},
Expand All @@ -29,7 +27,6 @@ struct Completable<T> {
#[derive(Debug)]
pub struct StateKeeperPersistence {
pool: ConnectionPool<Core>,
object_store: Option<Arc<dyn ObjectStore>>, // FIXME (PLA-857): remove from the state keeper
l2_erc20_bridge_addr: Address,
pre_insert_txs: bool,
commands_sender: mpsc::Sender<Completable<MiniblockSealCommand>>,
Expand Down Expand Up @@ -60,7 +57,6 @@ impl StateKeeperPersistence {
};
let this = Self {
pool,
object_store: None,
l2_erc20_bridge_addr,
pre_insert_txs: false,
commands_sender,
Expand All @@ -75,11 +71,6 @@ impl StateKeeperPersistence {
self
}

pub fn with_object_store(mut self, object_store: Arc<dyn ObjectStore>) -> Self {
self.object_store = Some(object_store);
self
}

/// Submits a new sealing `command` to the sealer that this handle is attached to.
///
/// If there are currently too many unprocessed commands, this method will wait until
Expand Down Expand Up @@ -156,34 +147,10 @@ impl StateKeeperOutputHandler for StateKeeperPersistence {
Ok(())
}

async fn handle_l1_batch(
&mut self,
witness_block_state: Option<&WitnessBlockState>,
updates_manager: &UpdatesManager,
) -> anyhow::Result<()> {
async fn handle_l1_batch(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> {
// We cannot start sealing an L1 batch until we've sealed all miniblocks included in it.
self.wait_for_all_commands().await;

if let Some(witness_block_state) = witness_block_state {
let store = self
.object_store
.as_deref()
.context("object store not set when saving `WitnessBlockState`")?;
match store
.put(updates_manager.l1_batch.number, witness_block_state)
.await
{
Ok(path) => {
tracing::debug!("Successfully uploaded witness block start state to Object Store to path = '{path}'");
}
Err(e) => {
tracing::error!(
"Failed to upload witness block start state to Object Store: {e:?}"
);
}
}
}

let pool = self.pool.clone();
let mut storage = pool.connection_tagged("state_keeper").await?;
updates_manager
Expand Down Expand Up @@ -323,7 +290,7 @@ mod tests {
});

updates.finish_batch(default_vm_block_result());
persistence.handle_l1_batch(None, &updates).await.unwrap();
persistence.handle_l1_batch(&updates).await.unwrap();

// Check that miniblock #1 and L1 batch #1 are persisted.
let mut storage = pool.connection().await.unwrap();
Expand Down

0 comments on commit 8ae0355

Please sign in to comment.