Skip to content

Commit

Permalink
make catch-up a managed task
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Mar 16, 2024
1 parent 10da2dc commit f6516bb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
10 changes: 8 additions & 2 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async fn build_state_keeper(
miniblock_sealer_handle: MiniblockSealerHandle,
stop_receiver: watch::Receiver<bool>,
chain_id: L2ChainId,
task_handles: &mut Vec<task::JoinHandle<anyhow::Result<()>>>,
) -> anyhow::Result<ZkSyncStateKeeper> {
// These config values are used on the main node, and depending on these values certain transactions can
// be *rejected* (that is, not included into the block). However, external node only mirrors what the main
Expand All @@ -80,8 +81,12 @@ async fn build_state_keeper(
state_keeper_db_path,
config.optional.enum_index_migration_chunk_size,
);
let stop_receiver_clone = stop_receiver.clone();
tokio::task::spawn(async move { task.run(stop_receiver_clone).await.unwrap() });
let mut stop_receiver_clone = stop_receiver.clone();
task_handles.push(tokio::task::spawn(async move {
let result = task.run(stop_receiver_clone.clone()).await;
stop_receiver_clone.changed().await?;
result
}));
let batch_executor_base: Box<dyn BatchExecutor> = Box::new(MainBatchExecutor::new(
Arc::new(storage_factory),
max_allowed_l2_tx_gas_limit,
Expand Down Expand Up @@ -170,6 +175,7 @@ async fn init_tasks(
miniblock_sealer_handle,
stop_receiver.clone(),
config.remote.l2_chain_id,
task_handles,
)
.await?;

Expand Down
8 changes: 7 additions & 1 deletion core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ async fn add_state_keeper_to_task_futures(
);
task_futures.push(tokio::spawn(miniblock_sealer.run()));

let state_keeper = create_state_keeper(
let (state_keeper, async_catchup_task) = create_state_keeper(
contracts_config,
state_keeper_config,
db_config,
Expand All @@ -848,6 +848,12 @@ async fn add_state_keeper_to_task_futures(
)
.await;

let mut stop_receiver_clone = stop_receiver.clone();
task_futures.push(tokio::task::spawn(async move {
let result = async_catchup_task.run(stop_receiver_clone.clone()).await;
stop_receiver_clone.changed().await?;
result
}));
task_futures.push(tokio::spawn(
state_keeper.run_fee_address_migration(state_keeper_pool),
));
Expand Down
17 changes: 9 additions & 8 deletions core/lib/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ pub(crate) async fn create_state_keeper(
miniblock_sealer_handle: MiniblockSealerHandle,
object_store: Arc<dyn ObjectStore>,
stop_receiver: watch::Receiver<bool>,
) -> ZkSyncStateKeeper {
) -> (ZkSyncStateKeeper, AsyncCatchupTask) {
let (storage_factory, task) = AsyncRocksdbCache::new(
pool.clone(),
db_config.state_keeper_db_path.clone(),
state_keeper_config.enum_index_migration_chunk_size(),
);
let stop_receiver_clone = stop_receiver.clone();
tokio::task::spawn(async move { task.run(stop_receiver_clone).await.unwrap() });
let batch_executor_base = MainBatchExecutor::new(
Arc::new(storage_factory),
state_keeper_config.max_allowed_l2_tx_gas_limit.into(),
Expand All @@ -77,10 +75,13 @@ pub(crate) async fn create_state_keeper(
.expect("Failed initializing main node I/O for state keeper");

let sealer = SequencerSealer::new(state_keeper_config);
ZkSyncStateKeeper::new(
stop_receiver,
Box::new(io),
Box::new(batch_executor_base),
Arc::new(sealer),
(
ZkSyncStateKeeper::new(
stop_receiver,
Box::new(io),
Box::new(batch_executor_base),
Arc::new(sealer),
),
task,
)
}

0 comments on commit f6516bb

Please sign in to comment.