Skip to content

Commit

Permalink
refactor(en): Refactor sync action queue in EN (#243)
Browse files Browse the repository at this point in the history
# What ❔

Refactors action queue used by EN to be more idiomatic. Covers sync
logic with tests.

## Why ❔

The idea is to reuse EN code other than the fetcher with the new sync
mechanism that will be brought by consensus.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli committed Oct 19, 2023
1 parent f4ffcb9 commit a135127
Show file tree
Hide file tree
Showing 20 changed files with 854 additions and 624 deletions.
22 changes: 6 additions & 16 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ use zksync_core::{
},
reorg_detector::ReorgDetector,
setup_sigint_handler,
state_keeper::{
L1BatchExecutorBuilder, MainBatchExecutorBuilder, SealManager, ZkSyncStateKeeper,
},
state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO,
fetcher::MainNodeFetcher, genesis::perform_genesis_if_needed, ActionQueue,
ExternalNodeSealer, SyncState,
fetcher::MainNodeFetcher, genesis::perform_genesis_if_needed, ActionQueue, SyncState,
},
};
use zksync_dal::{connection::DbVariant, healthcheck::ConnectionPoolHealthCheck, ConnectionPool};
Expand All @@ -52,13 +49,7 @@ async fn build_state_keeper(
stop_receiver: watch::Receiver<bool>,
chain_id: L2ChainId,
) -> ZkSyncStateKeeper {
let en_sealer = ExternalNodeSealer::new(action_queue.clone());
let main_node_url = config.required.main_node_url().unwrap();
let sealer = SealManager::custom(
None,
vec![en_sealer.clone().into_unconditional_batch_seal_criterion()],
vec![en_sealer.into_miniblock_seal_criterion()],
);

// These config values are used on the main node, and depending on these values certain transactions can
// be *rejected* (that is, not included into the block). However, external node only mirrors what the main
Expand Down Expand Up @@ -91,10 +82,9 @@ async fn build_state_keeper(
)
.await,
);

io.recalculate_miniblock_hashes().await;

ZkSyncStateKeeper::new(stop_receiver, io, batch_executor_base, sealer)
ZkSyncStateKeeper::without_sealer(stop_receiver, io, batch_executor_base)
}

