Skip to content

Commit

Permalink
feat(node_framework): Add Web3 API layers (#1258)
Browse files Browse the repository at this point in the history
With some hacks, adds support of our Web3 API to the framework.
For more context on hacks/tradeoffs, see comments in the PR.

With this PR, the node finally passes a significant number of tests.
Once the `eth_sender` is migrated, I expect all the tests to pass.

---------

Co-authored-by: Joonatan Saarhelo <joon.saar@gmail.com>
  • Loading branch information
popzxc and joonazan committed Mar 1, 2024
1 parent 8ff5bce commit 105f4cc
Show file tree
Hide file tree
Showing 18 changed files with 672 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/vm-perf-comparison.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run: echo "sha=$(git merge-base HEAD FETCH_HEAD)" >> $GITHUB_OUTPUT

- name: checkout divergence point
run: git checkout ${{ steps.merge_base.outputs.sha }}
run: git checkout ${{ steps.merge_base.outputs.sha }} --recurse-submodules

- name: setup-env
run: |
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,13 @@ async fn init_tasks(
.with_filter_limit(config.optional.filters_limit)
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_tx_sender(tx_sender.clone(), vm_barrier.clone())
.with_tx_sender(tx_sender.clone())
.with_vm_barrier(vm_barrier.clone())
.with_sync_state(sync_state.clone())
.enable_api_namespaces(config.optional.api_namespaces())
.build(stop_receiver.clone())
.build()
.context("failed to build HTTP JSON-RPC server")?
.run(stop_receiver.clone())
.await
.context("Failed initializing HTTP JSON-RPC server")?;

Expand All @@ -356,10 +359,13 @@ async fn init_tasks(
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_polling_interval(config.optional.polling_interval())
.with_tx_sender(tx_sender, vm_barrier)
.with_tx_sender(tx_sender)
.with_vm_barrier(vm_barrier)
.with_sync_state(sync_state)
.enable_api_namespaces(config.optional.api_namespaces())
.build(stop_receiver.clone())
.build()
.context("failed to build WS JSON-RPC server")?
.run(stop_receiver.clone())
.await
.context("Failed initializing WS JSON-RPC server")?;

Expand Down
15 changes: 6 additions & 9 deletions core/lib/dal/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl fmt::Debug for ConnectionPoolBuilder {
}

impl ConnectionPoolBuilder {
/// Overrides the maximum number of connections that can be allocated by the pool.
pub fn set_max_size(&mut self, max_size: u32) -> &mut Self {
self.max_size = max_size;
self
}

/// Sets the acquire timeout for a single connection attempt. There are multiple attempts (currently 3)
/// before `access_storage*` methods return an error. If not specified, the acquire timeout will not be set.
pub fn set_acquire_timeout(&mut self, timeout: Option<Duration>) -> &mut Self {
Expand Down Expand Up @@ -93,15 +99,6 @@ impl ConnectionPoolBuilder {
traced_connections: None,
})
}

/// Builds a connection pool that has a single connection.
pub async fn build_singleton(&self) -> anyhow::Result<ConnectionPool> {
let singleton_builder = Self {
max_size: 1,
..self.clone()
};
singleton_builder.build().await
}
}

#[derive(Debug)]
Expand Down
73 changes: 41 additions & 32 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct ApiServerHandles {
/// Optional part of the API server parameters.
#[derive(Debug, Default)]
struct OptionalApiParams {
vm_barrier: Option<VmConcurrencyBarrier>,
sync_state: Option<SyncState>,
filters_limit: Option<usize>,
subscriptions_limit: Option<usize>,
Expand All @@ -118,15 +119,16 @@ struct OptionalApiParams {
pub_sub_events_sender: Option<mpsc::UnboundedSender<PubSubEvent>>,
}

/// Full API server parameters.
/// Structure capable of spawning a configured Web3 API server along with all the required
/// maintenance tasks.
#[derive(Debug)]
struct FullApiParams {
pub struct ApiServer {
pool: ConnectionPool,
updaters_pool: ConnectionPool,
health_updater: Arc<HealthUpdater>,
config: InternalApiConfig,
transport: ApiTransport,
tx_sender: TxSender,
vm_barrier: VmConcurrencyBarrier,
polling_interval: Duration,
namespaces: Vec<Namespace>,
optional: OptionalApiParams,
Expand All @@ -141,7 +143,6 @@ pub struct ApiBuilder {
// Mandatory params that must be set using builder methods.
transport: Option<ApiTransport>,
tx_sender: Option<TxSender>,
vm_barrier: Option<VmConcurrencyBarrier>,
// Optional params that may or may not be set using builder methods. We treat `namespaces`
// specially because we want to output a warning if they are not set.
namespaces: Option<Vec<Namespace>>,
Expand All @@ -159,7 +160,6 @@ impl ApiBuilder {
polling_interval: Self::DEFAULT_POLLING_INTERVAL,
transport: None,
tx_sender: None,
vm_barrier: None,
namespaces: None,
optional: OptionalApiParams::default(),
}
Expand All @@ -184,9 +184,13 @@ impl ApiBuilder {
self
}

pub fn with_tx_sender(mut self, tx_sender: TxSender, vm_barrier: VmConcurrencyBarrier) -> Self {
pub fn with_tx_sender(mut self, tx_sender: TxSender) -> Self {
self.tx_sender = Some(tx_sender);
self.vm_barrier = Some(vm_barrier);
self
}

pub fn with_vm_barrier(mut self, vm_barrier: VmConcurrencyBarrier) -> Self {
self.optional.vm_barrier = Some(vm_barrier);
self
}

Expand Down Expand Up @@ -244,15 +248,24 @@ impl ApiBuilder {
self.optional.pub_sub_events_sender = Some(sender);
self
}
}

impl ApiBuilder {
pub fn build(self) -> anyhow::Result<ApiServer> {
let transport = self.transport.context("API transport not set")?;
let health_check_name = match &transport {
ApiTransport::Http(_) => "http_api",
ApiTransport::WebSocket(_) => "ws_api",
};
let (_health_check, health_updater) = ReactiveHealthCheck::new(health_check_name);

fn into_full_params(self) -> anyhow::Result<FullApiParams> {
Ok(FullApiParams {
Ok(ApiServer {
pool: self.pool,
health_updater: Arc::new(health_updater),
updaters_pool: self.updaters_pool,
config: self.config,
transport: self.transport.context("API transport not set")?,
transport,
tx_sender: self.tx_sender.context("Transaction sender not set")?,
vm_barrier: self.vm_barrier.context("VM barrier not set")?,
polling_interval: self.polling_interval,
namespaces: self.namespaces.unwrap_or_else(|| {
tracing::warn!(
Expand All @@ -265,16 +278,11 @@ impl ApiBuilder {
}
}

impl ApiBuilder {
pub async fn build(
self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<ApiServerHandles> {
self.into_full_params()?.spawn_server(stop_receiver).await
impl ApiServer {
pub fn health_check(&self) -> ReactiveHealthCheck {
self.health_updater.subscribe()
}
}

impl FullApiParams {
async fn build_rpc_state(
self,
last_sealed_miniblock: SealedMiniblockNumber,
Expand Down Expand Up @@ -353,7 +361,7 @@ impl FullApiParams {
Ok(rpc)
}

async fn spawn_server(
pub async fn run(
self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<ApiServerHandles> {
Expand Down Expand Up @@ -414,11 +422,6 @@ impl FullApiParams {
const SEALED_MINIBLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(25);

let transport = self.transport;
let health_check_name = match transport {
ApiTransport::Http(_) => "http_api",
ApiTransport::WebSocket(_) => "ws_api",
};
let (health_check, health_updater) = ReactiveHealthCheck::new(health_check_name);

let (last_sealed_miniblock, update_task) = SealedMiniblockNumber::new(
self.updaters_pool.clone(),
Expand All @@ -445,14 +448,15 @@ impl FullApiParams {
None
};

// Start the server in a separate tokio runtime from a dedicated thread.
// TODO (QIT-26): We still expose `health_check` in `ApiServerHandles` for the old code. After we switch to the
// framework it'll no longer be needed.
let health_check = self.health_updater.subscribe();
let (local_addr_sender, local_addr) = oneshot::channel();
let server_task = tokio::spawn(self.run_jsonrpsee_server(
stop_receiver,
pub_sub,
last_sealed_miniblock,
local_addr_sender,
health_updater,
));

tasks.push(server_task);
Expand All @@ -469,7 +473,6 @@ impl FullApiParams {
pub_sub: Option<EthSubscribe>,
last_sealed_miniblock: SealedMiniblockNumber,
local_addr_sender: oneshot::Sender<SocketAddr>,
health_updater: HealthUpdater,
) -> anyhow::Result<()> {
let transport = self.transport;
let (transport_str, is_http, addr) = match transport {
Expand Down Expand Up @@ -507,7 +510,8 @@ impl FullApiParams {
.map_or(u32::MAX, |limit| limit as u32);
let websocket_requests_per_minute_limit = self.optional.websocket_requests_per_minute_limit;
let subscriptions_limit = self.optional.subscriptions_limit;
let vm_barrier = self.vm_barrier.clone();
let vm_barrier = self.optional.vm_barrier.clone();
let health_updater = self.health_updater.clone();

let rpc = self
.build_rpc_module(pub_sub, last_sealed_miniblock)
Expand Down Expand Up @@ -577,8 +581,9 @@ impl FullApiParams {
// Hence, we monitor `stop_receiver` on a separate Tokio task.
let close_handle = server_handle.clone();
let closing_vm_barrier = vm_barrier.clone();
let health_updater = Arc::new(health_updater);
// We use `Weak` reference to the health updater in order to not prevent its drop if the server stops on its own.
// TODO (QIT-26): While `Arc<HealthUpdater>` is stored in `self`, we rely on the fact that `self` is consumed and
// dropped by `self.build_rpc_module` above, so we should still have just one strong reference.
let closing_health_updater = Arc::downgrade(&health_updater);
tokio::spawn(async move {
if stop_receiver.changed().await.is_err() {
Expand All @@ -593,14 +598,18 @@ impl FullApiParams {
tracing::info!(
"Stop signal received, {transport_str} JSON-RPC server is shutting down"
);
closing_vm_barrier.close();
if let Some(closing_vm_barrier) = closing_vm_barrier {
closing_vm_barrier.close();
}
close_handle.stop().ok();
});

server_handle.stopped().await;
drop(health_updater);
tracing::info!("{transport_str} JSON-RPC server stopped");
Self::wait_for_vm(vm_barrier, transport_str).await;
if let Some(vm_barrier) = vm_barrier {
Self::wait_for_vm(vm_barrier, transport_str).await;
}
Ok(())
}
}
7 changes: 5 additions & 2 deletions core/lib/zksync_core/src/api_server/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,13 @@ async fn spawn_server(
};
let server_handles = server_builder
.with_polling_interval(POLL_INTERVAL)
.with_tx_sender(tx_sender, vm_barrier)
.with_tx_sender(tx_sender)
.with_vm_barrier(vm_barrier)
.with_pub_sub_events(pub_sub_events_sender)
.enable_api_namespaces(namespaces)
.build(stop_receiver)
.build()
.expect("Unable to build API server")
.run(stop_receiver)
.await
.expect("Failed spawning JSON-RPC server");
(server_handles, pub_sub_events_receiver)
Expand Down
18 changes: 14 additions & 4 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,9 +1201,14 @@ async fn run_http_api(
.with_tree_api(api_config.web3_json_rpc.tree_api_url())
.with_batch_request_size_limit(api_config.web3_json_rpc.max_batch_request_size())
.with_response_body_size_limit(api_config.web3_json_rpc.max_response_body_size())
.with_tx_sender(tx_sender, vm_barrier)
.with_tx_sender(tx_sender)
.with_vm_barrier(vm_barrier)
.enable_api_namespaces(namespaces);
api_builder.build(stop_receiver).await
api_builder
.build()
.context("failed to build HTTP API server")?
.run(stop_receiver)
.await
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -1252,10 +1257,15 @@ async fn run_ws_api(
)
.with_polling_interval(api_config.web3_json_rpc.pubsub_interval())
.with_tree_api(api_config.web3_json_rpc.tree_api_url())
.with_tx_sender(tx_sender, vm_barrier)
.with_tx_sender(tx_sender)
.with_vm_barrier(vm_barrier)
.enable_api_namespaces(namespaces);

api_builder.build(stop_receiver.clone()).await
api_builder
.build()
.context("failed to build WS API server")?
.run(stop_receiver)
.await
}

async fn circuit_breakers_for_components(
Expand Down
2 changes: 2 additions & 0 deletions core/node/node_framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ zksync_types = { path = "../../lib/types" }
zksync_health_check = { path = "../../lib/health_check" }
zksync_dal = { path = "../../lib/dal" }
zksync_config = { path = "../../lib/config" }
zksync_state = { path = "../../lib/state" }
zksync_object_store = { path = "../../lib/object_store" }
zksync_core = { path = "../../lib/zksync_core" }
zksync_storage = { path = "../../lib/storage" }
zksync_eth_client = { path = "../../lib/eth_client" }
zksync_contracts = { path = "../../lib/contracts" }
zksync_web3_decl = { path = "../../lib/web3_decl" }

tracing = "0.1"
thiserror = "1"
Expand Down
Loading

0 comments on commit 105f4cc

Please sign in to comment.