Skip to content

Commit

Permalink
feat: Archive old prover jobs (#1516)
Browse files Browse the repository at this point in the history
## What ❔

Add recurring job that moves fri prover jobs that are older than set
time to fri_prover_jobs_archive

## Why ❔

To make querying the jobs faster

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
Artemka374 committed Mar 31, 2024
1 parent 0a13602 commit 201476c
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 26 deletions.
12 changes: 12 additions & 0 deletions core/lib/config/src/configs/house_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,16 @@ pub struct HouseKeeperConfig {
pub fri_prover_stats_reporting_interval_ms: u64,
pub fri_proof_compressor_job_retrying_interval_ms: u64,
pub fri_proof_compressor_stats_reporting_interval_ms: u64,
// TODO(PLA-862): Make these 2 variables required
pub fri_prover_job_archiver_reporting_interval_ms: Option<u64>,
pub fri_prover_job_archiver_archiving_interval_secs: Option<u64>,
}

impl HouseKeeperConfig {
pub fn fri_prover_job_archiver_enabled(&self) -> bool {
self.fri_prover_job_archiver_reporting_interval_ms.is_some()
&& self
.fri_prover_job_archiver_archiving_interval_secs
.is_some()
}
}
2 changes: 2 additions & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ impl Distribution<configs::house_keeper::HouseKeeperConfig> for EncodeDist {
fri_prover_stats_reporting_interval_ms: self.sample(rng),
fri_proof_compressor_job_retrying_interval_ms: self.sample(rng),
fri_proof_compressor_stats_reporting_interval_ms: self.sample(rng),
fri_prover_job_archiver_reporting_interval_ms: self.sample(rng),
fri_prover_job_archiver_archiving_interval_secs: self.sample(rng),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions core/lib/env_config/src/house_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ mod tests {
fri_prover_stats_reporting_interval_ms: 30_000,
fri_proof_compressor_job_retrying_interval_ms: 30_000,
fri_proof_compressor_stats_reporting_interval_ms: 30_000,
fri_prover_job_archiver_reporting_interval_ms: Some(1_800_000),
fri_prover_job_archiver_archiving_interval_secs: Some(172_800),
}
}

Expand All @@ -50,6 +52,8 @@ mod tests {
HOUSE_KEEPER_FRI_PROVER_STATS_REPORTING_INTERVAL_MS="30000"
HOUSE_KEEPER_FRI_PROOF_COMPRESSOR_STATS_REPORTING_INTERVAL_MS="30000"
HOUSE_KEEPER_FRI_PROOF_COMPRESSOR_JOB_RETRYING_INTERVAL_MS="30000"
HOUSE_KEEPER_FRI_PROVER_JOB_ARCHIVER_REPORTING_INTERVAL_MS="1800000"
HOUSE_KEEPER_FRI_PROVER_JOB_ARCHIVER_ARCHIVING_INTERVAL_SECS="172800"
"#;
lock.set_env(config);

Expand Down
9 changes: 9 additions & 0 deletions core/lib/protobuf_config/src/house_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ impl ProtoRepr for proto::HouseKeeper {
&self.fri_proof_compressor_stats_reporting_interval_ms,
)
.context("fri_proof_compressor_stats_reporting_interval_ms")?,
// TODO(PLA-862): Make these 2 variables required
fri_prover_job_archiver_reporting_interval_ms: self
.fri_prover_job_archiver_reporting_interval_ms,
fri_prover_job_archiver_archiving_interval_secs: self
.fri_prover_job_archiver_archiving_interval_secs,
})
}

Expand Down Expand Up @@ -82,6 +87,10 @@ impl ProtoRepr for proto::HouseKeeper {
fri_proof_compressor_stats_reporting_interval_ms: Some(
this.fri_proof_compressor_stats_reporting_interval_ms,
),
fri_prover_job_archiver_reporting_interval_ms: this
.fri_prover_job_archiver_reporting_interval_ms,
fri_prover_job_archiver_archiving_interval_secs: this
.fri_prover_job_archiver_archiving_interval_secs,
}
}
}
28 changes: 15 additions & 13 deletions core/lib/protobuf_config/src/proto/house_keeper.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ syntax = "proto3";
package zksync.config.house_keeper;

