diff --git a/.github/release-please/config.json b/.github/release-please/config.json index 300ce9ec0a0..4f69aa4f937 100644 --- a/.github/release-please/config.json +++ b/.github/release-please/config.json @@ -7,7 +7,13 @@ "packages": { "core": { "release-type": "simple", - "component": "core" + "component": "core", + "extra-files": [ + { + "type": "generic", + "path": "bin/external_node/Cargo.toml" + } + ] }, "prover": { "release-type": "simple", diff --git a/Cargo.lock b/Cargo.lock index 12dca9f4bc4..f000dc83cc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8526,13 +8526,14 @@ dependencies = [ [[package]] name = "zksync_external_node" -version = "0.1.0" +version = "22.1.0" dependencies = [ "anyhow", "clap 4.4.6", "envy", "futures 0.3.28", "prometheus_exporter", + "rustc_version", "semver", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 5284954d01d..cefe6294361 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ regex = "1" reqwest = "0.11" rlp = "0.5" rocksdb = "0.21.0" +rustc_version = "0.4.0" secp256k1 = "0.27.0" semver = "1" sentry = "0.31" diff --git a/core/bin/external_node/Cargo.toml b/core/bin/external_node/Cargo.toml index fb3517dbd21..ebdc0ed13a3 100644 --- a/core/bin/external_node/Cargo.toml +++ b/core/bin/external_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zksync_external_node" -version = "0.1.0" +version = "22.1.0" # x-release-please-version edition.workspace = true authors.workspace = true homepage.workspace = true @@ -44,3 +44,6 @@ clap = { workspace = true, features = ["derive"] } serde_json.workspace = true semver.workspace = true tracing.workspace = true + +[build-dependencies] +rustc_version.workspace = true diff --git a/core/bin/external_node/build.rs b/core/bin/external_node/build.rs new file mode 100644 index 00000000000..d37fef0b1b0 --- /dev/null +++ b/core/bin/external_node/build.rs @@ -0,0 +1,46 @@ +//! Build script for the external node binary. + +use std::{ + env, fs, + io::{self, Write}, + path::Path, +}; + +use rustc_version::{Channel, LlvmVersion}; + +fn print_rust_meta(out: &mut impl Write, meta: &rustc_version::VersionMeta) -> io::Result<()> { + writeln!( + out, + "pub(crate) const RUSTC_METADATA: RustcMetadata = RustcMetadata {{ \ + version: {semver:?}, \ + commit_hash: {commit_hash:?}, \ + commit_date: {commit_date:?}, \ + channel: {channel:?}, \ + host: {host:?}, \ + llvm: {llvm:?} \ + }};", + semver = meta.semver.to_string(), + commit_hash = meta.commit_hash, + commit_date = meta.commit_date, + channel = match meta.channel { + Channel::Dev => "dev", + Channel::Beta => "beta", + Channel::Nightly => "nightly", + Channel::Stable => "stable", + }, + host = meta.host, + llvm = meta.llvm_version.as_ref().map(LlvmVersion::to_string), + ) +} + +fn main() { + let out_dir = env::var("OUT_DIR").expect("`OUT_DIR` env var not set for build script"); + let rustc_meta = rustc_version::version_meta().expect("Failed obtaining rustc metadata"); + + let metadata_module_path = Path::new(&out_dir).join("metadata_values.rs"); + let metadata_module = + fs::File::create(metadata_module_path).expect("cannot create metadata module"); + let mut metadata_module = io::BufWriter::new(metadata_module); + + print_rust_meta(&mut metadata_module, &rustc_meta).expect("failed printing rustc metadata"); +} diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 9cddad5e767..bfa59a000bd 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -721,8 +721,6 @@ impl From for InternalApiConfig { req_entities_limit: config.optional.req_entities_limit, fee_history_limit: config.optional.fee_history_limit, filters_disabled: config.optional.filters_disabled, - mempool_cache_update_interval: config.optional.mempool_cache_update_interval(), - mempool_cache_size: config.optional.mempool_cache_size, dummy_verifier: config.remote.dummy_verifier, l1_batch_commit_data_generator_mode: config.remote.l1_batch_commit_data_generator_mode, } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 68f507429cc..dfac57f0168 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -18,7 +18,7 @@ use zksync_core::{ healthcheck::HealthCheckHandle, tree::{TreeApiClient, TreeApiHttpClient}, tx_sender::{proxy::TxProxy, ApiContracts, TxSenderBuilder}, - web3::{ApiBuilder, Namespace}, + web3::{mempool_cache::MempoolCache, ApiBuilder, Namespace}, }, block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert, NodeRole}, commitment_generator::CommitmentGenerator, @@ -57,16 +57,16 @@ use crate::{ config::{observability::observability_config_from_env, ExternalNodeConfig}, helpers::MainNodeHealthCheck, init::ensure_storage_initialized, + metrics::RUST_METRICS, }; mod config; mod helpers; mod init; +mod metadata; mod metrics; mod version_sync_task; -const RELEASE_MANIFEST: &str = include_str!("../../../../.github/release-please/manifest.json"); - /// Creates the state keeper configured to work in the external node mode. #[allow(clippy::too_many_arguments)] async fn build_state_keeper( @@ -356,13 +356,13 @@ async fn run_core( #[allow(clippy::too_many_arguments)] async fn run_api( + task_handles: &mut Vec>>, config: &ExternalNodeConfig, app_health: &AppHealthCheck, connection_pool: ConnectionPool, stop_receiver: watch::Receiver, sync_state: SyncState, tree_reader: Option>, - task_futures: &mut Vec>>, main_node_client: L2Client, singleton_pool_builder: ConnectionPoolBuilder, fee_params_fetcher: Arc, @@ -388,96 +388,87 @@ async fn run_api( } }; - let ( - tx_sender, - vm_barrier, - cache_update_handle, - proxy_cache_updater_handle, - whitelisted_tokens_update_handle, - ) = { - let tx_proxy = TxProxy::new(main_node_client.clone()); - let proxy_cache_updater_pool = singleton_pool_builder - .build() - .await - .context("failed to build a tree_pool")?; - let proxy_cache_updater_handle = tokio::spawn( - tx_proxy - .run_account_nonce_sweeper(proxy_cache_updater_pool.clone(), stop_receiver.clone()), - ); + let tx_proxy = TxProxy::new(main_node_client.clone()); + let proxy_cache_updater_pool = singleton_pool_builder + .build() + .await + .context("failed to build a tree_pool")?; + task_handles.push(tokio::spawn(tx_proxy.run_account_nonce_sweeper( + proxy_cache_updater_pool.clone(), + stop_receiver.clone(), + ))); - let tx_sender_builder = TxSenderBuilder::new( - config.clone().into(), - connection_pool.clone(), - Arc::new(tx_proxy), - ); + let tx_sender_builder = TxSenderBuilder::new( + config.clone().into(), + connection_pool.clone(), + Arc::new(tx_proxy), + ); - if config.optional.transactions_per_sec_limit.is_some() { - tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored"); - }; + if config.optional.transactions_per_sec_limit.is_some() { + tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored"); + }; - let max_concurrency = config.optional.vm_concurrency_limit; - let (vm_concurrency_limiter, vm_barrier) = VmConcurrencyLimiter::new(max_concurrency); - let mut storage_caches = PostgresStorageCaches::new( - config.optional.factory_deps_cache_size() as u64, - config.optional.initial_writes_cache_size() as u64, - ); - let latest_values_cache_size = config.optional.latest_values_cache_size() as u64; - let cache_update_handle = (latest_values_cache_size > 0).then(|| { - task::spawn( - storage_caches - .configure_storage_values_cache( - latest_values_cache_size, - connection_pool.clone(), - ) - .run(stop_receiver.clone()), - ) - }); - - let whitelisted_tokens_for_aa_cache = Arc::new(RwLock::new(Vec::new())); - let whitelisted_tokens_for_aa_cache_clone = whitelisted_tokens_for_aa_cache.clone(); - let mut stop_receiver_for_task = stop_receiver.clone(); - let whitelisted_tokens_update_task = task::spawn(async move { - loop { - match main_node_client.whitelisted_tokens_for_aa().await { - Ok(tokens) => { - *whitelisted_tokens_for_aa_cache_clone.write().await = tokens; - } - Err(jsonrpsee::core::client::Error::Call(error)) - if error.code() == jsonrpsee::types::error::METHOD_NOT_FOUND_CODE => - { - // Method is not supported by the main node, do nothing. - } - Err(err) => { - tracing::error!( - "Failed to query `whitelisted_tokens_for_aa`, error: {err:?}" - ); - } - } + let max_concurrency = config.optional.vm_concurrency_limit; + let (vm_concurrency_limiter, vm_barrier) = VmConcurrencyLimiter::new(max_concurrency); + let mut storage_caches = PostgresStorageCaches::new( + config.optional.factory_deps_cache_size() as u64, + config.optional.initial_writes_cache_size() as u64, + ); + let latest_values_cache_size = config.optional.latest_values_cache_size() as u64; + let cache_update_handle = (latest_values_cache_size > 0).then(|| { + task::spawn( + storage_caches + .configure_storage_values_cache(latest_values_cache_size, connection_pool.clone()) + .run(stop_receiver.clone()), + ) + }); + task_handles.extend(cache_update_handle); - // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. - tokio::time::timeout(Duration::from_secs(60), stop_receiver_for_task.changed()) - .await - .ok(); + let whitelisted_tokens_for_aa_cache = Arc::new(RwLock::new(Vec::new())); + let whitelisted_tokens_for_aa_cache_clone = whitelisted_tokens_for_aa_cache.clone(); + let mut stop_receiver_for_task = stop_receiver.clone(); + task_handles.push(task::spawn(async move { + while !*stop_receiver_for_task.borrow_and_update() { + match main_node_client.whitelisted_tokens_for_aa().await { + Ok(tokens) => { + *whitelisted_tokens_for_aa_cache_clone.write().await = tokens; + } + Err(jsonrpsee::core::client::Error::Call(error)) + if error.code() == jsonrpsee::types::error::METHOD_NOT_FOUND_CODE => + { + // Method is not supported by the main node, do nothing. + } + Err(err) => { + tracing::error!("Failed to query `whitelisted_tokens_for_aa`, error: {err:?}"); + } } - }); - - let tx_sender = tx_sender_builder - .with_whitelisted_tokens_for_aa(whitelisted_tokens_for_aa_cache) - .build( - fee_params_fetcher, - Arc::new(vm_concurrency_limiter), - ApiContracts::load_from_disk(), // TODO (BFT-138): Allow to dynamically reload API contracts - storage_caches, - ) - .await; - ( - tx_sender, - vm_barrier, - cache_update_handle, - proxy_cache_updater_handle, - whitelisted_tokens_update_task, + + // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. + tokio::time::timeout(Duration::from_secs(60), stop_receiver_for_task.changed()) + .await + .ok(); + } + Ok(()) + })); + + let tx_sender = tx_sender_builder + .with_whitelisted_tokens_for_aa(whitelisted_tokens_for_aa_cache) + .build( + fee_params_fetcher, + Arc::new(vm_concurrency_limiter), + ApiContracts::load_from_disk(), // TODO (BFT-138): Allow to dynamically reload API contracts + storage_caches, ) - }; + .await; + + let mempool_cache = MempoolCache::new(config.optional.mempool_cache_size); + let mempool_cache_update_task = mempool_cache.update_task( + connection_pool.clone(), + config.optional.mempool_cache_update_interval(), + ); + task_handles.push(tokio::spawn( + mempool_cache_update_task.run(stop_receiver.clone()), + )); if components.contains(&Component::HttpApi) { let builder = ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone()) @@ -489,6 +480,7 @@ async fn run_api( .with_vm_barrier(vm_barrier.clone()) .with_tree_api(tree_reader.clone()) .with_sync_state(sync_state.clone()) + .with_mempool_cache(mempool_cache.clone()) .enable_api_namespaces(config.optional.api_namespaces()); let http_server_handles = builder @@ -498,7 +490,7 @@ async fn run_api( .await .context("Failed initializing HTTP JSON-RPC server")?; app_health.insert_component(http_server_handles.health_check); - task_futures.extend(http_server_handles.tasks); + task_handles.extend(http_server_handles.tasks); } if components.contains(&Component::WsApi) { @@ -513,6 +505,7 @@ async fn run_api( .with_vm_barrier(vm_barrier) .with_tree_api(tree_reader) .with_sync_state(sync_state) + .with_mempool_cache(mempool_cache) .enable_api_namespaces(config.optional.api_namespaces()); let ws_server_handles = builder @@ -522,13 +515,9 @@ async fn run_api( .await .context("Failed initializing WS JSON-RPC server")?; app_health.insert_component(ws_server_handles.health_check); - task_futures.extend(ws_server_handles.tasks); + task_handles.extend(ws_server_handles.tasks); } - task_futures.extend(cache_update_handle); - task_futures.push(proxy_cache_updater_handle); - task_futures.push(whitelisted_tokens_update_handle); - Ok(()) } @@ -541,36 +530,9 @@ async fn init_tasks( stop_receiver: watch::Receiver, components: &HashSet, ) -> anyhow::Result<()> { - let release_manifest: serde_json::Value = serde_json::from_str(RELEASE_MANIFEST) - .context("releuse manifest is a valid json document")?; - let release_manifest_version = release_manifest["core"].as_str().context( - "a release-please manifest with \"core\" version field was specified at build time", - )?; - - let version = semver::Version::parse(release_manifest_version) - .context("version in manifest is a correct semver format")?; - let pool = connection_pool.clone(); - let mut stop_receiver_for_task = stop_receiver.clone(); - task_handles.push(tokio::spawn(async move { - while !*stop_receiver_for_task.borrow_and_update() { - let protocol_version = pool - .connection() - .await? - .protocol_versions_dal() - .last_used_version_id() - .await - .map(|version| version as u16); - - EN_METRICS.version[&(version.to_string(), protocol_version)].set(1); - - // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. - tokio::time::timeout(Duration::from_secs(10), stop_receiver_for_task.changed()) - .await - .ok(); - } - Ok(()) - })); - + let protocol_version_update_task = + EN_METRICS.run_protocol_version_updates(connection_pool.clone(), stop_receiver.clone()); + task_handles.push(tokio::spawn(protocol_version_update_task)); let singleton_pool_builder = ConnectionPool::singleton(&config.postgres.database_url); // Run the components. @@ -643,13 +605,13 @@ async fn init_tasks( if components.contains(&Component::HttpApi) || components.contains(&Component::WsApi) { run_api( + task_handles, config, app_health, connection_pool, stop_receiver.clone(), sync_state, tree_reader, - task_handles, main_node_client, singleton_pool_builder, fee_params_fetcher.clone(), @@ -801,6 +763,9 @@ async fn main() -> anyhow::Result<()> { ConnectionPool::::global_config().set_long_connection_threshold(threshold)?; } + RUST_METRICS.initialize(); + EN_METRICS.observe_config(&config); + let connection_pool = ConnectionPool::::builder( &config.postgres.database_url, config.postgres.max_connections, diff --git a/core/bin/external_node/src/metadata.rs b/core/bin/external_node/src/metadata.rs new file mode 100644 index 00000000000..ce454711a97 --- /dev/null +++ b/core/bin/external_node/src/metadata.rs @@ -0,0 +1,22 @@ +//! Metadata information about the external node. + +use vise::EncodeLabelSet; + +pub(crate) use self::values::RUSTC_METADATA; + +mod values { + use super::RustcMetadata; + include!(concat!(env!("OUT_DIR"), "/metadata_values.rs")); +} + +#[derive(Debug, EncodeLabelSet)] +pub(crate) struct RustcMetadata { + pub version: &'static str, + pub commit_hash: Option<&'static str>, + pub commit_date: Option<&'static str>, + pub channel: &'static str, + pub host: &'static str, + pub llvm: Option<&'static str>, +} + +pub(crate) const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/core/bin/external_node/src/metrics.rs b/core/bin/external_node/src/metrics.rs index 1d493dd0087..a95b5af700c 100644 --- a/core/bin/external_node/src/metrics.rs +++ b/core/bin/external_node/src/metrics.rs @@ -1,11 +1,93 @@ -use vise::{Gauge, LabeledFamily, Metrics}; +use std::time::Duration; + +use tokio::sync::watch; +use vise::{EncodeLabelSet, Gauge, Info, Metrics}; +use zksync_dal::{ConnectionPool, Core, CoreDal}; + +use crate::{ + config::ExternalNodeConfig, + metadata::{RustcMetadata, RUSTC_METADATA, SERVER_VERSION}, +}; + +/// Immutable EN parameters that affect multiple components. +#[derive(Debug, Clone, Copy, EncodeLabelSet)] +struct ExternalNodeInfo { + server_version: &'static str, + l1_chain_id: u64, + l2_chain_id: u64, + /// Size of the main Postgres connection pool. + postgres_pool_size: u32, +} #[derive(Debug, Metrics)] #[metrics(prefix = "external_node")] -pub(crate) struct EnMetrics { - #[metrics(labels = ["server_version", "protocol_version"])] - pub version: LabeledFamily<(String, Option), Gauge, 2>, +pub(crate) struct ExternalNodeMetrics { + /// General information about the external node. + info: Info, + /// Current protocol version. + protocol_version: Gauge, +} + +impl ExternalNodeMetrics { + pub(crate) fn observe_config(&self, config: &ExternalNodeConfig) { + let info = ExternalNodeInfo { + server_version: SERVER_VERSION, + l1_chain_id: config.remote.l1_chain_id.0, + l2_chain_id: config.remote.l2_chain_id.as_u64(), + postgres_pool_size: config.postgres.max_connections, + }; + tracing::info!("Setting general node information: {info:?}"); + + if self.info.set(info).is_err() { + tracing::warn!( + "General information is already set for the external node: {:?}, was attempting to set {info:?}", + self.info.get() + ); + } + } + + pub(crate) async fn run_protocol_version_updates( + &self, + pool: ConnectionPool, + mut stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + const QUERY_INTERVAL: Duration = Duration::from_secs(10); + + while !*stop_receiver.borrow_and_update() { + let maybe_protocol_version = pool + .connection() + .await? + .protocol_versions_dal() + .last_used_version_id() + .await; + if let Some(version) = maybe_protocol_version { + self.protocol_version.set(version as u64); + } + + tokio::time::timeout(QUERY_INTERVAL, stop_receiver.changed()) + .await + .ok(); + } + Ok(()) + } +} + +#[vise::register] +pub(crate) static EN_METRICS: vise::Global = vise::Global::new(); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "rust")] +pub(crate) struct RustMetrics { + /// General information about the Rust compiler. + info: Info, +} + +impl RustMetrics { + pub fn initialize(&self) { + tracing::info!("Metadata for rustc that this EN was compiled with: {RUSTC_METADATA:?}"); + self.info.set(RUSTC_METADATA).ok(); + } } #[vise::register] -pub(crate) static EN_METRICS: vise::Global = vise::Global::new(); +pub(crate) static RUST_METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index 228d40ab995..3794e741ebd 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -12,8 +12,8 @@ use futures::future; use serde::Serialize; use tokio::sync::watch; -use self::metrics::METRICS; -use crate::metrics::CheckResult; +use self::metrics::{CheckResult, METRICS}; +use crate::metrics::AppHealthCheckConfig; mod metrics; #[cfg(test)] @@ -112,6 +112,18 @@ impl AppHealthCheck { let slow_time_limit = slow_time_limit.unwrap_or(DEFAULT_SLOW_TIME_LIMIT); let hard_time_limit = hard_time_limit.unwrap_or(DEFAULT_HARD_TIME_LIMIT); tracing::debug!("Created app health with time limits: slow={slow_time_limit:?}, hard={hard_time_limit:?}"); + + let config = AppHealthCheckConfig { + slow_time_limit: slow_time_limit.into(), + hard_time_limit: hard_time_limit.into(), + }; + if METRICS.info.set(config).is_err() { + tracing::warn!( + "App health redefined; previous config: {:?}", + METRICS.info.get() + ); + } + Self { components: Mutex::default(), slow_time_limit, diff --git a/core/lib/health_check/src/metrics.rs b/core/lib/health_check/src/metrics.rs index bb90e6c499f..69f6265be51 100644 --- a/core/lib/health_check/src/metrics.rs +++ b/core/lib/health_check/src/metrics.rs @@ -2,7 +2,10 @@ use std::time::Duration; -use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics, Unit}; +use vise::{ + Buckets, DurationAsSecs, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Info, Metrics, + Unit, +}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] #[metrics(rename_all = "snake_case")] @@ -18,9 +21,19 @@ struct AbnormalCheckLabels { result: CheckResult, } +#[derive(Debug, EncodeLabelSet)] +pub(crate) struct AppHealthCheckConfig { + #[metrics(unit = Unit::Seconds)] + pub slow_time_limit: DurationAsSecs, + #[metrics(unit = Unit::Seconds)] + pub hard_time_limit: DurationAsSecs, +} + #[derive(Debug, Metrics)] #[metrics(prefix = "healthcheck")] pub(crate) struct HealthMetrics { + /// Immutable configuration for application health checks. + pub info: Info, /// Latency for abnormal checks. Includes slow, dropped and timed out checks (distinguished by the "result" label); /// skips normal checks. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] diff --git a/core/lib/state/src/cache/lru_cache.rs b/core/lib/state/src/cache/lru_cache.rs index 0e0f3541117..fa37bdb3e22 100644 --- a/core/lib/state/src/cache/lru_cache.rs +++ b/core/lib/state/src/cache/lru_cache.rs @@ -1,9 +1,10 @@ use std::hash::Hash; use crate::cache::{ - metrics::{Method, RequestOutcome, METRICS}, + metrics::{LruCacheConfig, Method, RequestOutcome, METRICS}, CacheValue, MokaBase, }; + /// Cache implementation that uses LRU eviction policy. #[derive(Debug, Clone)] pub struct LruCache { @@ -22,6 +23,15 @@ where /// /// Panics if an invalid cache capacity is provided. pub fn new(name: &'static str, capacity: u64) -> Self { + tracing::info!("Configured LRU cache `{name}` with capacity {capacity}B"); + if let Err(err) = METRICS.lru_info[&name].set(LruCacheConfig { capacity }) { + tracing::warn!( + "LRU cache `{name}` was already created with config {:?}; new config: {:?}", + METRICS.lru_info[&name].get(), + err.into_inner() + ); + } + let cache = if capacity == 0 { None } else { diff --git a/core/lib/state/src/cache/metrics.rs b/core/lib/state/src/cache/metrics.rs index c0a805a5b33..715d0d342f3 100644 --- a/core/lib/state/src/cache/metrics.rs +++ b/core/lib/state/src/cache/metrics.rs @@ -2,7 +2,10 @@ use std::time::Duration; -use vise::{Buckets, Counter, EncodeLabelValue, Gauge, Histogram, LabeledFamily, Metrics}; +use vise::{ + Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Gauge, Histogram, Info, LabeledFamily, + Metrics, Unit, +}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] #[metrics(rename_all = "snake_case")] @@ -33,9 +36,29 @@ const SMALL_LATENCIES: Buckets = Buckets::values(&[ 1e-8, 2.5e-8, 5e-8, 1e-7, 2.5e-7, 5e-7, 1e-6, 2.5e-6, 5e-6, 1e-5, 2.5e-5, 5e-5, 1e-4, 1e-3, ]); +#[derive(Debug, EncodeLabelSet)] +pub(super) struct LruCacheConfig { + /// Cache capacity in bytes. + #[metrics(unit = Unit::Bytes)] + pub capacity: u64, +} + +#[derive(Debug, EncodeLabelSet)] +pub(super) struct SequentialCacheConfig { + /// Cache capacity in number of items. + pub capacity: u64, +} + #[derive(Debug, Metrics)] #[metrics(prefix = "server_state_cache")] pub(super) struct CacheMetrics { + /// Configuration of LRU caches. + #[metrics(labels = ["name"])] + pub lru_info: LabeledFamily<&'static str, Info>, + /// Configuration of sequential caches. + #[metrics(labels = ["name"])] + pub sequential_info: LabeledFamily<&'static str, Info>, + /// Latency of calling a cache method. #[metrics(buckets = SMALL_LATENCIES, labels = ["name", "method"])] pub latency: LabeledFamily<(&'static str, Method), Histogram, 2>, diff --git a/core/lib/state/src/cache/sequential_cache.rs b/core/lib/state/src/cache/sequential_cache.rs index 5ca86e35d73..0f36785f666 100644 --- a/core/lib/state/src/cache/sequential_cache.rs +++ b/core/lib/state/src/cache/sequential_cache.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use crate::cache::metrics::{Method, RequestOutcome, METRICS}; +use crate::cache::metrics::{Method, RequestOutcome, SequentialCacheConfig, METRICS}; /// A generic cache structure for storing key-value pairs in sequential order. /// It allows for non-unique keys and supports efficient retrieval of values based on a key @@ -27,6 +27,19 @@ impl SequentialCache { /// Panics if `capacity` is 0. pub fn new(name: &'static str, capacity: usize) -> Self { assert!(capacity > 0, "Cache capacity must be greater than 0"); + + let config = SequentialCacheConfig { + capacity: capacity as u64, + }; + tracing::info!("Configured sequential cache `{name}` with capacity {capacity} items"); + if let Err(err) = METRICS.sequential_info[&name].set(config) { + tracing::warn!( + "Sequential cache `{name}` was already created with config {:?}; new config: {:?}", + METRICS.sequential_info[&name].get(), + err.into_inner() + ); + } + SequentialCache { name, data: VecDeque::with_capacity(capacity), diff --git a/core/lib/state/src/postgres/mod.rs b/core/lib/state/src/postgres/mod.rs index de58a860630..1fc832b194a 100644 --- a/core/lib/state/src/postgres/mod.rs +++ b/core/lib/state/src/postgres/mod.rs @@ -266,8 +266,6 @@ pub struct PostgresStorageCaches { } impl PostgresStorageCaches { - const NEG_INITIAL_WRITES_NAME: &'static str = "negative_initial_writes_cache"; - /// Creates caches with the specified capacities measured in bytes. pub fn new(factory_deps_capacity: u64, initial_writes_capacity: u64) -> Self { tracing::debug!( @@ -282,7 +280,7 @@ impl PostgresStorageCaches { initial_writes_capacity / 2, ), negative_initial_writes: InitialWritesCache::new( - Self::NEG_INITIAL_WRITES_NAME, + "negative_initial_writes_cache", initial_writes_capacity / 2, ), values: None, diff --git a/core/lib/web3_decl/src/client/metrics.rs b/core/lib/web3_decl/src/client/metrics.rs index c7c254c9969..5bc2a2153b3 100644 --- a/core/lib/web3_decl/src/client/metrics.rs +++ b/core/lib/web3_decl/src/client/metrics.rs @@ -3,9 +3,12 @@ use std::time::Duration; use jsonrpsee::{core::client, http_client::transport}; -use vise::{Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics, Unit}; +use vise::{ + Buckets, Counter, DurationAsSecs, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Info, + Metrics, Unit, +}; -use super::{AcquireStats, CallOrigin}; +use super::{AcquireStats, CallOrigin, SharedRateLimit}; #[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)] pub(super) struct RequestLabels { @@ -42,9 +45,18 @@ pub(super) struct GenericErrorLabels { kind: CallErrorKind, } +#[derive(Debug, EncodeLabelSet)] +struct L2ClientConfigLabels { + rate_limit: usize, + #[metrics(unit = Unit::Seconds)] + rate_limit_window: DurationAsSecs, +} + #[derive(Debug, Metrics)] #[metrics(prefix = "l2_client")] pub(super) struct L2ClientMetrics { + /// Client configuration. + info: Info, /// Number of requests timed out in the rate-limiting logic. pub rate_limit_timeout: Family, /// Latency of rate-limiting logic for rate-limited requests. @@ -59,6 +71,20 @@ pub(super) struct L2ClientMetrics { } impl L2ClientMetrics { + pub fn observe_config(&self, rate_limit: &SharedRateLimit) { + let config_labels = L2ClientConfigLabels { + rate_limit: rate_limit.rate_limit, + rate_limit_window: rate_limit.rate_limit_window.into(), + }; + if let Err(err) = self.info.set(config_labels) { + tracing::warn!( + "Error setting configuration info {:?} for L2 client; already set to {:?}", + err.into_inner(), + self.info.get() + ); + } + } + pub fn observe_rate_limit_latency( &self, component: &'static str, diff --git a/core/lib/web3_decl/src/client/mod.rs b/core/lib/web3_decl/src/client/mod.rs index c6c19aa9960..80ddb337a27 100644 --- a/core/lib/web3_decl/src/client/mod.rs +++ b/core/lib/web3_decl/src/client/mod.rs @@ -266,9 +266,12 @@ impl L2ClientBuilder { self.client, self.rate_limit ); + let rate_limit = SharedRateLimit::new(self.rate_limit.0, self.rate_limit.1); + METRICS.observe_config(&rate_limit); + L2Client { inner: self.client, - rate_limit: SharedRateLimit::new(self.rate_limit.0, self.rate_limit.1), + rate_limit, component_name: "", metrics: &METRICS, } diff --git a/core/lib/zksync_core/src/api_server/web3/mempool_cache.rs b/core/lib/zksync_core/src/api_server/web3/mempool_cache.rs index 16eff2e5a66..2c6872a048e 100644 --- a/core/lib/zksync_core/src/api_server/web3/mempool_cache.rs +++ b/core/lib/zksync_core/src/api_server/web3/mempool_cache.rs @@ -1,4 +1,4 @@ -use std::{future::Future, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use chrono::NaiveDateTime; use tokio::sync::{watch, RwLock}; @@ -12,53 +12,30 @@ use super::metrics::MEMPOOL_CACHE_METRICS; /// Stores all transactions accepted by the mempool and provides a way to query all that are newer than a given timestamp. /// Updates the cache based on interval passed in the constructor #[derive(Debug, Clone)] -pub(crate) struct MempoolCache(Arc>>); +pub struct MempoolCache(Arc>>); /// `INITIAL_LOOKBEHIND` is the period of time for which the cache is initially populated. const INITIAL_LOOKBEHIND: Duration = Duration::from_secs(120); impl MempoolCache { /// Initializes the mempool cache with the parameters provided. - pub fn new( - connection_pool: ConnectionPool, - update_interval: Duration, - capacity: usize, - stop_receiver: watch::Receiver, - ) -> (Self, impl Future>) { + pub fn new(capacity: usize) -> Self { let cache = SequentialCache::new("mempool", capacity); let cache = Arc::new(RwLock::new(cache)); - let cache_for_task = cache.clone(); - let update_task = async move { - loop { - if *stop_receiver.borrow() { - tracing::debug!("Stopping mempool cache updates"); - return Ok(()); - } - - // Get the timestamp that will be used as the lower bound for the next update - // If cache is non-empty - this is the last tx time, otherwise it's `INITIAL_LOOKBEHIND` seconds ago - let last_timestamp = cache_for_task - .read() - .await - .get_last_key() - .unwrap_or_else(|| chrono::Utc::now().naive_utc() - INITIAL_LOOKBEHIND); - - let latency = MEMPOOL_CACHE_METRICS.db_poll_latency.start(); - let mut connection = connection_pool.connection_tagged("api").await?; - let txs = connection - .transactions_web3_dal() - .get_pending_txs_hashes_after(last_timestamp, None) - .await?; - drop(connection); - latency.observe(); - MEMPOOL_CACHE_METRICS.tx_batch_size.observe(txs.len()); - - cache_for_task.write().await.insert(txs)?; - tokio::time::sleep(update_interval).await; - } - }; + Self(cache) + } - (Self(cache), update_task) + /// Returns a task that will update this cache in background. + pub fn update_task( + &self, + connection_pool: ConnectionPool, + update_interval: Duration, + ) -> MempoolCacheUpdateTask { + MempoolCacheUpdateTask { + cache: self.0.clone(), + connection_pool, + update_interval, + } } /// Returns all transaction hashes that are newer than the given timestamp. @@ -70,3 +47,44 @@ impl MempoolCache { self.0.read().await.query(after) } } + +/// Task updating [`MempoolCache`]. Should be spawned as a Tokio task (exactly one task for the cache). +#[derive(Debug)] +pub struct MempoolCacheUpdateTask { + cache: Arc>>, + connection_pool: ConnectionPool, + update_interval: Duration, +} + +impl MempoolCacheUpdateTask { + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + loop { + if *stop_receiver.borrow() { + tracing::debug!("Stopping mempool cache updates"); + return Ok(()); + } + + // Get the timestamp that will be used as the lower bound for the next update + // If cache is non-empty - this is the last tx time, otherwise it's `INITIAL_LOOKBEHIND` seconds ago + let last_timestamp = self + .cache + .read() + .await + .get_last_key() + .unwrap_or_else(|| chrono::Utc::now().naive_utc() - INITIAL_LOOKBEHIND); + + let latency = MEMPOOL_CACHE_METRICS.db_poll_latency.start(); + let mut connection = self.connection_pool.connection_tagged("api").await?; + let txs = connection + .transactions_web3_dal() + .get_pending_txs_hashes_after(last_timestamp, None) + .await?; + drop(connection); + latency.observe(); + MEMPOOL_CACHE_METRICS.tx_batch_size.observe(txs.len()); + + self.cache.write().await.insert(txs)?; + tokio::time::sleep(self.update_interval).await; + } + } +} diff --git a/core/lib/zksync_core/src/api_server/web3/metrics.rs b/core/lib/zksync_core/src/api_server/web3/metrics.rs index c5f8cd9f1bc..02c42f4589d 100644 --- a/core/lib/zksync_core/src/api_server/web3/metrics.rs +++ b/core/lib/zksync_core/src/api_server/web3/metrics.rs @@ -3,13 +3,16 @@ use std::{fmt, time::Duration}; use vise::{ - Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, LabeledFamily, - Metrics, Unit, + Buckets, Counter, DurationAsSecs, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, + Info, LabeledFamily, Metrics, Unit, }; use zksync_types::api; use zksync_web3_decl::error::Web3Error; -use super::{backend_jsonrpsee::MethodMetadata, ApiTransport, TypedFilter}; +use super::{ + backend_jsonrpsee::MethodMetadata, ApiTransport, InternalApiConfig, OptionalApiParams, + TypedFilter, +}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "scheme", rename_all = "UPPERCASE")] @@ -142,6 +145,21 @@ struct Web3ErrorLabels { kind: Web3ErrorKind, } +#[derive(Debug, EncodeLabelSet)] +struct Web3ConfigLabels { + #[metrics(unit = Unit::Seconds)] + polling_interval: DurationAsSecs, + req_entities_limit: usize, + fee_history_limit: u64, + filters_limit: Option, + subscriptions_limit: Option, + #[metrics(unit = Unit::Bytes)] + batch_request_size_limit: Option, + #[metrics(unit = Unit::Bytes)] + response_body_size_limit: Option, + websocket_requests_per_minute_limit: Option, +} + /// Roughly exponential buckets for the `web3_call_block_diff` metric. The distribution should be skewed towards lower values. const BLOCK_DIFF_BUCKETS: Buckets = Buckets::values(&[ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1_000.0, @@ -153,6 +171,9 @@ const RESPONSE_SIZE_BUCKETS: Buckets = Buckets::exponential(1.0..=1_048_576.0, 4 #[derive(Debug, Metrics)] #[metrics(prefix = "api")] pub(in crate::api_server) struct ApiMetrics { + /// Web3 server configuration. + web3_info: Family>, + /// Latency of a Web3 call. Calls that take block ID as an input have block ID and block diff /// labels (the latter is the difference between the latest sealed miniblock and the resolved miniblock). #[metrics(buckets = Buckets::LATENCIES)] @@ -184,6 +205,31 @@ pub(in crate::api_server) struct ApiMetrics { } impl ApiMetrics { + pub(super) fn observe_config( + &self, + transport: ApiTransportLabel, + polling_interval: Duration, + config: &InternalApiConfig, + optional: &OptionalApiParams, + ) { + let config_labels = Web3ConfigLabels { + polling_interval: polling_interval.into(), + req_entities_limit: config.req_entities_limit, + fee_history_limit: config.fee_history_limit, + filters_limit: optional.filters_limit, + subscriptions_limit: optional.subscriptions_limit, + batch_request_size_limit: optional.batch_request_size_limit, + response_body_size_limit: optional.response_body_size_limit, + websocket_requests_per_minute_limit: optional + .websocket_requests_per_minute_limit + .map(Into::into), + }; + tracing::info!("{transport:?} Web3 server is configured with options: {config_labels:?}"); + if self.web3_info[&transport].set(config_labels).is_err() { + tracing::warn!("Cannot set config labels for {transport:?} Web3 server"); + } + } + /// Observes latency of a finished RPC call. pub fn observe_latency(&self, meta: &MethodMetadata) { let latency = meta.started_at.elapsed(); diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 97cd5a5ffcc..92a00161586 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -48,7 +48,7 @@ use crate::{ }; pub mod backend_jsonrpsee; -mod mempool_cache; +pub mod mempool_cache; pub(super) mod metrics; pub mod namespaces; mod pubsub; @@ -128,6 +128,7 @@ struct OptionalApiParams { response_body_size_limit: Option, websocket_requests_per_minute_limit: Option, tree_api: Option>, + mempool_cache: Option, pub_sub_events_sender: Option>, } @@ -259,6 +260,11 @@ impl ApiBuilder { self } + pub fn with_mempool_cache(mut self, cache: MempoolCache) -> Self { + self.optional.mempool_cache = Some(cache); + self + } + #[cfg(test)] fn with_pub_sub_events(mut self, sender: mpsc::UnboundedSender) -> Self { self.optional.pub_sub_events_sender = Some(sender); @@ -309,7 +315,6 @@ impl ApiServer { async fn build_rpc_state( self, last_sealed_miniblock: SealedMiniblockNumber, - mempool_cache: MempoolCache, ) -> anyhow::Result { let mut storage = self.updaters_pool.connection_tagged("api").await?; let start_info = BlockStartInfo::new(&mut storage).await?; @@ -333,7 +338,7 @@ impl ApiServer { sync_state: self.optional.sync_state, api_config: self.config, start_info, - mempool_cache, + mempool_cache: self.optional.mempool_cache, last_sealed_miniblock, tree_api: self.optional.tree_api, }) @@ -343,13 +348,10 @@ impl ApiServer { self, pub_sub: Option, last_sealed_miniblock: SealedMiniblockNumber, - mempool_cache: MempoolCache, ) -> anyhow::Result> { let namespaces = self.namespaces.clone(); let zksync_network_id = self.config.l2_chain_id; - let rpc_state = self - .build_rpc_state(last_sealed_miniblock, mempool_cache) - .await?; + let rpc_state = self.build_rpc_state(last_sealed_miniblock).await?; // Collect all the methods into a single RPC module. let mut rpc = RpcModule::new(()); @@ -458,16 +460,6 @@ impl ApiServer { ); let mut tasks = vec![tokio::spawn(sealed_miniblock_update_task)]; - - let (mempool_cache, mempool_cache_update_task) = MempoolCache::new( - self.updaters_pool.clone(), - self.config.mempool_cache_update_interval, - self.config.mempool_cache_size, - stop_receiver.clone(), - ); - - tasks.push(tokio::spawn(mempool_cache_update_task)); - let pub_sub = if matches!(transport, ApiTransport::WebSocket(_)) && self.namespaces.contains(&Namespace::Pubsub) { @@ -493,7 +485,6 @@ impl ApiServer { let server_task = tokio::spawn(self.run_jsonrpsee_server( stop_receiver, pub_sub, - mempool_cache, last_sealed_miniblock, local_addr_sender, )); @@ -510,7 +501,6 @@ impl ApiServer { self, mut stop_receiver: watch::Receiver, pub_sub: Option, - mempool_cache: MempoolCache, last_sealed_miniblock: SealedMiniblockNumber, local_addr_sender: oneshot::Sender, ) -> anyhow::Result<()> { @@ -520,6 +510,12 @@ impl ApiServer { ApiTransport::WebSocket(addr) => ("WS", false, addr), }; let transport_label = (&transport).into(); + API_METRICS.observe_config( + transport_label, + self.polling_interval, + &self.config, + &self.optional, + ); tracing::info!( "Waiting for at least one L1 batch in Postgres to start {transport_str} API server" @@ -555,7 +551,7 @@ impl ApiServer { let method_tracer = self.method_tracer.clone(); let rpc = self - .build_rpc_module(pub_sub, last_sealed_miniblock, mempool_cache) + .build_rpc_module(pub_sub, last_sealed_miniblock) .await?; let registered_method_names = Arc::new(rpc.method_names().collect::>()); tracing::debug!( diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index bc50abac48b..6d4d5c73e25 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -698,27 +698,24 @@ impl EthNamespace { TypedFilter::PendingTransactions(from_timestamp_excluded) => { // Attempt to get pending transactions from cache. - let tx_hashes_from_cache = self - .state - .mempool_cache - .get_tx_hashes_after(*from_timestamp_excluded) - .await; - let tx_hashes = match tx_hashes_from_cache { - Some(mut result) => { - result.truncate(self.state.api_config.req_entities_limit); - result - } - None => { - // On cache miss, query the database. - let mut conn = self.state.acquire_connection().await?; - conn.transactions_web3_dal() - .get_pending_txs_hashes_after( - *from_timestamp_excluded, - Some(self.state.api_config.req_entities_limit), - ) - .await - .map_err(DalError::generalize)? - } + let tx_hashes_from_cache = if let Some(cache) = &self.state.mempool_cache { + cache.get_tx_hashes_after(*from_timestamp_excluded).await + } else { + None + }; + let tx_hashes = if let Some(mut result) = tx_hashes_from_cache { + result.truncate(self.state.api_config.req_entities_limit); + result + } else { + // On cache miss, query the database. + let mut conn = self.state.acquire_connection().await?; + conn.transactions_web3_dal() + .get_pending_txs_hashes_after( + *from_timestamp_excluded, + Some(self.state.api_config.req_entities_limit), + ) + .await + .map_err(DalError::generalize)? }; // It's possible the `tx_hashes` vector is empty, diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 45839ae2798..98a35d7fec5 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -98,8 +98,6 @@ pub struct InternalApiConfig { pub req_entities_limit: usize, pub fee_history_limit: u64, pub filters_disabled: bool, - pub mempool_cache_update_interval: Duration, - pub mempool_cache_size: usize, pub dummy_verifier: bool, pub l1_batch_commit_data_generator_mode: L1BatchCommitDataGeneratorMode, } @@ -140,8 +138,6 @@ impl InternalApiConfig { req_entities_limit: web3_config.req_entities_limit(), fee_history_limit: web3_config.fee_history_limit(), filters_disabled: web3_config.filters_disabled, - mempool_cache_update_interval: web3_config.mempool_cache_update_interval(), - mempool_cache_size: web3_config.mempool_cache_size(), dummy_verifier: genesis_config.dummy_verifier, l1_batch_commit_data_generator_mode: genesis_config.l1_batch_commit_data_generator_mode, } @@ -236,7 +232,7 @@ pub(crate) struct RpcState { /// Number of the first locally available miniblock / L1 batch. May differ from 0 if the node state was recovered /// from a snapshot. pub(super) start_info: BlockStartInfo, - pub(super) mempool_cache: MempoolCache, + pub(super) mempool_cache: Option, pub(super) last_sealed_miniblock: SealedMiniblockNumber, } diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 5a9956237f3..dd2cf89b0a4 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -58,7 +58,7 @@ use crate::{ healthcheck::HealthCheckHandle, tree::TreeApiHttpClient, tx_sender::{ApiContracts, TxSender, TxSenderBuilder, TxSenderConfig}, - web3::{self, state::InternalApiConfig, Namespace}, + web3::{self, mempool_cache::MempoolCache, state::InternalApiConfig, Namespace}, }, basic_witness_input_producer::BasicWitnessInputProducer, commitment_generator::CommitmentGenerator, @@ -377,10 +377,19 @@ pub async fn initialize_components( // program termination. let mut storage_caches = None; + let mempool_cache = MempoolCache::new(api_config.web3_json_rpc.mempool_cache_size()); + let mempool_cache_update_task = mempool_cache.update_task( + connection_pool.clone(), + api_config.web3_json_rpc.mempool_cache_update_interval(), + ); + task_futures.push(tokio::spawn( + mempool_cache_update_task.run(stop_receiver.clone()), + )); + if components.contains(&Component::HttpApi) { storage_caches = Some( build_storage_caches( - &configs.api_config.clone().context("api")?.web3_json_rpc, + &api_config.web3_json_rpc, &replica_connection_pool, &mut task_futures, stop_receiver.clone(), @@ -412,6 +421,7 @@ pub async fn initialize_components( batch_fee_input_provider, state_keeper_config.save_call_traces, storage_caches.clone().unwrap(), + mempool_cache.clone(), ) .await .context("run_http_api")?; @@ -459,6 +469,7 @@ pub async fn initialize_components( replica_connection_pool.clone(), stop_receiver.clone(), storage_caches, + mempool_cache, ) .await .context("run_ws_api")?; @@ -1229,6 +1240,7 @@ async fn run_http_api( batch_fee_model_input_provider: Arc, with_debug_namespace: bool, storage_caches: PostgresStorageCaches, + mempool_cache: MempoolCache, ) -> anyhow::Result<()> { let (tx_sender, vm_barrier) = build_tx_sender( tx_sender_config, @@ -1261,6 +1273,7 @@ async fn run_http_api( .with_response_body_size_limit(api_config.web3_json_rpc.max_response_body_size()) .with_tx_sender(tx_sender) .with_vm_barrier(vm_barrier) + .with_mempool_cache(mempool_cache) .enable_api_namespaces(namespaces); if let Some(tree_api_url) = api_config.web3_json_rpc.tree_api_url() { let tree_api = Arc::new(TreeApiHttpClient::new(tree_api_url)); @@ -1292,6 +1305,7 @@ async fn run_ws_api( replica_connection_pool: ConnectionPool, stop_receiver: watch::Receiver, storage_caches: PostgresStorageCaches, + mempool_cache: MempoolCache, ) -> anyhow::Result<()> { let (tx_sender, vm_barrier) = build_tx_sender( tx_sender_config, @@ -1327,6 +1341,7 @@ async fn run_ws_api( .with_polling_interval(api_config.web3_json_rpc.pubsub_interval()) .with_tx_sender(tx_sender) .with_vm_barrier(vm_barrier) + .with_mempool_cache(mempool_cache) .enable_api_namespaces(namespaces); if let Some(tree_api_url) = api_config.web3_json_rpc.tree_api_url() { let tree_api = Arc::new(TreeApiHttpClient::new(tree_api_url)); diff --git a/core/lib/zksync_core/src/metadata_calculator/metrics.rs b/core/lib/zksync_core/src/metadata_calculator/metrics.rs index 7b5189a6b38..074f444dea6 100644 --- a/core/lib/zksync_core/src/metadata_calculator/metrics.rs +++ b/core/lib/zksync_core/src/metadata_calculator/metrics.rs @@ -3,14 +3,60 @@ use std::time::{Duration, Instant}; use vise::{ - Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, LatencyObserver, Metrics, - Unit, + Buckets, DurationAsSecs, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Info, + LatencyObserver, Metrics, Unit, }; +use zksync_config::configs::database::MerkleTreeMode; use zksync_shared_metrics::{BlockStage, APP_METRICS}; use zksync_types::block::L1BatchHeader; use zksync_utils::time::seconds_since_epoch; -use super::MetadataCalculator; +use super::{MetadataCalculator, MetadataCalculatorConfig}; + +#[derive(Debug, EncodeLabelValue)] +#[metrics(rename_all = "snake_case")] +enum ModeLabel { + Full, + Lightweight, +} + +impl From for ModeLabel { + fn from(mode: MerkleTreeMode) -> Self { + match mode { + MerkleTreeMode::Full => Self::Full, + MerkleTreeMode::Lightweight => Self::Lightweight, + } + } +} + +#[derive(Debug, EncodeLabelSet)] +pub(super) struct ConfigLabels { + mode: ModeLabel, + #[metrics(unit = Unit::Seconds)] + delay_interval: DurationAsSecs, + max_l1_batches_per_iter: usize, + multi_get_chunk_size: usize, + #[metrics(unit = Unit::Bytes)] + block_cache_capacity: usize, + #[metrics(unit = Unit::Bytes)] + memtable_capacity: usize, + #[metrics(unit = Unit::Seconds)] + stalled_writes_timeout: DurationAsSecs, +} + +impl ConfigLabels { + pub fn new(config: &MetadataCalculatorConfig) -> Self { + Self { + mode: config.mode.into(), + delay_interval: config.delay_interval.into(), + max_l1_batches_per_iter: config.max_l1_batches_per_iter, + multi_get_chunk_size: config.multi_get_chunk_size, + block_cache_capacity: config.block_cache_capacity, + memtable_capacity: config.memtable_capacity, + stalled_writes_timeout: config.stalled_writes_timeout.into(), + } + } +} #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "stage", rename_all = "snake_case")] @@ -80,6 +126,8 @@ const LATENCIES_PER_LOG: Buckets = Buckets::values(&[ #[derive(Debug, Metrics)] #[metrics(prefix = "server_metadata_calculator")] pub(super) struct MetadataCalculatorMetrics { + /// Merkle tree configuration. + pub info: Info, /// Lag between the number of L1 batches processed in the Merkle tree and stored in Postgres. /// The lag can only be positive if Postgres was restored from a backup truncating some /// of the batches already processed by the tree. diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 6abea0c6a88..7e541a94e85 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -21,6 +21,7 @@ pub use self::helpers::LazyAsyncTreeReader; pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; use self::{ helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth}, + metrics::{ConfigLabels, METRICS}, updater::TreeUpdater, }; @@ -97,6 +98,14 @@ impl MetadataCalculator { config: MetadataCalculatorConfig, object_store: Option>, ) -> anyhow::Result { + if let Err(err) = METRICS.info.set(ConfigLabels::new(&config)) { + tracing::warn!( + "Cannot set config {:?}; it's already set to {:?}", + err.into_inner(), + METRICS.info.get() + ); + } + anyhow::ensure!( config.max_l1_batches_per_iter > 0, "Maximum L1 batches per iteration is misconfigured to be 0; please update it to positive value" diff --git a/core/node/node_framework/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs index 5d51c294aad..93a2856eb43 100644 --- a/core/node/node_framework/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -51,6 +51,7 @@ use zksync_node_framework::{ StateKeeperLayer, }, web3_api::{ + caches::MempoolCacheLayer, server::{Web3ServerLayer, Web3ServerOptionalConfig}, tree_api_client::TreeApiClientLayer, tx_sender::{PostgresStorageCachesConfig, TxSenderLayer}, @@ -215,6 +216,15 @@ impl MainNodeBuilder { Ok(self) } + fn add_api_caches_layer(mut self) -> anyhow::Result { + let rpc_config = ApiConfig::from_env()?.web3_json_rpc; + self.node.add_layer(MempoolCacheLayer::new( + rpc_config.mempool_cache_size(), + rpc_config.mempool_cache_update_interval(), + )); + Ok(self) + } + fn add_tree_api_client_layer(mut self) -> anyhow::Result { let rpc_config = ApiConfig::from_env()?.web3_json_rpc; self.node @@ -407,6 +417,7 @@ fn main() -> anyhow::Result<()> { .add_healthcheck_layer()? .add_tx_sender_layer()? .add_tree_api_client_layer()? + .add_api_caches_layer()? .add_http_web3_api_layer()? .add_ws_web3_api_layer()? .add_house_keeper_layer()? diff --git a/core/node/node_framework/src/implementations/layers/web3_api/caches.rs b/core/node/node_framework/src/implementations/layers/web3_api/caches.rs new file mode 100644 index 00000000000..5ca4d2b9c69 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/web3_api/caches.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use zksync_core::api_server::web3::mempool_cache::{self, MempoolCache}; + +use crate::{ + implementations::resources::{pools::ReplicaPoolResource, web3_api::MempoolCacheResource}, + service::{ServiceContext, StopReceiver}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct MempoolCacheLayer { + capacity: usize, + update_interval: Duration, +} + +impl MempoolCacheLayer { + pub fn new(capacity: usize, update_interval: Duration) -> Self { + Self { + capacity, + update_interval, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for MempoolCacheLayer { + fn layer_name(&self) -> &'static str { + "mempool_cache_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let pool_resource = context.get_resource::().await?; + let replica_pool = pool_resource.get().await?; + let mempool_cache = MempoolCache::new(self.capacity); + let update_task = mempool_cache.update_task(replica_pool, self.update_interval); + context.add_task(Box::new(MempoolCacheUpdateTask(update_task))); + context.insert_resource(MempoolCacheResource(mempool_cache))?; + Ok(()) + } +} + +#[derive(Debug)] +pub struct MempoolCacheUpdateTask(mempool_cache::MempoolCacheUpdateTask); + +#[async_trait::async_trait] +impl Task for MempoolCacheUpdateTask { + fn name(&self) -> &'static str { + "mempool_cache_update_task" + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.0.run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/web3_api/mod.rs b/core/node/node_framework/src/implementations/layers/web3_api/mod.rs index 2f872d8e298..48c9eb744ab 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/mod.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/mod.rs @@ -1,3 +1,4 @@ +pub mod caches; pub mod server; pub mod tree_api_client; pub mod tx_sender; diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server.rs b/core/node/node_framework/src/implementations/layers/web3_api/server.rs index 8a5739e2662..a68fa6bee60 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/server.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/server.rs @@ -10,7 +10,7 @@ use crate::{ healthcheck::AppHealthCheckResource, pools::ReplicaPoolResource, sync_state::SyncStateResource, - web3_api::{TreeApiClientResource, TxSenderResource}, + web3_api::{MempoolCacheResource, TreeApiClientResource, TxSenderResource}, }, service::{ServiceContext, StopReceiver}, task::Task, @@ -124,12 +124,14 @@ impl WiringLayer for Web3ServerLayer { Err(WiringError::ResourceLacking { .. }) => None, Err(err) => return Err(err), }; + let MempoolCacheResource(mempool_cache) = context.get_resource().await?; // Build server. let mut api_builder = ApiBuilder::jsonrpsee_backend(self.internal_api_config, replica_pool.clone()) .with_updaters_pool(updaters_pool) - .with_tx_sender(tx_sender); + .with_tx_sender(tx_sender) + .with_mempool_cache(mempool_cache); if let Some(client) = tree_api_client { api_builder = api_builder.with_tree_api(client); } diff --git a/core/node/node_framework/src/implementations/resources/web3_api.rs b/core/node/node_framework/src/implementations/resources/web3_api.rs index 7f7dedc2500..68d343b9b0c 100644 --- a/core/node/node_framework/src/implementations/resources/web3_api.rs +++ b/core/node/node_framework/src/implementations/resources/web3_api.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use zksync_core::api_server::{ tree::TreeApiClient, tx_sender::{tx_sink::TxSink, TxSender}, + web3::mempool_cache::MempoolCache, }; use crate::resource::Resource; @@ -33,3 +34,12 @@ impl Resource for TreeApiClientResource { "api/tree_api_client".into() } } + +#[derive(Debug, Clone)] +pub struct MempoolCacheResource(pub MempoolCache); + +impl Resource for MempoolCacheResource { + fn name() -> String { + "api/mempool_cache".into() + } +}