Skip to content

Commit

Permalink
perf(external-node): Use async miniblock sealing in external IO (#611)
Browse files Browse the repository at this point in the history
## What ❔

External IO uses async miniblock sealing. 

## Why ❔

Execution of transactions and miniblock sealing (writing data to
postgres) happen in parallel so the perfomance is better.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
perekopskiy committed Dec 6, 2023
1 parent 2de4825 commit 5cf7210
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 83 deletions.
9 changes: 9 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ pub struct OptionalENConfig {
/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
#[serde(default = "OptionalENConfig::default_enum_index_migration_chunk_size")]
pub enum_index_migration_chunk_size: usize,
/// Capacity of the queue for asynchronous miniblock sealing. Once this many miniblocks are queued,
/// sealing will block until some of the miniblocks from the queue are processed.
/// 0 means that sealing is synchronous; this is mostly useful for performance comparison, testing etc.
#[serde(default = "OptionalENConfig::default_miniblock_seal_queue_capacity")]
pub miniblock_seal_queue_capacity: usize,
}

impl OptionalENConfig {
Expand Down Expand Up @@ -288,6 +293,10 @@ impl OptionalENConfig {
5000
}

const fn default_miniblock_seal_queue_capacity() -> usize {
10
}

pub fn polling_interval(&self) -> Duration {
Duration::from_millis(self.polling_interval)
}
Expand Down
17 changes: 15 additions & 2 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use zksync_core::{
},
reorg_detector::ReorgDetector,
setup_sigint_handler,
state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper},
state_keeper::{
L1BatchExecutorBuilder, MainBatchExecutorBuilder, MiniblockSealer, MiniblockSealerHandle,
ZkSyncStateKeeper,
},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor,
genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient, SyncState,
Expand All @@ -47,6 +50,7 @@ async fn build_state_keeper(
connection_pool: ConnectionPool,
sync_state: SyncState,
l2_erc20_bridge_addr: Address,
miniblock_sealer_handle: MiniblockSealerHandle,
stop_receiver: watch::Receiver<bool>,
chain_id: L2ChainId,
) -> ZkSyncStateKeeper {
Expand All @@ -73,6 +77,7 @@ async fn build_state_keeper(
let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.expect("Failed creating JSON-RPC client for main node");
let io = ExternalIO::new(
miniblock_sealer_handle,
connection_pool,
action_queue,
sync_state,
Expand Down Expand Up @@ -106,13 +111,22 @@ async fn init_tasks(

let sync_state = SyncState::new();
let (action_queue_sender, action_queue) = ActionQueue::new();

let mut task_handles = vec![];
let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(
connection_pool.clone(),
config.optional.miniblock_seal_queue_capacity,
);
task_handles.push(tokio::spawn(miniblock_sealer.run()));

let state_keeper = build_state_keeper(
action_queue,
config.required.state_cache_path.clone(),
&config,
connection_pool.clone(),
sync_state.clone(),
config.remote.l2_erc20_bridge_addr,
miniblock_sealer_handle,
stop_receiver.clone(),
config.remote.l2_chain_id,
)
Expand Down Expand Up @@ -271,7 +285,6 @@ async fn init_tasks(
healthchecks,
);

let mut task_handles = vec![];
if let Some(port) = config.optional.prometheus_port {
let prometheus_task = PrometheusExporterConfig::pull(port).run(stop_receiver.clone());
task_handles.push(tokio::spawn(prometheus_task));
Expand Down
3 changes: 3 additions & 0 deletions core/lib/zksync_core/src/state_keeper/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ where
self.current_l1_batch_number,
self.current_miniblock_number,
self.l2_erc20_bridge_addr,
None,
false,
);
self.miniblock_sealer_handle.submit(command).await;
self.current_miniblock_number += 1;
Expand Down Expand Up @@ -323,6 +325,7 @@ where
l1_batch_env,
finished_batch,
self.l2_erc20_bridge_addr,
None,
)
.await;
self.current_miniblock_number += 1; // Due to fictive miniblock being sealed.
Expand Down
11 changes: 4 additions & 7 deletions core/lib/zksync_core/src/state_keeper/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ struct Completable<T> {

/// Handle for [`MiniblockSealer`] allowing to submit [`MiniblockSealCommand`]s.
#[derive(Debug)]
pub(crate) struct MiniblockSealerHandle {
pub struct MiniblockSealerHandle {
commands_sender: mpsc::Sender<Completable<MiniblockSealCommand>>,
latest_completion_receiver: Option<oneshot::Receiver<()>>,
// If true, `submit()` will wait for the operation to complete.
Expand All @@ -144,7 +144,7 @@ impl MiniblockSealerHandle {
///
/// If there are currently too many unprocessed commands, this method will wait until
/// enough of them are processed (i.e., there is back pressure).
pub async fn submit(&mut self, command: MiniblockSealCommand) {
pub(crate) async fn submit(&mut self, command: MiniblockSealCommand) {
let miniblock_number = command.miniblock_number;
tracing::debug!(
"Enqueuing sealing command for miniblock #{miniblock_number} with #{} txs (L1 batch #{})",
Expand Down Expand Up @@ -209,7 +209,7 @@ impl MiniblockSealerHandle {

/// Component responsible for sealing miniblocks (i.e., storing their data to Postgres).
#[derive(Debug)]
pub(crate) struct MiniblockSealer {
pub struct MiniblockSealer {
pool: ConnectionPool,
is_sync: bool,
// Weak sender handle to get queue capacity stats.
Expand All @@ -220,10 +220,7 @@ pub(crate) struct MiniblockSealer {
impl MiniblockSealer {
/// Creates a sealer that will use the provided Postgres connection and will have the specified
/// `command_capacity` for unprocessed sealing commands.
pub(crate) fn new(
pool: ConnectionPool,
mut command_capacity: usize,
) -> (Self, MiniblockSealerHandle) {
pub fn new(pool: ConnectionPool, mut command_capacity: usize) -> (Self, MiniblockSealerHandle) {
let is_sync = command_capacity == 0;
command_capacity = command_capacity.max(1);

Expand Down
55 changes: 52 additions & 3 deletions core/lib/zksync_core/src/state_keeper/io/seal_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use multivm::interface::{FinishedL1Batch, L1BatchEnv};
use zksync_dal::blocks_dal::ConsensusBlockFields;
use zksync_dal::StorageProcessor;
use zksync_system_constants::ACCOUNT_CODE_STORAGE_ADDRESS;
use zksync_types::{
Expand All @@ -18,14 +19,18 @@ use zksync_types::{
use zksync_types::{
block::{L1BatchHeader, MiniblockHeader},
event::{extract_added_tokens, extract_long_l2_to_l1_messages},
l1::L1Tx,
l2::L2Tx,
protocol_version::ProtocolUpgradeTx,
storage_writes_deduplicator::{ModifiedSlot, StorageWritesDeduplicator},
tx::{
tx_execution_info::DeduplicatedWritesMetrics, IncludedTxLocation,
TransactionExecutionResult,
},
zkevm_test_harness::witness::sort_storage_access::sort_storage_access_queries,
AccountTreeId, Address, ExecuteTransactionCommon, L1BatchNumber, LogQuery, MiniblockNumber,
StorageKey, StorageLog, StorageLogQuery, StorageValue, Transaction, VmEvent, H256,
AccountTreeId, Address, ExecuteTransactionCommon, L1BatchNumber, L1BlockNumber, LogQuery,
MiniblockNumber, StorageKey, StorageLog, StorageLogQuery, StorageValue, Transaction, VmEvent,
H256,
};
// TODO (SMA-1206): use seconds instead of milliseconds.
use zksync_utils::{h256_to_u256, time::millis_since_epoch, u256_to_h256};
Expand All @@ -50,6 +55,7 @@ impl UpdatesManager {
l1_batch_env: &L1BatchEnv,
finished_batch: FinishedL1Batch,
l2_erc20_bridge_addr: Address,
consensus: Option<ConsensusBlockFields>,
) {
let started_at = Instant::now();
let progress = L1_BATCH_METRICS.start(L1BatchSealStage::VmFinalization);
Expand All @@ -63,6 +69,8 @@ impl UpdatesManager {
l1_batch_env.number,
current_miniblock_number,
l2_erc20_bridge_addr,
consensus,
false, // fictive miniblocks don't have txs, so it's fine to pass `false` here.
);
miniblock_command.seal_inner(&mut transaction, true).await;
progress.observe(None);
Expand Down Expand Up @@ -274,6 +282,36 @@ impl MiniblockSealCommand {
async fn seal_inner(&self, storage: &mut StorageProcessor<'_>, is_fictive: bool) {
self.assert_valid_miniblock(is_fictive);

let mut transaction = storage.start_transaction().await.unwrap();
if self.pre_insert_txs {
let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::PreInsertTxs, is_fictive);
for tx in &self.miniblock.executed_transactions {
if let Ok(l1_tx) = L1Tx::try_from(tx.transaction.clone()) {
let l1_block_number = L1BlockNumber(l1_tx.common_data.eth_block as u32);
transaction
.transactions_dal()
.insert_transaction_l1(l1_tx, l1_block_number)
.await;
} else if let Ok(l2_tx) = L2Tx::try_from(tx.transaction.clone()) {
// Using `Default` for execution metrics should be OK here, since this data is not used on the EN.
transaction
.transactions_dal()
.insert_transaction_l2(l2_tx, Default::default())
.await;
} else if let Ok(protocol_system_upgrade_tx) =
ProtocolUpgradeTx::try_from(tx.transaction.clone())
{
transaction
.transactions_dal()
.insert_system_transaction(protocol_system_upgrade_tx)
.await;
} else {
unreachable!("Transaction {:?} is neither L1 nor L2", tx.transaction);
}
}
progress.observe(Some(self.miniblock.executed_transactions.len()));
}

let l1_batch_number = self.l1_batch_number;
let miniblock_number = self.miniblock_number;
let started_at = Instant::now();
Expand All @@ -291,7 +329,6 @@ impl MiniblockSealCommand {
event_count = self.miniblock.events.len()
);

let mut transaction = storage.start_transaction().await.unwrap();
let miniblock_header = MiniblockHeader {
number: miniblock_number,
timestamp: self.miniblock.timestamp,
Expand Down Expand Up @@ -404,6 +441,18 @@ impl MiniblockSealCommand {
.await;
progress.observe(user_l2_to_l1_log_count);

let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::InsertConsensus, is_fictive);
// We want to add miniblock consensus fields atomically with the miniblock data so that we
// don't need to deal with corner cases (e.g., a miniblock w/o consensus fields).
if let Some(consensus) = &self.consensus {
transaction
.blocks_dal()
.set_miniblock_consensus_fields(self.miniblock_number, consensus)
.await
.unwrap();
}
progress.observe(None);

let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::CommitMiniblock, is_fictive);
let current_l2_virtual_block_info = transaction
.storage_dal()
Expand Down
12 changes: 12 additions & 0 deletions core/lib/zksync_core/src/state_keeper/io/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ async fn processing_storage_logs_when_sealing_miniblock() {
base_system_contracts_hashes: BaseSystemContractsHashes::default(),
protocol_version: Some(ProtocolVersionId::latest()),
l2_erc20_bridge_addr: Address::default(),
consensus: None,
pre_insert_txs: false,
};
let mut conn = connection_pool
.access_storage_tagged("state_keeper")
Expand Down Expand Up @@ -321,6 +323,8 @@ async fn processing_events_when_sealing_miniblock() {
base_system_contracts_hashes: BaseSystemContractsHashes::default(),
protocol_version: Some(ProtocolVersionId::latest()),
l2_erc20_bridge_addr: Address::default(),
consensus: None,
pre_insert_txs: false,
};
let mut conn = pool.access_storage_tagged("state_keeper").await.unwrap();
conn.protocol_versions_dal()
Expand Down Expand Up @@ -434,6 +438,8 @@ async fn miniblock_sealer_handle_blocking() {
L1BatchNumber(1),
MiniblockNumber(1),
Address::default(),
None,
false,
);
sealer_handle.submit(seal_command).await;

Expand All @@ -442,6 +448,8 @@ async fn miniblock_sealer_handle_blocking() {
L1BatchNumber(1),
MiniblockNumber(2),
Address::default(),
None,
false,
);
{
let submit_future = sealer_handle.submit(seal_command);
Expand Down Expand Up @@ -470,6 +478,8 @@ async fn miniblock_sealer_handle_blocking() {
L1BatchNumber(2),
MiniblockNumber(3),
Address::default(),
None,
false,
);
sealer_handle.submit(seal_command).await;
let command = sealer.commands_receiver.recv().await.unwrap();
Expand All @@ -489,6 +499,8 @@ async fn miniblock_sealer_handle_parallel_processing() {
L1BatchNumber(1),
MiniblockNumber(i),
Address::default(),
None,
false,
);
sealer_handle.submit(seal_command).await;
}
Expand Down
7 changes: 2 additions & 5 deletions core/lib/zksync_core/src/state_keeper/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ pub(super) enum L1BatchSealStage {
FilterWrittenSlots,
InsertInitialWrites,
CommitL1Batch,
ExternalNodeStoreTransactions,
}

/// Buckets for positive integer, not-so-large values (e.g., initial writes count).
Expand Down Expand Up @@ -221,10 +220,6 @@ impl L1BatchMetrics {
latency_per_unit: &self.sealed_entity_per_unit[&stage],
}
}

pub(crate) fn start_storing_on_en(&self) -> LatencyObserver<'_> {
self.sealed_time_stage[&L1BatchSealStage::ExternalNodeStoreTransactions].start()
}
}

#[vise::register]
Expand All @@ -241,6 +236,7 @@ pub(super) enum MiniblockQueueStage {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)]
#[metrics(rename_all = "snake_case")]
pub(super) enum MiniblockSealStage {
PreInsertTxs,
InsertMiniblockHeader,
MarkTransactionsInMiniblock,
InsertStorageLogs,
Expand All @@ -253,6 +249,7 @@ pub(super) enum MiniblockSealStage {
InsertEvents,
ExtractL2ToL1Logs,
InsertL2ToL1Logs,
InsertConsensus,
CommitMiniblock,
}

Expand Down
6 changes: 3 additions & 3 deletions core/lib/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ pub(crate) mod updates;

pub use self::{
batch_executor::{L1BatchExecutorBuilder, MainBatchExecutorBuilder},
io::{MiniblockSealer, MiniblockSealerHandle},
keeper::ZkSyncStateKeeper,
};
pub(crate) use self::{
io::MiniblockSealer, mempool_actor::MempoolFetcher, seal_criteria::ConditionalSealer,
types::MempoolGuard,
mempool_actor::MempoolFetcher, seal_criteria::ConditionalSealer, types::MempoolGuard,
};

use self::io::{MempoolIO, MiniblockSealerHandle};
use self::io::MempoolIO;
use crate::l1_gas_price::L1GasPriceProvider;

#[allow(clippy::too_many_arguments)]
Expand Down
10 changes: 10 additions & 0 deletions core/lib/zksync_core/src/state_keeper/updates/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use multivm::interface::{L1BatchEnv, VmExecutionResultAndLogs};

use zksync_contracts::BaseSystemContractsHashes;
use zksync_dal::blocks_dal::ConsensusBlockFields;
use zksync_types::vm_trace::Call;
use zksync_types::{
block::BlockGasCount, storage_writes_deduplicator::StorageWritesDeduplicator,
Expand Down Expand Up @@ -81,6 +82,8 @@ impl UpdatesManager {
l1_batch_number: L1BatchNumber,
miniblock_number: MiniblockNumber,
l2_erc20_bridge_addr: Address,
consensus: Option<ConsensusBlockFields>,
pre_insert_txs: bool,
) -> MiniblockSealCommand {
MiniblockSealCommand {
l1_batch_number,
Expand All @@ -93,6 +96,8 @@ impl UpdatesManager {
base_system_contracts_hashes: self.base_system_contract_hashes,
protocol_version: Some(self.protocol_version),
l2_erc20_bridge_addr,
consensus,
pre_insert_txs,
}
}

Expand Down Expand Up @@ -172,6 +177,11 @@ pub(crate) struct MiniblockSealCommand {
pub base_system_contracts_hashes: BaseSystemContractsHashes,
pub protocol_version: Option<ProtocolVersionId>,
pub l2_erc20_bridge_addr: Address,
pub consensus: Option<ConsensusBlockFields>,
/// Whether transactions should be pre-inserted to DB.
/// Should be set to `true` for EN's IO as EN doesn't store transactions in DB
/// before they are included into miniblocks.
pub pre_insert_txs: bool,
}

#[cfg(test)]
Expand Down

0 comments on commit 5cf7210

Please sign in to comment.