Skip to content

Commit

Permalink
feat(node-framework): Add housekeeper layer (#1409)
Browse files Browse the repository at this point in the history
## What ❔
Adds housekeeper layer to the node-framework. Worth to note that
code(added in this PR) in the layer is reused from
[add_house_keeper_to_task_futures](https://github.com/matter-labs/zksync-era/blob/a3c2fae94c813feb1b55c466a39addd0fca79a13/core/lib/zksync_core/src/lib.rs#L999)

## Why ❔
To run the Main node within the framework, we have to add housekeeper
layer to it.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.

---------

Co-authored-by: Igor Aleksanov <popzxc@yandex.ru>
  • Loading branch information
AnastasiiaVashchuk and popzxc committed Mar 13, 2024
1 parent cd748e9 commit 702e739
Show file tree
Hide file tree
Showing 3 changed files with 355 additions and 1 deletion.
25 changes: 24 additions & 1 deletion core/node/node_framework/examples/main_node.rs
Expand Up @@ -6,7 +6,10 @@ use anyhow::Context;
use zksync_config::{
configs::{
chain::{MempoolConfig, NetworkConfig, OperationsManagerConfig, StateKeeperConfig},
ObservabilityConfig, ProofDataHandlerConfig,
fri_prover_group::FriProverGroupConfig,
house_keeper::HouseKeeperConfig,
FriProofCompressorConfig, FriProverConfig, FriWitnessGeneratorConfig, ObservabilityConfig,
ProofDataHandlerConfig,
},
ApiConfig, ContractsConfig, DBConfig, ETHClientConfig, ETHSenderConfig, ETHWatchConfig,
GasAdjusterConfig, ObjectStoreConfig, PostgresConfig,
Expand All @@ -25,6 +28,7 @@ use zksync_node_framework::{
eth_watch::EthWatchLayer,
fee_input::SequencerFeeInputLayer,
healtcheck_server::HealthCheckLayer,
house_keeper::HouseKeeperLayer,
metadata_calculator::MetadataCalculatorLayer,
object_store::ObjectStoreLayer,
pools_layer::PoolsLayerBuilder,
Expand Down Expand Up @@ -238,6 +242,24 @@ impl MainNodeBuilder {
Ok(self)
}

fn add_house_keeper_layer(mut self) -> anyhow::Result<Self> {
let house_keeper_config = HouseKeeperConfig::from_env()?;
let fri_prover_config = FriProverConfig::from_env()?;
let fri_witness_generator_config = FriWitnessGeneratorConfig::from_env()?;
let fri_prover_group_config = FriProverGroupConfig::from_env()?;
let fri_proof_compressor_config = FriProofCompressorConfig::from_env()?;

self.node.add_layer(HouseKeeperLayer::new(
house_keeper_config,
fri_prover_config,
fri_witness_generator_config,
fri_prover_group_config,
fri_proof_compressor_config,
));

Ok(self)
}

fn add_commitment_generator_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(CommitmentGeneratorLayer);

Expand Down Expand Up @@ -274,6 +296,7 @@ fn main() -> anyhow::Result<()> {
.add_tree_api_client_layer()?
.add_http_web3_api_layer()?
.add_ws_web3_api_layer()?
.add_house_keeper_layer()?
.add_commitment_generator_layer()?
.build()
.run()?;
Expand Down
330 changes: 330 additions & 0 deletions core/node/node_framework/src/implementations/layers/house_keeper.rs
@@ -0,0 +1,330 @@
use std::time::Duration;

use zksync_config::configs::{
fri_prover_group::FriProverGroupConfig, house_keeper::HouseKeeperConfig,
FriProofCompressorConfig, FriProverConfig, FriWitnessGeneratorConfig,
};
use zksync_core::house_keeper::{
blocks_state_reporter::L1BatchMetricsReporter,
fri_proof_compressor_job_retry_manager::FriProofCompressorJobRetryManager,
fri_proof_compressor_queue_monitor::FriProofCompressorStatsReporter,
fri_prover_job_retry_manager::FriProverJobRetryManager,
fri_prover_queue_monitor::FriProverStatsReporter,
fri_scheduler_circuit_queuer::SchedulerCircuitQueuer,
fri_witness_generator_jobs_retry_manager::FriWitnessGeneratorJobRetryManager,
fri_witness_generator_queue_monitor::FriWitnessGeneratorStatsReporter,
periodic_job::PeriodicJob,
waiting_to_queued_fri_witness_job_mover::WaitingToQueuedFriWitnessJobMover,
};
use zksync_dal::ConnectionPool;

use crate::{
implementations::resources::pools::{ProverPoolResource, ReplicaPoolResource},
service::{ServiceContext, StopReceiver},
task::Task,
wiring_layer::{WiringError, WiringLayer},
};

const SCRAPE_INTERVAL: Duration = Duration::from_secs(60);

#[derive(Debug)]
pub struct HouseKeeperLayer {
house_keeper_config: HouseKeeperConfig,
fri_prover_config: FriProverConfig,
fri_witness_generator_config: FriWitnessGeneratorConfig,
fri_prover_group_config: FriProverGroupConfig,
fri_proof_compressor_config: FriProofCompressorConfig,
}

impl HouseKeeperLayer {
pub fn new(
house_keeper_config: HouseKeeperConfig,
fri_prover_config: FriProverConfig,
fri_witness_generator_config: FriWitnessGeneratorConfig,
fri_prover_group_config: FriProverGroupConfig,
fri_proof_compressor_config: FriProofCompressorConfig,
) -> Self {
Self {
house_keeper_config,
fri_prover_config,
fri_witness_generator_config,
fri_prover_group_config,
fri_proof_compressor_config,
}
}
}

#[async_trait::async_trait]
impl WiringLayer for HouseKeeperLayer {
fn layer_name(&self) -> &'static str {
"house_keeper_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
// initialize resources
let replica_pool_resource = context.get_resource::<ReplicaPoolResource>().await?;
let replica_pool = replica_pool_resource.get().await?;

let prover_pool_resource = context.get_resource::<ProverPoolResource>().await?;
let prover_pool = prover_pool_resource.get().await?;

// initialize and add tasks
let pool_for_metrics = replica_pool.clone();
context.add_task(Box::new(PoolForMetricsTask { pool_for_metrics }));

let l1_batch_metrics_reporter = L1BatchMetricsReporter::new(
self.house_keeper_config
.l1_batch_metrics_reporting_interval_ms,
replica_pool.clone(),
);
context.add_task(Box::new(L1BatchMetricsReporterTask {
l1_batch_metrics_reporter,
}));

let fri_prover_job_retry_manager = FriProverJobRetryManager::new(
self.fri_prover_config.max_attempts,
self.fri_prover_config.proof_generation_timeout(),
self.house_keeper_config.fri_prover_job_retrying_interval_ms,
prover_pool.clone(),
);
context.add_task(Box::new(FriProverJobRetryManagerTask {
fri_prover_job_retry_manager,
}));

let fri_witness_gen_job_retry_manager = FriWitnessGeneratorJobRetryManager::new(
self.fri_witness_generator_config.max_attempts,
self.fri_witness_generator_config
.witness_generation_timeout(),
self.house_keeper_config
.fri_witness_generator_job_retrying_interval_ms,
prover_pool.clone(),
);
context.add_task(Box::new(FriWitnessGeneratorJobRetryManagerTask {
fri_witness_gen_job_retry_manager,
}));

let waiting_to_queued_fri_witness_job_mover = WaitingToQueuedFriWitnessJobMover::new(
self.house_keeper_config.fri_witness_job_moving_interval_ms,
prover_pool.clone(),
);
context.add_task(Box::new(WaitingToQueuedFriWitnessJobMoverTask {
waiting_to_queued_fri_witness_job_mover,
}));

let scheduler_circuit_queuer = SchedulerCircuitQueuer::new(
self.house_keeper_config.fri_witness_job_moving_interval_ms,
prover_pool.clone(),
);
context.add_task(Box::new(SchedulerCircuitQueuerTask {
scheduler_circuit_queuer,
}));

let fri_witness_generator_stats_reporter = FriWitnessGeneratorStatsReporter::new(
prover_pool.clone(),
self.house_keeper_config
.witness_generator_stats_reporting_interval_ms,
);
context.add_task(Box::new(FriWitnessGeneratorStatsReporterTask {
fri_witness_generator_stats_reporter,
}));

let fri_prover_stats_reporter = FriProverStatsReporter::new(
self.house_keeper_config
.fri_prover_stats_reporting_interval_ms,
prover_pool.clone(),
replica_pool.clone(),
self.fri_prover_group_config,
);
context.add_task(Box::new(FriProverStatsReporterTask {
fri_prover_stats_reporter,
}));

let fri_proof_compressor_stats_reporter = FriProofCompressorStatsReporter::new(
self.house_keeper_config
.fri_proof_compressor_stats_reporting_interval_ms,
prover_pool.clone(),
);
context.add_task(Box::new(FriProofCompressorStatsReporterTask {
fri_proof_compressor_stats_reporter,
}));

let fri_proof_compressor_retry_manager = FriProofCompressorJobRetryManager::new(
self.fri_proof_compressor_config.max_attempts,
self.fri_proof_compressor_config.generation_timeout(),
self.house_keeper_config
.fri_proof_compressor_job_retrying_interval_ms,
prover_pool.clone(),
);
context.add_task(Box::new(FriProofCompressorJobRetryManagerTask {
fri_proof_compressor_retry_manager,
}));

Ok(())
}
}

// TODO (QIT-29): Support stop receivers for house keeper related tasks.

#[derive(Debug)]
struct PoolForMetricsTask {
pool_for_metrics: ConnectionPool,
}

#[async_trait::async_trait]
impl Task for PoolForMetricsTask {
fn name(&self) -> &'static str {
"pool_for_metrics"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.pool_for_metrics
.run_postgres_metrics_scraping(SCRAPE_INTERVAL)
.await;

Ok(())
}
}

#[derive(Debug)]
struct L1BatchMetricsReporterTask {
l1_batch_metrics_reporter: L1BatchMetricsReporter,
}

#[async_trait::async_trait]
impl Task for L1BatchMetricsReporterTask {
fn name(&self) -> &'static str {
"l1_batch_metrics_reporter"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.l1_batch_metrics_reporter.run().await
}
}

#[derive(Debug)]
struct FriProverJobRetryManagerTask {
fri_prover_job_retry_manager: FriProverJobRetryManager,
}

#[async_trait::async_trait]
impl Task for FriProverJobRetryManagerTask {
fn name(&self) -> &'static str {
"fri_prover_job_retry_manager"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.fri_prover_job_retry_manager.run().await
}
}

#[derive(Debug)]
struct FriWitnessGeneratorJobRetryManagerTask {
fri_witness_gen_job_retry_manager: FriWitnessGeneratorJobRetryManager,
}

#[async_trait::async_trait]
impl Task for FriWitnessGeneratorJobRetryManagerTask {
fn name(&self) -> &'static str {
"fri_witness_generator_job_retry_manager"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.fri_witness_gen_job_retry_manager.run().await
}
}

