diff --git a/Cargo.lock b/Cargo.lock index db8d495a547..ac89f7a2963 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8541,6 +8541,7 @@ dependencies = [ "tracing", "vlog", "zksync_circuit_breaker", + "zksync_concurrency", "zksync_config", "zksync_contracts", "zksync_core", diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 84c7ddf17ba..21b45f9b34c 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -8,7 +8,7 @@ use tokio::{ sync::watch, task::{self, JoinHandle}, }; -use zksync_concurrency::{ctx, limiter, scope, time}; +use zksync_concurrency::{ctx, scope}; use zksync_config::configs::{ api::MerkleTreeApiConfig, chain::L1BatchCommitDataGeneratorMode, database::MerkleTreeMode, }; @@ -210,37 +210,41 @@ async fn run_core( .await?; task_handles.push(tokio::spawn({ - let ctx = ctx::root(); - let cfg = config.consensus.clone(); - let mut stop_receiver = stop_receiver.clone(); - let fetcher = consensus::Fetcher { - store: consensus::Store(connection_pool.clone()), - sync_state: sync_state.clone(), - client: Box::new(main_node_client.clone()), - limiter: limiter::Limiter::new( - &ctx, - limiter::Rate { - burst: 10, - refresh: time::Duration::milliseconds(30), - }, - ), + let config = config.consensus.clone(); + let secrets = + config::read_consensus_secrets().context("config::read_consensus_secrets()")?; + let cfg = match (config, secrets) { + (Some(cfg), Some(secrets)) => Some((cfg, secrets)), + (Some(_), None) => { + anyhow::bail!("Consensus config is specified, but secrets are missing") + } + (None, _) => { + // Secrets may be unconditionally embedded in some environments, but they are unused + // unless a consensus config is provided. + None + } }; - let actions = action_queue_sender; + + let pool = connection_pool.clone(); + let sync_state = sync_state.clone(); + let main_node_client = Arc::new(main_node_client.clone()); + let mut stop_receiver = stop_receiver.clone(); async move { - scope::run!(&ctx, |ctx, s| async { - s.spawn_bg(async { - let res = match cfg { - Some(cfg) => { - let secrets = config::read_consensus_secrets() - .context("config::read_consensus_secrets()")? - .context("consensus secrets missing")?; - fetcher.run_p2p(ctx, actions, cfg.p2p(&secrets)?).await - } - None => fetcher.run_centralized(ctx, actions).await, - }; - tracing::info!("Consensus actor stopped"); - res - }); + // We instantiate the root context here, since the consensus task is the only user of the + // structured concurrency framework. + // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, + // not the consensus task itself. There may have been any number of tasks running in the root context, + // but we only need to wait for stop signal once, and it will be propagated to all child contexts. + let ctx = ctx::root(); + scope::run!(&ctx, |ctx, s| async move { + s.spawn_bg(consensus::era::run_fetcher( + ctx, + cfg, + pool, + sync_state, + main_node_client, + action_queue_sender, + )); ctx.wait(stop_receiver.wait_for(|stop| *stop)).await??; Ok(()) }) diff --git a/core/lib/zksync_core/src/consensus/era.rs b/core/lib/zksync_core/src/consensus/era.rs new file mode 100644 index 00000000000..7c20f3b493b --- /dev/null +++ b/core/lib/zksync_core/src/consensus/era.rs @@ -0,0 +1,64 @@ +//! This module provides convenience functions to run consensus components in different modes +//! as expected by the zkSync Era. +//! +//! This module simply glues APIs that are already publicly exposed by the `consensus` module, +//! so in case any custom behavior is needed, these APIs should be used directly. + +use std::sync::Arc; + +use zksync_concurrency::{ctx, limiter, time}; +use zksync_dal::{ConnectionPool, Core}; + +use super::{ + config::{Config, Secrets}, + fetcher::Fetcher, + storage::Store, +}; +use crate::sync_layer::{sync_action::ActionQueueSender, MainNodeClient, SyncState}; + +/// Runs the consensus task in the main node mode. +pub async fn run_main_node( + ctx: &ctx::Ctx, + cfg: super::MainNodeConfig, + pool: ConnectionPool, +) -> anyhow::Result<()> { + // Consensus is a new component. + // For now in case of error we just log it and allow the server + // to continue running. + if let Err(err) = cfg.run(ctx, Store(pool)).await { + tracing::error!(%err, "Consensus actor failed"); + } else { + tracing::info!("Consensus actor stopped"); + } + Ok(()) +} + +/// Runs the consensus in the fetcher mode (e.g. for the external node needs). +/// The fetcher implementation may either be p2p or centralized. +pub async fn run_fetcher( + ctx: &ctx::Ctx, + cfg: Option<(Config, Secrets)>, + pool: ConnectionPool, + sync_state: SyncState, + main_node_client: Arc, + actions: ActionQueueSender, +) -> anyhow::Result<()> { + let fetcher = Fetcher { + store: Store(pool), + sync_state: sync_state.clone(), + client: main_node_client, + limiter: limiter::Limiter::new( + ctx, + limiter::Rate { + burst: 10, + refresh: time::Duration::milliseconds(30), + }, + ), + }; + let res = match cfg { + Some((cfg, secrets)) => fetcher.run_p2p(ctx, actions, cfg.p2p(&secrets)?).await, + None => fetcher.run_centralized(ctx, actions).await, + }; + tracing::info!("Consensus actor stopped"); + res +} diff --git a/core/lib/zksync_core/src/consensus/fetcher.rs b/core/lib/zksync_core/src/consensus/fetcher.rs index dee02faef65..3b267e38a13 100644 --- a/core/lib/zksync_core/src/consensus/fetcher.rs +++ b/core/lib/zksync_core/src/consensus/fetcher.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, limiter, scope, time}; use zksync_consensus_executor as executor; @@ -18,7 +20,7 @@ pub type P2PConfig = executor::Config; pub struct Fetcher { pub store: Store, pub sync_state: SyncState, - pub client: Box, + pub client: Arc, /// Rate limiter for `client.fetch_l2_block` requests. pub limiter: limiter::Limiter, } diff --git a/core/lib/zksync_core/src/consensus/mod.rs b/core/lib/zksync_core/src/consensus/mod.rs index c59f26a472b..282319240c2 100644 --- a/core/lib/zksync_core/src/consensus/mod.rs +++ b/core/lib/zksync_core/src/consensus/mod.rs @@ -2,6 +2,7 @@ #![allow(clippy::redundant_locals)] #![allow(clippy::needless_pass_by_ref_mut)] + use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_consensus_executor as executor; use zksync_consensus_roles::validator; @@ -10,6 +11,7 @@ use zksync_consensus_storage::BlockStore; pub use self::{fetcher::*, storage::Store}; mod config; +pub mod era; mod fetcher; mod storage; #[cfg(test)] diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index 15c85ad636f..433820b673e 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -305,7 +305,7 @@ impl StateKeeper { ) -> anyhow::Result<()> { Fetcher { store: self.store, - client: Box::new(client), + client: Arc::new(client), sync_state: SyncState::default(), limiter: unbounded_limiter(ctx), } @@ -322,7 +322,7 @@ impl StateKeeper { ) -> anyhow::Result<()> { Fetcher { store: self.store, - client: Box::new(client), + client: Arc::new(client), sync_state: SyncState::default(), limiter: unbounded_limiter(ctx), } diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 32947d62904..bc5f7d20503 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -548,18 +548,14 @@ pub async fn initialize_components( let pool = connection_pool.clone(); let mut stop_receiver = stop_receiver.clone(); task_futures.push(tokio::spawn(async move { - scope::run!(&ctx::root(), |ctx, s| async { - s.spawn_bg(async { - // Consensus is a new component. - // For now in case of error we just log it and allow the server - // to continue running. - if let Err(err) = cfg.run(ctx, consensus::Store(pool)).await { - tracing::error!(%err, "Consensus actor failed"); - } else { - tracing::info!("Consensus actor stopped"); - } - Ok(()) - }); + // We instantiate the root context here, since the consensus task is the only user of the + // structured concurrency framework. + // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, + // not the consensus task itself. There may have been any number of tasks running in the root context, + // but we only need to wait for stop signal once, and it will be propagated to all child contexts. + let root_ctx = ctx::root(); + scope::run!(&root_ctx, |ctx, s| async move { + s.spawn_bg(consensus::era::run_main_node(ctx, cfg, pool)); let _ = stop_receiver.wait_for(|stop| *stop).await?; Ok(()) }) diff --git a/core/lib/zksync_core/src/sync_layer/mod.rs b/core/lib/zksync_core/src/sync_layer/mod.rs index c5ebd52ea43..a8e840c9609 100644 --- a/core/lib/zksync_core/src/sync_layer/mod.rs +++ b/core/lib/zksync_core/src/sync_layer/mod.rs @@ -10,7 +10,9 @@ mod sync_state; mod tests; pub use self::{ - client::MainNodeClient, external_io::ExternalIO, sync_action::ActionQueue, + client::MainNodeClient, + external_io::ExternalIO, + sync_action::{ActionQueue, ActionQueueSender}, sync_state::SyncState, }; diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index b40ca50effe..89e2934ad3c 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -26,6 +26,7 @@ zksync_contracts.workspace = true zksync_web3_decl.workspace = true zksync_utils.workspace = true zksync_circuit_breaker.workspace = true +zksync_concurrency.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/core/node/node_framework/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs index abfbcc58704..d84c623a9de 100644 --- a/core/node/node_framework/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -23,13 +23,16 @@ use zksync_core::{ tx_sender::{ApiContracts, TxSenderConfig}, web3::{state::InternalApiConfig, Namespace}, }, + consensus, metadata_calculator::MetadataCalculatorConfig, + temp_config_store::decode_yaml, }; use zksync_env_config::FromEnv; use zksync_node_framework::{ implementations::layers::{ circuit_breaker_checker::CircuitBreakerCheckerLayer, commitment_generator::CommitmentGeneratorLayer, + consensus::{ConsensusLayer, Mode as ConsensusMode}, contract_verification_api::ContractVerificationApiLayer, eth_sender::EthSenderLayer, eth_watch::EthWatchLayer, @@ -339,6 +342,39 @@ impl MainNodeBuilder { Ok(self) } + fn add_consensus_layer(mut self) -> anyhow::Result { + // Copy-pasted from the zksync_server codebase. + + fn read_consensus_secrets() -> anyhow::Result> { + // Read public config. + let Ok(path) = std::env::var("CONSENSUS_SECRETS_PATH") else { + return Ok(None); + }; + let secrets = std::fs::read_to_string(&path).context(path)?; + Ok(Some(decode_yaml(&secrets).context("failed decoding YAML")?)) + } + + fn read_consensus_config() -> anyhow::Result> { + // Read public config. + let Ok(path) = std::env::var("CONSENSUS_CONFIG_PATH") else { + return Ok(None); + }; + let cfg = std::fs::read_to_string(&path).context(path)?; + Ok(Some(decode_yaml(&cfg).context("failed decoding YAML")?)) + } + + let config = read_consensus_config().context("read_consensus_config()")?; + let secrets = read_consensus_secrets().context("read_consensus_secrets()")?; + + self.node.add_layer(ConsensusLayer { + mode: ConsensusMode::Main, + config, + secrets, + }); + + Ok(self) + } + fn build(mut self) -> Result { self.node.build() } @@ -376,6 +412,7 @@ fn main() -> anyhow::Result<()> { .add_house_keeper_layer()? .add_commitment_generator_layer()? .add_contract_verification_api_layer()? + .add_consensus_layer()? .build()? .run()?; diff --git a/core/node/node_framework/src/implementations/layers/consensus.rs b/core/node/node_framework/src/implementations/layers/consensus.rs new file mode 100644 index 00000000000..b5fd528416f --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/consensus.rs @@ -0,0 +1,173 @@ +use std::sync::Arc; + +use anyhow::Context as _; +use zksync_concurrency::{ctx, scope}; +use zksync_core::{ + consensus::{self, MainNodeConfig}, + sync_layer::{ActionQueueSender, MainNodeClient, SyncState}, +}; +use zksync_dal::{ConnectionPool, Core}; + +use crate::{ + implementations::resources::{ + action_queue::ActionQueueSenderResource, main_node_client::MainNodeClientResource, + pools::MasterPoolResource, sync_state::SyncStateResource, + }, + service::{ServiceContext, StopReceiver}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug, Copy, Clone)] +pub enum Mode { + Main, + External, +} + +#[derive(Debug)] +pub struct ConsensusLayer { + pub mode: Mode, + pub config: Option, + pub secrets: Option, +} + +#[async_trait::async_trait] +impl WiringLayer for ConsensusLayer { + fn layer_name(&self) -> &'static str { + "consensus_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let pool = context + .get_resource::() + .await? + .get() + .await?; + + match self.mode { + Mode::Main => { + let config = self.config.ok_or_else(|| { + WiringError::Configuration("Missing public consensus config".to_string()) + })?; + let secrets = self.secrets.ok_or_else(|| { + WiringError::Configuration("Missing private consensus config".to_string()) + })?; + + let main_node_config = config.main_node(&secrets)?; + + let task = MainNodeConsensusTask { + config: main_node_config, + pool, + }; + context.add_task(Box::new(task)); + } + Mode::External => { + let main_node_client = context.get_resource::().await?.0; + let sync_state = context.get_resource::().await?.0; + let action_queue_sender = context + .get_resource::() + .await? + .0 + .take() + .ok_or_else(|| { + WiringError::Configuration( + "Action queue sender is taken by another resource".to_string(), + ) + })?; + + let config = match (self.config, self.secrets) { + (Some(cfg), Some(secrets)) => Some((cfg, secrets)), + (Some(_), None) => { + return Err(WiringError::Configuration( + "Consensus config is specified, but secrets are missing".to_string(), + )); + } + (None, _) => { + // Secrets may be unconditionally embedded in some environments, but they are unused + // unless a consensus config is provided. + None + } + }; + + let task = FetcherTask { + config, + pool, + main_node_client, + sync_state, + action_queue_sender, + }; + context.add_task(Box::new(task)); + } + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct MainNodeConsensusTask { + config: MainNodeConfig, + pool: ConnectionPool, +} + +#[async_trait::async_trait] +impl Task for MainNodeConsensusTask { + fn name(&self) -> &'static str { + "consensus" + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + // We instantiate the root context here, since the consensus task is the only user of the + // structured concurrency framework (`MainNodeConsensusTask` and `FetcherTask` are considered mutually + // exclusive). + // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, + // not the consensus task itself. There may have been any number of tasks running in the root context, + // but we only need to wait for stop signal once, and it will be propagated to all child contexts. + let root_ctx = ctx::root(); + scope::run!(&root_ctx, |ctx, s| async move { + s.spawn_bg(consensus::era::run_main_node(ctx, self.config, self.pool)); + let _ = stop_receiver.0.wait_for(|stop| *stop).await?; + Ok(()) + }) + .await + } +} + +#[derive(Debug)] +pub struct FetcherTask { + config: Option<(consensus::Config, consensus::Secrets)>, + pool: ConnectionPool, + main_node_client: Arc, + sync_state: SyncState, + action_queue_sender: ActionQueueSender, +} + +#[async_trait::async_trait] +impl Task for FetcherTask { + fn name(&self) -> &'static str { + "consensus_fetcher" + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + // We instantiate the root context here, since the consensus task is the only user of the + // structured concurrency framework (`MainNodeConsensusTask` and `FetcherTask` are considered mutually + // exclusive). + // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, + // not the consensus task itself. There may have been any number of tasks running in the root context, + // but we only need to wait for stop signal once, and it will be propagated to all child contexts. + let root_ctx = ctx::root(); + scope::run!(&root_ctx, |ctx, s| async { + s.spawn_bg(zksync_core::consensus::era::run_fetcher( + &root_ctx, + self.config, + self.pool, + self.sync_state, + self.main_node_client, + self.action_queue_sender, + )); + ctx.wait(stop_receiver.0.wait_for(|stop| *stop)).await??; + Ok(()) + }) + .await + .context("consensus actor") + } +} diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 35a80ac6cd0..9a55cf18d24 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -1,5 +1,6 @@ pub mod circuit_breaker_checker; pub mod commitment_generator; +pub mod consensus; pub mod consistency_checker; pub mod contract_verification_api; pub mod eth_sender; diff --git a/core/node/node_framework/src/implementations/resources/action_queue.rs b/core/node/node_framework/src/implementations/resources/action_queue.rs new file mode 100644 index 00000000000..1950236cf84 --- /dev/null +++ b/core/node/node_framework/src/implementations/resources/action_queue.rs @@ -0,0 +1,12 @@ +use zksync_core::sync_layer::ActionQueueSender; + +use crate::resource::{Resource, ResourceId, Unique}; + +#[derive(Debug, Clone)] +pub struct ActionQueueSenderResource(pub Unique); + +impl Resource for ActionQueueSenderResource { + fn resource_id() -> ResourceId { + "external_node/action_queue_sender".into() + } +} diff --git a/core/node/node_framework/src/implementations/resources/main_node_client.rs b/core/node/node_framework/src/implementations/resources/main_node_client.rs new file mode 100644 index 00000000000..dd5dcaf7721 --- /dev/null +++ b/core/node/node_framework/src/implementations/resources/main_node_client.rs @@ -0,0 +1,14 @@ +use std::sync::Arc; + +use zksync_core::sync_layer::MainNodeClient; + +use crate::resource::{Resource, ResourceId}; + +#[derive(Debug, Clone)] +pub struct MainNodeClientResource(pub Arc); + +impl Resource for MainNodeClientResource { + fn resource_id() -> ResourceId { + "external_node/main_node_client".into() + } +} diff --git a/core/node/node_framework/src/implementations/resources/mod.rs b/core/node/node_framework/src/implementations/resources/mod.rs index 2fe2827729f..2225fcd2f4c 100644 --- a/core/node/node_framework/src/implementations/resources/mod.rs +++ b/core/node/node_framework/src/implementations/resources/mod.rs @@ -1,9 +1,11 @@ +pub mod action_queue; pub mod circuit_breakers; pub mod eth_interface; pub mod fee_input; pub mod healthcheck; pub mod l1_batch_commit_data_generator; pub mod l1_tx_params; +pub mod main_node_client; pub mod object_store; pub mod pools; pub mod state_keeper;