Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node_framework): Support StateKeeper in the framework #1043

Merged
merged 18 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn build_state_keeper(
stop_receiver,
Box::new(io),
batch_executor_base,
Box::new(NoopSealer),
Arc::new(NoopSealer),
))
}

Expand Down
5 changes: 2 additions & 3 deletions core/lib/zksync_core/src/consensus/testonly.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Utilities for testing the consensus module.

use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use anyhow::Context as _;
use rand::{
Expand Down Expand Up @@ -445,7 +444,7 @@ impl StateKeeperRunner {
stop_receiver,
Box::new(io),
Box::new(MockBatchExecutorBuilder),
Box::new(NoopSealer),
Arc::new(NoopSealer),
)
.run(),
);
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/fee_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub trait BatchFeeModelInputProvider: fmt::Debug + 'static + Send + Sync {
/// it explicitly gets the L1 gas price from the provider and uses it to calculate the batch fee input instead of getting
/// it from other node.
#[derive(Debug)]
pub(crate) struct MainNodeFeeInputProvider {
pub struct MainNodeFeeInputProvider {
provider: Arc<GasAdjuster>,
config: FeeModelConfig,
}
Expand All @@ -74,7 +74,7 @@ impl BatchFeeModelInputProvider for MainNodeFeeInputProvider {
}

impl MainNodeFeeInputProvider {
pub(crate) fn new(provider: Arc<GasAdjuster>, config: FeeModelConfig) -> Self {
pub fn new(provider: Arc<GasAdjuster>, config: FeeModelConfig) -> Self {
Self { provider, config }
}
}
Expand Down
28 changes: 16 additions & 12 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,15 +758,15 @@ async fn add_state_keeper_to_task_futures(
.build()
.await
.context("failed to build state_keeper_pool")?;
let next_priority_id = state_keeper_pool
.access_storage()
.await
.unwrap()
.transactions_dal()
.next_priority_id()
.await;
let mempool = MempoolGuard::new(next_priority_id, mempool_config.capacity);
mempool.register_metrics();
let mempool = {
popzxc marked this conversation as resolved.
Show resolved Hide resolved
let mut storage = state_keeper_pool
.access_storage()
.await
.context("Access storage to build mempool")?;
let mempool = MempoolGuard::from_storage(&mut storage, mempool_config.capacity).await;
mempool.register_metrics();
mempool
};

let miniblock_sealer_pool = pool_builder
.build()
Expand Down Expand Up @@ -802,9 +802,13 @@ async fn add_state_keeper_to_task_futures(
.build()
.await
.context("failed to build mempool_fetcher_pool")?;
let mempool_fetcher = MempoolFetcher::new(mempool, batch_fee_input_provider, mempool_config);
let mempool_fetcher_handle =
tokio::spawn(mempool_fetcher.run(mempool_fetcher_pool, stop_receiver));
let mempool_fetcher = MempoolFetcher::new(
mempool,
batch_fee_input_provider,
mempool_config,
mempool_fetcher_pool,
);
let mempool_fetcher_handle = tokio::spawn(mempool_fetcher.run(stop_receiver));
task_futures.push(mempool_fetcher_handle);
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/state_keeper/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
/// Decides which batch parameters should be used for the new batch.
/// This is an IO for the main server application.
#[derive(Debug)]
pub(crate) struct MempoolIO {
pub struct MempoolIO {
mempool: MempoolGuard,
pool: ConnectionPool,
object_store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -437,7 +437,7 @@ async fn sleep_past(timestamp: u64, miniblock: MiniblockNumber) -> u64 {

impl MempoolIO {
#[allow(clippy::too_many_arguments)]
pub(in crate::state_keeper) async fn new(
pub async fn new(
mempool: MempoolGuard,
object_store: Arc<dyn ObjectStore>,
miniblock_sealer_handle: MiniblockSealerHandle,
Expand Down
1 change: 0 additions & 1 deletion core/lib/zksync_core/src/state_keeper/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use zksync_types::{
Transaction,
};

pub(crate) use self::mempool::MempoolIO;
use super::{
metrics::{MiniblockQueueStage, MINIBLOCK_METRICS},
seal_criteria::IoSealCriteria,
Expand Down
5 changes: 3 additions & 2 deletions core/lib/zksync_core/src/state_keeper/keeper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
convert::Infallible,
future::{self, Future},
sync::Arc,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -60,15 +61,15 @@ pub struct ZkSyncStateKeeper {
stop_receiver: watch::Receiver<bool>,
io: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn L1BatchExecutorBuilder>,
popzxc marked this conversation as resolved.
Show resolved Hide resolved
sealer: Box<dyn ConditionalSealer>,
sealer: Arc<dyn ConditionalSealer>,
}

impl ZkSyncStateKeeper {
pub fn new(
stop_receiver: watch::Receiver<bool>,
io: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn L1BatchExecutorBuilder>,
sealer: Box<dyn ConditionalSealer>,
sealer: Arc<dyn ConditionalSealer>,
) -> Self {
Self {
stop_receiver,
Expand Down
43 changes: 27 additions & 16 deletions core/lib/zksync_core/src/state_keeper/mempool_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub async fn l2_tx_filter(
#[derive(Debug)]
pub struct MempoolFetcher {
mempool: MempoolGuard,
pool: ConnectionPool,
batch_fee_input_provider: Arc<dyn BatchFeeModelInputProvider>,
sync_interval: Duration,
sync_batch_size: usize,
Expand All @@ -48,9 +49,11 @@ impl MempoolFetcher {
mempool: MempoolGuard,
batch_fee_input_provider: Arc<dyn BatchFeeModelInputProvider>,
config: &MempoolConfig,
pool: ConnectionPool,
) -> Self {
Self {
mempool,
pool,
batch_fee_input_provider,
sync_interval: config.sync_interval(),
sync_batch_size: config.sync_batch_size,
Expand All @@ -60,12 +63,8 @@ impl MempoolFetcher {
}
}

pub async fn run(
mut self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let mut storage = pool.access_storage_tagged("state_keeper").await?;
pub async fn run(mut self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let mut storage = self.pool.access_storage_tagged("state_keeper").await?;
if let Some(stuck_tx_timeout) = self.stuck_tx_timeout {
let removed_txs = storage
.transactions_dal()
Expand All @@ -87,7 +86,7 @@ impl MempoolFetcher {
break;
}
let latency = KEEPER_METRICS.mempool_sync.start();
let mut storage = pool.access_storage_tagged("state_keeper").await?;
let mut storage = self.pool.access_storage_tagged("state_keeper").await?;
let mempool_info = self.mempool.get_mempool_info();
let protocol_version = pending_protocol_version(&mut storage)
.await
Expand Down Expand Up @@ -230,12 +229,16 @@ mod tests {
let (base_fee, gas_per_pubdata) =
derive_base_fee_and_gas_per_pubdata(fee_input, ProtocolVersionId::latest().into());

let mut fetcher =
MempoolFetcher::new(mempool.clone(), fee_params_provider, &TEST_MEMPOOL_CONFIG);
let mut fetcher = MempoolFetcher::new(
mempool.clone(),
fee_params_provider,
&TEST_MEMPOOL_CONFIG,
pool.clone(),
);
let (tx_hashes_sender, mut tx_hashes_receiver) = mpsc::unbounded_channel();
fetcher.transaction_hashes_sender = tx_hashes_sender;
let (stop_sender, stop_receiver) = watch::channel(false);
let fetcher_task = tokio::spawn(fetcher.run(pool.clone(), stop_receiver));
let fetcher_task = tokio::spawn(fetcher.run(stop_receiver));

// Add a new transaction to the storage.
let transaction = create_l2_transaction(base_fee, gas_per_pubdata);
Expand Down Expand Up @@ -283,10 +286,14 @@ mod tests {
let (base_fee, gas_per_pubdata) =
derive_base_fee_and_gas_per_pubdata(fee_input, ProtocolVersionId::latest().into());

let fetcher =
MempoolFetcher::new(mempool.clone(), fee_params_provider, &TEST_MEMPOOL_CONFIG);
let fetcher = MempoolFetcher::new(
mempool.clone(),
fee_params_provider,
&TEST_MEMPOOL_CONFIG,
pool.clone(),
);
let (stop_sender, stop_receiver) = watch::channel(false);
let fetcher_task = tokio::spawn(fetcher.run(pool.clone(), stop_receiver));
let fetcher_task = tokio::spawn(fetcher.run(stop_receiver));

// Add a transaction with insufficient fee to the storage.
let transaction = create_l2_transaction(base_fee / 2, gas_per_pubdata / 2);
Expand Down Expand Up @@ -319,12 +326,16 @@ mod tests {
let (base_fee, gas_per_pubdata) =
derive_base_fee_and_gas_per_pubdata(fee_input, ProtocolVersionId::latest().into());

let mut fetcher =
MempoolFetcher::new(mempool.clone(), fee_params_provider, &TEST_MEMPOOL_CONFIG);
let mut fetcher = MempoolFetcher::new(
mempool.clone(),
fee_params_provider,
&TEST_MEMPOOL_CONFIG,
pool.clone(),
);
let (tx_hashes_sender, mut tx_hashes_receiver) = mpsc::unbounded_channel();
fetcher.transaction_hashes_sender = tx_hashes_sender;
let (stop_sender, stop_receiver) = watch::channel(false);
let fetcher_task = tokio::spawn(fetcher.run(pool.clone(), stop_receiver));
let fetcher_task = tokio::spawn(fetcher.run(stop_receiver));

// Add a new transaction to the storage.
let transaction = create_l2_transaction(base_fee * 2, gas_per_pubdata * 2);
Expand Down
11 changes: 5 additions & 6 deletions core/lib/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use zksync_config::{
use zksync_dal::ConnectionPool;
use zksync_object_store::ObjectStore;

use self::io::MempoolIO;
pub use self::{
batch_executor::{L1BatchExecutorBuilder, MainBatchExecutorBuilder},
io::{MiniblockSealer, MiniblockSealerHandle},
io::{mempool::MempoolIO, MiniblockSealer, MiniblockSealerHandle, StateKeeperIO},
keeper::ZkSyncStateKeeper,
};
pub(crate) use self::{
mempool_actor::MempoolFetcher, seal_criteria::SequencerSealer, types::MempoolGuard,
mempool_actor::MempoolFetcher,
seal_criteria::SequencerSealer,
types::MempoolGuard,
};
use crate::fee_model::BatchFeeModelInputProvider;

Expand Down Expand Up @@ -75,6 +74,6 @@ pub(crate) async fn create_state_keeper(
stop_receiver,
Box::new(io),
Box::new(batch_executor_base),
Box::new(sealer),
Arc::new(sealer),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl ConditionalSealer for SequencerSealer {
}

impl SequencerSealer {
pub(crate) fn new(config: StateKeeperConfig) -> Self {
pub fn new(config: StateKeeperConfig) -> Self {
let sealers = Self::default_sealers();
Self { config, sealers }
}
Expand Down
3 changes: 2 additions & 1 deletion core/lib/zksync_core/src/state_keeper/tests/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
collections::{HashMap, HashSet, VecDeque},
convert::TryInto,
fmt,
sync::Arc,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -200,7 +201,7 @@ impl TestScenario {
stop_receiver,
Box::new(io),
Box::new(batch_executor_base),
Box::new(sealer),
Arc::new(sealer),
);
let sk_thread = tokio::spawn(sk.run());

Expand Down
11 changes: 10 additions & 1 deletion core/lib/zksync_core/src/state_keeper/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use multivm::interface::VmExecutionResultAndLogs;
use zksync_dal::StorageProcessor;
use zksync_mempool::{L2TxFilter, MempoolInfo, MempoolStore};
use zksync_types::{
block::BlockGasCount, tx::ExecutionMetrics, Address, Nonce, PriorityOpId, Transaction,
Expand All @@ -16,7 +17,15 @@ use crate::gas_tracker::{gas_count_from_metrics, gas_count_from_tx_and_metrics};
pub struct MempoolGuard(Arc<Mutex<MempoolStore>>);

impl MempoolGuard {
pub fn new(next_priority_id: PriorityOpId, capacity: u64) -> Self {
pub async fn from_storage(storage_processor: &mut StorageProcessor<'_>, capacity: u64) -> Self {
let next_priority_id = storage_processor
.transactions_dal()
.next_priority_id()
.await;
Self::new(next_priority_id, capacity)
}

pub(super) fn new(next_priority_id: PriorityOpId, capacity: u64) -> Self {
let store = MempoolStore::new(next_priority_id, capacity);
Self(Arc::new(Mutex::new(store)))
}
Expand Down
3 changes: 2 additions & 1 deletion core/lib/zksync_core/src/sync_layer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{
collections::{HashMap, VecDeque},
iter,
sync::Arc,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -94,7 +95,7 @@ impl StateKeeperHandles {
stop_receiver,
Box::new(io),
Box::new(batch_executor_base),
Box::new(NoopSealer),
Arc::new(NoopSealer),
);
Self {
stop_sender,
Expand Down
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ zksync_config = { path = "../../lib/config" }
zksync_object_store = { path = "../../lib/object_store" }
zksync_core = { path = "../../lib/zksync_core" }
zksync_storage = { path = "../../lib/storage" }
zksync_eth_client = { path = "../../lib/eth_client" }

tracing = "0.1"
thiserror = "1"
Expand Down
Loading
Loading