From e1990ba271dc4edeff775878f19edf97e89b0a6f Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Sat, 16 Mar 2024 06:33:41 +0100 Subject: [PATCH] make catch-up a managed task --- core/bin/external_node/src/main.rs | 6 +++++- core/lib/zksync_core/src/lib.rs | 6 +++++- core/lib/zksync_core/src/state_keeper/mod.rs | 17 +++++++++-------- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 8ffa3dd741c..0c58e170dc0 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -65,6 +65,7 @@ async fn build_state_keeper( miniblock_sealer_handle: MiniblockSealerHandle, stop_receiver: watch::Receiver, chain_id: L2ChainId, + task_handles: &mut Vec>>, ) -> anyhow::Result { // These config values are used on the main node, and depending on these values certain transactions can // be *rejected* (that is, not included into the block). However, external node only mirrors what the main @@ -81,7 +82,9 @@ async fn build_state_keeper( config.optional.enum_index_migration_chunk_size, ); let stop_receiver_clone = stop_receiver.clone(); - tokio::task::spawn(async move { task.run(stop_receiver_clone).await.unwrap() }); + task_handles.push(tokio::task::spawn(async move { + task.run(stop_receiver_clone).await + })); let batch_executor_base: Box = Box::new(MainBatchExecutor::new( Arc::new(storage_factory), max_allowed_l2_tx_gas_limit, @@ -170,6 +173,7 @@ async fn init_tasks( miniblock_sealer_handle, stop_receiver.clone(), config.remote.l2_chain_id, + task_handles, ) .await?; diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 988631f5717..c6d5e9661e2 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -833,7 +833,7 @@ async fn add_state_keeper_to_task_futures( ); task_futures.push(tokio::spawn(miniblock_sealer.run())); - let state_keeper = create_state_keeper( + let (state_keeper, async_catchup_task) = create_state_keeper( contracts_config, state_keeper_config, db_config, @@ -848,6 +848,10 @@ async fn add_state_keeper_to_task_futures( ) .await; + let stop_receiver_clone = stop_receiver.clone(); + task_futures.push(tokio::task::spawn(async move { + async_catchup_task.run(stop_receiver_clone).await + })); task_futures.push(tokio::spawn( state_keeper.run_fee_address_migration(state_keeper_pool), )); diff --git a/core/lib/zksync_core/src/state_keeper/mod.rs b/core/lib/zksync_core/src/state_keeper/mod.rs index 6e947289b5a..1a423168d45 100644 --- a/core/lib/zksync_core/src/state_keeper/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/mod.rs @@ -45,14 +45,12 @@ pub(crate) async fn create_state_keeper( miniblock_sealer_handle: MiniblockSealerHandle, object_store: Arc, stop_receiver: watch::Receiver, -) -> ZkSyncStateKeeper { +) -> (ZkSyncStateKeeper, AsyncCatchupTask) { let (storage_factory, task) = AsyncRocksdbCache::new( pool.clone(), db_config.state_keeper_db_path.clone(), state_keeper_config.enum_index_migration_chunk_size(), ); - let stop_receiver_clone = stop_receiver.clone(); - tokio::task::spawn(async move { task.run(stop_receiver_clone).await.unwrap() }); let batch_executor_base = MainBatchExecutor::new( Arc::new(storage_factory), state_keeper_config.max_allowed_l2_tx_gas_limit.into(), @@ -77,10 +75,13 @@ pub(crate) async fn create_state_keeper( .expect("Failed initializing main node I/O for state keeper"); let sealer = SequencerSealer::new(state_keeper_config); - ZkSyncStateKeeper::new( - stop_receiver, - Box::new(io), - Box::new(batch_executor_base), - Arc::new(sealer), + ( + ZkSyncStateKeeper::new( + stop_receiver, + Box::new(io), + Box::new(batch_executor_base), + Arc::new(sealer), + ), + task, ) }