Skip to content

Commit

Permalink
feat: Add various metrics to the Prover subsystems (#541)
Browse files Browse the repository at this point in the history
## What ❔

1. Add various metrics to the Prover subsystems, especially:
* oldest block, that wasn't sent to
prover(`fri_prover.oldest_unprocessed_block`)
* oldest block, that didn't go through basic/leaf/node aggregation
levels (`fri_prover.oldest_unprocessed_block_by_round`)
* how much time is spent on waiting for available prover to send data to
(`prover_fri_witness_vector_generator.prover_waiting_time)
* count for attempts to send data to prover
(`prover_fri_witness_vector_generator.prover_attempts_count`)
2. Refactor metrics in prover to use vise.

## Why ❔

We have some metric coverage on the prover subsystem, but it's
incomplete.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
Artemka374 committed Dec 4, 2023
1 parent 19c84ce commit 58a4e6c
Show file tree
Hide file tree
Showing 51 changed files with 623 additions and 249 deletions.
74 changes: 74 additions & 0 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,24 @@
},
"query": "UPDATE transactions\n SET in_mempool = TRUE\n FROM (\n SELECT hash FROM (\n SELECT hash\n FROM transactions\n WHERE miniblock_number IS NULL AND in_mempool = FALSE AND error IS NULL\n AND (is_priority = TRUE OR (max_fee_per_gas >= $2 and gas_per_pubdata_limit >= $3))\n AND tx_format != $4\n ORDER BY is_priority DESC, priority_op_id, received_at\n LIMIT $1\n ) as subquery1\n ORDER BY hash\n ) as subquery2\n WHERE transactions.hash = subquery2.hash\n RETURNING transactions.*"
},
"2cc57497090a97bcb453036f7b5e2139b590699aa1a2df4d6fd2b19e27e06251": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
false
],
"parameters": {
"Left": []
}
},
"query": "SELECT l1_batch_number FROM proof_generation_details WHERE status <> 'generated' ORDER BY l1_batch_number ASC LIMIT 1"
},
"2e3f116ca05ae70b7c83ac550302194c91f57b69902ff8e42140fde732ae5e6a": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -3971,6 +3989,24 @@
},
"query": "VACUUM storage_logs"
},
"4860c1118485da8673963a260ded76eb8e13989936f9ab17e23687a1103132cb": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
false
],
"parameters": {
"Left": []
}
},
"query": "SELECT l1_batch_number FROM proof_generation_details WHERE status = 'ready_to_be_proven' ORDER BY l1_batch_number ASC LIMIT 1"
},
"4ab8a25620b5400d836e1b847320d4e176629a27e1a6cb0666ab02bb55371769": {
"describe": {
"columns": [
Expand Down Expand Up @@ -8814,6 +8850,26 @@
},
"query": "SELECT MAX(operation_number) as \"max?\" FROM storage_logs WHERE miniblock_number = $1"
},
"a7c7e8f036404d24dc6bfa184a84b92d8f73ca034970481af34b6163e66dc59a": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
false
],
"parameters": {
"Left": [
"Int2"
]
}
},
"query": "\n SELECT l1_batch_number \n FROM prover_jobs_fri \n WHERE status <> 'skipped'\n AND status <> 'successful'\n AND aggregation_round = $1 \n ORDER BY l1_batch_number ASC \n LIMIT 1\n "
},
"a8b32073a67ad77caab11e73a5cac5aa5b5382648ff95d6787a309eb3f64d434": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -11541,6 +11597,24 @@
},
"query": "\n UPDATE node_aggregation_witness_jobs\n SET status='queued'\n WHERE l1_batch_number IN\n (SELECT prover_jobs.l1_batch_number\n FROM prover_jobs\n JOIN node_aggregation_witness_jobs nawj ON prover_jobs.l1_batch_number = nawj.l1_batch_number\n WHERE nawj.status = 'waiting_for_proofs'\n AND prover_jobs.status = 'successful'\n AND prover_jobs.aggregation_round = 1\n GROUP BY prover_jobs.l1_batch_number, nawj.number_of_leaf_circuits\n HAVING COUNT(*) = nawj.number_of_leaf_circuits)\n RETURNING l1_batch_number;\n "
},
"f15f0848cfd830ec5d5b479fdcdd36c6a4439495b7680614ac1b0e4d73fb992f": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
false
],
"parameters": {
"Left": []
}
},
"query": "SELECT l1_batch_number FROM proof_compression_jobs_fri WHERE status <> 'successful' ORDER BY l1_batch_number ASC LIMIT 1"
},
"f1defa140e20b9c250d3212602dc259c0a35598c2e69d1c42746a8fab6dd8d3e": {
"describe": {
"columns": [],
Expand Down
16 changes: 16 additions & 0 deletions core/lib/dal/src/fri_proof_compressor_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,22 @@ impl FriProofCompressorDal<'_, '_> {
}
}

pub async fn get_oldest_not_compressed_batch(&mut self) -> Option<L1BatchNumber> {
let result: Option<L1BatchNumber> = sqlx::query!(
"SELECT l1_batch_number \
FROM proof_compression_jobs_fri \
WHERE status <> 'successful' \
ORDER BY l1_batch_number ASC \
LIMIT 1",
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

result
}

pub async fn requeue_stuck_jobs(
&mut self,
processing_timeout: Duration,
Expand Down
22 changes: 22 additions & 0 deletions core/lib/dal/src/fri_prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,28 @@ impl FriProverDal<'_, '_> {
}
}

pub async fn min_unproved_l1_batch_number_for_aggregation_round(
&mut self,
aggregation_round: AggregationRound,
) -> Option<L1BatchNumber> {
sqlx::query!(
r#"
SELECT l1_batch_number
FROM prover_jobs_fri
WHERE status <> 'skipped'
AND status <> 'successful'
AND aggregation_round = $1
ORDER BY l1_batch_number ASC
LIMIT 1
"#,
aggregation_round as i16
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| L1BatchNumber(row.l1_batch_number as u32))
}

pub async fn update_status(&mut self, id: u32, status: &str) {
sqlx::query!(
"UPDATE prover_jobs_fri \
Expand Down
32 changes: 32 additions & 0 deletions core/lib/dal/src/proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,36 @@ impl ProofGenerationDal<'_, '_> {
.then_some(())
.ok_or(sqlx::Error::RowNotFound)
}

pub async fn get_oldest_unprocessed_batch(&mut self) -> Option<L1BatchNumber> {
let result: Option<L1BatchNumber> = sqlx::query!(
"SELECT l1_batch_number \
FROM proof_generation_details \
WHERE status = 'ready_to_be_proven' \
ORDER BY l1_batch_number ASC \
LIMIT 1",
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

result
}

pub async fn get_oldest_not_generated_batch(&mut self) -> Option<L1BatchNumber> {
let result: Option<L1BatchNumber> = sqlx::query!(
"SELECT l1_batch_number \
FROM proof_generation_details \
WHERE status <> 'generated' \
ORDER BY l1_batch_number ASC \
LIMIT 1",
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

result
}
}
11 changes: 11 additions & 0 deletions core/lib/types/src/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ impl AggregationRound {
}
}

impl std::fmt::Display for AggregationRound {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str(match self {
Self::BasicCircuits => "basic_circuits",
Self::LeafAggregation => "leaf_aggregation",
Self::NodeAggregation => "node_aggregation",
Self::Scheduler => "scheduler",
})
}
}

impl FromStr for AggregationRound {
type Err = String;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ impl PeriodicJob for FriProofCompressorStatsReporter {
stats.in_progress as f64,
"type" => "in_progress"
);

let oldest_not_compressed_batch = self
.pool
.access_storage()
.await
.unwrap()
.fri_proof_compressor_dal()
.get_oldest_not_compressed_batch()
.await;

if let Some(l1_batch_number) = oldest_not_compressed_batch {
metrics::gauge!(
format!(
"prover_fri.{}.oldest_not_compressed_batch",
PROOF_COMPRESSOR_SERVICE_NAME
),
l1_batch_number.0 as f64
);
}

Ok(())
}

Expand Down
39 changes: 39 additions & 0 deletions core/lib/zksync_core/src/house_keeper/fri_prover_queue_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,45 @@ impl PeriodicJob for FriProverStatsReporter {
"circuit_id" => circuit_id.to_string(),
"aggregation_round" => aggregation_round.to_string());
}

// FIXME: refactor metrics here

if let Some(l1_batch_number) = conn
.proof_generation_dal()
.get_oldest_unprocessed_batch()
.await
{
metrics::gauge!(
"fri_prover.oldest_unprocessed_batch",
l1_batch_number.0 as f64
)
}

if let Some(l1_batch_number) = conn
.proof_generation_dal()
.get_oldest_not_generated_batch()
.await
{
metrics::gauge!(
"fri_prover.oldest_not_generated_batch",
l1_batch_number.0 as f64
)
}

for aggregation_round in 0..2 {
if let Some(l1_batch_number) = conn
.fri_prover_jobs_dal()
.min_unproved_l1_batch_number_for_aggregation_round(aggregation_round.into())
.await
{
metrics::gauge!(
"fri_prover.oldest_unprocessed_block_by_round",
l1_batch_number.0 as f64,
"aggregation_round" => aggregation_round.to_string()
)
}
}

Ok(())
}

Expand Down
7 changes: 1 addition & 6 deletions core/lib/zksync_core/src/witness_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@ impl From<AggregationRound> for StageLabel {

impl fmt::Display for StageLabel {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str(match self.0 {
AggregationRound::BasicCircuits => "basic_circuits",
AggregationRound::LeafAggregation => "leaf_aggregation",
AggregationRound::NodeAggregation => "node_aggregation",
AggregationRound::Scheduler => "scheduler",
})
self.0.fmt(formatter)
}
}

Expand Down
Loading

0 comments on commit 58a4e6c

Please sign in to comment.