diff --git a/core/lib/config/src/configs/house_keeper.rs b/core/lib/config/src/configs/house_keeper.rs index 75cd919e2f8..a9e2c7cc7b3 100644 --- a/core/lib/config/src/configs/house_keeper.rs +++ b/core/lib/config/src/configs/house_keeper.rs @@ -16,4 +16,16 @@ pub struct HouseKeeperConfig { pub fri_prover_stats_reporting_interval_ms: u64, pub fri_proof_compressor_job_retrying_interval_ms: u64, pub fri_proof_compressor_stats_reporting_interval_ms: u64, + // TODO(PLA-862): Make these 2 variables required + pub fri_prover_job_archiver_reporting_interval_ms: Option, + pub fri_prover_job_archiver_archiving_interval_secs: Option, +} + +impl HouseKeeperConfig { + pub fn fri_prover_job_archiver_enabled(&self) -> bool { + self.fri_prover_job_archiver_reporting_interval_ms.is_some() + && self + .fri_prover_job_archiver_archiving_interval_secs + .is_some() + } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 47274c606f0..6ac8ef24993 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -599,6 +599,8 @@ impl Distribution for EncodeDist { fri_prover_stats_reporting_interval_ms: self.sample(rng), fri_proof_compressor_job_retrying_interval_ms: self.sample(rng), fri_proof_compressor_stats_reporting_interval_ms: self.sample(rng), + fri_prover_job_archiver_reporting_interval_ms: self.sample(rng), + fri_prover_job_archiver_archiving_interval_secs: self.sample(rng), } } } diff --git a/core/lib/env_config/src/house_keeper.rs b/core/lib/env_config/src/house_keeper.rs index bf995f8238b..9ce6063ce89 100644 --- a/core/lib/env_config/src/house_keeper.rs +++ b/core/lib/env_config/src/house_keeper.rs @@ -30,6 +30,8 @@ mod tests { fri_prover_stats_reporting_interval_ms: 30_000, fri_proof_compressor_job_retrying_interval_ms: 30_000, fri_proof_compressor_stats_reporting_interval_ms: 30_000, + fri_prover_job_archiver_reporting_interval_ms: Some(1_800_000), + fri_prover_job_archiver_archiving_interval_secs: Some(172_800), } } @@ -50,6 +52,8 @@ mod tests { HOUSE_KEEPER_FRI_PROVER_STATS_REPORTING_INTERVAL_MS="30000" HOUSE_KEEPER_FRI_PROOF_COMPRESSOR_STATS_REPORTING_INTERVAL_MS="30000" HOUSE_KEEPER_FRI_PROOF_COMPRESSOR_JOB_RETRYING_INTERVAL_MS="30000" + HOUSE_KEEPER_FRI_PROVER_JOB_ARCHIVER_REPORTING_INTERVAL_MS="1800000" + HOUSE_KEEPER_FRI_PROVER_JOB_ARCHIVER_ARCHIVING_INTERVAL_SECS="172800" "#; lock.set_env(config); diff --git a/core/lib/protobuf_config/src/house_keeper.rs b/core/lib/protobuf_config/src/house_keeper.rs index f83be249ad3..316136f7efb 100644 --- a/core/lib/protobuf_config/src/house_keeper.rs +++ b/core/lib/protobuf_config/src/house_keeper.rs @@ -50,6 +50,11 @@ impl ProtoRepr for proto::HouseKeeper { &self.fri_proof_compressor_stats_reporting_interval_ms, ) .context("fri_proof_compressor_stats_reporting_interval_ms")?, + // TODO(PLA-862): Make these 2 variables required + fri_prover_job_archiver_reporting_interval_ms: self + .fri_prover_job_archiver_reporting_interval_ms, + fri_prover_job_archiver_archiving_interval_secs: self + .fri_prover_job_archiver_archiving_interval_secs, }) } @@ -82,6 +87,10 @@ impl ProtoRepr for proto::HouseKeeper { fri_proof_compressor_stats_reporting_interval_ms: Some( this.fri_proof_compressor_stats_reporting_interval_ms, ), + fri_prover_job_archiver_reporting_interval_ms: this + .fri_prover_job_archiver_reporting_interval_ms, + fri_prover_job_archiver_archiving_interval_secs: this + .fri_prover_job_archiver_archiving_interval_secs, } } } diff --git a/core/lib/protobuf_config/src/proto/house_keeper.proto b/core/lib/protobuf_config/src/proto/house_keeper.proto index 6c186aba554..c701656f9f7 100644 --- a/core/lib/protobuf_config/src/proto/house_keeper.proto +++ b/core/lib/protobuf_config/src/proto/house_keeper.proto @@ -3,17 +3,19 @@ syntax = "proto3"; package zksync.config.house_keeper; message HouseKeeper { - optional uint64 l1_batch_metrics_reporting_interval_ms = 1; // required; ms - optional uint64 gpu_prover_queue_reporting_interval_ms = 2; // required; ms - optional uint64 prover_job_retrying_interval_ms = 3; // required; ms - optional uint64 prover_stats_reporting_interval_ms = 4; // required ms - optional uint64 witness_job_moving_interval_ms = 5; // required; ms - optional uint64 witness_generator_stats_reporting_interval_ms = 6; // required; ms - optional uint64 fri_witness_job_moving_interval_ms = 7; // required; ms - optional uint64 fri_prover_job_retrying_interval_ms = 8; // required; ms - optional uint64 fri_witness_generator_job_retrying_interval_ms = 9; // required; ms - optional uint32 prover_db_pool_size = 10; // required - optional uint64 fri_prover_stats_reporting_interval_ms = 11; // required; ms - optional uint64 fri_proof_compressor_job_retrying_interval_ms = 12; // required; ms - optional uint64 fri_proof_compressor_stats_reporting_interval_ms = 13; // required; ms + optional uint64 l1_batch_metrics_reporting_interval_ms = 1; // required; ms + optional uint64 gpu_prover_queue_reporting_interval_ms = 2; // required; ms + optional uint64 prover_job_retrying_interval_ms = 3; // required; ms + optional uint64 prover_stats_reporting_interval_ms = 4; // required ms + optional uint64 witness_job_moving_interval_ms = 5; // required; ms + optional uint64 witness_generator_stats_reporting_interval_ms = 6; // required; ms + optional uint64 fri_witness_job_moving_interval_ms = 7; // required; ms + optional uint64 fri_prover_job_retrying_interval_ms = 8; // required; ms + optional uint64 fri_witness_generator_job_retrying_interval_ms = 9; // required; ms + optional uint32 prover_db_pool_size = 10; // required + optional uint64 fri_prover_stats_reporting_interval_ms = 11; // required; ms + optional uint64 fri_proof_compressor_job_retrying_interval_ms = 12; // required; ms + optional uint64 fri_proof_compressor_stats_reporting_interval_ms = 13; // required; ms + optional uint64 fri_prover_job_archiver_reporting_interval_ms = 14; // optional; ms + optional uint64 fri_prover_job_archiver_archiving_interval_secs = 15; // optional; seconds } diff --git a/core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs b/core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs new file mode 100644 index 00000000000..28fadd104ed --- /dev/null +++ b/core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs @@ -0,0 +1,48 @@ +use prover_dal::{Prover, ProverDal}; +use zksync_db_connection::connection_pool::ConnectionPool; + +use crate::house_keeper::periodic_job::PeriodicJob; + +#[derive(Debug)] +pub struct FriProverJobArchiver { + pool: ConnectionPool, + reporting_interval_ms: u64, + archiving_interval_secs: u64, +} + +impl FriProverJobArchiver { + pub fn new( + pool: ConnectionPool, + reporting_interval_ms: u64, + archiving_interval_secs: u64, + ) -> Self { + Self { + pool, + reporting_interval_ms, + archiving_interval_secs, + } + } +} + +#[async_trait::async_trait] +impl PeriodicJob for FriProverJobArchiver { + const SERVICE_NAME: &'static str = "FriProverJobArchiver"; + + async fn run_routine_task(&mut self) -> anyhow::Result<()> { + let archived_jobs = self + .pool + .connection() + .await + .unwrap() + .fri_prover_jobs_dal() + .archive_old_jobs(self.archiving_interval_secs) + .await; + tracing::info!("Archived {:?} fri prover jobs", archived_jobs); + metrics::counter!("server.prover_fri.archived_jobs", archived_jobs as u64); + Ok(()) + } + + fn polling_interval_ms(&self) -> u64 { + self.reporting_interval_ms + } +} diff --git a/core/lib/zksync_core/src/house_keeper/mod.rs b/core/lib/zksync_core/src/house_keeper/mod.rs index 2c029d42e77..f3b5b202ee1 100644 --- a/core/lib/zksync_core/src/house_keeper/mod.rs +++ b/core/lib/zksync_core/src/house_keeper/mod.rs @@ -2,6 +2,7 @@ pub mod blocks_state_reporter; pub mod fri_proof_compressor_job_retry_manager; pub mod fri_proof_compressor_queue_monitor; pub mod fri_prover_job_retry_manager; +pub mod fri_prover_jobs_archiver; pub mod fri_prover_queue_monitor; pub mod fri_scheduler_circuit_queuer; pub mod fri_witness_generator_jobs_retry_manager; diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 8484987eb12..3bc6f13c9ec 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -71,6 +71,7 @@ use crate::{ fri_proof_compressor_job_retry_manager::FriProofCompressorJobRetryManager, fri_proof_compressor_queue_monitor::FriProofCompressorStatsReporter, fri_prover_job_retry_manager::FriProverJobRetryManager, + fri_prover_jobs_archiver::FriProverJobArchiver, fri_prover_queue_monitor::FriProverStatsReporter, fri_scheduler_circuit_queuer::SchedulerCircuitQueuer, fri_witness_generator_jobs_retry_manager::FriWitnessGeneratorJobRetryManager, @@ -1074,6 +1075,21 @@ async fn add_house_keeper_to_task_futures( let task = fri_witness_generator_stats_reporter.run(stop_receiver.clone()); task_futures.push(tokio::spawn(task)); + // TODO(PLA-862): remove after fields become required + if house_keeper_config.fri_prover_job_archiver_enabled() { + let fri_prover_jobs_archiver = FriProverJobArchiver::new( + prover_connection_pool.clone(), + house_keeper_config + .fri_prover_job_archiver_reporting_interval_ms + .unwrap(), + house_keeper_config + .fri_prover_job_archiver_archiving_interval_secs + .unwrap(), + ); + let task = fri_prover_jobs_archiver.run(stop_receiver.clone()); + task_futures.push(tokio::spawn(task)); + } + let fri_prover_group_config = configs .fri_prover_group_config .clone() diff --git a/core/node/node_framework/src/implementations/layers/house_keeper.rs b/core/node/node_framework/src/implementations/layers/house_keeper.rs index 22e9281f104..507dcd49038 100644 --- a/core/node/node_framework/src/implementations/layers/house_keeper.rs +++ b/core/node/node_framework/src/implementations/layers/house_keeper.rs @@ -9,6 +9,7 @@ use zksync_core::house_keeper::{ fri_proof_compressor_job_retry_manager::FriProofCompressorJobRetryManager, fri_proof_compressor_queue_monitor::FriProofCompressorStatsReporter, fri_prover_job_retry_manager::FriProverJobRetryManager, + fri_prover_jobs_archiver::FriProverJobArchiver, fri_prover_queue_monitor::FriProverStatsReporter, fri_scheduler_circuit_queuer::SchedulerCircuitQueuer, fri_witness_generator_jobs_retry_manager::FriWitnessGeneratorJobRetryManager, @@ -111,6 +112,21 @@ impl WiringLayer for HouseKeeperLayer { waiting_to_queued_fri_witness_job_mover, })); + if self.house_keeper_config.fri_prover_job_archiver_enabled() { + let fri_prover_job_archiver = FriProverJobArchiver::new( + prover_pool.clone(), + self.house_keeper_config + .fri_prover_job_archiver_reporting_interval_ms + .unwrap(), + self.house_keeper_config + .fri_prover_job_archiver_archiving_interval_secs + .unwrap(), + ); + context.add_task(Box::new(FriProverJobArchiverTask { + fri_prover_job_archiver, + })); + } + let scheduler_circuit_queuer = SchedulerCircuitQueuer::new( self.house_keeper_config.fri_witness_job_moving_interval_ms, prover_pool.clone(), @@ -333,3 +349,19 @@ impl Task for FriProofCompressorJobRetryManagerTask { .await } } + +#[derive(Debug)] +struct FriProverJobArchiverTask { + fri_prover_job_archiver: FriProverJobArchiver, +} + +#[async_trait::async_trait] +impl Task for FriProverJobArchiverTask { + fn name(&self) -> &'static str { + "fri_prover_job_archiver" + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.fri_prover_job_archiver.run(stop_receiver.0).await + } +} diff --git a/etc/env/base/house_keeper.toml b/etc/env/base/house_keeper.toml index d79a93867f3..3391fe54aa9 100644 --- a/etc/env/base/house_keeper.toml +++ b/etc/env/base/house_keeper.toml @@ -1,14 +1,16 @@ [house_keeper] -l1_batch_metrics_reporting_interval_ms=10000 -gpu_prover_queue_reporting_interval_ms=10000 -prover_job_retrying_interval_ms=300000 -prover_stats_reporting_interval_ms=5000 -witness_job_moving_interval_ms=30000 -witness_generator_stats_reporting_interval_ms=10000 -fri_witness_job_moving_interval_ms=40000 -fri_prover_job_retrying_interval_ms=30000 -fri_witness_generator_job_retrying_interval_ms=30000 -prover_db_pool_size=2 -fri_prover_stats_reporting_interval_ms=30000 -fri_proof_compressor_job_retrying_interval_ms=30000 -fri_proof_compressor_stats_reporting_interval_ms=10000 +l1_batch_metrics_reporting_interval_ms = 10000 +gpu_prover_queue_reporting_interval_ms = 10000 +prover_job_retrying_interval_ms = 300000 +prover_stats_reporting_interval_ms = 5000 +witness_job_moving_interval_ms = 30000 +witness_generator_stats_reporting_interval_ms = 10000 +fri_witness_job_moving_interval_ms = 40000 +fri_prover_job_retrying_interval_ms = 30000 +fri_witness_generator_job_retrying_interval_ms = 30000 +prover_db_pool_size = 2 +fri_prover_stats_reporting_interval_ms = 30000 +fri_proof_compressor_job_retrying_interval_ms = 30000 +fri_proof_compressor_stats_reporting_interval_ms = 10000 +fri_prover_job_archiver_reporting_interval_ms = 1800000 +fri_prover_job_archiver_archiving_interval_ms = 172800 \ No newline at end of file diff --git a/prover/prover_dal/.sqlx/query-6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec.json b/prover/prover_dal/.sqlx/query-6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec.json new file mode 100644 index 00000000000..02b7862517f --- /dev/null +++ b/prover/prover_dal/.sqlx/query-6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH deleted AS (\n DELETE FROM prover_jobs_fri\n WHERE\n status NOT IN ('queued', 'in_progress', 'in_gpu_proof', 'failed')\n AND updated_at < NOW() - $1::INTERVAL\n RETURNING *\n ),\n inserted_count AS (\n INSERT INTO prover_jobs_fri_archive\n SELECT * FROM deleted\n )\n SELECT COUNT(*) FROM deleted\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [ + null + ] + }, + "hash": "6cfc59d2fc039c706f30ae91b7d9d0c658093dede5eb61489205aa751ad5b8ec" +} diff --git a/prover/prover_dal/src/fri_prover_dal.rs b/prover/prover_dal/src/fri_prover_dal.rs index c5996b739b5..2da9f69d311 100644 --- a/prover/prover_dal/src/fri_prover_dal.rs +++ b/prover/prover_dal/src/fri_prover_dal.rs @@ -567,4 +567,31 @@ impl FriProverDal<'_, '_> { .ok()? .map(|row| row.id as u32) } + + pub async fn archive_old_jobs(&mut self, archiving_interval_secs: u64) -> usize { + let archiving_interval_secs = + pg_interval_from_duration(Duration::from_secs(archiving_interval_secs)); + + sqlx::query_scalar!( + r#" + WITH deleted AS ( + DELETE FROM prover_jobs_fri + WHERE + status NOT IN ('queued', 'in_progress', 'in_gpu_proof', 'failed') + AND updated_at < NOW() - $1::INTERVAL + RETURNING * + ), + inserted_count AS ( + INSERT INTO prover_jobs_fri_archive + SELECT * FROM deleted + ) + SELECT COUNT(*) FROM deleted + "#, + &archiving_interval_secs, + ) + .fetch_one(self.storage.conn()) + .await + .unwrap() + .unwrap_or(0) as usize + } }