Skip to content

Commit

Permalink
feat(node-framework): Add consensus support (#1546)
Browse files Browse the repository at this point in the history
## What ❔

- Moves consensus-related logic to the consensus module (so that all the
logic in its own dedicated task).
- Creates a layer for the consensus task.

## Why ❔

mhm

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `zk spellcheck`.
- [ ] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
popzxc committed Apr 3, 2024
1 parent 7953897 commit 27fe475
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 34 additions & 30 deletions core/bin/external_node/src/main.rs
Expand Up @@ -8,7 +8,7 @@ use tokio::{
sync::watch,
task::{self, JoinHandle},
};
use zksync_concurrency::{ctx, limiter, scope, time};
use zksync_concurrency::{ctx, scope};
use zksync_config::configs::{
api::MerkleTreeApiConfig, chain::L1BatchCommitDataGeneratorMode, database::MerkleTreeMode,
};
Expand Down Expand Up @@ -210,37 +210,41 @@ async fn run_core(
.await?;

task_handles.push(tokio::spawn({
let ctx = ctx::root();
let cfg = config.consensus.clone();
let mut stop_receiver = stop_receiver.clone();
let fetcher = consensus::Fetcher {
store: consensus::Store(connection_pool.clone()),
sync_state: sync_state.clone(),
client: Box::new(main_node_client.clone()),
limiter: limiter::Limiter::new(
&ctx,
limiter::Rate {
burst: 10,
refresh: time::Duration::milliseconds(30),
},
),
let config = config.consensus.clone();
let secrets =
config::read_consensus_secrets().context("config::read_consensus_secrets()")?;
let cfg = match (config, secrets) {
(Some(cfg), Some(secrets)) => Some((cfg, secrets)),
(Some(_), None) => {
anyhow::bail!("Consensus config is specified, but secrets are missing")
}
(None, _) => {
// Secrets may be unconditionally embedded in some environments, but they are unused
// unless a consensus config is provided.
None
}
};
let actions = action_queue_sender;

let pool = connection_pool.clone();
let sync_state = sync_state.clone();
let main_node_client = Arc::new(main_node_client.clone());
let mut stop_receiver = stop_receiver.clone();
async move {
scope::run!(&ctx, |ctx, s| async {
s.spawn_bg(async {
let res = match cfg {
Some(cfg) => {
let secrets = config::read_consensus_secrets()
.context("config::read_consensus_secrets()")?
.context("consensus secrets missing")?;
fetcher.run_p2p(ctx, actions, cfg.p2p(&secrets)?).await
}
None => fetcher.run_centralized(ctx, actions).await,
};
tracing::info!("Consensus actor stopped");
res
});
// We instantiate the root context here, since the consensus task is the only user of the
// structured concurrency framework.
// Note, however, that awaiting for the `stop_receiver` is related to the root context behavior,
// not the consensus task itself. There may have been any number of tasks running in the root context,
// but we only need to wait for stop signal once, and it will be propagated to all child contexts.
let ctx = ctx::root();
scope::run!(&ctx, |ctx, s| async move {
s.spawn_bg(consensus::era::run_fetcher(
ctx,
cfg,
pool,
sync_state,
main_node_client,
action_queue_sender,
));
ctx.wait(stop_receiver.wait_for(|stop| *stop)).await??;
Ok(())
})
Expand Down
64 changes: 64 additions & 0 deletions core/lib/zksync_core/src/consensus/era.rs
@@ -0,0 +1,64 @@
//! This module provides convenience functions to run consensus components in different modes
//! as expected by the zkSync Era.
//!
//! This module simply glues APIs that are already publicly exposed by the `consensus` module,
//! so in case any custom behavior is needed, these APIs should be used directly.

use std::sync::Arc;

use zksync_concurrency::{ctx, limiter, time};
use zksync_dal::{ConnectionPool, Core};

use super::{
config::{Config, Secrets},
fetcher::Fetcher,
storage::Store,
};
use crate::sync_layer::{sync_action::ActionQueueSender, MainNodeClient, SyncState};

/// Runs the consensus task in the main node mode.
pub async fn run_main_node(
ctx: &ctx::Ctx,
cfg: super::MainNodeConfig,
pool: ConnectionPool<Core>,
) -> anyhow::Result<()> {
// Consensus is a new component.
// For now in case of error we just log it and allow the server
// to continue running.
if let Err(err) = cfg.run(ctx, Store(pool)).await {
tracing::error!(%err, "Consensus actor failed");
} else {
tracing::info!("Consensus actor stopped");
}
Ok(())
}

/// Runs the consensus in the fetcher mode (e.g. for the external node needs).
/// The fetcher implementation may either be p2p or centralized.
pub async fn run_fetcher(
ctx: &ctx::Ctx,
cfg: Option<(Config, Secrets)>,
pool: ConnectionPool<Core>,
sync_state: SyncState,
main_node_client: Arc<dyn MainNodeClient>,
actions: ActionQueueSender,
) -> anyhow::Result<()> {
let fetcher = Fetcher {
store: Store(pool),
sync_state: sync_state.clone(),
client: main_node_client,
limiter: limiter::Limiter::new(
ctx,
limiter::Rate {
burst: 10,
refresh: time::Duration::milliseconds(30),
},
),
};
let res = match cfg {
Some((cfg, secrets)) => fetcher.run_p2p(ctx, actions, cfg.p2p(&secrets)?).await,
None => fetcher.run_centralized(ctx, actions).await,
};
tracing::info!("Consensus actor stopped");
res
}
4 changes: 3 additions & 1 deletion core/lib/zksync_core/src/consensus/fetcher.rs
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, limiter, scope, time};
use zksync_consensus_executor as executor;
Expand All @@ -18,7 +20,7 @@ pub type P2PConfig = executor::Config;
pub struct Fetcher {
pub store: Store,
pub sync_state: SyncState,
pub client: Box<dyn MainNodeClient>,
pub client: Arc<dyn MainNodeClient>,
/// Rate limiter for `client.fetch_l2_block` requests.
pub limiter: limiter::Limiter,
}
Expand Down
2 changes: 2 additions & 0 deletions core/lib/zksync_core/src/consensus/mod.rs
Expand Up @@ -2,6 +2,7 @@

#![allow(clippy::redundant_locals)]
#![allow(clippy::needless_pass_by_ref_mut)]

use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
Expand All @@ -10,6 +11,7 @@ use zksync_consensus_storage::BlockStore;
pub use self::{fetcher::*, storage::Store};

mod config;
pub mod era;
mod fetcher;
mod storage;
#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/consensus/testonly.rs
Expand Up @@ -305,7 +305,7 @@ impl StateKeeper {
) -> anyhow::Result<()> {
Fetcher {
store: self.store,
client: Box::new(client),
client: Arc::new(client),
sync_state: SyncState::default(),
limiter: unbounded_limiter(ctx),
}
Expand All @@ -322,7 +322,7 @@ impl StateKeeper {
) -> anyhow::Result<()> {
Fetcher {
store: self.store,
client: Box::new(client),
client: Arc::new(client),
sync_state: SyncState::default(),
limiter: unbounded_limiter(ctx),
}
Expand Down
20 changes: 8 additions & 12 deletions core/lib/zksync_core/src/lib.rs
Expand Up @@ -548,18 +548,14 @@ pub async fn initialize_components(
let pool = connection_pool.clone();
let mut stop_receiver = stop_receiver.clone();
task_futures.push(tokio::spawn(async move {
scope::run!(&ctx::root(), |ctx, s| async {
s.spawn_bg(async {
// Consensus is a new component.
// For now in case of error we just log it and allow the server
// to continue running.
if let Err(err) = cfg.run(ctx, consensus::Store(pool)).await {
tracing::error!(%err, "Consensus actor failed");
} else {
tracing::info!("Consensus actor stopped");
}
Ok(())
});
// We instantiate the root context here, since the consensus task is the only user of the
// structured concurrency framework.
// Note, however, that awaiting for the `stop_receiver` is related to the root context behavior,
// not the consensus task itself. There may have been any number of tasks running in the root context,
// but we only need to wait for stop signal once, and it will be propagated to all child contexts.
let root_ctx = ctx::root();
scope::run!(&root_ctx, |ctx, s| async move {
s.spawn_bg(consensus::era::run_main_node(ctx, cfg, pool));
let _ = stop_receiver.wait_for(|stop| *stop).await?;
Ok(())
})
Expand Down
4 changes: 3 additions & 1 deletion core/lib/zksync_core/src/sync_layer/mod.rs
Expand Up @@ -10,7 +10,9 @@ mod sync_state;
mod tests;

pub use self::{
client::MainNodeClient, external_io::ExternalIO, sync_action::ActionQueue,
client::MainNodeClient,
external_io::ExternalIO,
sync_action::{ActionQueue, ActionQueueSender},
sync_state::SyncState,
};

Expand Down
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Expand Up @@ -26,6 +26,7 @@ zksync_contracts.workspace = true
zksync_web3_decl.workspace = true
zksync_utils.workspace = true
zksync_circuit_breaker.workspace = true
zksync_concurrency.workspace = true

tracing.workspace = true
thiserror.workspace = true
Expand Down
37 changes: 37 additions & 0 deletions core/node/node_framework/examples/main_node.rs
Expand Up @@ -23,13 +23,16 @@ use zksync_core::{
tx_sender::{ApiContracts, TxSenderConfig},
web3::{state::InternalApiConfig, Namespace},
},
consensus,
metadata_calculator::MetadataCalculatorConfig,
temp_config_store::decode_yaml,
};
use zksync_env_config::FromEnv;
use zksync_node_framework::{
implementations::layers::{
circuit_breaker_checker::CircuitBreakerCheckerLayer,
commitment_generator::CommitmentGeneratorLayer,
consensus::{ConsensusLayer, Mode as ConsensusMode},
contract_verification_api::ContractVerificationApiLayer,
eth_sender::EthSenderLayer,
eth_watch::EthWatchLayer,
Expand Down Expand Up @@ -339,6 +342,39 @@ impl MainNodeBuilder {
Ok(self)
}

fn add_consensus_layer(mut self) -> anyhow::Result<Self> {
// Copy-pasted from the zksync_server codebase.

fn read_consensus_secrets() -> anyhow::Result<Option<consensus::Secrets>> {
// Read public config.
let Ok(path) = std::env::var("CONSENSUS_SECRETS_PATH") else {
return Ok(None);
};
let secrets = std::fs::read_to_string(&path).context(path)?;
Ok(Some(decode_yaml(&secrets).context("failed decoding YAML")?))
}

fn read_consensus_config() -> anyhow::Result<Option<consensus::Config>> {
// Read public config.
let Ok(path) = std::env::var("CONSENSUS_CONFIG_PATH") else {
return Ok(None);
};
let cfg = std::fs::read_to_string(&path).context(path)?;
Ok(Some(decode_yaml(&cfg).context("failed decoding YAML")?))
}

let config = read_consensus_config().context("read_consensus_config()")?;
let secrets = read_consensus_secrets().context("read_consensus_secrets()")?;

self.node.add_layer(ConsensusLayer {
mode: ConsensusMode::Main,
config,
secrets,
});

Ok(self)
}

fn build(mut self) -> Result<ZkStackService, ZkStackServiceError> {
self.node.build()
}
Expand Down Expand Up @@ -376,6 +412,7 @@ fn main() -> anyhow::Result<()> {
.add_house_keeper_layer()?
.add_commitment_generator_layer()?
.add_contract_verification_api_layer()?
.add_consensus_layer()?
.build()?
.run()?;

Expand Down

0 comments on commit 27fe475

Please sign in to comment.