Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(external-node): Use async miniblock sealing in external IO #611

Merged
merged 6 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ pub struct OptionalENConfig {
/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
#[serde(default = "OptionalENConfig::default_enum_index_migration_chunk_size")]
pub enum_index_migration_chunk_size: usize,
/// Capacity of the queue for asynchronous miniblock sealing. Once this many miniblocks are queued,
/// sealing will block until some of the miniblocks from the queue are processed.
/// 0 means that sealing is synchronous; this is mostly useful for performance comparison, testing etc.
#[serde(default = "OptionalENConfig::default_miniblock_seal_queue_capacity")]
pub miniblock_seal_queue_capacity: usize,
}

impl OptionalENConfig {
Expand Down Expand Up @@ -288,6 +293,10 @@ impl OptionalENConfig {
5000
}

const fn default_miniblock_seal_queue_capacity() -> usize {
10
}

pub fn polling_interval(&self) -> Duration {
Duration::from_millis(self.polling_interval)
}
Expand Down
17 changes: 15 additions & 2 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use zksync_core::{
},
reorg_detector::ReorgDetector,
setup_sigint_handler,
state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper},
state_keeper::{
L1BatchExecutorBuilder, MainBatchExecutorBuilder, MiniblockSealer, MiniblockSealerHandle,
ZkSyncStateKeeper,
},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor,
genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient, SyncState,
Expand All @@ -47,6 +50,7 @@ async fn build_state_keeper(
connection_pool: ConnectionPool,
sync_state: SyncState,
l2_erc20_bridge_addr: Address,
miniblock_sealer_handle: MiniblockSealerHandle,
stop_receiver: watch::Receiver<bool>,
chain_id: L2ChainId,
) -> ZkSyncStateKeeper {
Expand All @@ -73,6 +77,7 @@ async fn build_state_keeper(
let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.expect("Failed creating JSON-RPC client for main node");
let io = ExternalIO::new(
miniblock_sealer_handle,
connection_pool,
action_queue,
sync_state,
Expand Down Expand Up @@ -106,13 +111,22 @@ async fn init_tasks(

let sync_state = SyncState::new();
let (action_queue_sender, action_queue) = ActionQueue::new();

let mut task_handles = vec![];
let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(
connection_pool.clone(),
config.optional.miniblock_seal_queue_capacity,
);
task_handles.push(tokio::spawn(miniblock_sealer.run()));

let state_keeper = build_state_keeper(
action_queue,
config.required.state_cache_path.clone(),
&config,
connection_pool.clone(),
sync_state.clone(),
config.remote.l2_erc20_bridge_addr,
miniblock_sealer_handle,
stop_receiver.clone(),
config.remote.l2_chain_id,
)
Expand Down Expand Up @@ -271,7 +285,6 @@ async fn init_tasks(
healthchecks,
);

let mut task_handles = vec![];
if let Some(port) = config.optional.prometheus_port {
let prometheus_task = PrometheusExporterConfig::pull(port).run(stop_receiver.clone());
task_handles.push(tokio::spawn(prometheus_task));
Expand Down
3 changes: 3 additions & 0 deletions core/lib/zksync_core/src/state_keeper/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ where
self.current_l1_batch_number,
self.current_miniblock_number,
self.l2_erc20_bridge_addr,
None,
false,
);
self.miniblock_sealer_handle.submit(command).await;
self.current_miniblock_number += 1;
Expand Down Expand Up @@ -323,6 +325,7 @@ where
l1_batch_env,
finished_batch,
self.l2_erc20_bridge_addr,
None,
)
.await;
self.current_miniblock_number += 1; // Due to fictive miniblock being sealed.
Expand Down
11 changes: 4 additions & 7 deletions core/lib/zksync_core/src/state_keeper/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ struct Completable<T> {

/// Handle for [`MiniblockSealer`] allowing to submit [`MiniblockSealCommand`]s.
#[derive(Debug)]
pub(crate) struct MiniblockSealerHandle {
pub struct MiniblockSealerHandle {
commands_sender: mpsc::Sender<Completable<MiniblockSealCommand>>,
latest_completion_receiver: Option<oneshot::Receiver<()>>,
// If true, `submit()` will wait for the operation to complete.
Expand All @@ -144,7 +144,7 @@ impl MiniblockSealerHandle {
///
/// If there are currently too many unprocessed commands, this method will wait until
/// enough of them are processed (i.e., there is back pressure).
pub async fn submit(&mut self, command: MiniblockSealCommand) {
pub(crate) async fn submit(&mut self, command: MiniblockSealCommand) {
let miniblock_number = command.miniblock_number;
tracing::debug!(
"Enqueuing sealing command for miniblock #{miniblock_number} with #{} txs (L1 batch #{})",
Expand Down Expand Up @@ -209,7 +209,7 @@ impl MiniblockSealerHandle {

/// Component responsible for sealing miniblocks (i.e., storing their data to Postgres).
#[derive(Debug)]
pub(crate) struct MiniblockSealer {
pub struct MiniblockSealer {
pool: ConnectionPool,
is_sync: bool,
// Weak sender handle to get queue capacity stats.
Expand All @@ -220,10 +220,7 @@ pub(crate) struct MiniblockSealer {
impl MiniblockSealer {
/// Creates a sealer that will use the provided Postgres connection and will have the specified
/// `command_capacity` for unprocessed sealing commands.
pub(crate) fn new(
pool: ConnectionPool,
mut command_capacity: usize,
) -> (Self, MiniblockSealerHandle) {
pub fn new(pool: ConnectionPool, mut command_capacity: usize) -> (Self, MiniblockSealerHandle) {
let is_sync = command_capacity == 0;
command_capacity = command_capacity.max(1);

Expand Down
55 changes: 52 additions & 3 deletions core/lib/zksync_core/src/state_keeper/io/seal_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use multivm::interface::{FinishedL1Batch, L1BatchEnv};
use zksync_dal::blocks_dal::ConsensusBlockFields;
use zksync_dal::StorageProcessor;
use zksync_system_constants::ACCOUNT_CODE_STORAGE_ADDRESS;
use zksync_types::{
Expand All @@ -18,14 +19,18 @@ use zksync_types::{
use zksync_types::{
block::{L1BatchHeader, MiniblockHeader},
event::{extract_added_tokens, extract_long_l2_to_l1_messages},
l1::L1Tx,
l2::L2Tx,
protocol_version::ProtocolUpgradeTx,
storage_writes_deduplicator::{ModifiedSlot, StorageWritesDeduplicator},
tx::{
tx_execution_info::DeduplicatedWritesMetrics, IncludedTxLocation,
TransactionExecutionResult,
},
zkevm_test_harness::witness::sort_storage_access::sort_storage_access_queries,
AccountTreeId, Address, ExecuteTransactionCommon, L1BatchNumber, LogQuery, MiniblockNumber,
StorageKey, StorageLog, StorageLogQuery, StorageValue, Transaction, VmEvent, H256,
AccountTreeId, Address, ExecuteTransactionCommon, L1BatchNumber, L1BlockNumber, LogQuery,
MiniblockNumber, StorageKey, StorageLog, StorageLogQuery, StorageValue, Transaction, VmEvent,
H256,
};
// TODO (SMA-1206): use seconds instead of milliseconds.
use zksync_utils::{h256_to_u256, time::millis_since_epoch, u256_to_h256};
Expand All @@ -50,6 +55,7 @@ impl UpdatesManager {
l1_batch_env: &L1BatchEnv,
finished_batch: FinishedL1Batch,
l2_erc20_bridge_addr: Address,
consensus: Option<ConsensusBlockFields>,
) {
let started_at = Instant::now();
let progress = L1_BATCH_METRICS.start(L1BatchSealStage::VmFinalization);
Expand All @@ -63,6 +69,8 @@ impl UpdatesManager {
l1_batch_env.number,
current_miniblock_number,
l2_erc20_bridge_addr,
consensus,
false, // fictive miniblocks don't have txs, so it's fine to pass `false` here.
);
miniblock_command.seal_inner(&mut transaction, true).await;
progress.observe(None);
Expand Down Expand Up @@ -274,6 +282,36 @@ impl MiniblockSealCommand {
async fn seal_inner(&self, storage: &mut StorageProcessor<'_>, is_fictive: bool) {
self.assert_valid_miniblock(is_fictive);

let mut transaction = storage.start_transaction().await.unwrap();
if self.pre_insert_txs {
let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::PreInsertTxs, is_fictive);
for tx in &self.miniblock.executed_transactions {
if let Ok(l1_tx) = L1Tx::try_from(tx.transaction.clone()) {
let l1_block_number = L1BlockNumber(l1_tx.common_data.eth_block as u32);
transaction
.transactions_dal()
.insert_transaction_l1(l1_tx, l1_block_number)
.await;
} else if let Ok(l2_tx) = L2Tx::try_from(tx.transaction.clone()) {
// Using `Default` for execution metrics should be OK here, since this data is not used on the EN.
transaction
.transactions_dal()
.insert_transaction_l2(l2_tx, Default::default())
.await;
} else if let Ok(protocol_system_upgrade_tx) =
ProtocolUpgradeTx::try_from(tx.transaction.clone())
{
transaction
.transactions_dal()
.insert_system_transaction(protocol_system_upgrade_tx)
.await;
} else {
unreachable!("Transaction {:?} is neither L1 nor L2", tx.transaction);
}
}
progress.observe(Some(self.miniblock.executed_transactions.len()));
}

let l1_batch_number = self.l1_batch_number;
let miniblock_number = self.miniblock_number;
let started_at = Instant::now();
Expand All @@ -291,7 +329,6 @@ impl MiniblockSealCommand {
event_count = self.miniblock.events.len()
);

let mut transaction = storage.start_transaction().await.unwrap();
let miniblock_header = MiniblockHeader {
number: miniblock_number,
timestamp: self.miniblock.timestamp,
Expand Down Expand Up @@ -404,6 +441,18 @@ impl MiniblockSealCommand {
.await;
progress.observe(user_l2_to_l1_log_count);

let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::InsertConsensus, is_fictive);
// We want to add miniblock consensus fields atomically with the miniblock data so that we
// don't need to deal with corner cases (e.g., a miniblock w/o consensus fields).
if let Some(consensus) = &self.consensus {
transaction
.blocks_dal()
.set_miniblock_consensus_fields(self.miniblock_number, consensus)
.await
.unwrap();
}
progress.observe(None);

let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::CommitMiniblock, is_fictive);
let current_l2_virtual_block_info = transaction
.storage_dal()
Expand Down
12 changes: 12 additions & 0 deletions core/lib/zksync_core/src/state_keeper/io/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ async fn processing_storage_logs_when_sealing_miniblock() {
base_system_contracts_hashes: BaseSystemContractsHashes::default(),
protocol_version: Some(ProtocolVersionId::latest()),
l2_erc20_bridge_addr: Address::default(),
consensus: None,
pre_insert_txs: false,
};
let mut conn = connection_pool
.access_storage_tagged("state_keeper")
Expand Down Expand Up @@ -321,6 +323,8 @@ async fn processing_events_when_sealing_miniblock() {
base_system_contracts_hashes: BaseSystemContractsHashes::default(),
protocol_version: Some(ProtocolVersionId::latest()),
l2_erc20_bridge_addr: Address::default(),
consensus: None,
pre_insert_txs: false,
};
let mut conn = pool.access_storage_tagged("state_keeper").await.unwrap();
conn.protocol_versions_dal()
Expand Down Expand Up @@ -434,6 +438,8 @@ async fn miniblock_sealer_handle_blocking() {
L1BatchNumber(1),
MiniblockNumber(1),
Address::default(),
None,
false,
);
sealer_handle.submit(seal_command).await;

Expand All @@ -442,6 +448,8 @@ async fn miniblock_sealer_handle_blocking() {
L1BatchNumber(1),
MiniblockNumber(2),
Address::default(),
None,
false,
);
{
let submit_future = sealer_handle.submit(seal_command);
Expand Down Expand Up @@ -470,6 +478,8 @@ async fn miniblock_sealer_handle_blocking() {
L1BatchNumber(2),
MiniblockNumber(3),
Address::default(),
None,
false,
);
sealer_handle.submit(seal_command).await;
let command = sealer.commands_receiver.recv().await.unwrap();
Expand All @@ -489,6 +499,8 @@ async fn miniblock_sealer_handle_parallel_processing() {
L1BatchNumber(1),
MiniblockNumber(i),
Address::default(),
None,
false,
);
sealer_handle.submit(seal_command).await;
}
Expand Down
7 changes: 2 additions & 5 deletions core/lib/zksync_core/src/state_keeper/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ pub(super) enum L1BatchSealStage {
FilterWrittenSlots,
InsertInitialWrites,
CommitL1Batch,
ExternalNodeStoreTransactions,
}

/// Buckets for positive integer, not-so-large values (e.g., initial writes count).
Expand Down Expand Up @@ -221,10 +220,6 @@ impl L1BatchMetrics {
latency_per_unit: &self.sealed_entity_per_unit[&stage],
}
}

pub(crate) fn start_storing_on_en(&self) -> LatencyObserver<'_> {
self.sealed_time_stage[&L1BatchSealStage::ExternalNodeStoreTransactions].start()
}
}

#[vise::register]
Expand All @@ -241,6 +236,7 @@ pub(super) enum MiniblockQueueStage {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)]
#[metrics(rename_all = "snake_case")]
pub(super) enum MiniblockSealStage {
PreInsertTxs,
InsertMiniblockHeader,
MarkTransactionsInMiniblock,
InsertStorageLogs,
Expand All @@ -253,6 +249,7 @@ pub(super) enum MiniblockSealStage {
InsertEvents,
ExtractL2ToL1Logs,
InsertL2ToL1Logs,
InsertConsensus,
CommitMiniblock,
}

Expand Down
6 changes: 3 additions & 3 deletions core/lib/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ pub(crate) mod updates;

pub use self::{
batch_executor::{L1BatchExecutorBuilder, MainBatchExecutorBuilder},
io::{MiniblockSealer, MiniblockSealerHandle},
keeper::ZkSyncStateKeeper,
};
pub(crate) use self::{
io::MiniblockSealer, mempool_actor::MempoolFetcher, seal_criteria::ConditionalSealer,
types::MempoolGuard,
mempool_actor::MempoolFetcher, seal_criteria::ConditionalSealer, types::MempoolGuard,
};

use self::io::{MempoolIO, MiniblockSealerHandle};
use self::io::MempoolIO;
use crate::l1_gas_price::L1GasPriceProvider;

#[allow(clippy::too_many_arguments)]
Expand Down
10 changes: 10 additions & 0 deletions core/lib/zksync_core/src/state_keeper/updates/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use multivm::interface::{L1BatchEnv, VmExecutionResultAndLogs};

use zksync_contracts::BaseSystemContractsHashes;
use zksync_dal::blocks_dal::ConsensusBlockFields;
use zksync_types::vm_trace::Call;
use zksync_types::{
block::BlockGasCount, storage_writes_deduplicator::StorageWritesDeduplicator,
Expand Down Expand Up @@ -81,6 +82,8 @@ impl UpdatesManager {
l1_batch_number: L1BatchNumber,
miniblock_number: MiniblockNumber,
l2_erc20_bridge_addr: Address,
consensus: Option<ConsensusBlockFields>,
pre_insert_txs: bool,
) -> MiniblockSealCommand {
MiniblockSealCommand {
l1_batch_number,
Expand All @@ -93,6 +96,8 @@ impl UpdatesManager {
base_system_contracts_hashes: self.base_system_contract_hashes,
protocol_version: Some(self.protocol_version),
l2_erc20_bridge_addr,
consensus,
pre_insert_txs,
}
}

Expand Down Expand Up @@ -172,6 +177,11 @@ pub(crate) struct MiniblockSealCommand {
pub base_system_contracts_hashes: BaseSystemContractsHashes,
pub protocol_version: Option<ProtocolVersionId>,
pub l2_erc20_bridge_addr: Address,
pub consensus: Option<ConsensusBlockFields>,
/// Whether transactions should be pre-inserted to DB.
/// Should be set to `true` for EN's IO as EN doesn't store transactions in DB
/// before they are included into miniblocks.
pub pre_insert_txs: bool,
}

#[cfg(test)]
Expand Down
Loading