Skip to content

Commit

Permalink
feat(node-framework): Add eth sender layer (#1390)
Browse files Browse the repository at this point in the history
## What ❔

<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `zk spellcheck`.
- [ ] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
AnastasiiaVashchuk committed Mar 18, 2024
1 parent 90dee73 commit 0affdf8
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 23 deletions.
10 changes: 5 additions & 5 deletions core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs
Expand Up @@ -62,6 +62,7 @@ pub struct EthTxAggregator {
/// transactions. The `Some` then contains the address of this custom operator
/// address.
custom_commit_sender_addr: Option<Address>,
pool: ConnectionPool,
}

struct TxData {
Expand All @@ -72,6 +73,7 @@ struct TxData {
impl EthTxAggregator {
#[allow(clippy::too_many_arguments)]
pub async fn new(
pool: ConnectionPool,
config: SenderConfig,
aggregator: Aggregator,
eth_client: Arc<dyn BoundEthInterface>,
Expand Down Expand Up @@ -110,14 +112,12 @@ impl EthTxAggregator {
base_nonce_custom_commit_sender,
rollup_chain_id,
custom_commit_sender_addr,
pool,
}
}

pub async fn run(
mut self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
pub async fn run(mut self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let pool = self.pool.clone();
loop {
let mut storage = pool.access_storage_tagged("eth_sender").await.unwrap();

Expand Down
10 changes: 5 additions & 5 deletions core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs
Expand Up @@ -58,10 +58,12 @@ pub struct EthTxManager {
ethereum_gateway_blobs: Option<Arc<dyn BoundEthInterface>>,
config: SenderConfig,
gas_adjuster: Arc<dyn L1TxParamsProvider>,
pool: ConnectionPool,
}

impl EthTxManager {
pub fn new(
pool: ConnectionPool,
config: SenderConfig,
gas_adjuster: Arc<dyn L1TxParamsProvider>,
ethereum_gateway: Arc<dyn BoundEthInterface>,
Expand All @@ -72,6 +74,7 @@ impl EthTxManager {
ethereum_gateway_blobs,
config,
gas_adjuster,
pool,
}
}

Expand Down Expand Up @@ -720,11 +723,8 @@ impl EthTxManager {
METRICS.l1_blocks_waited_in_mempool[&tx_type_label].observe(waited_blocks.into());
}

pub async fn run(
mut self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
pub async fn run(mut self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let pool = self.pool.clone();
{
let l1_block_numbers = self
.get_l1_block_numbers()
Expand Down
2 changes: 2 additions & 0 deletions core/lib/zksync_core/src/eth_sender/tests.rs
Expand Up @@ -120,6 +120,7 @@ impl EthSenderTester {
let store_factory = ObjectStoreFactory::mock();

let aggregator = EthTxAggregator::new(
connection_pool.clone(),
SenderConfig {
proof_sending_mode: ProofSendingMode::SkipEveryProof,
..eth_sender_config.sender.clone()
Expand All @@ -142,6 +143,7 @@ impl EthSenderTester {
.await;

let manager = EthTxManager::new(
connection_pool.clone(),
eth_sender_config.sender,
gas_adjuster.clone(),
gateway.clone(),
Expand Down
6 changes: 4 additions & 2 deletions core/lib/zksync_core/src/lib.rs
Expand Up @@ -662,6 +662,7 @@ pub async fn initialize_components(
.map(|k| k.sender_account());

let eth_tx_aggregator_actor = EthTxAggregator::new(
eth_sender_pool,
eth_sender.sender.clone(),
Aggregator::new(
eth_sender.sender.clone(),
Expand All @@ -682,7 +683,7 @@ pub async fn initialize_components(
)
.await;
task_futures.push(tokio::spawn(
eth_tx_aggregator_actor.run(eth_sender_pool, stop_receiver.clone()),
eth_tx_aggregator_actor.run(stop_receiver.clone()),
));
let elapsed = started_at.elapsed();
APP_METRICS.init_latency[&InitStage::EthTxAggregator].set(elapsed);
Expand All @@ -705,6 +706,7 @@ pub async fn initialize_components(
let eth_client_blobs =
PKSigningClient::from_config_blobs(&eth_sender, &contracts_config, &eth_client_config);
let eth_tx_manager_actor = EthTxManager::new(
eth_manager_pool,
eth_sender.sender,
gas_adjuster
.get_or_init()
Expand All @@ -714,7 +716,7 @@ pub async fn initialize_components(
eth_client_blobs.map(|c| Arc::new(c) as Arc<dyn BoundEthInterface>),
);
task_futures.extend([tokio::spawn(
eth_tx_manager_actor.run(eth_manager_pool, stop_receiver.clone()),
eth_tx_manager_actor.run(stop_receiver.clone()),
)]);
let elapsed = started_at.elapsed();
APP_METRICS.init_latency[&InitStage::EthTxManager].set(elapsed);
Expand Down
27 changes: 22 additions & 5 deletions core/node/node_framework/examples/main_node.rs
Expand Up @@ -25,10 +25,11 @@ use zksync_env_config::FromEnv;
use zksync_node_framework::{
implementations::layers::{
commitment_generator::CommitmentGeneratorLayer,
eth_sender::EthSenderLayer,
eth_watch::EthWatchLayer,
fee_input::SequencerFeeInputLayer,
healtcheck_server::HealthCheckLayer,
house_keeper::HouseKeeperLayer,
l1_gas::SequencerL1GasLayer,
metadata_calculator::MetadataCalculatorLayer,
object_store::ObjectStoreLayer,
pools_layer::PoolsLayerBuilder,
Expand Down Expand Up @@ -77,16 +78,16 @@ impl MainNodeBuilder {
Ok(self)
}

fn add_fee_input_layer(mut self) -> anyhow::Result<Self> {
fn add_sequencer_l1_gas_layer(mut self) -> anyhow::Result<Self> {
let gas_adjuster_config = GasAdjusterConfig::from_env()?;
let state_keeper_config = StateKeeperConfig::from_env()?;
let eth_sender_config = ETHSenderConfig::from_env()?;
let fee_input_layer = SequencerFeeInputLayer::new(
let sequencer_l1_gas_layer = SequencerL1GasLayer::new(
gas_adjuster_config,
state_keeper_config,
eth_sender_config.sender.pubdata_sending_mode,
);
self.node.add_layer(fee_input_layer);
self.node.add_layer(sequencer_l1_gas_layer);
Ok(self)
}

Expand Down Expand Up @@ -241,6 +242,21 @@ impl MainNodeBuilder {

Ok(self)
}
fn add_eth_sender_layer(mut self) -> anyhow::Result<Self> {
let eth_sender_config = ETHSenderConfig::from_env()?;
let contracts_config = ContractsConfig::from_env()?;
let eth_client_config = ETHClientConfig::from_env()?;
let network_config = NetworkConfig::from_env()?;

self.node.add_layer(EthSenderLayer::new(
eth_sender_config,
contracts_config,
eth_client_config,
network_config,
));

Ok(self)
}

fn add_house_keeper_layer(mut self) -> anyhow::Result<Self> {
let house_keeper_config = HouseKeeperConfig::from_env()?;
Expand Down Expand Up @@ -285,11 +301,12 @@ fn main() -> anyhow::Result<()> {
MainNodeBuilder::new()
.add_pools_layer()?
.add_query_eth_client_layer()?
.add_fee_input_layer()?
.add_sequencer_l1_gas_layer()?
.add_object_store_layer()?
.add_metadata_calculator_layer()?
.add_state_keeper_layer()?
.add_eth_watch_layer()?
.add_eth_sender_layer()?
.add_proof_data_handler_layer()?
.add_healthcheck_layer()?
.add_tx_sender_layer()?
Expand Down
140 changes: 140 additions & 0 deletions core/node/node_framework/src/implementations/layers/eth_sender.rs
@@ -0,0 +1,140 @@
use std::sync::Arc;

use zksync_config::configs::{
chain::NetworkConfig, eth_sender::ETHSenderConfig, ContractsConfig, ETHClientConfig,
};
use zksync_core::eth_sender::{Aggregator, EthTxAggregator, EthTxManager};
use zksync_eth_client::{clients::PKSigningClient, BoundEthInterface};

use crate::{
implementations::resources::{
eth_interface::BoundEthInterfaceResource, l1_tx_params::L1TxParamsResource,
object_store::ObjectStoreResource, pools::MasterPoolResource,
},
service::{ServiceContext, StopReceiver},
task::Task,
wiring_layer::{WiringError, WiringLayer},
};

#[derive(Debug)]
pub struct EthSenderLayer {
eth_sender_config: ETHSenderConfig,
contracts_config: ContractsConfig,
eth_client_config: ETHClientConfig,
network_config: NetworkConfig,
}

impl EthSenderLayer {
pub fn new(
eth_sender_config: ETHSenderConfig,
contracts_config: ContractsConfig,
eth_client_config: ETHClientConfig,
network_config: NetworkConfig,
) -> Self {
Self {
eth_sender_config,
contracts_config,
eth_client_config,
network_config,
}
}
}

#[async_trait::async_trait]
impl WiringLayer for EthSenderLayer {
fn layer_name(&self) -> &'static str {
"eth_sender_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
// Get resources
let pool_resource = context.get_resource::<MasterPoolResource>().await?;
let pool = pool_resource.get().await.unwrap();

let eth_client = context.get_resource::<BoundEthInterfaceResource>().await?.0;

let object_store = context.get_resource::<ObjectStoreResource>().await?.0;

// Create and add tasks
let eth_client_blobs = PKSigningClient::from_config_blobs(
&self.eth_sender_config,
&self.contracts_config,
&self.eth_client_config,
);
let eth_client_blobs_addr = eth_client_blobs.clone().map(|k| k.sender_account());

let aggregator = Aggregator::new(
self.eth_sender_config.sender.clone(),
object_store,
eth_client_blobs_addr.is_some(),
self.eth_sender_config.sender.pubdata_sending_mode.into(),
);

let config = self.eth_sender_config.sender;

let eth_tx_aggregator_actor = EthTxAggregator::new(
pool.clone(),
config.clone(),
aggregator,
eth_client.clone(),
self.contracts_config.validator_timelock_addr,
self.contracts_config.l1_multicall3_addr,
self.contracts_config.diamond_proxy_addr,
self.network_config.zksync_network_id,
eth_client_blobs_addr,
)
.await;

context.add_task(Box::new(EthTxAggregatorTask {
eth_tx_aggregator_actor,
}));

let gas_adjuster = context.get_resource::<L1TxParamsResource>().await?.0;

let eth_tx_manager_actor = EthTxManager::new(
pool,
config,
gas_adjuster,
eth_client,
eth_client_blobs.map(|c| Arc::new(c) as Arc<dyn BoundEthInterface>),
);

context.add_task(Box::new(EthTxManagerTask {
eth_tx_manager_actor,
}));

Ok(())
}
}

#[derive(Debug)]
struct EthTxAggregatorTask {
eth_tx_aggregator_actor: EthTxAggregator,
}

#[async_trait::async_trait]
impl Task for EthTxAggregatorTask {
fn name(&self) -> &'static str {
"eth_tx_aggregator"
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.eth_tx_aggregator_actor.run(stop_receiver.0).await
}
}

#[derive(Debug)]
struct EthTxManagerTask {
eth_tx_manager_actor: EthTxManager,
}

#[async_trait::async_trait]
impl Task for EthTxManagerTask {
fn name(&self) -> &'static str {
"eth_tx_manager"
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.eth_tx_manager_actor.run(stop_receiver.0).await
}
}
Expand Up @@ -11,20 +11,21 @@ use zksync_types::fee_model::FeeModelConfig;
use crate::{
implementations::resources::{
eth_interface::EthInterfaceResource, fee_input::FeeInputResource,
l1_tx_params::L1TxParamsResource,
},
service::{ServiceContext, StopReceiver},
task::Task,
wiring_layer::{WiringError, WiringLayer},
};

#[derive(Debug)]
pub struct SequencerFeeInputLayer {
pub struct SequencerL1GasLayer {
gas_adjuster_config: GasAdjusterConfig,
state_keeper_config: StateKeeperConfig,
pubdata_sending_mode: PubdataSendingMode,
}

impl SequencerFeeInputLayer {
impl SequencerL1GasLayer {
pub fn new(
gas_adjuster_config: GasAdjusterConfig,
state_keeper_config: StateKeeperConfig,
Expand All @@ -39,9 +40,9 @@ impl SequencerFeeInputLayer {
}

#[async_trait::async_trait]
impl WiringLayer for SequencerFeeInputLayer {
impl WiringLayer for SequencerL1GasLayer {
fn layer_name(&self) -> &'static str {
"sequencer_fee_input_layer"
"sequencer_l1_gas_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
Expand All @@ -58,6 +59,8 @@ impl WiringLayer for SequencerFeeInputLayer {
));
context.insert_resource(FeeInputResource(batch_fee_input_provider))?;

context.insert_resource(L1TxParamsResource(gas_adjuster.clone()))?;

context.add_task(Box::new(GasAdjusterTask { gas_adjuster }));
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion core/node/node_framework/src/implementations/layers/mod.rs
@@ -1,8 +1,9 @@
pub mod commitment_generator;
pub mod eth_sender;
pub mod eth_watch;
pub mod fee_input;
pub mod healtcheck_server;
pub mod house_keeper;
pub mod l1_gas;
pub mod metadata_calculator;
pub mod object_store;
pub mod pk_signing_eth_client;
Expand Down
Expand Up @@ -4,7 +4,7 @@ use zksync_core::fee_model::BatchFeeModelInputProvider;

use crate::resource::{Resource, ResourceId};

/// Wrapper for the object store.
/// Wrapper for the batch fee model input provider.
#[derive(Debug, Clone)]
pub struct FeeInputResource(pub Arc<dyn BatchFeeModelInputProvider>);

Expand Down

0 comments on commit 0affdf8

Please sign in to comment.