message HouseKeeper {
optional uint64 l1_batch_metrics_reporting_interval_ms = 1; // required; ms
optional uint64 gpu_prover_queue_reporting_interval_ms = 2; // required; ms
optional uint64 prover_job_retrying_interval_ms = 3; // required; ms
optional uint64 prover_stats_reporting_interval_ms = 4; // required ms
optional uint64 witness_job_moving_interval_ms = 5; // required; ms
optional uint64 witness_generator_stats_reporting_interval_ms = 6; // required; ms
optional uint64 fri_witness_job_moving_interval_ms = 7; // required; ms
optional uint64 fri_prover_job_retrying_interval_ms = 8; // required; ms
optional uint64 fri_witness_generator_job_retrying_interval_ms = 9; // required; ms
optional uint32 prover_db_pool_size = 10; // required
optional uint64 fri_prover_stats_reporting_interval_ms = 11; // required; ms
optional uint64 fri_proof_compressor_job_retrying_interval_ms = 12; // required; ms
optional uint64 fri_proof_compressor_stats_reporting_interval_ms = 13; // required; ms
optional uint64 l1_batch_metrics_reporting_interval_ms = 1; // required; ms
optional uint64 gpu_prover_queue_reporting_interval_ms = 2; // required; ms
optional uint64 prover_job_retrying_interval_ms = 3; // required; ms
optional uint64 prover_stats_reporting_interval_ms = 4; // required ms
optional uint64 witness_job_moving_interval_ms = 5; // required; ms
optional uint64 witness_generator_stats_reporting_interval_ms = 6; // required; ms
optional uint64 fri_witness_job_moving_interval_ms = 7; // required; ms
optional uint64 fri_prover_job_retrying_interval_ms = 8; // required; ms
optional uint64 fri_witness_generator_job_retrying_interval_ms = 9; // required; ms
optional uint32 prover_db_pool_size = 10; // required
optional uint64 fri_prover_stats_reporting_interval_ms = 11; // required; ms
optional uint64 fri_proof_compressor_job_retrying_interval_ms = 12; // required; ms
optional uint64 fri_proof_compressor_stats_reporting_interval_ms = 13; // required; ms
optional uint64 fri_prover_job_archiver_reporting_interval_ms = 14; // optional; ms
optional uint64 fri_prover_job_archiver_archiving_interval_secs = 15; // optional; seconds
}
48 changes: 48 additions & 0 deletions core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use prover_dal::{Prover, ProverDal};
use zksync_db_connection::connection_pool::ConnectionPool;

use crate::house_keeper::periodic_job::PeriodicJob;

#[derive(Debug)]
pub struct FriProverJobArchiver {
pool: ConnectionPool<Prover>,
reporting_interval_ms: u64,
archiving_interval_secs: u64,
}

impl FriProverJobArchiver {
pub fn new(
pool: ConnectionPool<Prover>,
reporting_interval_ms: u64,
archiving_interval_secs: u64,
) -> Self {
Self {
pool,
reporting_interval_ms,
archiving_interval_secs,
}
}
}

#[async_trait::async_trait]
impl PeriodicJob for FriProverJobArchiver {
const SERVICE_NAME: &'static str = "FriProverJobArchiver";

async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let archived_jobs = self
.pool
.connection()
.await
.unwrap()
.fri_prover_jobs_dal()
.archive_old_jobs(self.archiving_interval_secs)
.await;
tracing::info!("Archived {:?} fri prover jobs", archived_jobs);
metrics::counter!("server.prover_fri.archived_jobs", archived_jobs as u64);
Ok(())
}

