diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 616e513749b..b7e95bb28eb 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -261,6 +261,19 @@ pub struct OptionalENConfig { pub l1_batch_commit_data_generator_mode: L1BatchCommitDataGeneratorMode, } +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ApiComponentConfig { + /// Address of the tree API used by this EN in case it does not have a + /// local tree component running and in this case needs to send requests + /// to some external tree API. + pub tree_api_url: Option, +} + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct TreeComponentConfig { + pub api_port: Option, +} + impl OptionalENConfig { const fn default_filters_limit() -> usize { 10_000 @@ -554,6 +567,8 @@ pub struct ExternalNodeConfig { pub optional: OptionalENConfig, pub remote: RemoteENConfig, pub consensus: Option, + pub api_component: ApiComponentConfig, + pub tree_component: TreeComponentConfig, } impl ExternalNodeConfig { @@ -568,6 +583,14 @@ impl ExternalNodeConfig { .from_env::() .context("could not load external node config")?; + let api_component_config = envy::prefixed("EN_API") + .from_env::() + .context("could not load external node config")?; + + let tree_component_config = envy::prefixed("EN_TREE") + .from_env::() + .context("could not load external node config")?; + let client = HttpClientBuilder::default() .build(required.main_node_url()?) .expect("Unable to build HTTP client for main node"); @@ -619,6 +642,8 @@ impl ExternalNodeConfig { required, optional, consensus: read_consensus_config().context("read_consensus_config()")?, + tree_component: tree_component_config, + api_component: api_component_config, }) } } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 8ba6826d35d..84c7ddf17ba 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -1,17 +1,22 @@ -use std::{future, sync::Arc, time::Duration}; +use std::{collections::HashSet, future, net::Ipv4Addr, str::FromStr, sync::Arc, time::Duration}; use anyhow::Context as _; use clap::Parser; use metrics::EN_METRICS; use prometheus_exporter::PrometheusExporterConfig; -use tokio::{sync::watch, task}; -use zksync_basic_types::L2ChainId; +use tokio::{ + sync::watch, + task::{self, JoinHandle}, +}; use zksync_concurrency::{ctx, limiter, scope, time}; -use zksync_config::configs::{chain::L1BatchCommitDataGeneratorMode, database::MerkleTreeMode}; +use zksync_config::configs::{ + api::MerkleTreeApiConfig, chain::L1BatchCommitDataGeneratorMode, database::MerkleTreeMode, +}; use zksync_core::{ api_server::{ execution_sandbox::VmConcurrencyLimiter, healthcheck::HealthCheckHandle, + tree::{TreeApiClient, TreeApiHttpClient}, tx_sender::{proxy::TxProxy, ApiContracts, TxSenderBuilder}, web3::{ApiBuilder, Namespace}, }, @@ -25,8 +30,7 @@ use zksync_core::{ }, l1_gas_price::MainNodeFeeParamsFetcher, metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig}, - reorg_detector, - reorg_detector::ReorgDetector, + reorg_detector::{self, ReorgDetector}, setup_sigint_handler, state_keeper::{ seal_criteria::NoopSealer, AsyncRocksdbCache, BatchExecutor, MainBatchExecutor, @@ -39,11 +43,14 @@ use zksync_core::{ utils::ensure_l1_batch_commit_data_generation_mode, }; use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core, CoreDal}; -use zksync_db_connection::healthcheck::ConnectionPoolHealthCheck; +use zksync_db_connection::{ + connection_pool::ConnectionPoolBuilder, healthcheck::ConnectionPoolHealthCheck, +}; use zksync_eth_client::clients::QueryClient; use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck}; use zksync_state::PostgresStorageCaches; use zksync_storage::RocksDB; +use zksync_types::L2ChainId; use zksync_utils::wait_for_tasks::ManagedTasks; use zksync_web3_decl::jsonrpsee::http_client::HttpClient; @@ -114,25 +121,61 @@ async fn build_state_keeper( )) } -async fn init_tasks( +async fn run_tree( + task_futures: &mut Vec>>, + config: &ExternalNodeConfig, + api_config: Option<&MerkleTreeApiConfig>, + app_health: &AppHealthCheck, + stop_receiver: watch::Receiver, + tree_pool: ConnectionPool, +) -> anyhow::Result> { + let metadata_calculator_config = MetadataCalculatorConfig { + db_path: config.required.merkle_tree_path.clone(), + mode: MerkleTreeMode::Lightweight, + delay_interval: config.optional.metadata_calculator_delay(), + max_l1_batches_per_iter: config.optional.max_l1_batches_per_tree_iter, + multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size, + block_cache_capacity: config.optional.merkle_tree_block_cache_size(), + memtable_capacity: config.optional.merkle_tree_memtable_capacity(), + stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(), + }; + let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None) + .await + .context("failed initializing metadata calculator")?; + let tree_reader = Arc::new(metadata_calculator.tree_reader()); + app_health.insert_component(metadata_calculator.tree_health_check()); + + if let Some(api_config) = api_config { + let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into(); + let tree_reader = metadata_calculator.tree_reader(); + let stop_receiver = stop_receiver.clone(); + task_futures.push(tokio::spawn(async move { + tree_reader + .wait() + .await + .run_api_server(address, stop_receiver) + .await + })); + } + + let tree_handle = task::spawn(metadata_calculator.run(tree_pool, stop_receiver)); + + task_futures.push(tree_handle); + Ok(tree_reader) +} + +#[allow(clippy::too_many_arguments)] +async fn run_core( config: &ExternalNodeConfig, connection_pool: ConnectionPool, main_node_client: HttpClient, task_handles: &mut Vec>>, app_health: &AppHealthCheck, stop_receiver: watch::Receiver, -) -> anyhow::Result<()> { - let release_manifest: serde_json::Value = serde_json::from_str(RELEASE_MANIFEST) - .expect("release manifest is a valid json document; qed"); - let release_manifest_version = release_manifest["core"].as_str().expect( - "a release-please manifest with \"core\" version field was specified at build time; qed.", - ); - - let version = semver::Version::parse(release_manifest_version) - .expect("version in manifest is a correct semver format; qed"); + fee_params_fetcher: Arc, + singleton_pool_builder: &ConnectionPoolBuilder, +) -> anyhow::Result { // Create components. - let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(main_node_client.clone())); - let sync_state = SyncState::default(); app_health.insert_custom_component(Arc::new(sync_state.clone())); let (action_queue_sender, action_queue) = ActionQueue::new(); @@ -143,23 +186,6 @@ async fn init_tasks( config.optional.miniblock_seal_queue_capacity, ); task_handles.push(tokio::spawn(miniblock_sealer.run())); - let pool = connection_pool.clone(); - task_handles.push(tokio::spawn(async move { - loop { - let protocol_version = pool - .connection() - .await - .unwrap() - .protocol_versions_dal() - .last_used_version_id() - .await - .map(|version| version as u16); - - EN_METRICS.version[&(format!("{}", version), protocol_version)].set(1); - - tokio::time::sleep(Duration::from_secs(10)).await; - } - })); let mut persistence = persistence.with_tx_insertion(); if !config.optional.protective_reads_persistence_enabled { @@ -235,34 +261,23 @@ async fn init_tasks( } })); - let singleton_pool_builder = ConnectionPool::::singleton(&config.postgres.database_url); - - let metadata_calculator_config = MetadataCalculatorConfig { - db_path: config.required.merkle_tree_path.clone(), - mode: MerkleTreeMode::Lightweight, - delay_interval: config.optional.metadata_calculator_delay(), - max_l1_batches_per_iter: config.optional.max_l1_batches_per_tree_iter, - multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size, - block_cache_capacity: config.optional.merkle_tree_block_cache_size(), - memtable_capacity: config.optional.merkle_tree_memtable_capacity(), - stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(), - }; - let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None) - .await - .context("failed initializing metadata calculator")?; - app_health.insert_component(metadata_calculator.tree_health_check()); - + let fee_address_migration_handle = + task::spawn(state_keeper.run_fee_address_migration(connection_pool.clone())); + let sk_handle = task::spawn(state_keeper.run()); + let fee_params_fetcher_handle = + tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone())); let remote_diamond_proxy_addr = config.remote.diamond_proxy_addr; let diamond_proxy_addr = if let Some(addr) = config.optional.contracts_diamond_proxy_addr { anyhow::ensure!( addr == remote_diamond_proxy_addr, - "Diamond proxy address {addr:?} specified in config doesn't match one returned by main node \ - ({remote_diamond_proxy_addr:?})" + "Diamond proxy address {addr:?} specified in config doesn't match one returned \ + by main node ({remote_diamond_proxy_addr:?})" ); addr } else { tracing::info!( - "Diamond proxy address is not specified in config; will use address returned by main node: {remote_diamond_proxy_addr:?}" + "Diamond proxy address is not specified in config; will use address \ + returned by main node: {remote_diamond_proxy_addr:?}" ); remote_diamond_proxy_addr }; @@ -314,15 +329,6 @@ async fn init_tasks( ); app_health.insert_component(batch_status_updater.health_check()); - // Run the components. - let tree_stop_receiver = stop_receiver.clone(); - let tree_pool = singleton_pool_builder - .build() - .await - .context("failed to build a tree_pool")?; - let tree_reader = Arc::new(metadata_calculator.tree_reader()); - let tree_handle = task::spawn(metadata_calculator.run(tree_pool, tree_stop_receiver)); - let commitment_generator_pool = singleton_pool_builder .build() .await @@ -332,11 +338,52 @@ async fn init_tasks( let commitment_generator_handle = tokio::spawn(commitment_generator.run(stop_receiver.clone())); let updater_handle = task::spawn(batch_status_updater.run(stop_receiver.clone())); - let fee_address_migration_handle = - task::spawn(state_keeper.run_fee_address_migration(connection_pool.clone())); - let sk_handle = task::spawn(state_keeper.run()); - let fee_params_fetcher_handle = - tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone())); + + task_handles.extend([ + sk_handle, + fee_address_migration_handle, + fee_params_fetcher_handle, + consistency_checker_handle, + commitment_generator_handle, + updater_handle, + ]); + + Ok(sync_state) +} + +#[allow(clippy::too_many_arguments)] +async fn run_api( + config: &ExternalNodeConfig, + app_health: &AppHealthCheck, + connection_pool: ConnectionPool, + stop_receiver: watch::Receiver, + sync_state: SyncState, + tree_reader: Option>, + task_futures: &mut Vec>>, + main_node_client: HttpClient, + singleton_pool_builder: ConnectionPoolBuilder, + fee_params_fetcher: Arc, + components: &HashSet, +) -> anyhow::Result<()> { + let tree_reader = match tree_reader { + Some(tree_reader) => { + if let Some(url) = &config.api_component.tree_api_url { + tracing::warn!( + "Tree component is run locally; the specified tree API URL {url} is ignored" + ); + } + + tree_reader + } + None => { + let tree_api_url = &config + .api_component + .tree_api_url + .as_ref() + .context("Need to have a configured tree api url")?; + Arc::new(TreeApiHttpClient::new(tree_api_url)) + } + }; let (tx_sender, vm_barrier, cache_update_handle, proxy_cache_updater_handle) = { let tx_proxy = TxProxy::new(main_node_client); @@ -393,25 +440,30 @@ async fn init_tasks( ) }; - let http_server_handles = - ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone()) + if components.contains(&Component::HttpApi) { + let builder = ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone()) .http(config.required.http_port) .with_filter_limit(config.optional.filters_limit) .with_batch_request_size_limit(config.optional.max_batch_request_size) .with_response_body_size_limit(config.optional.max_response_body_size()) .with_tx_sender(tx_sender.clone()) .with_vm_barrier(vm_barrier.clone()) - .with_sync_state(sync_state.clone()) .with_tree_api(tree_reader.clone()) - .enable_api_namespaces(config.optional.api_namespaces()) + .with_sync_state(sync_state.clone()) + .enable_api_namespaces(config.optional.api_namespaces()); + + let http_server_handles = builder .build() .context("failed to build HTTP JSON-RPC server")? .run(stop_receiver.clone()) .await .context("Failed initializing HTTP JSON-RPC server")?; + app_health.insert_component(http_server_handles.health_check); + task_futures.extend(http_server_handles.tasks); + } - let ws_server_handles = - ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone()) + if components.contains(&Component::WsApi) { + let builder = ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone()) .ws(config.required.ws_port) .with_filter_limit(config.optional.filters_limit) .with_subscriptions_limit(config.optional.subscriptions_limit) @@ -420,17 +472,147 @@ async fn init_tasks( .with_polling_interval(config.optional.polling_interval()) .with_tx_sender(tx_sender) .with_vm_barrier(vm_barrier) - .with_sync_state(sync_state) .with_tree_api(tree_reader) - .enable_api_namespaces(config.optional.api_namespaces()) + .with_sync_state(sync_state) + .enable_api_namespaces(config.optional.api_namespaces()); + + let ws_server_handles = builder .build() .context("failed to build WS JSON-RPC server")? .run(stop_receiver.clone()) .await .context("Failed initializing WS JSON-RPC server")?; + app_health.insert_component(ws_server_handles.health_check); + task_futures.extend(ws_server_handles.tasks); + } + + task_futures.extend(cache_update_handle); + task_futures.push(proxy_cache_updater_handle); + + Ok(()) +} + +async fn init_tasks( + config: &ExternalNodeConfig, + connection_pool: ConnectionPool, + main_node_client: HttpClient, + task_handles: &mut Vec>>, + app_health: &AppHealthCheck, + stop_receiver: watch::Receiver, + components: &HashSet, +) -> anyhow::Result<()> { + let release_manifest: serde_json::Value = serde_json::from_str(RELEASE_MANIFEST) + .context("releuse manifest is a valid json document")?; + let release_manifest_version = release_manifest["core"].as_str().context( + "a release-please manifest with \"core\" version field was specified at build time", + )?; - app_health.insert_component(ws_server_handles.health_check); - app_health.insert_component(http_server_handles.health_check); + let version = semver::Version::parse(release_manifest_version) + .context("version in manifest is a correct semver format")?; + let pool = connection_pool.clone(); + task_handles.push(tokio::spawn(async move { + loop { + let protocol_version = pool + .connection() + .await + .unwrap() + .protocol_versions_dal() + .last_used_version_id() + .await + .map(|version| version as u16); + + EN_METRICS.version[&(format!("{}", version), protocol_version)].set(1); + + tokio::time::sleep(Duration::from_secs(10)).await; + } + })); + + let singleton_pool_builder = ConnectionPool::singleton(&config.postgres.database_url); + + // Run the components. + let tree_pool = singleton_pool_builder + .build() + .await + .context("failed to build a tree_pool")?; + + if !components.contains(&Component::Tree) { + anyhow::ensure!( + !components.contains(&Component::TreeApi), + "Merkle tree API cannot be started without a tree component" + ); + } + // Create a tree reader. If the list of requested components has the tree itself, then + // we can get this tree's reader and use it right away. Otherwise, if configuration has + // specified address of another instance hosting tree API, create a tree reader to that + // remote API. A tree reader is necessary for `zks_getProof` method to work. + let tree_reader: Option> = if components.contains(&Component::Tree) { + let tree_api_config = if components.contains(&Component::TreeApi) { + Some(MerkleTreeApiConfig { + port: config + .tree_component + .api_port + .context("should contain tree api port")?, + }) + } else { + None + }; + Some( + run_tree( + task_handles, + config, + tree_api_config.as_ref(), + app_health, + stop_receiver.clone(), + tree_pool, + ) + .await?, + ) + } else { + None + }; + + let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(main_node_client.clone())); + + let sync_state = if components.contains(&Component::Core) { + run_core( + config, + connection_pool.clone(), + main_node_client.clone(), + task_handles, + app_health, + stop_receiver.clone(), + fee_params_fetcher.clone(), + &singleton_pool_builder, + ) + .await? + } else { + let sync_state = SyncState::default(); + + task_handles.push(tokio::spawn(sync_state.clone().run_updater( + connection_pool.clone(), + main_node_client.clone(), + stop_receiver.clone(), + ))); + + sync_state + }; + + if components.contains(&Component::HttpApi) || components.contains(&Component::WsApi) { + run_api( + config, + app_health, + connection_pool, + stop_receiver.clone(), + sync_state, + tree_reader, + task_handles, + main_node_client, + singleton_pool_builder, + fee_params_fetcher.clone(), + components, + ) + .await?; + } if let Some(port) = config.optional.prometheus_port { let (prometheus_health_check, prometheus_health_updater) = @@ -446,20 +628,6 @@ async fn init_tasks( })); } - task_handles.extend(http_server_handles.tasks); - task_handles.extend(ws_server_handles.tasks); - task_handles.extend(cache_update_handle); - task_handles.push(proxy_cache_updater_handle); - task_handles.extend([ - sk_handle, - fee_address_migration_handle, - updater_handle, - tree_handle, - consistency_checker_handle, - fee_params_fetcher_handle, - commitment_generator_handle, - ]); - Ok(()) } @@ -496,6 +664,56 @@ struct Cli { /// This is an experimental and incomplete feature; do not use unless you know what you're doing. #[arg(long)] enable_snapshots_recovery: bool, + /// Comma-separated list of components to launch. + #[arg(long, default_value = "all")] + components: ComponentsToRun, +} + +#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)] +pub enum Component { + HttpApi, + WsApi, + Tree, + TreeApi, + Core, +} + +impl Component { + fn components_from_str(s: &str) -> anyhow::Result<&[Component]> { + match s { + "api" => Ok(&[Component::HttpApi, Component::WsApi]), + "http_api" => Ok(&[Component::HttpApi]), + "ws_api" => Ok(&[Component::WsApi]), + "tree" => Ok(&[Component::Tree]), + "tree_api" => Ok(&[Component::TreeApi]), + "core" => Ok(&[Component::Core]), + "all" => Ok(&[ + Component::HttpApi, + Component::WsApi, + Component::Tree, + Component::Core, + ]), + other => Err(anyhow::anyhow!("{other} is not a valid component name")), + } + } +} + +#[derive(Debug, Clone)] +struct ComponentsToRun(HashSet); + +impl FromStr for ComponentsToRun { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let components = s + .split(',') + .try_fold(HashSet::new(), |mut acc, component_str| { + let components = Component::components_from_str(component_str.trim())?; + acc.extend(components); + Ok::<_, Self::Err>(acc) + })?; + Ok(Self(components)) + } } #[tokio::main] @@ -660,6 +878,7 @@ async fn main() -> anyhow::Result<()> { &mut task_handles, &app_health, stop_receiver.clone(), + &opt.components.0, ) .await .context("init_tasks")?; diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 78411ab9b02..7a9acb6aedf 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -222,7 +222,7 @@ impl AsyncTree { /// Async version of [`ZkSyncTreeReader`]. #[derive(Debug, Clone)] -pub(crate) struct AsyncTreeReader { +pub struct AsyncTreeReader { inner: ZkSyncTreeReader, mode: MerkleTreeMode, } @@ -261,7 +261,7 @@ impl LazyAsyncTreeReader { } /// Waits until the tree is initialized and returns a reader for it. - pub(crate) async fn wait(mut self) -> AsyncTreeReader { + pub async fn wait(mut self) -> AsyncTreeReader { loop { if let Some(reader) = self.0.borrow().clone() { break reader; diff --git a/core/lib/zksync_core/src/sync_layer/sync_state.rs b/core/lib/zksync_core/src/sync_layer/sync_state.rs index 0bbedfd56aa..104e9d67a8d 100644 --- a/core/lib/zksync_core/src/sync_layer/sync_state.rs +++ b/core/lib/zksync_core/src/sync_layer/sync_state.rs @@ -1,10 +1,14 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use anyhow::Context; use async_trait::async_trait; use serde::Serialize; +use tokio::sync::watch; use zksync_concurrency::{ctx, sync}; +use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_health_check::{CheckHealth, Health, HealthStatus}; use zksync_types::MiniblockNumber; +use zksync_web3_decl::{jsonrpsee::http_client::HttpClient, namespaces::EthNamespaceClient}; use crate::{ metrics::EN_METRICS, @@ -73,6 +77,41 @@ impl SyncState { pub(crate) fn is_synced(&self) -> bool { self.0.borrow().is_synced().0 } + + pub async fn run_updater( + self, + connection_pool: ConnectionPool, + main_node_client: HttpClient, + mut stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + const UPDATE_INTERVAL: Duration = Duration::from_secs(10); + + while !*stop_receiver.borrow_and_update() { + let local_block = connection_pool + .connection() + .await + .context("Failed to get a connection from the pool in sync state updater")? + .blocks_dal() + .get_sealed_miniblock_number() + .await + .context("Failed to get the miniblock number from DB")?; + + let main_node_block = main_node_client + .get_block_number() + .await + .context("Failed to request last miniblock number from main node")?; + + if let Some(local_block) = local_block { + self.set_local_block(local_block); + self.set_main_node_block(main_node_block.as_u32().into()); + } + + tokio::time::timeout(UPDATE_INTERVAL, stop_receiver.changed()) + .await + .ok(); + } + Ok(()) + } } #[async_trait]