#[derive(Debug)]
struct WaitingToQueuedFriWitnessJobMoverTask {
waiting_to_queued_fri_witness_job_mover: WaitingToQueuedFriWitnessJobMover,
}

#[async_trait::async_trait]
impl Task for WaitingToQueuedFriWitnessJobMoverTask {
fn name(&self) -> &'static str {
"waiting_to_queued_fri_witness_job_mover"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.waiting_to_queued_fri_witness_job_mover.run().await
}
}

#[derive(Debug)]
struct SchedulerCircuitQueuerTask {
scheduler_circuit_queuer: SchedulerCircuitQueuer,
}

#[async_trait::async_trait]
impl Task for SchedulerCircuitQueuerTask {
fn name(&self) -> &'static str {
"scheduler_circuit_queuer"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.scheduler_circuit_queuer.run().await
}
}

#[derive(Debug)]
struct FriWitnessGeneratorStatsReporterTask {
fri_witness_generator_stats_reporter: FriWitnessGeneratorStatsReporter,
}

#[async_trait::async_trait]
impl Task for FriWitnessGeneratorStatsReporterTask {
fn name(&self) -> &'static str {
"fri_witness_generator_stats_reporter"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.fri_witness_generator_stats_reporter.run().await
}
}

#[derive(Debug)]
struct FriProverStatsReporterTask {
fri_prover_stats_reporter: FriProverStatsReporter,
}