fn polling_interval_ms(&self) -> u64 {
self.reporting_interval_ms
}
}
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/house_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod blocks_state_reporter;
pub mod fri_proof_compressor_job_retry_manager;
pub mod fri_proof_compressor_queue_monitor;
pub mod fri_prover_job_retry_manager;
pub mod fri_prover_jobs_archiver;
pub mod fri_prover_queue_monitor;
pub mod fri_scheduler_circuit_queuer;
pub mod fri_witness_generator_jobs_retry_manager;
Expand Down
16 changes: 16 additions & 0 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::{
fri_proof_compressor_job_retry_manager::FriProofCompressorJobRetryManager,
fri_proof_compressor_queue_monitor::FriProofCompressorStatsReporter,
fri_prover_job_retry_manager::FriProverJobRetryManager,
fri_prover_jobs_archiver::FriProverJobArchiver,
fri_prover_queue_monitor::FriProverStatsReporter,
fri_scheduler_circuit_queuer::SchedulerCircuitQueuer,
fri_witness_generator_jobs_retry_manager::FriWitnessGeneratorJobRetryManager,
Expand Down Expand Up @@ -1074,6 +1075,21 @@ async fn add_house_keeper_to_task_futures(
let task = fri_witness_generator_stats_reporter.run(stop_receiver.clone());
task_futures.push(tokio::spawn(task));

// TODO(PLA-862): remove after fields become required
if house_keeper_config.fri_prover_job_archiver_enabled() {
let fri_prover_jobs_archiver = FriProverJobArchiver::new(
prover_connection_pool.clone(),
house_keeper_config
.fri_prover_job_archiver_reporting_interval_ms
.unwrap(),
house_keeper_config
.fri_prover_job_archiver_archiving_interval_secs
.unwrap(),
);
let task = fri_prover_jobs_archiver.run(stop_receiver.clone());
task_futures.push(tokio::spawn(task));
}

let fri_prover_group_config = configs
.fri_prover_group_config
.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use zksync_core::house_keeper::{
fri_proof_compressor_job_retry_manager::FriProofCompressorJobRetryManager,
fri_proof_compressor_queue_monitor::FriProofCompressorStatsReporter,
fri_prover_job_retry_manager::FriProverJobRetryManager,
fri_prover_jobs_archiver::FriProverJobArchiver,
fri_prover_queue_monitor::FriProverStatsReporter,
fri_scheduler_circuit_queuer::SchedulerCircuitQueuer,
fri_witness_generator_jobs_retry_manager::FriWitnessGeneratorJobRetryManager,
Expand Down Expand Up @@ -111,6 +112,21 @@ impl WiringLayer for HouseKeeperLayer {
waiting_to_queued_fri_witness_job_mover,
}));

if self.house_keeper_config.fri_prover_job_archiver_enabled() {
let fri_prover_job_archiver = FriProverJobArchiver::new(
prover_pool.clone(),
self.house_keeper_config
.fri_prover_job_archiver_reporting_interval_ms
.unwrap(),
self.house_keeper_config
.fri_prover_job_archiver_archiving_interval_secs
.unwrap(),
);
context.add_task(Box::new(FriProverJobArchiverTask {
fri_prover_job_archiver,
}));
}

let scheduler_circuit_queuer = SchedulerCircuitQueuer::new(
self.house_keeper_config.fri_witness_job_moving_interval_ms,
prover_pool.clone(),
Expand Down Expand Up @@ -333,3 +349,19 @@ impl Task for FriProofCompressorJobRetryManagerTask {
.await
}
}

#[derive(Debug)]
struct FriProverJobArchiverTask {
fri_prover_job_archiver: FriProverJobArchiver,
}

#[async_trait::async_trait]
impl Task for FriProverJobArchiverTask {
fn name(&self) -> &'static str {
"fri_prover_job_archiver"
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.fri_prover_job_archiver.run(stop_receiver.0).await
}
}
28 changes: 15 additions & 13 deletions etc/env/base/house_keeper.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
[house_keeper]
l1_batch_metrics_reporting_interval_ms=10000
gpu_prover_queue_reporting_interval_ms=10000
prover_job_retrying_interval_ms=300000
prover_stats_reporting_interval_ms=5000
witness_job_moving_interval_ms=30000
witness_generator_stats_reporting_interval_ms=10000
fri_witness_job_moving_interval_ms=40000
fri_prover_job_retrying_interval_ms=30000
fri_witness_generator_job_retrying_interval_ms=30000
prover_db_pool_size=2
fri_prover_stats_reporting_interval_ms=30000
fri_proof_compressor_job_retrying_interval_ms=30000
fri_proof_compressor_stats_reporting_interval_ms=10000
l1_batch_metrics_reporting_interval_ms = 10000
gpu_prover_queue_reporting_interval_ms = 10000
prover_job_retrying_interval_ms = 300000
prover_stats_reporting_interval_ms = 5000
witness_job_moving_interval_ms = 30000
witness_generator_stats_reporting_interval_ms = 10000
fri_witness_job_moving_interval_ms = 40000
fri_prover_job_retrying_interval_ms = 30000
fri_witness_generator_job_retrying_interval_ms = 30000
prover_db_pool_size = 2
fri_prover_stats_reporting_interval_ms = 30000
fri_proof_compressor_job_retrying_interval_ms = 30000
fri_proof_compressor_stats_reporting_interval_ms = 10000
fri_prover_job_archiver_reporting_interval_ms = 1800000
fri_prover_job_archiver_archiving_interval_ms = 172800

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions prover/prover_dal/src/fri_prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,31 @@ impl FriProverDal<'_, '_> {
.ok()?
.map(|row| row.id as u32)
}

pub async fn archive_old_jobs(&mut self, archiving_interval_secs: u64) -> usize {
let archiving_interval_secs =
pg_interval_from_duration(Duration::from_secs(archiving_interval_secs));

sqlx::query_scalar!(
r#"
WITH deleted AS (
DELETE FROM prover_jobs_fri
WHERE
status NOT IN ('queued', 'in_progress', 'in_gpu_proof', 'failed')
AND updated_at < NOW() - $1::INTERVAL
RETURNING *
),
inserted_count AS (
INSERT INTO prover_jobs_fri_archive
SELECT * FROM deleted
)
SELECT COUNT(*) FROM deleted
"#,
&archiving_interval_secs,
)
.fetch_one(self.storage.conn())
.await
.unwrap()
.unwrap_or(0) as usize
}
}

0 comments on commit 201476c

Please sign in to comment.