async fn init_tasks(
Expand All @@ -115,9 +105,9 @@ async fn init_tasks(
let gas_adjuster = Arc::new(MainNodeGasPriceFetcher::new(&main_node_url));

let sync_state = SyncState::new();
let action_queue = ActionQueue::new();
let (action_queue_sender, action_queue) = ActionQueue::new();
let state_keeper = build_state_keeper(
action_queue.clone(),
action_queue,
config.required.state_cache_path.clone(),
&config,
connection_pool.clone(),
Expand All @@ -135,7 +125,7 @@ async fn init_tasks(
.await
.context("failed to build a connection pool for MainNodeFetcher")?,
&main_node_url,
action_queue.clone(),
action_queue_sender,
sync_state.clone(),
stop_receiver.clone(),
)
Expand Down
16 changes: 16 additions & 0 deletions core/lib/zksync_core/src/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ pub struct GenesisParams {
pub first_l1_verifier_config: L1VerifierConfig,
}

impl GenesisParams {
#[cfg(test)]
pub(crate) fn mock() -> Self {
use zksync_types::system_contracts::get_system_smart_contracts;

Self {
first_validator: Address::repeat_byte(0x01),
protocol_version: ProtocolVersionId::latest(),
base_system_contracts: BaseSystemContracts::load_from_disk(),
system_contracts: get_system_smart_contracts(),
first_l1_verifier_config: L1VerifierConfig::default(),
first_verifier_address: Address::zero(),
}
}
}

pub async fn ensure_genesis_state(
storage: &mut StorageProcessor<'_>,
zksync_chain_id: L2ChainId,
Expand Down
26 changes: 5 additions & 21 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,8 @@ mod tests {
use tempfile::TempDir;

use db_test_macro::db_test;
use zksync_contracts::BaseSystemContracts;
use zksync_dal::ConnectionPool;
use zksync_types::{
proofs::PrepareBasicCircuitsJob, protocol_version::L1VerifierConfig,
system_contracts::get_system_smart_contracts, Address, L2ChainId, ProtocolVersionId,
StorageKey, StorageLogKind,
};
use zksync_types::{proofs::PrepareBasicCircuitsJob, L2ChainId, StorageKey, StorageLogKind};

use super::*;
use crate::{
Expand Down Expand Up @@ -403,23 +398,12 @@ mod tests {
}
}

fn mock_genesis_params() -> GenesisParams {
GenesisParams {
first_validator: Address::repeat_byte(0x01),
protocol_version: ProtocolVersionId::latest(),
base_system_contracts: BaseSystemContracts::load_from_disk(),
system_contracts: get_system_smart_contracts(),
first_l1_verifier_config: L1VerifierConfig::default(),
first_verifier_address: Address::zero(),
}
}

#[db_test]
async fn loaded_logs_equivalence_basics(pool: ConnectionPool) {
ensure_genesis_state(
&mut pool.access_storage().await.unwrap(),
L2ChainId::from(270),
&mock_genesis_params(),
&GenesisParams::mock(),
)
.await
.unwrap();
Expand All @@ -441,7 +425,7 @@ mod tests {
#[db_test]
async fn loaded_logs_equivalence_with_zero_no_op_logs(pool: ConnectionPool) {
let mut storage = pool.access_storage().await.unwrap();
ensure_genesis_state(&mut storage, L2ChainId::from(270), &mock_genesis_params())
ensure_genesis_state(&mut storage, L2ChainId::from(270), &GenesisParams::mock())
.await
.unwrap();

Expand Down Expand Up @@ -519,7 +503,7 @@ mod tests {
#[db_test]
async fn loaded_logs_equivalence_with_non_zero_no_op_logs(pool: ConnectionPool) {
let mut storage = pool.access_storage().await.unwrap();
ensure_genesis_state(&mut storage, L2ChainId::from(270), &mock_genesis_params())
ensure_genesis_state(&mut storage, L2ChainId::from(270), &GenesisParams::mock())
.await
.unwrap();

Expand Down Expand Up @@ -566,7 +550,7 @@ mod tests {
#[db_test]
async fn loaded_logs_equivalence_with_protective_reads(pool: ConnectionPool) {
let mut storage = pool.access_storage().await.unwrap();
ensure_genesis_state(&mut storage, L2ChainId::from(270), &mock_genesis_params())
ensure_genesis_state(&mut storage, L2ChainId::from(270), &GenesisParams::mock())
.await
.unwrap();

Expand Down
54 changes: 8 additions & 46 deletions core/lib/zksync_core/src/metadata_calculator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ use zksync_object_store::{ObjectStore, ObjectStoreFactory};
use zksync_types::{
block::{miniblock_hash, BlockGasCount, L1BatchHeader, MiniblockHeader},
proofs::PrepareBasicCircuitsJob,
protocol_version::L1VerifierConfig,
system_contracts::get_system_smart_contracts,
AccountTreeId, Address, L1BatchNumber, L2ChainId, MiniblockNumber, ProtocolVersionId,
StorageKey, StorageLog, H256,
AccountTreeId, Address, L1BatchNumber, L2ChainId, MiniblockNumber, StorageKey, StorageLog,
H256,
};
use zksync_utils::u32_to_h256;

Expand Down Expand Up @@ -397,27 +395,9 @@ async fn setup_calculator_with_options(

let mut storage = pool.access_storage().await.unwrap();
if storage.blocks_dal().is_genesis_needed().await.unwrap() {
let chain_id = L2ChainId::from(270);
let protocol_version = ProtocolVersionId::latest();
let base_system_contracts = BaseSystemContracts::load_from_disk();
let system_contracts = get_system_smart_contracts();
let first_validator = Address::repeat_byte(0x01);
let first_l1_verifier_config = L1VerifierConfig::default();
let first_verifier_address = Address::zero();
ensure_genesis_state(
&mut storage,
chain_id,
&GenesisParams {
first_validator,
protocol_version,
base_system_contracts,
system_contracts,
first_l1_verifier_config,
first_verifier_address,
},
)
.await
.unwrap();
ensure_genesis_state(&mut storage, L2ChainId::from(270), &GenesisParams::mock())
.await
.unwrap();
}
metadata_calculator
}
Expand Down Expand Up @@ -641,27 +621,9 @@ async fn remove_l1_batches(
#[db_test]
async fn deduplication_works_as_expected(pool: ConnectionPool) {
let mut storage = pool.access_storage().await.unwrap();

let first_validator = Address::repeat_byte(0x01);
let protocol_version = ProtocolVersionId::latest();
let base_system_contracts = BaseSystemContracts::load_from_disk();
let system_contracts = get_system_smart_contracts();
let first_l1_verifier_config = L1VerifierConfig::default();
let first_verifier_address = Address::zero();
ensure_genesis_state(
&mut storage,
L2ChainId::from(270),
&GenesisParams {
protocol_version,
first_validator,
base_system_contracts,
system_contracts,
first_l1_verifier_config,
first_verifier_address,
},
)
.await
.unwrap();
ensure_genesis_state(&mut storage, L2ChainId::from(270), &GenesisParams::mock())
.await
.unwrap();

let logs = gen_storage_logs(100..120, 1).pop().unwrap();
let hashed_keys: Vec<_> = logs.iter().map(|log| log.key.hashed_key()).collect();
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl TxExecutionResult {
#[async_trait]
pub trait L1BatchExecutorBuilder: 'static + Send + Sync + fmt::Debug {
async fn init_batch(
&self,
&mut self,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
) -> BatchExecutorHandle;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl MainBatchExecutorBuilder {
#[async_trait]
impl L1BatchExecutorBuilder for MainBatchExecutorBuilder {
async fn init_batch(
&self,
&mut self,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
) -> BatchExecutorHandle {
Expand Down
26 changes: 22 additions & 4 deletions core/lib/zksync_core/src/state_keeper/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ use crate::{
extractors,
io::{
common::{l1_batch_params, load_pending_batch, poll_iters},
MiniblockSealerHandle, PendingBatchData, StateKeeperIO,
MiniblockParams, MiniblockSealerHandle, PendingBatchData, StateKeeperIO,
},
mempool_actor::l2_tx_filter,
metrics::KEEPER_METRICS,
seal_criteria::{IoSealCriteria, TimeoutSealer},
updates::UpdatesManager,
MempoolGuard,
},
};

use super::MiniblockParams;

/// Mempool-based IO for the state keeper.
/// Receives transactions from the database through the mempool filtering logic.
/// Decides which batch parameters should be used for the new batch.
Expand All @@ -48,6 +47,7 @@ use super::MiniblockParams;
pub(crate) struct MempoolIO<G> {
mempool: MempoolGuard,
pool: ConnectionPool,
timeout_sealer: TimeoutSealer,
filter: L2TxFilter,
current_miniblock_number: MiniblockNumber,
miniblock_sealer_handle: MiniblockSealerHandle,
Expand All @@ -65,8 +65,25 @@ pub(crate) struct MempoolIO<G> {
virtual_blocks_per_miniblock: u32,
}

impl<G> IoSealCriteria for MempoolIO<G>
where
G: L1GasPriceProvider + 'static + Send + Sync,
{
fn should_seal_l1_batch_unconditionally(&mut self, manager: &UpdatesManager) -> bool {
self.timeout_sealer
.should_seal_l1_batch_unconditionally(manager)
}

fn should_seal_miniblock(&mut self, manager: &UpdatesManager) -> bool {
self.timeout_sealer.should_seal_miniblock(manager)
}
}

#[async_trait]
impl<G: L1GasPriceProvider + 'static + Send + Sync> StateKeeperIO for MempoolIO<G> {
impl<G> StateKeeperIO for MempoolIO<G>
where
G: L1GasPriceProvider + 'static + Send + Sync,
{
fn current_l1_batch_number(&self) -> L1BatchNumber {
self.current_l1_batch_number
}
Expand Down Expand Up @@ -423,6 +440,7 @@ impl<G: L1GasPriceProvider> MempoolIO<G> {
Self {
mempool,
pool,
timeout_sealer: TimeoutSealer::new(config),
filter: L2TxFilter::default(),
// ^ Will be initialized properly on the first newly opened batch
current_l1_batch_number: last_sealed_l1_batch_header.number + 1,
Expand Down
3 changes: 2 additions & 1 deletion core/lib/zksync_core/src/state_keeper/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) mod seal_logic;
pub(crate) use self::mempool::MempoolIO;
use super::{
metrics::{MiniblockQueueStage, MINIBLOCK_METRICS},
seal_criteria::IoSealCriteria,
updates::{MiniblockSealCommand, UpdatesManager},
};

Expand Down Expand Up @@ -65,7 +66,7 @@ pub struct MiniblockParams {
/// it's used to receive volatile parameters (such as batch parameters), and also it's used to perform
/// mutable operations on the persistent state (e.g. persist executed batches).
#[async_trait]
pub trait StateKeeperIO: 'static + Send {
pub trait StateKeeperIO: 'static + Send + IoSealCriteria {
/// Returns the number of the currently processed L1 batch.
fn current_l1_batch_number(&self) -> L1BatchNumber;
/// Returns the number of the currently processed miniblock (aka L2 block).
Expand Down
Loading

0 comments on commit a135127

Please sign in to comment.