From 5cf7210dc77bb615944352f23ed39fad324b914f Mon Sep 17 00:00:00 2001 From: perekopskiy <53865202+perekopskiy@users.noreply.github.com> Date: Wed, 6 Dec 2023 11:51:40 +0200 Subject: [PATCH] perf(external-node): Use async miniblock sealing in external IO (#611) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ External IO uses async miniblock sealing. ## Why ❔ Execution of transactions and miniblock sealing (writing data to postgres) happen in parallel so the perfomance is better. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `cargo spellcheck --cfg=./spellcheck/era.cfg --code 1`. --- core/bin/external_node/src/config/mod.rs | 9 +++ core/bin/external_node/src/main.rs | 17 +++- .../src/state_keeper/io/mempool.rs | 3 + .../zksync_core/src/state_keeper/io/mod.rs | 11 +-- .../src/state_keeper/io/seal_logic.rs | 55 ++++++++++++- .../src/state_keeper/io/tests/mod.rs | 12 +++ .../zksync_core/src/state_keeper/metrics.rs | 7 +- core/lib/zksync_core/src/state_keeper/mod.rs | 6 +- .../src/state_keeper/updates/mod.rs | 10 +++ .../zksync_core/src/sync_layer/external_io.rs | 78 ++++--------------- core/lib/zksync_core/src/sync_layer/tests.rs | 6 +- spellcheck/era.dic | 1 + 12 files changed, 132 insertions(+), 83 deletions(-) diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 3f26a334ea3..c116201b91d 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -190,6 +190,11 @@ pub struct OptionalENConfig { /// Number of keys that is processed by enum_index migration in State Keeper each L1 batch. #[serde(default = "OptionalENConfig::default_enum_index_migration_chunk_size")] pub enum_index_migration_chunk_size: usize, + /// Capacity of the queue for asynchronous miniblock sealing. Once this many miniblocks are queued, + /// sealing will block until some of the miniblocks from the queue are processed. + /// 0 means that sealing is synchronous; this is mostly useful for performance comparison, testing etc. + #[serde(default = "OptionalENConfig::default_miniblock_seal_queue_capacity")] + pub miniblock_seal_queue_capacity: usize, } impl OptionalENConfig { @@ -288,6 +293,10 @@ impl OptionalENConfig { 5000 } + const fn default_miniblock_seal_queue_capacity() -> usize { + 10 + } + pub fn polling_interval(&self) -> Duration { Duration::from_millis(self.polling_interval) } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 52f3353dc07..6324b0599a6 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -22,7 +22,10 @@ use zksync_core::{ }, reorg_detector::ReorgDetector, setup_sigint_handler, - state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper}, + state_keeper::{ + L1BatchExecutorBuilder, MainBatchExecutorBuilder, MiniblockSealer, MiniblockSealerHandle, + ZkSyncStateKeeper, + }, sync_layer::{ batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor, genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient, SyncState, @@ -47,6 +50,7 @@ async fn build_state_keeper( connection_pool: ConnectionPool, sync_state: SyncState, l2_erc20_bridge_addr: Address, + miniblock_sealer_handle: MiniblockSealerHandle, stop_receiver: watch::Receiver, chain_id: L2ChainId, ) -> ZkSyncStateKeeper { @@ -73,6 +77,7 @@ async fn build_state_keeper( let main_node_client = ::json_rpc(&main_node_url) .expect("Failed creating JSON-RPC client for main node"); let io = ExternalIO::new( + miniblock_sealer_handle, connection_pool, action_queue, sync_state, @@ -106,6 +111,14 @@ async fn init_tasks( let sync_state = SyncState::new(); let (action_queue_sender, action_queue) = ActionQueue::new(); + + let mut task_handles = vec![]; + let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new( + connection_pool.clone(), + config.optional.miniblock_seal_queue_capacity, + ); + task_handles.push(tokio::spawn(miniblock_sealer.run())); + let state_keeper = build_state_keeper( action_queue, config.required.state_cache_path.clone(), @@ -113,6 +126,7 @@ async fn init_tasks( connection_pool.clone(), sync_state.clone(), config.remote.l2_erc20_bridge_addr, + miniblock_sealer_handle, stop_receiver.clone(), config.remote.l2_chain_id, ) @@ -271,7 +285,6 @@ async fn init_tasks( healthchecks, ); - let mut task_handles = vec![]; if let Some(port) = config.optional.prometheus_port { let prometheus_task = PrometheusExporterConfig::pull(port).run(stop_receiver.clone()); task_handles.push(tokio::spawn(prometheus_task)); diff --git a/core/lib/zksync_core/src/state_keeper/io/mempool.rs b/core/lib/zksync_core/src/state_keeper/io/mempool.rs index f10ad87580c..1d3ad506df6 100644 --- a/core/lib/zksync_core/src/state_keeper/io/mempool.rs +++ b/core/lib/zksync_core/src/state_keeper/io/mempool.rs @@ -274,6 +274,8 @@ where self.current_l1_batch_number, self.current_miniblock_number, self.l2_erc20_bridge_addr, + None, + false, ); self.miniblock_sealer_handle.submit(command).await; self.current_miniblock_number += 1; @@ -323,6 +325,7 @@ where l1_batch_env, finished_batch, self.l2_erc20_bridge_addr, + None, ) .await; self.current_miniblock_number += 1; // Due to fictive miniblock being sealed. diff --git a/core/lib/zksync_core/src/state_keeper/io/mod.rs b/core/lib/zksync_core/src/state_keeper/io/mod.rs index 858c46b2e70..313c363418d 100644 --- a/core/lib/zksync_core/src/state_keeper/io/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/io/mod.rs @@ -130,7 +130,7 @@ struct Completable { /// Handle for [`MiniblockSealer`] allowing to submit [`MiniblockSealCommand`]s. #[derive(Debug)] -pub(crate) struct MiniblockSealerHandle { +pub struct MiniblockSealerHandle { commands_sender: mpsc::Sender>, latest_completion_receiver: Option>, // If true, `submit()` will wait for the operation to complete. @@ -144,7 +144,7 @@ impl MiniblockSealerHandle { /// /// If there are currently too many unprocessed commands, this method will wait until /// enough of them are processed (i.e., there is back pressure). - pub async fn submit(&mut self, command: MiniblockSealCommand) { + pub(crate) async fn submit(&mut self, command: MiniblockSealCommand) { let miniblock_number = command.miniblock_number; tracing::debug!( "Enqueuing sealing command for miniblock #{miniblock_number} with #{} txs (L1 batch #{})", @@ -209,7 +209,7 @@ impl MiniblockSealerHandle { /// Component responsible for sealing miniblocks (i.e., storing their data to Postgres). #[derive(Debug)] -pub(crate) struct MiniblockSealer { +pub struct MiniblockSealer { pool: ConnectionPool, is_sync: bool, // Weak sender handle to get queue capacity stats. @@ -220,10 +220,7 @@ pub(crate) struct MiniblockSealer { impl MiniblockSealer { /// Creates a sealer that will use the provided Postgres connection and will have the specified /// `command_capacity` for unprocessed sealing commands. - pub(crate) fn new( - pool: ConnectionPool, - mut command_capacity: usize, - ) -> (Self, MiniblockSealerHandle) { + pub fn new(pool: ConnectionPool, mut command_capacity: usize) -> (Self, MiniblockSealerHandle) { let is_sync = command_capacity == 0; command_capacity = command_capacity.max(1); diff --git a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs index ca2dc641909..4501be62f78 100644 --- a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs +++ b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs @@ -8,6 +8,7 @@ use std::{ }; use multivm::interface::{FinishedL1Batch, L1BatchEnv}; +use zksync_dal::blocks_dal::ConsensusBlockFields; use zksync_dal::StorageProcessor; use zksync_system_constants::ACCOUNT_CODE_STORAGE_ADDRESS; use zksync_types::{ @@ -18,14 +19,18 @@ use zksync_types::{ use zksync_types::{ block::{L1BatchHeader, MiniblockHeader}, event::{extract_added_tokens, extract_long_l2_to_l1_messages}, + l1::L1Tx, + l2::L2Tx, + protocol_version::ProtocolUpgradeTx, storage_writes_deduplicator::{ModifiedSlot, StorageWritesDeduplicator}, tx::{ tx_execution_info::DeduplicatedWritesMetrics, IncludedTxLocation, TransactionExecutionResult, }, zkevm_test_harness::witness::sort_storage_access::sort_storage_access_queries, - AccountTreeId, Address, ExecuteTransactionCommon, L1BatchNumber, LogQuery, MiniblockNumber, - StorageKey, StorageLog, StorageLogQuery, StorageValue, Transaction, VmEvent, H256, + AccountTreeId, Address, ExecuteTransactionCommon, L1BatchNumber, L1BlockNumber, LogQuery, + MiniblockNumber, StorageKey, StorageLog, StorageLogQuery, StorageValue, Transaction, VmEvent, + H256, }; // TODO (SMA-1206): use seconds instead of milliseconds. use zksync_utils::{h256_to_u256, time::millis_since_epoch, u256_to_h256}; @@ -50,6 +55,7 @@ impl UpdatesManager { l1_batch_env: &L1BatchEnv, finished_batch: FinishedL1Batch, l2_erc20_bridge_addr: Address, + consensus: Option, ) { let started_at = Instant::now(); let progress = L1_BATCH_METRICS.start(L1BatchSealStage::VmFinalization); @@ -63,6 +69,8 @@ impl UpdatesManager { l1_batch_env.number, current_miniblock_number, l2_erc20_bridge_addr, + consensus, + false, // fictive miniblocks don't have txs, so it's fine to pass `false` here. ); miniblock_command.seal_inner(&mut transaction, true).await; progress.observe(None); @@ -274,6 +282,36 @@ impl MiniblockSealCommand { async fn seal_inner(&self, storage: &mut StorageProcessor<'_>, is_fictive: bool) { self.assert_valid_miniblock(is_fictive); + let mut transaction = storage.start_transaction().await.unwrap(); + if self.pre_insert_txs { + let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::PreInsertTxs, is_fictive); + for tx in &self.miniblock.executed_transactions { + if let Ok(l1_tx) = L1Tx::try_from(tx.transaction.clone()) { + let l1_block_number = L1BlockNumber(l1_tx.common_data.eth_block as u32); + transaction + .transactions_dal() + .insert_transaction_l1(l1_tx, l1_block_number) + .await; + } else if let Ok(l2_tx) = L2Tx::try_from(tx.transaction.clone()) { + // Using `Default` for execution metrics should be OK here, since this data is not used on the EN. + transaction + .transactions_dal() + .insert_transaction_l2(l2_tx, Default::default()) + .await; + } else if let Ok(protocol_system_upgrade_tx) = + ProtocolUpgradeTx::try_from(tx.transaction.clone()) + { + transaction + .transactions_dal() + .insert_system_transaction(protocol_system_upgrade_tx) + .await; + } else { + unreachable!("Transaction {:?} is neither L1 nor L2", tx.transaction); + } + } + progress.observe(Some(self.miniblock.executed_transactions.len())); + } + let l1_batch_number = self.l1_batch_number; let miniblock_number = self.miniblock_number; let started_at = Instant::now(); @@ -291,7 +329,6 @@ impl MiniblockSealCommand { event_count = self.miniblock.events.len() ); - let mut transaction = storage.start_transaction().await.unwrap(); let miniblock_header = MiniblockHeader { number: miniblock_number, timestamp: self.miniblock.timestamp, @@ -404,6 +441,18 @@ impl MiniblockSealCommand { .await; progress.observe(user_l2_to_l1_log_count); + let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::InsertConsensus, is_fictive); + // We want to add miniblock consensus fields atomically with the miniblock data so that we + // don't need to deal with corner cases (e.g., a miniblock w/o consensus fields). + if let Some(consensus) = &self.consensus { + transaction + .blocks_dal() + .set_miniblock_consensus_fields(self.miniblock_number, consensus) + .await + .unwrap(); + } + progress.observe(None); + let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::CommitMiniblock, is_fictive); let current_l2_virtual_block_info = transaction .storage_dal() diff --git a/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs b/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs index 0c13a7a614b..2b924554f27 100644 --- a/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs @@ -245,6 +245,8 @@ async fn processing_storage_logs_when_sealing_miniblock() { base_system_contracts_hashes: BaseSystemContractsHashes::default(), protocol_version: Some(ProtocolVersionId::latest()), l2_erc20_bridge_addr: Address::default(), + consensus: None, + pre_insert_txs: false, }; let mut conn = connection_pool .access_storage_tagged("state_keeper") @@ -321,6 +323,8 @@ async fn processing_events_when_sealing_miniblock() { base_system_contracts_hashes: BaseSystemContractsHashes::default(), protocol_version: Some(ProtocolVersionId::latest()), l2_erc20_bridge_addr: Address::default(), + consensus: None, + pre_insert_txs: false, }; let mut conn = pool.access_storage_tagged("state_keeper").await.unwrap(); conn.protocol_versions_dal() @@ -434,6 +438,8 @@ async fn miniblock_sealer_handle_blocking() { L1BatchNumber(1), MiniblockNumber(1), Address::default(), + None, + false, ); sealer_handle.submit(seal_command).await; @@ -442,6 +448,8 @@ async fn miniblock_sealer_handle_blocking() { L1BatchNumber(1), MiniblockNumber(2), Address::default(), + None, + false, ); { let submit_future = sealer_handle.submit(seal_command); @@ -470,6 +478,8 @@ async fn miniblock_sealer_handle_blocking() { L1BatchNumber(2), MiniblockNumber(3), Address::default(), + None, + false, ); sealer_handle.submit(seal_command).await; let command = sealer.commands_receiver.recv().await.unwrap(); @@ -489,6 +499,8 @@ async fn miniblock_sealer_handle_parallel_processing() { L1BatchNumber(1), MiniblockNumber(i), Address::default(), + None, + false, ); sealer_handle.submit(seal_command).await; } diff --git a/core/lib/zksync_core/src/state_keeper/metrics.rs b/core/lib/zksync_core/src/state_keeper/metrics.rs index f3f43324320..72b89c4a2b8 100644 --- a/core/lib/zksync_core/src/state_keeper/metrics.rs +++ b/core/lib/zksync_core/src/state_keeper/metrics.rs @@ -168,7 +168,6 @@ pub(super) enum L1BatchSealStage { FilterWrittenSlots, InsertInitialWrites, CommitL1Batch, - ExternalNodeStoreTransactions, } /// Buckets for positive integer, not-so-large values (e.g., initial writes count). @@ -221,10 +220,6 @@ impl L1BatchMetrics { latency_per_unit: &self.sealed_entity_per_unit[&stage], } } - - pub(crate) fn start_storing_on_en(&self) -> LatencyObserver<'_> { - self.sealed_time_stage[&L1BatchSealStage::ExternalNodeStoreTransactions].start() - } } #[vise::register] @@ -241,6 +236,7 @@ pub(super) enum MiniblockQueueStage { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] #[metrics(rename_all = "snake_case")] pub(super) enum MiniblockSealStage { + PreInsertTxs, InsertMiniblockHeader, MarkTransactionsInMiniblock, InsertStorageLogs, @@ -253,6 +249,7 @@ pub(super) enum MiniblockSealStage { InsertEvents, ExtractL2ToL1Logs, InsertL2ToL1Logs, + InsertConsensus, CommitMiniblock, } diff --git a/core/lib/zksync_core/src/state_keeper/mod.rs b/core/lib/zksync_core/src/state_keeper/mod.rs index bdc1f90e206..5ec395267df 100644 --- a/core/lib/zksync_core/src/state_keeper/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/mod.rs @@ -24,14 +24,14 @@ pub(crate) mod updates; pub use self::{ batch_executor::{L1BatchExecutorBuilder, MainBatchExecutorBuilder}, + io::{MiniblockSealer, MiniblockSealerHandle}, keeper::ZkSyncStateKeeper, }; pub(crate) use self::{ - io::MiniblockSealer, mempool_actor::MempoolFetcher, seal_criteria::ConditionalSealer, - types::MempoolGuard, + mempool_actor::MempoolFetcher, seal_criteria::ConditionalSealer, types::MempoolGuard, }; -use self::io::{MempoolIO, MiniblockSealerHandle}; +use self::io::MempoolIO; use crate::l1_gas_price::L1GasPriceProvider; #[allow(clippy::too_many_arguments)] diff --git a/core/lib/zksync_core/src/state_keeper/updates/mod.rs b/core/lib/zksync_core/src/state_keeper/updates/mod.rs index dc72893e703..3f09f7be30b 100644 --- a/core/lib/zksync_core/src/state_keeper/updates/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/updates/mod.rs @@ -1,6 +1,7 @@ use multivm::interface::{L1BatchEnv, VmExecutionResultAndLogs}; use zksync_contracts::BaseSystemContractsHashes; +use zksync_dal::blocks_dal::ConsensusBlockFields; use zksync_types::vm_trace::Call; use zksync_types::{ block::BlockGasCount, storage_writes_deduplicator::StorageWritesDeduplicator, @@ -81,6 +82,8 @@ impl UpdatesManager { l1_batch_number: L1BatchNumber, miniblock_number: MiniblockNumber, l2_erc20_bridge_addr: Address, + consensus: Option, + pre_insert_txs: bool, ) -> MiniblockSealCommand { MiniblockSealCommand { l1_batch_number, @@ -93,6 +96,8 @@ impl UpdatesManager { base_system_contracts_hashes: self.base_system_contract_hashes, protocol_version: Some(self.protocol_version), l2_erc20_bridge_addr, + consensus, + pre_insert_txs, } } @@ -172,6 +177,11 @@ pub(crate) struct MiniblockSealCommand { pub base_system_contracts_hashes: BaseSystemContractsHashes, pub protocol_version: Option, pub l2_erc20_bridge_addr: Address, + pub consensus: Option, + /// Whether transactions should be pre-inserted to DB. + /// Should be set to `true` for EN's IO as EN doesn't store transactions in DB + /// before they are included into miniblocks. + pub pre_insert_txs: bool, } #[cfg(test)] diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index 8e3ca863072..4e870b95674 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -1,20 +1,14 @@ use async_trait::async_trait; use futures::future; -use std::{ - collections::HashMap, - convert::{TryFrom, TryInto}, - iter::FromIterator, - time::Duration, -}; +use std::{collections::HashMap, convert::TryInto, iter::FromIterator, time::Duration}; use multivm::interface::{FinishedL1Batch, L1BatchEnv, SystemEnv}; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; use zksync_dal::ConnectionPool; use zksync_types::{ - ethabi::Address, l1::L1Tx, l2::L2Tx, protocol_version::ProtocolUpgradeTx, - witness_block_state::WitnessBlockState, L1BatchNumber, L1BlockNumber, L2ChainId, - MiniblockNumber, ProtocolVersionId, Transaction, H256, U256, + ethabi::Address, protocol_version::ProtocolUpgradeTx, witness_block_state::WitnessBlockState, + L1BatchNumber, L2ChainId, MiniblockNumber, ProtocolVersionId, Transaction, H256, U256, }; use zksync_utils::{be_words_to_bytes, bytes_to_be_words}; @@ -29,9 +23,9 @@ use crate::{ extractors, io::{ common::{l1_batch_params, load_pending_batch, poll_iters}, - MiniblockParams, PendingBatchData, StateKeeperIO, + MiniblockParams, MiniblockSealerHandle, PendingBatchData, StateKeeperIO, }, - metrics::{KEEPER_METRICS, L1_BATCH_METRICS}, + metrics::KEEPER_METRICS, seal_criteria::IoSealCriteria, updates::UpdatesManager, }, @@ -48,6 +42,7 @@ const POLL_INTERVAL: Duration = Duration::from_millis(100); /// to the one in the mempool IO (which is used in the main node). #[derive(Debug)] pub struct ExternalIO { + miniblock_sealer_handle: MiniblockSealerHandle, pool: ConnectionPool, current_l1_batch_number: L1BatchNumber, @@ -64,7 +59,9 @@ pub struct ExternalIO { } impl ExternalIO { + #[allow(clippy::too_many_arguments)] pub async fn new( + miniblock_sealer_handle: MiniblockSealerHandle, pool: ConnectionPool, actions: ActionQueue, sync_state: SyncState, @@ -95,6 +92,7 @@ impl ExternalIO { sync_state.set_local_block(last_miniblock_number); Self { + miniblock_sealer_handle, pool, current_l1_batch_number: last_sealed_l1_batch_header.number + 1, current_miniblock_number: last_miniblock_number + 1, @@ -459,56 +457,15 @@ impl StateKeeperIO for ExternalIO { panic!("State keeper requested to seal miniblock, but the next action is {action:?}"); }; - let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap(); - let mut transaction = storage.start_transaction().await.unwrap(); - - let store_latency = L1_BATCH_METRICS.start_storing_on_en(); - // We don't store the transactions in the database until they're executed to not overcomplicate the state - // recovery on restart. So we have to store them here. - for tx in &updates_manager.miniblock.executed_transactions { - if let Ok(l1_tx) = L1Tx::try_from(tx.transaction.clone()) { - let l1_block_number = L1BlockNumber(l1_tx.common_data.eth_block as u32); - transaction - .transactions_dal() - .insert_transaction_l1(l1_tx, l1_block_number) - .await; - } else if let Ok(l2_tx) = L2Tx::try_from(tx.transaction.clone()) { - // Using `Default` for execution metrics should be OK here, since this data is not used on the EN. - transaction - .transactions_dal() - .insert_transaction_l2(l2_tx, Default::default()) - .await; - } else if let Ok(protocol_system_upgrade_tx) = - ProtocolUpgradeTx::try_from(tx.transaction.clone()) - { - transaction - .transactions_dal() - .insert_system_transaction(protocol_system_upgrade_tx) - .await; - } else { - unreachable!("Transaction {:?} is neither L1 nor L2", tx.transaction); - } - } - store_latency.observe(); - // Now transactions are stored, and we may mark them as executed. let command = updates_manager.seal_miniblock_command( self.current_l1_batch_number, self.current_miniblock_number, self.l2_erc20_bridge_addr, + consensus, + true, ); - command.seal(&mut transaction).await; - - // We want to add miniblock consensus fields atomically with the miniblock data so that we - // don't need to deal with corner cases (e.g., a miniblock w/o consensus fields). - if let Some(consensus) = &consensus { - transaction - .blocks_dal() - .set_miniblock_consensus_fields(self.current_miniblock_number, consensus) - .await - .unwrap(); - } - transaction.commit().await.unwrap(); + self.miniblock_sealer_handle.submit(command).await; self.sync_state .set_local_block(self.current_miniblock_number); @@ -531,6 +488,9 @@ impl StateKeeperIO for ExternalIO { ); }; + // We cannot start sealing an L1 batch until we've sealed all miniblocks included in it. + self.miniblock_sealer_handle.wait_for_all_commands().await; + let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap(); let mut transaction = storage.start_transaction().await.unwrap(); updates_manager @@ -540,15 +500,9 @@ impl StateKeeperIO for ExternalIO { l1_batch_env, finished_batch, self.l2_erc20_bridge_addr, + consensus, ) .await; - if let Some(consensus) = &consensus { - transaction - .blocks_dal() - .set_miniblock_consensus_fields(self.current_miniblock_number, consensus) - .await - .unwrap(); - } transaction.commit().await.unwrap(); tracing::info!("Batch {} is sealed", self.current_l1_batch_number); diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index 20bafc51cf6..10582c7d9f9 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -22,7 +22,7 @@ use crate::{ genesis::{ensure_genesis_state, GenesisParams}, state_keeper::{ tests::{create_l1_batch_metadata, create_l2_transaction, TestBatchExecutorBuilder}, - ZkSyncStateKeeper, + MiniblockSealer, ZkSyncStateKeeper, }, }; @@ -156,7 +156,11 @@ impl StateKeeperHandles { ensure_genesis(&mut pool.access_storage().await.unwrap()).await; let sync_state = SyncState::new(); + let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5); + tokio::spawn(miniblock_sealer.run()); + let io = ExternalIO::new( + miniblock_sealer_handle, pool, actions, sync_state.clone(), diff --git a/spellcheck/era.dic b/spellcheck/era.dic index 13dd303a3dc..e56162fcf02 100644 --- a/spellcheck/era.dic +++ b/spellcheck/era.dic @@ -276,6 +276,7 @@ versa blake2 AR16MT Preimages +EN's // Names Vyper