From 58576d106179177f808018514a1606941bf24b30 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 8 Mar 2024 14:53:40 +0200 Subject: [PATCH] feat(en): Enable Merkle tree client on EN (#1386) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Enables Merkle tree client on EN so that `zks_getProof` can work properly. ## Why ❔ This endpoint won't produce overhead and could be useful for some EN operators. Also, it allows checking Merkle tree consistency after snapshot recovery more easily. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. - [x] Linkcheck has been run via `zk linkcheck`. --- core/bin/external_node/src/main.rs | 3 + core/lib/config/src/configs/api.rs | 4 +- core/lib/health_check/src/lib.rs | 11 ++ core/lib/merkle_tree/src/errors.rs | 10 +- core/lib/web3_decl/src/namespaces/zks.rs | 2 +- .../zksync_core/src/api_server/tree/mod.rs | 160 ++++++++++++++---- .../zksync_core/src/api_server/tree/tests.rs | 50 ++++-- .../web3/backend_jsonrpsee/namespaces/zks.rs | 2 +- .../zksync_core/src/api_server/web3/mod.rs | 14 +- .../src/api_server/web3/namespaces/zks.rs | 32 ++-- .../zksync_core/src/api_server/web3/state.rs | 4 +- core/lib/zksync_core/src/lib.rs | 56 ++++-- .../src/metadata_calculator/helpers.rs | 28 ++- .../src/metadata_calculator/mod.rs | 17 +- .../src/metadata_calculator/recovery/tests.rs | 4 +- .../node/node_framework/examples/main_node.rs | 11 +- .../implementations/layers/web3_api/mod.rs | 1 + .../implementations/layers/web3_api/server.rs | 20 ++- .../layers/web3_api/tree_api_client.rs | 39 +++++ .../src/implementations/resources/web3_api.rs | 14 +- 20 files changed, 361 insertions(+), 121 deletions(-) create mode 100644 core/node/node_framework/src/implementations/layers/web3_api/tree_api_client.rs diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 09637549f2e..492e6fb7783 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -266,6 +266,7 @@ async fn init_tasks( .build() .await .context("failed to build a tree_pool")?; + let tree_reader = Arc::new(metadata_calculator.tree_reader()); let tree_handle = task::spawn(metadata_calculator.run(tree_pool, tree_stop_receiver)); let commitment_generator_pool = singleton_pool_builder @@ -344,6 +345,7 @@ async fn init_tasks( .with_tx_sender(tx_sender.clone()) .with_vm_barrier(vm_barrier.clone()) .with_sync_state(sync_state.clone()) + .with_tree_api(tree_reader.clone()) .enable_api_namespaces(config.optional.api_namespaces()) .build() .context("failed to build HTTP JSON-RPC server")? @@ -362,6 +364,7 @@ async fn init_tasks( .with_tx_sender(tx_sender) .with_vm_barrier(vm_barrier) .with_sync_state(sync_state) + .with_tree_api(tree_reader) .enable_api_namespaces(config.optional.api_namespaces()) .build() .context("failed to build WS JSON-RPC server")? diff --git a/core/lib/config/src/configs/api.rs b/core/lib/config/src/configs/api.rs index e5129cb2d5e..29fc01a2a9e 100644 --- a/core/lib/config/src/configs/api.rs +++ b/core/lib/config/src/configs/api.rs @@ -207,8 +207,8 @@ impl Web3JsonRpcConfig { .unwrap_or(NonZeroU32::new(6000).unwrap()) } - pub fn tree_api_url(&self) -> Option { - self.tree_api_url.clone() + pub fn tree_api_url(&self) -> Option<&str> { + self.tree_api_url.as_deref() } } diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index d839deeda44..228d40ab995 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -271,6 +271,17 @@ impl fmt::Debug for dyn CheckHealth { } } +#[async_trait] +impl CheckHealth for Arc { + fn name(&self) -> &'static str { + (**self).name() + } + + async fn check_health(&self) -> Health { + (**self).check_health().await + } +} + /// Basic implementation of [`CheckHealth`] trait that can be updated using a matching [`HealthUpdater`]. #[derive(Debug, Clone)] pub struct ReactiveHealthCheck { diff --git a/core/lib/merkle_tree/src/errors.rs b/core/lib/merkle_tree/src/errors.rs index 4afe8a2367c..b8130717f93 100644 --- a/core/lib/merkle_tree/src/errors.rs +++ b/core/lib/merkle_tree/src/errors.rs @@ -138,8 +138,10 @@ impl error::Error for DeserializeError {} /// Error accessing a specific tree version. #[derive(Debug)] pub struct NoVersionError { - pub(crate) missing_version: u64, - pub(crate) version_count: u64, + /// Missing requested version of the tree. + pub missing_version: u64, + /// Current number of versions in the tree. + pub version_count: u64, } impl fmt::Display for NoVersionError { @@ -151,12 +153,12 @@ impl fmt::Display for NoVersionError { if missing_version >= version_count { write!( formatter, - "Version {missing_version} does not exist in Merkle tree; it has {version_count} versions" + "version {missing_version} does not exist in Merkle tree; it has {version_count} versions" ) } else { write!( formatter, - "Version {missing_version} was pruned from Merkle tree" + "version {missing_version} was pruned from Merkle tree" ) } } diff --git a/core/lib/web3_decl/src/namespaces/zks.rs b/core/lib/web3_decl/src/namespaces/zks.rs index d6282705623..3086d2ba142 100644 --- a/core/lib/web3_decl/src/namespaces/zks.rs +++ b/core/lib/web3_decl/src/namespaces/zks.rs @@ -117,5 +117,5 @@ pub trait ZksNamespace { address: Address, keys: Vec, l1_batch_number: L1BatchNumber, - ) -> RpcResult; + ) -> RpcResult>; } diff --git a/core/lib/zksync_core/src/api_server/tree/mod.rs b/core/lib/zksync_core/src/api_server/tree/mod.rs index a6b6d51fbaa..bb6f2528672 100644 --- a/core/lib/zksync_core/src/api_server/tree/mod.rs +++ b/core/lib/zksync_core/src/api_server/tree/mod.rs @@ -12,11 +12,12 @@ use axum::{ }; use serde::{Deserialize, Serialize}; use tokio::sync::watch; +use zksync_health_check::{CheckHealth, Health, HealthStatus}; use zksync_merkle_tree::NoVersionError; use zksync_types::{L1BatchNumber, H256, U256}; use self::metrics::{MerkleTreeApiMethod, API_METRICS}; -use crate::metadata_calculator::{AsyncTreeReader, MerkleTreeInfo}; +use crate::metadata_calculator::{AsyncTreeReader, LazyAsyncTreeReader, MerkleTreeInfo}; mod metrics; #[cfg(test)] @@ -34,7 +35,7 @@ struct TreeProofsResponse { } #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct TreeEntryWithProof { +pub struct TreeEntryWithProof { #[serde(default, skip_serializing_if = "H256::is_zero")] pub value: H256, #[serde(default, skip_serializing_if = "TreeEntryWithProof::is_zero")] @@ -60,59 +61,117 @@ impl TreeEntryWithProof { } } +/// Server-side tree API error. #[derive(Debug)] -enum TreeApiError { +enum TreeApiServerError { NoTreeVersion(NoVersionError), } -impl IntoResponse for TreeApiError { +// Contains the same fields as `NoVersionError` and is serializable. +#[derive(Debug, Serialize, Deserialize)] +struct NoVersionErrorData { + missing_version: u64, + version_count: u64, +} + +impl From for NoVersionErrorData { + fn from(err: NoVersionError) -> Self { + Self { + missing_version: err.missing_version, + version_count: err.version_count, + } + } +} + +impl From for NoVersionError { + fn from(data: NoVersionErrorData) -> Self { + Self { + missing_version: data.missing_version, + version_count: data.version_count, + } + } +} + +// Loosely conforms to HTTP Problem Details RFC: +#[derive(Debug, Serialize)] +struct Problem { + r#type: &'static str, + title: &'static str, + detail: String, + #[serde(flatten)] + data: T, +} + +const PROBLEM_CONTENT_TYPE: &str = "application/problem+json"; + +impl IntoResponse for TreeApiServerError { fn into_response(self) -> Response { - let (status, title, detail) = match self { + let headers = [(header::CONTENT_TYPE, PROBLEM_CONTENT_TYPE)]; + match self { Self::NoTreeVersion(err) => { - (StatusCode::NOT_FOUND, "L1 batch not found", err.to_string()) + let body = Problem { + r#type: "/errors#l1-batch-not-found", + title: "L1 batch not found", + detail: err.to_string(), + data: NoVersionErrorData::from(err), + }; + (StatusCode::NOT_FOUND, headers, Json(body)).into_response() } - }; - - // Loosely conforms to HTTP Problem Details RFC: - let body = serde_json::json!({ - "type": "/errors#l1-batch-not-found", - "title": title, - "detail": detail, - }); - let headers = [(header::CONTENT_TYPE, "application/problem+json")]; - (status, headers, Json(body)).into_response() + } } } +/// Client-side tree API error used by [`TreeApiClient`]. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum TreeApiError { + #[error(transparent)] + NoVersion(NoVersionError), + #[error("tree API is temporarily not available because the Merkle tree isn't initialized; repeat request later")] + NotReady, + /// Catch-all variant for internal errors. + #[error("internal error")] + Internal(#[from] anyhow::Error), +} + /// Client accessing Merkle tree API. #[async_trait] -pub(crate) trait TreeApiClient { +pub trait TreeApiClient: 'static + Send + Sync + fmt::Debug { /// Obtains general information about the tree. - async fn get_info(&self) -> anyhow::Result; + async fn get_info(&self) -> Result; /// Obtains proofs for the specified `hashed_keys` at the specified tree version (= L1 batch number). async fn get_proofs( &self, l1_batch_number: L1BatchNumber, hashed_keys: Vec, - ) -> anyhow::Result>; + ) -> Result, TreeApiError>; } /// In-memory client implementation. #[async_trait] -impl TreeApiClient for AsyncTreeReader { - async fn get_info(&self) -> anyhow::Result { - Ok(self.clone().info().await) +impl TreeApiClient for LazyAsyncTreeReader { + async fn get_info(&self) -> Result { + if let Some(reader) = self.read() { + Ok(reader.info().await) + } else { + Err(TreeApiError::NotReady) + } } async fn get_proofs( &self, l1_batch_number: L1BatchNumber, hashed_keys: Vec, - ) -> anyhow::Result> { - self.get_proofs_inner(l1_batch_number, hashed_keys) - .await - .map_err(Into::into) + ) -> Result, TreeApiError> { + if let Some(reader) = self.read() { + reader + .get_proofs_inner(l1_batch_number, hashed_keys) + .await + .map_err(TreeApiError::NoVersion) + } else { + Err(TreeApiError::NotReady) + } } } @@ -134,9 +193,26 @@ impl TreeApiHttpClient { } } +#[async_trait] +impl CheckHealth for TreeApiHttpClient { + fn name(&self) -> &'static str { + "tree_api_http_client" + } + + async fn check_health(&self) -> Health { + match self.get_info().await { + Ok(info) => Health::from(HealthStatus::Ready).with_details(info), + Err(TreeApiError::NotReady) => HealthStatus::Affected.into(), + Err(err) => Health::from(HealthStatus::NotReady).with_details(serde_json::json!({ + "error": err.to_string(), + })), + } + } +} + #[async_trait] impl TreeApiClient for TreeApiHttpClient { - async fn get_info(&self) -> anyhow::Result { + async fn get_info(&self) -> Result { let response = self .inner .get(&self.info_url) @@ -146,17 +222,17 @@ impl TreeApiClient for TreeApiHttpClient { let response = response .error_for_status() .context("Requesting tree info returned non-OK response")?; - response + Ok(response .json() .await - .context("Failed deserializing tree info") + .context("Failed deserializing tree info")?) } async fn get_proofs( &self, l1_batch_number: L1BatchNumber, hashed_keys: Vec, - ) -> anyhow::Result> { + ) -> Result, TreeApiError> { let response = self .inner .post(&self.proofs_url) @@ -166,12 +242,26 @@ impl TreeApiClient for TreeApiHttpClient { }) .send() .await - .with_context(|| format!("Failed requesting proofs for L1 batch #{l1_batch_number}"))?; + .with_context(|| format!("failed requesting proofs for L1 batch #{l1_batch_number}"))?; + + let is_problem = response + .headers() + .get(header::CONTENT_TYPE) + .map_or(false, |header| *header == PROBLEM_CONTENT_TYPE); + if response.status() == StatusCode::NOT_FOUND && is_problem { + // Try to parse `NoVersionError` from the response body. + let problem_data: NoVersionErrorData = response + .json() + .await + .context("failed parsing error response")?; + return Err(TreeApiError::NoVersion(problem_data.into())); + } + let response = response.error_for_status().with_context(|| { - format!("Requesting proofs for L1 batch #{l1_batch_number} returned non-OK response") + format!("requesting proofs for L1 batch #{l1_batch_number} returned non-OK response") })?; let response: TreeProofsResponse = response.json().await.with_context(|| { - format!("Failed deserializing proofs for L1 batch #{l1_batch_number}") + format!("failed deserializing proofs for L1 batch #{l1_batch_number}") })?; Ok(response.entries) } @@ -200,12 +290,12 @@ impl AsyncTreeReader { async fn get_proofs_handler( State(this): State, Json(request): Json, - ) -> Result, TreeApiError> { + ) -> Result, TreeApiServerError> { let latency = API_METRICS.latency[&MerkleTreeApiMethod::GetProofs].start(); let entries = this .get_proofs_inner(request.l1_batch_number, request.hashed_keys) .await - .map_err(TreeApiError::NoTreeVersion)?; + .map_err(TreeApiServerError::NoTreeVersion)?; let response = TreeProofsResponse { entries }; latency.observe(); Ok(Json(response)) diff --git a/core/lib/zksync_core/src/api_server/tree/tests.rs b/core/lib/zksync_core/src/api_server/tree/tests.rs index d934aaab476..bf1a69bb33a 100644 --- a/core/lib/zksync_core/src/api_server/tree/tests.rs +++ b/core/lib/zksync_core/src/api_server/tree/tests.rs @@ -2,6 +2,7 @@ use std::net::Ipv4Addr; +use assert_matches::assert_matches; use tempfile::TempDir; use zksync_dal::ConnectionPool; @@ -23,6 +24,7 @@ async fn merkle_tree_api() { let (stop_sender, stop_receiver) = watch::channel(false); let api_server = tree_reader + .wait() .await .create_api_server(&api_addr, stop_receiver.clone()) .unwrap(); @@ -60,21 +62,43 @@ async fn merkle_tree_api() { .get_proofs(L1BatchNumber(10), vec![]) .await .unwrap_err(); - let err = format!("{err:?}"); - // Check that the error message contains all necessary info to troubleshoot it. - assert!( - err.contains("Requesting proofs for L1 batch #10 returned non-OK response"), - "{}", - err - ); - assert!(err.contains("404 Not Found"), "{}", err); - assert!( - err.contains(&format!("http://{local_addr}/proofs")), - "{}", - err - ); + let TreeApiError::NoVersion(err) = err else { + panic!("Unexpected error: {err:?}"); + }; + assert_eq!(err.version_count, 6); + assert_eq!(err.missing_version, 10); // Stop the calculator and the tree API server. stop_sender.send_replace(true); api_server_task.await.unwrap().unwrap(); } + +#[tokio::test] +async fn local_merkle_tree_client() { + let pool = ConnectionPool::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await; + + reset_db_state(&pool, 5).await; + let tree_reader = calculator.tree_reader(); + + let err = tree_reader.get_info().await.unwrap_err(); + assert_matches!(err, TreeApiError::NotReady); + + // Wait until the calculator processes initial L1 batches. + run_calculator(calculator, pool).await; + + let tree_info = tree_reader.get_info().await.unwrap(); + assert!(tree_info.leaf_count > 20); + assert_eq!(tree_info.next_l1_batch_number, L1BatchNumber(6)); + + let err = tree_reader + .get_proofs(L1BatchNumber(10), vec![]) + .await + .unwrap_err(); + let TreeApiError::NoVersion(err) = err else { + panic!("Unexpected error: {err:?}"); + }; + assert_eq!(err.version_count, 6); + assert_eq!(err.missing_version, 10); +} diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/namespaces/zks.rs index 90c95e5c5b0..580450288bd 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/namespaces/zks.rs @@ -162,7 +162,7 @@ impl ZksNamespaceServer for ZksNamespace { address: Address, keys: Vec, l1_batch_number: L1BatchNumber, - ) -> RpcResult { + ) -> RpcResult> { self.get_proofs_impl(address, keys, l1_batch_number) .await .map_err(|err| self.current_method().map_err(err)) diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index c248a8101ae..fb0fc05fb49 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -37,7 +37,7 @@ use self::{ use crate::{ api_server::{ execution_sandbox::{BlockStartInfo, VmConcurrencyBarrier}, - tree::TreeApiHttpClient, + tree::TreeApiClient, tx_sender::TxSender, }, sync_layer::SyncState, @@ -115,7 +115,7 @@ struct OptionalApiParams { batch_request_size_limit: Option, response_body_size_limit: Option, websocket_requests_per_minute_limit: Option, - tree_api_url: Option, + tree_api: Option>, pub_sub_events_sender: Option>, } @@ -241,8 +241,9 @@ impl ApiBuilder { self } - pub fn with_tree_api(mut self, tree_api_url: Option) -> Self { - self.optional.tree_api_url = tree_api_url; + pub fn with_tree_api(mut self, tree_api: Arc) -> Self { + tracing::info!("Using tree API client: {tree_api:?}"); + self.optional.tree_api = Some(tree_api); self } @@ -318,10 +319,7 @@ impl ApiServer { api_config: self.config, start_info, last_sealed_miniblock, - tree_api: self - .optional - .tree_api_url - .map(|url| TreeApiHttpClient::new(url.as_str())), + tree_api: self.optional.tree_api, }) } diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs index 2dd6f9e0f92..c662b78e240 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs @@ -27,7 +27,7 @@ use zksync_web3_decl::{ }; use crate::api_server::{ - tree::TreeApiClient, + tree::TreeApiError, web3::{backend_jsonrpsee::MethodTracer, RpcState}, }; @@ -499,20 +499,32 @@ impl ZksNamespace { address: Address, keys: Vec, l1_batch_number: L1BatchNumber, - ) -> Result { + ) -> Result, Web3Error> { self.state.start_info.ensure_not_pruned(l1_batch_number)?; let hashed_keys = keys .iter() .map(|key| StorageKey::new(AccountTreeId::new(address), *key).hashed_key_u256()) .collect(); - let proofs = self + let tree_api = self .state .tree_api - .as_ref() - .ok_or(Web3Error::TreeApiUnavailable)? - .get_proofs(l1_batch_number, hashed_keys) - .await - .context("get_proofs")?; + .as_deref() + .ok_or(Web3Error::TreeApiUnavailable)?; + let proofs_result = tree_api.get_proofs(l1_batch_number, hashed_keys).await; + let proofs = match proofs_result { + Ok(proofs) => proofs, + Err(TreeApiError::NotReady) => return Err(Web3Error::TreeApiUnavailable), + Err(TreeApiError::NoVersion(err)) => { + return if err.missing_version > err.version_count { + Ok(None) + } else { + Err(Web3Error::InternalError(anyhow::anyhow!( + "L1 batch #{l1_batch_number} is pruned in Merkle tree, but not in Postgres" + ))) + }; + } + Err(TreeApiError::Internal(err)) => return Err(Web3Error::InternalError(err)), + }; let storage_proof = proofs .into_iter() @@ -525,9 +537,9 @@ impl ZksNamespace { }) .collect(); - Ok(Proof { + Ok(Some(Proof { address, storage_proof, - }) + })) } } diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 270f1cbf5ed..808f4049f5f 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -27,7 +27,7 @@ use super::{ use crate::{ api_server::{ execution_sandbox::{BlockArgs, BlockArgsError, BlockStartInfo}, - tree::TreeApiHttpClient, + tree::TreeApiClient, tx_sender::{tx_sink::TxSink, TxSender}, }, sync_layer::SyncState, @@ -203,7 +203,7 @@ pub(crate) struct RpcState { pub(super) current_method: Arc, pub(super) installed_filters: Option>>, pub(super) connection_pool: ConnectionPool, - pub(super) tree_api: Option, + pub(super) tree_api: Option>, pub(super) tx_sender: TxSender, pub(super) sync_state: Option, pub(super) api_config: InternalApiConfig, diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index da97c06a1c1..86d9df66010 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -54,9 +54,9 @@ use crate::{ contract_verification, execution_sandbox::{VmConcurrencyBarrier, VmConcurrencyLimiter}, healthcheck::HealthCheckHandle, + tree::TreeApiHttpClient, tx_sender::{ApiContracts, TxSender, TxSenderBuilder, TxSenderConfig}, - web3, - web3::{state::InternalApiConfig, ApiServerHandles, Namespace}, + web3::{self, state::InternalApiConfig, Namespace}, }, basic_witness_input_producer::BasicWitnessInputProducer, commitment_generator::CommitmentGenerator, @@ -450,7 +450,9 @@ pub async fn initialize_components( bounded_gas_adjuster, FeeModelConfig::from_state_keeper_config(&state_keeper_config), )); - let server_handles = run_http_api( + run_http_api( + &mut task_futures, + &app_health, &postgres_config, &tx_sender_config, &state_keeper_config, @@ -466,8 +468,6 @@ pub async fn initialize_components( .await .context("run_http_api")?; - task_futures.extend(server_handles.tasks); - app_health.insert_component(server_handles.health_check); let elapsed = started_at.elapsed(); APP_METRICS.init_latency[&InitStage::HttpApi].set(elapsed); tracing::info!( @@ -493,7 +493,9 @@ pub async fn initialize_components( bounded_gas_adjuster, FeeModelConfig::from_state_keeper_config(&state_keeper_config), )); - let server_handles = run_ws_api( + run_ws_api( + &mut task_futures, + &app_health, &postgres_config, &tx_sender_config, &state_keeper_config, @@ -508,8 +510,6 @@ pub async fn initialize_components( .await .context("run_ws_api")?; - task_futures.extend(server_handles.tasks); - app_health.insert_component(server_handles.health_check); let elapsed = started_at.elapsed(); APP_METRICS.init_latency[&InitStage::WsApi].set(elapsed); tracing::info!( @@ -948,6 +948,7 @@ async fn run_tree( let stop_receiver = stop_receiver.clone(); task_futures.push(tokio::spawn(async move { tree_reader + .wait() .await .run_api_server(address, stop_receiver) .await @@ -1172,6 +1173,8 @@ async fn build_tx_sender( #[allow(clippy::too_many_arguments)] async fn run_http_api( + task_futures: &mut Vec>>, + app_health: &AppHealthCheck, postgres_config: &PostgresConfig, tx_sender_config: &TxSenderConfig, state_keeper_config: &StateKeeperConfig, @@ -1183,7 +1186,7 @@ async fn run_http_api( batch_fee_model_input_provider: Arc, with_debug_namespace: bool, storage_caches: PostgresStorageCaches, -) -> anyhow::Result { +) -> anyhow::Result<()> { let (tx_sender, vm_barrier) = build_tx_sender( tx_sender_config, &api_config.web3_json_rpc, @@ -1206,26 +1209,36 @@ async fn run_http_api( .await .context("failed to build last_miniblock_pool")?; - let api_builder = + let mut api_builder = web3::ApiBuilder::jsonrpsee_backend(internal_api.clone(), replica_connection_pool) .http(api_config.web3_json_rpc.http_port) .with_updaters_pool(updaters_pool) .with_filter_limit(api_config.web3_json_rpc.filters_limit()) - .with_tree_api(api_config.web3_json_rpc.tree_api_url()) .with_batch_request_size_limit(api_config.web3_json_rpc.max_batch_request_size()) .with_response_body_size_limit(api_config.web3_json_rpc.max_response_body_size()) .with_tx_sender(tx_sender) .with_vm_barrier(vm_barrier) .enable_api_namespaces(namespaces); - api_builder + if let Some(tree_api_url) = api_config.web3_json_rpc.tree_api_url() { + let tree_api = Arc::new(TreeApiHttpClient::new(tree_api_url)); + api_builder = api_builder.with_tree_api(tree_api.clone()); + app_health.insert_custom_component(tree_api); + } + + let server_handles = api_builder .build() .context("failed to build HTTP API server")? .run(stop_receiver) - .await + .await?; + task_futures.extend(server_handles.tasks); + app_health.insert_component(server_handles.health_check); + Ok(()) } #[allow(clippy::too_many_arguments)] async fn run_ws_api( + task_futures: &mut Vec>>, + app_health: &AppHealthCheck, postgres_config: &PostgresConfig, tx_sender_config: &TxSenderConfig, state_keeper_config: &StateKeeperConfig, @@ -1236,7 +1249,7 @@ async fn run_ws_api( replica_connection_pool: ConnectionPool, stop_receiver: watch::Receiver, storage_caches: PostgresStorageCaches, -) -> anyhow::Result { +) -> anyhow::Result<()> { let (tx_sender, vm_barrier) = build_tx_sender( tx_sender_config, &api_config.web3_json_rpc, @@ -1255,7 +1268,7 @@ async fn run_ws_api( let mut namespaces = Namespace::DEFAULT.to_vec(); namespaces.push(Namespace::Snapshots); - let api_builder = + let mut api_builder = web3::ApiBuilder::jsonrpsee_backend(internal_api.clone(), replica_connection_pool) .ws(api_config.web3_json_rpc.ws_port) .with_updaters_pool(last_miniblock_pool) @@ -1269,16 +1282,23 @@ async fn run_ws_api( .websocket_requests_per_minute_limit(), ) .with_polling_interval(api_config.web3_json_rpc.pubsub_interval()) - .with_tree_api(api_config.web3_json_rpc.tree_api_url()) .with_tx_sender(tx_sender) .with_vm_barrier(vm_barrier) .enable_api_namespaces(namespaces); + if let Some(tree_api_url) = api_config.web3_json_rpc.tree_api_url() { + let tree_api = Arc::new(TreeApiHttpClient::new(tree_api_url)); + api_builder = api_builder.with_tree_api(tree_api.clone()); + app_health.insert_custom_component(tree_api); + } - api_builder + let server_handles = api_builder .build() .context("failed to build WS API server")? .run(stop_receiver) - .await + .await?; + task_futures.extend(server_handles.tasks); + app_health.insert_component(server_handles.health_check); + Ok(()) } async fn circuit_breakers_for_components( diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 1f51746bd99..c3c6a8e324d 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -2,6 +2,7 @@ use std::{ collections::BTreeMap, + future, future::Future, path::{Path, PathBuf}, time::Duration, @@ -11,6 +12,7 @@ use anyhow::Context as _; use serde::{Deserialize, Serialize}; #[cfg(test)] use tokio::sync::mpsc; +use tokio::sync::watch; use zksync_config::configs::database::MerkleTreeMode; use zksync_dal::StorageProcessor; use zksync_health_check::{Health, HealthStatus}; @@ -26,7 +28,7 @@ use super::metrics::{LoadChangesStage, TreeUpdateStage, METRICS}; /// General information about the Merkle tree. #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct MerkleTreeInfo { +pub struct MerkleTreeInfo { pub mode: MerkleTreeMode, pub root_hash: H256, pub next_l1_batch_number: L1BatchNumber, @@ -230,6 +232,30 @@ impl AsyncTreeReader { } } +/// Lazily initialized [`AsyncTreeReader`]. +#[derive(Debug)] +pub struct LazyAsyncTreeReader(pub(super) watch::Receiver>); + +impl LazyAsyncTreeReader { + /// Returns a reader if it is initialized. + pub(crate) fn read(&self) -> Option { + self.0.borrow().clone() + } + + /// Waits until the tree is initialized and returns a reader for it. + pub(crate) async fn wait(mut self) -> AsyncTreeReader { + loop { + if let Some(reader) = self.0.borrow().clone() { + break reader; + } + if self.0.changed().await.is_err() { + tracing::info!("Tree dropped without getting ready; not resolving tree reader"); + future::pending::<()>().await; + } + } + } +} + /// Async wrapper for [`MerkleTreeRecovery`]. #[derive(Debug, Default)] pub(super) struct AsyncTreeRecovery { diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 3226b20d872..7ea8e2078c2 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -2,7 +2,6 @@ //! stores them in the DB. use std::{ - future::{self, Future}, sync::Arc, time::{Duration, Instant}, }; @@ -17,6 +16,7 @@ use zksync_dal::ConnectionPool; use zksync_health_check::{HealthUpdater, ReactiveHealthCheck}; use zksync_object_store::ObjectStore; +pub use self::helpers::LazyAsyncTreeReader; pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; use self::{ helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth}, @@ -109,19 +109,8 @@ impl MetadataCalculator { } /// Returns a reference to the tree reader. - pub(crate) fn tree_reader(&self) -> impl Future { - let mut receiver = self.tree_reader.subscribe(); - async move { - loop { - if let Some(reader) = receiver.borrow().clone() { - break reader; - } - if receiver.changed().await.is_err() { - tracing::info!("Tree dropped without getting ready; not resolving tree reader"); - future::pending::<()>().await; - } - } - } + pub fn tree_reader(&self) -> LazyAsyncTreeReader { + LazyAsyncTreeReader(self.tree_reader.subscribe()) } async fn create_tree(&self) -> anyhow::Result { diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs index 78d8bb65f4e..af6acfad4c9 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs @@ -272,7 +272,7 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { match case { // Wait until the tree is fully initialized and stop the calculator. RecoveryWorkflowCase::Stop => { - let tree_info = tree_reader.await.info().await; + let tree_info = tree_reader.wait().await.info().await; assert_eq!(tree_info.root_hash, snapshot_recovery.l1_batch_root_hash); assert_eq!(tree_info.leaf_count, 200); assert_eq!( @@ -283,7 +283,7 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { // Emulate state keeper adding a new L1 batch to Postgres. RecoveryWorkflowCase::CreateBatch => { - tree_reader.await; + tree_reader.wait().await; let mut storage = storage.start_transaction().await.unwrap(); let mut new_logs = gen_storage_logs(500..600, 1).pop().unwrap(); diff --git a/core/node/node_framework/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs index c2b34a2d98f..1ba1b79c01c 100644 --- a/core/node/node_framework/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -35,6 +35,7 @@ use zksync_node_framework::{ }, web3_api::{ server::{Web3ServerLayer, Web3ServerOptionalConfig}, + tree_api_client::TreeApiClientLayer, tx_sender::{PostgresStorageCachesConfig, TxSenderLayer}, tx_sink::TxSinkLayer, }, @@ -167,6 +168,13 @@ impl MainNodeBuilder { Ok(self) } + fn add_tree_api_client_layer(mut self) -> anyhow::Result { + let rpc_config = ApiConfig::from_env()?.web3_json_rpc; + self.node + .add_layer(TreeApiClientLayer::http(rpc_config.tree_api_url)); + Ok(self) + } + fn add_http_web3_api_layer(mut self) -> anyhow::Result { let rpc_config = ApiConfig::from_env()?.web3_json_rpc; let contracts_config = ContractsConfig::from_env()?; @@ -186,7 +194,6 @@ impl MainNodeBuilder { subscriptions_limit: Some(rpc_config.subscriptions_limit()), batch_request_size_limit: Some(rpc_config.max_batch_request_size()), response_body_size_limit: Some(rpc_config.max_response_body_size()), - tree_api_url: rpc_config.tree_api_url(), ..Default::default() }; self.node.add_layer(Web3ServerLayer::http( @@ -220,7 +227,6 @@ impl MainNodeBuilder { websocket_requests_per_minute_limit: Some( rpc_config.websocket_requests_per_minute_limit(), ), - tree_api_url: rpc_config.tree_api_url(), }; self.node.add_layer(Web3ServerLayer::ws( rpc_config.ws_port, @@ -258,6 +264,7 @@ fn main() -> anyhow::Result<()> { .add_proof_data_handler_layer()? .add_healthcheck_layer()? .add_tx_sender_layer()? + .add_tree_api_client_layer()? .add_http_web3_api_layer()? .add_ws_web3_api_layer()? .build() diff --git a/core/node/node_framework/src/implementations/layers/web3_api/mod.rs b/core/node/node_framework/src/implementations/layers/web3_api/mod.rs index 6af795c72ae..2f872d8e298 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/mod.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/mod.rs @@ -1,3 +1,4 @@ pub mod server; +pub mod tree_api_client; pub mod tx_sender; pub mod tx_sink; diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server.rs b/core/node/node_framework/src/implementations/layers/web3_api/server.rs index 03d6647dc7e..93cc9ddc11f 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/server.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/server.rs @@ -5,8 +5,10 @@ use zksync_core::api_server::web3::{state::InternalApiConfig, ApiBuilder, ApiSer use crate::{ implementations::resources::{ - healthcheck::AppHealthCheckResource, pools::ReplicaPoolResource, - sync_state::SyncStateResource, web3_api::TxSenderResource, + healthcheck::AppHealthCheckResource, + pools::ReplicaPoolResource, + sync_state::SyncStateResource, + web3_api::{TreeApiClientResource, TxSenderResource}, }, service::{ServiceContext, StopReceiver}, task::Task, @@ -22,7 +24,6 @@ pub struct Web3ServerOptionalConfig { pub batch_request_size_limit: Option, pub response_body_size_limit: Option, pub websocket_requests_per_minute_limit: Option, - pub tree_api_url: Option, } impl Web3ServerOptionalConfig { @@ -47,7 +48,6 @@ impl Web3ServerOptionalConfig { api_builder = api_builder .with_websocket_requests_per_minute_limit(websocket_requests_per_minute_limit); } - api_builder = api_builder.with_tree_api(self.tree_api_url); api_builder } } @@ -113,15 +113,21 @@ impl WiringLayer for Web3ServerLayer { let sync_state = match context.get_resource::().await { Ok(sync_state) => Some(sync_state.0), Err(WiringError::ResourceLacking(_)) => None, - Err(err) => { - return Err(err); - } + Err(err) => return Err(err), + }; + let tree_api_client = match context.get_resource::().await { + Ok(client) => Some(client.0), + Err(WiringError::ResourceLacking(_)) => None, + Err(err) => return Err(err), }; // Build server. let mut api_builder = ApiBuilder::jsonrpsee_backend(self.internal_api_config, replica_pool) .with_updaters_pool(updaters_pool) .with_tx_sender(tx_sender); + if let Some(client) = tree_api_client { + api_builder = api_builder.with_tree_api(client); + } match self.transport { Transport::Http => { api_builder = api_builder.http(self.port); diff --git a/core/node/node_framework/src/implementations/layers/web3_api/tree_api_client.rs b/core/node/node_framework/src/implementations/layers/web3_api/tree_api_client.rs new file mode 100644 index 00000000000..ac28c13049b --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/web3_api/tree_api_client.rs @@ -0,0 +1,39 @@ +use std::sync::Arc; + +use zksync_core::api_server::tree::TreeApiHttpClient; + +use crate::{ + implementations::resources::{ + healthcheck::AppHealthCheckResource, web3_api::TreeApiClientResource, + }, + service::ServiceContext, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct TreeApiClientLayer { + url: Option, +} + +impl TreeApiClientLayer { + pub fn http(url: Option) -> Self { + Self { url } + } +} + +#[async_trait::async_trait] +impl WiringLayer for TreeApiClientLayer { + fn layer_name(&self) -> &'static str { + "tree_api_client_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + if let Some(url) = &self.url { + let client = Arc::new(TreeApiHttpClient::new(url)); + let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; + app_health.insert_custom_component(client.clone()); + context.insert_resource(TreeApiClientResource(client))?; + } + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/resources/web3_api.rs b/core/node/node_framework/src/implementations/resources/web3_api.rs index 955e5c265d3..4b77e5c95d4 100644 --- a/core/node/node_framework/src/implementations/resources/web3_api.rs +++ b/core/node/node_framework/src/implementations/resources/web3_api.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use zksync_core::api_server::tx_sender::{tx_sink::TxSink, TxSender}; +use zksync_core::api_server::{ + tree::TreeApiClient, + tx_sender::{tx_sink::TxSink, TxSender}, +}; use crate::resource::{Resource, ResourceId}; @@ -21,3 +24,12 @@ impl Resource for TxSinkResource { "api/tx_sink".into() } } + +#[derive(Debug, Clone)] +pub struct TreeApiClientResource(pub Arc); + +impl Resource for TreeApiClientResource { + fn resource_id() -> ResourceId { + "api/tree_api_client".into() + } +}