Skip to content

Commit

Permalink
feat(node_framework): Support Eth Watch in the framework (#1145)
Browse files Browse the repository at this point in the history
## What ❔
- Small change in the `EthWatch` component. Now `Eth Watch` 's `pool`
field is used instead of `ConnectionPool` passed in the Eth Watch's
`run()` method.
- Adds `Eth Watch` implementation, i. e. `EthWatchLayer` and
`EthWatchTask`, in the same way it was done for `StateKeeper `
[here](https://github.com/matter-labs/zksync-era/pull/1043/files#diff-8ff8babf7b83c79dbf96f4998cf71d888beaeb265c7ce33192b0bb0c808f662b).

Current "external" point of view for the framework(the only thing
changed is that we add ` .add_eth_watch_layer()?` after
`.add_state_keeper_layer()?`):

```rust
fn main() -> anyhow::Result<()> {
    #[allow(deprecated)] // TODO (QIT-21): Use centralized configuration approach.
    let log_format = vlog::log_format_from_env();
    let _guard = vlog::ObservabilityBuilder::new()
        .with_log_format(log_format)
        .build();

    MainNodeBuilder::new()
        .add_pools_layer()?
        .add_fee_input_layer()?
        .add_object_store_layer()?
        .add_metadata_calculator_layer()?
        .add_state_keeper_layer()?
        .add_eth_watch_layer()?
        .add_healthcheck_layer()?
        .build()
        .run()?;

    Ok(())
}
```
  • Loading branch information
AnastasiiaVashchuk committed Feb 21, 2024
1 parent e75aa11 commit 4f41b68
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 11 additions & 12 deletions core/lib/zksync_core/src/eth_watch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use self::{
metrics::{PollStage, METRICS},
};

mod client;
pub mod client;
mod event_processors;
mod metrics;
#[cfg(test)]
Expand All @@ -45,14 +45,15 @@ pub struct EthWatch {
event_processors: Vec<Box<dyn EventProcessor>>,

last_processed_ethereum_block: u64,
pool: ConnectionPool,
}

impl EthWatch {
pub async fn new(
diamond_proxy_address: Address,
governance_contract: Option<Contract>,
mut client: Box<dyn EthClient>,
pool: &ConnectionPool,
pool: ConnectionPool,
poll_interval: Duration,
) -> Self {
let mut storage = pool.access_storage_tagged("eth_watch").await.unwrap();
Expand All @@ -61,6 +62,8 @@ impl EthWatch {

tracing::info!("initialized state: {:?}", state);

drop(storage);

let priority_ops_processor =
PriorityOpsEventProcessor::new(state.next_expected_priority_id);
let upgrades_processor = UpgradesEventProcessor::new(state.last_seen_version_id);
Expand Down Expand Up @@ -89,6 +92,7 @@ impl EthWatch {
poll_interval,
event_processors,
last_processed_ethereum_block: state.last_processed_ethereum_block,
pool,
}
}

Expand Down Expand Up @@ -131,12 +135,9 @@ impl EthWatch {
}
}

pub async fn run(
&mut self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
pub async fn run(mut self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let mut timer = tokio::time::interval(self.poll_interval);
let pool = self.pool.clone();
loop {
if *stop_receiver.borrow() {
tracing::info!("Stop signal received, eth_watch is shutting down");
Expand Down Expand Up @@ -203,16 +204,14 @@ pub async fn start_eth_watch(
config.confirmations_for_eth_event,
);

let mut eth_watch = EthWatch::new(
let eth_watch = EthWatch::new(
diamond_proxy_addr,
Some(governance.0),
Box::new(eth_client),
&pool,
pool,
config.poll_interval(),
)
.await;

Ok(tokio::spawn(async move {
eth_watch.run(pool, stop_receiver).await
}))
Ok(tokio::spawn(eth_watch.run(stop_receiver)))
}
14 changes: 7 additions & 7 deletions core/lib/zksync_core/src/eth_watch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ async fn test_normal_operation_l1_txs() {
Address::default(),
None,
Box::new(client.clone()),
&connection_pool,
connection_pool.clone(),
std::time::Duration::from_nanos(1),
)
.await;
Expand Down Expand Up @@ -258,7 +258,7 @@ async fn test_normal_operation_upgrades() {
Address::default(),
None,
Box::new(client.clone()),
&connection_pool,
connection_pool.clone(),
std::time::Duration::from_nanos(1),
)
.await;
Expand Down Expand Up @@ -319,7 +319,7 @@ async fn test_gap_in_upgrades() {
Address::default(),
None,
Box::new(client.clone()),
&connection_pool,
connection_pool.clone(),
std::time::Duration::from_nanos(1),
)
.await;
Expand Down Expand Up @@ -358,7 +358,7 @@ async fn test_normal_operation_governance_upgrades() {
Address::default(),
Some(governance_contract()),
Box::new(client.clone()),
&connection_pool,
connection_pool.clone(),
std::time::Duration::from_nanos(1),
)
.await;
Expand Down Expand Up @@ -420,7 +420,7 @@ async fn test_gap_in_single_batch() {
Address::default(),
None,
Box::new(client.clone()),
&connection_pool,
connection_pool.clone(),
std::time::Duration::from_nanos(1),
)
.await;
Expand Down Expand Up @@ -450,7 +450,7 @@ async fn test_gap_between_batches() {
Address::default(),
None,
Box::new(client.clone()),
&connection_pool,
connection_pool.clone(),
std::time::Duration::from_nanos(1),
)
.await;
Expand Down Expand Up @@ -485,7 +485,7 @@ async fn test_overlapping_batches() {
Address::default(),
None,
Box::new(client.clone()),
&connection_pool,
connection_pool.clone(),
std::time::Duration::from_nanos(1),
)
.await;
Expand Down
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ zksync_object_store = { path = "../../lib/object_store" }
zksync_core = { path = "../../lib/zksync_core" }
zksync_storage = { path = "../../lib/storage" }
zksync_eth_client = { path = "../../lib/eth_client" }
zksync_contracts = { path = "../../lib/contracts" }

tracing = "0.1"
thiserror = "1"
Expand Down
14 changes: 12 additions & 2 deletions core/node/node_framework/examples/main_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use zksync_config::{
chain::{MempoolConfig, NetworkConfig, OperationsManagerConfig, StateKeeperConfig},
ObservabilityConfig,
},
ApiConfig, ContractsConfig, DBConfig, ETHClientConfig, GasAdjusterConfig, ObjectStoreConfig,
PostgresConfig,
ApiConfig, ContractsConfig, DBConfig, ETHClientConfig, ETHWatchConfig, GasAdjusterConfig,
ObjectStoreConfig, PostgresConfig,
};
use zksync_core::metadata_calculator::MetadataCalculatorConfig;
use zksync_env_config::FromEnv;
use zksync_node_framework::{
implementations::layers::{
eth_watch::EthWatchLayer,
fee_input::SequencerFeeInputLayer,
healtcheck_server::HealthCheckLayer,
metadata_calculator::MetadataCalculatorLayer,
Expand Down Expand Up @@ -102,6 +103,14 @@ impl MainNodeBuilder {
Ok(self)
}

fn add_eth_watch_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(EthWatchLayer::new(
ETHWatchConfig::from_env()?,
ContractsConfig::from_env()?,
));
Ok(self)
}

fn add_healthcheck_layer(mut self) -> anyhow::Result<Self> {
let healthcheck_config = ApiConfig::from_env()?.healthcheck;
self.node.add_layer(HealthCheckLayer(healthcheck_config));
Expand Down Expand Up @@ -131,6 +140,7 @@ fn main() -> anyhow::Result<()> {
.add_object_store_layer()?
.add_metadata_calculator_layer()?
.add_state_keeper_layer()?
.add_eth_watch_layer()?
.add_healthcheck_layer()?
.build()
.run()?;
Expand Down
88 changes: 88 additions & 0 deletions core/node/node_framework/src/implementations/layers/eth_watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::time::Duration;

use zksync_config::{ContractsConfig, ETHWatchConfig};
use zksync_contracts::governance_contract;
use zksync_core::eth_watch::{client::EthHttpQueryClient, EthWatch};
use zksync_dal::ConnectionPool;
use zksync_types::{ethabi::Contract, Address};

use crate::{
implementations::resources::{eth_interface::EthInterfaceResource, pools::MasterPoolResource},
service::{ServiceContext, StopReceiver},
task::Task,
wiring_layer::{WiringError, WiringLayer},
};

#[derive(Debug)]
pub struct EthWatchLayer {
eth_watch_config: ETHWatchConfig,
contracts_config: ContractsConfig,
}

impl EthWatchLayer {
pub fn new(eth_watch_config: ETHWatchConfig, contracts_config: ContractsConfig) -> Self {
Self {
eth_watch_config,
contracts_config,
}
}
}

#[async_trait::async_trait]
impl WiringLayer for EthWatchLayer {
fn layer_name(&self) -> &'static str {
"eth_watch_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let pool_resource = context.get_resource::<MasterPoolResource>().await?;
let main_pool = pool_resource.get().await.unwrap();

let client = context.get_resource::<EthInterfaceResource>().await?.0;

let eth_client = EthHttpQueryClient::new(
client,
self.contracts_config.diamond_proxy_addr,
Some(self.contracts_config.governance_addr),
self.eth_watch_config.confirmations_for_eth_event,
);
context.add_task(Box::new(EthWatchTask {
main_pool,
client: eth_client,
governance_contract: Some(governance_contract()),
diamond_proxy_address: self.contracts_config.diamond_proxy_addr,
poll_interval: self.eth_watch_config.poll_interval(),
}));

Ok(())
}
}

#[derive(Debug)]
struct EthWatchTask {
main_pool: ConnectionPool,
client: EthHttpQueryClient,
governance_contract: Option<Contract>,
diamond_proxy_address: Address,
poll_interval: Duration,
}

#[async_trait::async_trait]
impl Task for EthWatchTask {
fn name(&self) -> &'static str {
"eth_watch"
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
let eth_watch = EthWatch::new(
self.diamond_proxy_address,
self.governance_contract,
Box::new(self.client),
self.main_pool,
self.poll_interval,
)
.await;

eth_watch.run(stop_receiver.0).await
}
}
1 change: 1 addition & 0 deletions core/node/node_framework/src/implementations/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod eth_watch;
pub mod fee_input;
pub mod healtcheck_server;
pub mod metadata_calculator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl WiringLayer for StateKeeperLayer {
}
}

#[derive(Debug)]
struct StateKeeperTask {
io: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn BatchExecutor>,
Expand Down

0 comments on commit 4f41b68

Please sign in to comment.