#[async_trait::async_trait]
impl Task for FriProverStatsReporterTask {
fn name(&self) -> &'static str {
"fri_prover_stats_reporter"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.fri_prover_stats_reporter.run().await
}
}

#[derive(Debug)]
struct FriProofCompressorStatsReporterTask {
fri_proof_compressor_stats_reporter: FriProofCompressorStatsReporter,
}

#[async_trait::async_trait]
impl Task for FriProofCompressorStatsReporterTask {
fn name(&self) -> &'static str {
"fri_proof_compressor_stats_reporter"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.fri_proof_compressor_stats_reporter.run().await
}
}

#[derive(Debug)]
struct FriProofCompressorJobRetryManagerTask {
fri_proof_compressor_retry_manager: FriProofCompressorJobRetryManager,
}

#[async_trait::async_trait]
impl Task for FriProofCompressorJobRetryManagerTask {
fn name(&self) -> &'static str {
"fri_proof_compressor_job_retry_manager"
}

async fn run(self: Box<Self>, _stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.fri_proof_compressor_retry_manager.run().await
}
}
1 change: 1 addition & 0 deletions core/node/node_framework/src/implementations/layers/mod.rs
Expand Up @@ -2,6 +2,7 @@ pub mod commitment_generator;
pub mod eth_watch;
pub mod fee_input;
pub mod healtcheck_server;
pub mod house_keeper;
pub mod metadata_calculator;
pub mod object_store;
pub mod pk_signing_eth_client;
Expand Down

0 comments on commit 702e739

Please sign in to comment.