diff --git a/checks-config/era.dic b/checks-config/era.dic index fdd961f601e..78da85c1d90 100644 --- a/checks-config/era.dic +++ b/checks-config/era.dic @@ -927,3 +927,4 @@ StorageMarker SIGINT opentelemetry PubdataSendingMode +FriGpuProverArchiver diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index 827d1942b6a..41ab439a15f 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -1,5 +1,5 @@ //! Types exposed by the prover DAL for general-purpose use. -use std::{net::IpAddr, ops::Add}; +use std::{net::IpAddr, ops::Add, str::FromStr}; use chrono::{DateTime, Duration, Utc}; @@ -204,7 +204,7 @@ pub struct JobExtendedStatistics { pub active_area: Vec, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum GpuProverInstanceStatus { // The instance is available for processing. Available, @@ -215,3 +215,17 @@ pub enum GpuProverInstanceStatus { // The instance is not alive anymore. Dead, } + +impl FromStr for GpuProverInstanceStatus { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "available" => Ok(Self::Available), + "full" => Ok(Self::Full), + "reserved" => Ok(Self::Reserved), + "dead" => Ok(Self::Dead), + _ => Err(()), + } + } +} diff --git a/core/lib/config/src/configs/fri_prover.rs b/core/lib/config/src/configs/fri_prover.rs index 958b8c5fec2..f8b9b8adf1c 100644 --- a/core/lib/config/src/configs/fri_prover.rs +++ b/core/lib/config/src/configs/fri_prover.rs @@ -25,6 +25,8 @@ pub struct FriProverConfig { pub queue_capacity: usize, pub witness_vector_receiver_port: u16, pub zone_read_url: String, + pub availability_check_interval_in_secs: u32, + // whether to write to public GCS bucket for https://github.com/matter-labs/era-boojum-validator-cli pub shall_save_to_public_bucket: bool, pub object_store: Option, diff --git a/core/lib/config/src/configs/house_keeper.rs b/core/lib/config/src/configs/house_keeper.rs index d2f85df9bec..e1eb1337566 100644 --- a/core/lib/config/src/configs/house_keeper.rs +++ b/core/lib/config/src/configs/house_keeper.rs @@ -13,13 +13,20 @@ pub struct HouseKeeperConfig { pub prover_db_pool_size: u32, pub proof_compressor_job_retrying_interval_ms: u64, pub proof_compressor_stats_reporting_interval_ms: u64, - pub prover_job_archiver_reporting_interval_ms: Option, - pub prover_job_archiver_archiving_interval_secs: Option, + pub prover_job_archiver_archiving_interval_ms: Option, + pub prover_job_archiver_archive_after_secs: Option, + pub fri_gpu_prover_archiver_archiving_interval_ms: Option, + pub fri_gpu_prover_archiver_archive_after_secs: Option, } impl HouseKeeperConfig { - pub fn prover_job_archiver_enabled(&self) -> bool { - self.prover_job_archiver_reporting_interval_ms.is_some() - && self.prover_job_archiver_archiving_interval_secs.is_some() + pub fn prover_job_archiver_params(&self) -> Option<(u64, u64)> { + self.prover_job_archiver_archiving_interval_ms + .zip(self.prover_job_archiver_archive_after_secs) + } + + pub fn fri_gpu_prover_archiver_params(&self) -> Option<(u64, u64)> { + self.fri_gpu_prover_archiver_archiving_interval_ms + .zip(self.fri_gpu_prover_archiver_archive_after_secs) } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index d92590f466b..097e1af6dbc 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -430,6 +430,7 @@ impl Distribution for EncodeDist { witness_vector_receiver_port: self.sample(rng), zone_read_url: self.sample(rng), shall_save_to_public_bucket: self.sample(rng), + availability_check_interval_in_secs: self.sample(rng), object_store: self.sample(rng), } } @@ -563,8 +564,10 @@ impl Distribution for EncodeDist { witness_generator_job_retrying_interval_ms: self.sample(rng), proof_compressor_job_retrying_interval_ms: self.sample(rng), proof_compressor_stats_reporting_interval_ms: self.sample(rng), - prover_job_archiver_reporting_interval_ms: self.sample(rng), - prover_job_archiver_archiving_interval_secs: self.sample(rng), + prover_job_archiver_archiving_interval_ms: self.sample(rng), + prover_job_archiver_archive_after_secs: self.sample(rng), + fri_gpu_prover_archiver_archiving_interval_ms: self.sample(rng), + fri_gpu_prover_archiver_archive_after_secs: self.sample(rng), } } } diff --git a/core/lib/dal/.sqlx/query-37069b0fbe07f12f6ac93d434b7be812a6d7e76df4e545d639a564992f12bbe1.json b/core/lib/dal/.sqlx/query-37069b0fbe07f12f6ac93d434b7be812a6d7e76df4e545d639a564992f12bbe1.json deleted file mode 100644 index a830db9f4c9..00000000000 --- a/core/lib/dal/.sqlx/query-37069b0fbe07f12f6ac93d434b7be812a6d7e76df4e545d639a564992f12bbe1.json +++ /dev/null @@ -1,100 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n miniblocks.number,\n COALESCE(\n miniblocks.l1_batch_number,\n (\n SELECT\n (MAX(number) + 1)\n FROM\n l1_batches\n ),\n (\n SELECT\n MAX(l1_batch_number) + 1\n FROM\n snapshot_recovery\n )\n ) AS \"l1_batch_number!\",\n (\n SELECT\n MAX(m2.number)\n FROM\n miniblocks m2\n WHERE\n miniblocks.l1_batch_number = m2.l1_batch_number\n ) AS \"last_batch_miniblock?\",\n miniblocks.timestamp,\n miniblocks.l1_gas_price,\n miniblocks.l2_fair_gas_price,\n miniblocks.fair_pubdata_price,\n miniblocks.bootloader_code_hash,\n miniblocks.default_aa_code_hash,\n miniblocks.virtual_blocks,\n miniblocks.hash,\n miniblocks.gas_limit,\n miniblocks.protocol_version AS \"protocol_version!\",\n miniblocks.fee_account_address AS \"fee_account_address!\"\n FROM\n miniblocks\n WHERE\n miniblocks.number = $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "number", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "l1_batch_number!", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "last_batch_miniblock?", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "timestamp", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "l1_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 5, - "name": "l2_fair_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 6, - "name": "fair_pubdata_price", - "type_info": "Int8" - }, - { - "ordinal": 7, - "name": "bootloader_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 8, - "name": "default_aa_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 9, - "name": "virtual_blocks", - "type_info": "Int8" - }, - { - "ordinal": 10, - "name": "hash", - "type_info": "Bytea" - }, - { - "ordinal": 11, - "name": "gas_limit", - "type_info": "Int8" - }, - { - "ordinal": 12, - "name": "protocol_version!", - "type_info": "Int4" - }, - { - "ordinal": 13, - "name": "fee_account_address!", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - null, - null, - false, - false, - false, - true, - true, - true, - false, - false, - true, - true, - false - ] - }, - "hash": "37069b0fbe07f12f6ac93d434b7be812a6d7e76df4e545d639a564992f12bbe1" -} diff --git a/core/lib/env_config/src/fri_prover.rs b/core/lib/env_config/src/fri_prover.rs index f888436dc71..373d1e6f990 100644 --- a/core/lib/env_config/src/fri_prover.rs +++ b/core/lib/env_config/src/fri_prover.rs @@ -45,6 +45,7 @@ mod tests { }, max_retries: 5, }), + availability_check_interval_in_secs: 1_800, } } @@ -65,6 +66,7 @@ mod tests { FRI_PROVER_WITNESS_VECTOR_RECEIVER_PORT="3316" FRI_PROVER_ZONE_READ_URL="http://metadata.google.internal/computeMetadata/v1/instance/zone" FRI_PROVER_SHALL_SAVE_TO_PUBLIC_BUCKET=true + FRI_PROVER_AVAILABILITY_CHECK_INTERVAL_IN_SECS="1800" OBJECT_STORE_BUCKET_BASE_URL="/base/url" OBJECT_STORE_MODE="GCSWithCredentialFile" OBJECT_STORE_GCS_CREDENTIAL_FILE_PATH="/path/to/credentials.json" diff --git a/core/lib/env_config/src/house_keeper.rs b/core/lib/env_config/src/house_keeper.rs index 5adccc5f0d4..f23d2705bd0 100644 --- a/core/lib/env_config/src/house_keeper.rs +++ b/core/lib/env_config/src/house_keeper.rs @@ -27,8 +27,12 @@ mod tests { prover_db_pool_size: 2, proof_compressor_job_retrying_interval_ms: 30_000, proof_compressor_stats_reporting_interval_ms: 10_000, - prover_job_archiver_reporting_interval_ms: Some(1_800_000), - prover_job_archiver_archiving_interval_secs: Some(172_800), + prover_job_archiver_archiving_interval_ms: Some(1_800_000), + prover_job_archiver_archive_after_secs: Some(172_800), + // 24 hours + fri_gpu_prover_archiver_archiving_interval_ms: Some(86_400_000), + // 48 hours + fri_gpu_prover_archiver_archive_after_secs: Some(172_800), } } @@ -48,8 +52,10 @@ mod tests { HOUSE_KEEPER_PROVER_STATS_REPORTING_INTERVAL_MS="5000" HOUSE_KEEPER_PROOF_COMPRESSOR_STATS_REPORTING_INTERVAL_MS="10000" HOUSE_KEEPER_PROOF_COMPRESSOR_JOB_RETRYING_INTERVAL_MS="30000" - HOUSE_KEEPER_PROVER_JOB_ARCHIVER_REPORTING_INTERVAL_MS="1800000" - HOUSE_KEEPER_PROVER_JOB_ARCHIVER_ARCHIVING_INTERVAL_SECS="172800" + HOUSE_KEEPER_PROVER_JOB_ARCHIVER_ARCHIVING_INTERVAL_MS="1800000" + HOUSE_KEEPER_PROVER_JOB_ARCHIVER_ARCHIVE_AFTER_SECS="172800" + HOUSE_KEEPER_FRI_GPU_PROVER_ARCHIVER_ARCHIVING_INTERVAL_MS="86400000" + HOUSE_KEEPER_FRI_GPU_PROVER_ARCHIVER_ARCHIVE_AFTER_SECS="172800" "#; lock.set_env(config); diff --git a/core/lib/protobuf_config/src/house_keeper.rs b/core/lib/protobuf_config/src/house_keeper.rs index ef6f659fd6b..b6871de853f 100644 --- a/core/lib/protobuf_config/src/house_keeper.rs +++ b/core/lib/protobuf_config/src/house_keeper.rs @@ -42,10 +42,13 @@ impl ProtoRepr for proto::HouseKeeper { .context("proof_compressor_stats_reporting_interval_ms")?, // TODO(PLA-862): Make these 2 variables required - prover_job_archiver_reporting_interval_ms: self - .prover_job_archiver_reporting_interval_ms, - prover_job_archiver_archiving_interval_secs: self - .prover_job_archiver_archiving_interval_secs, + prover_job_archiver_archiving_interval_ms: self + .prover_job_archiver_archiving_interval_ms, + prover_job_archiver_archive_after_secs: self.prover_job_archiver_archive_after_secs, + fri_gpu_prover_archiver_archiving_interval_ms: self + .fri_gpu_prover_archiver_archiving_interval_ms, + fri_gpu_prover_archiver_archive_after_secs: self + .fri_gpu_prover_archiver_archive_after_secs, }) } @@ -73,10 +76,13 @@ impl ProtoRepr for proto::HouseKeeper { proof_compressor_stats_reporting_interval_ms: Some( this.proof_compressor_stats_reporting_interval_ms, ), - prover_job_archiver_reporting_interval_ms: this - .prover_job_archiver_reporting_interval_ms, - prover_job_archiver_archiving_interval_secs: this - .prover_job_archiver_archiving_interval_secs, + prover_job_archiver_archiving_interval_ms: this + .prover_job_archiver_archiving_interval_ms, + prover_job_archiver_archive_after_secs: this.prover_job_archiver_archive_after_secs, + fri_gpu_prover_archiver_archiving_interval_ms: this + .fri_gpu_prover_archiver_archiving_interval_ms, + fri_gpu_prover_archiver_archive_after_secs: this + .fri_gpu_prover_archiver_archive_after_secs, } } } diff --git a/core/lib/protobuf_config/src/proto/house_keeper.proto b/core/lib/protobuf_config/src/proto/house_keeper.proto index 05f68680c4e..dce4af95b80 100644 --- a/core/lib/protobuf_config/src/proto/house_keeper.proto +++ b/core/lib/protobuf_config/src/proto/house_keeper.proto @@ -3,16 +3,18 @@ syntax = "proto3"; package zksync.config.house_keeper; message HouseKeeper { - optional uint64 l1_batch_metrics_reporting_interval_ms = 1; // required; ms - optional uint64 gpu_prover_queue_reporting_interval_ms = 2; // required; ms - optional uint64 prover_job_retrying_interval_ms = 3; // required; ms - optional uint64 prover_stats_reporting_interval_ms = 4; // required ms - optional uint64 witness_job_moving_interval_ms = 5; // required; ms - optional uint64 witness_generator_stats_reporting_interval_ms = 6; // required; ms - optional uint64 witness_generator_job_retrying_interval_ms = 9; // required; ms - optional uint32 prover_db_pool_size = 10; // required - optional uint64 proof_compressor_job_retrying_interval_ms = 12; // required; ms - optional uint64 proof_compressor_stats_reporting_interval_ms = 13; // required; ms - optional uint64 prover_job_archiver_reporting_interval_ms = 14; // optional; ms - optional uint64 prover_job_archiver_archiving_interval_secs = 15; // optional; seconds + optional uint64 l1_batch_metrics_reporting_interval_ms = 1; // required; ms + optional uint64 gpu_prover_queue_reporting_interval_ms = 2; // required; ms + optional uint64 prover_job_retrying_interval_ms = 3; // required; ms + optional uint64 prover_stats_reporting_interval_ms = 4; // required ms + optional uint64 witness_job_moving_interval_ms = 5; // required; ms + optional uint64 witness_generator_stats_reporting_interval_ms = 6; // required; ms + optional uint64 witness_generator_job_retrying_interval_ms = 9; // required; ms + optional uint32 prover_db_pool_size = 10; // required + optional uint64 proof_compressor_job_retrying_interval_ms = 12; // required; ms + optional uint64 proof_compressor_stats_reporting_interval_ms = 13; // required; ms + optional uint64 prover_job_archiver_archiving_interval_ms = 14; // optional; ms + optional uint64 prover_job_archiver_archive_after_secs = 15; // optional; seconds + optional uint64 fri_gpu_prover_archiver_archiving_interval_ms = 16; // optional; ms + optional uint64 fri_gpu_prover_archiver_archive_after_secs = 17; // optional; seconds } diff --git a/core/lib/protobuf_config/src/proto/prover.proto b/core/lib/protobuf_config/src/proto/prover.proto index da1f295a582..36700cd555e 100644 --- a/core/lib/protobuf_config/src/proto/prover.proto +++ b/core/lib/protobuf_config/src/proto/prover.proto @@ -1,99 +1,101 @@ syntax = "proto3"; + import "zksync/config/object_store.proto"; package zksync.config.prover; message ProofCompressor { - optional uint32 compression_mode = 1; // required; u8 - optional uint32 prometheus_listener_port = 2; // required; u16 - optional string prometheus_pushgateway_url = 3; // required - optional uint64 prometheus_push_interval_ms = 4; // optional; ms - optional uint32 generation_timeout_in_secs = 5; // required; s - optional uint32 max_attempts = 6; // required - optional string universal_setup_path = 7; // required; fs path - optional string universal_setup_download_url = 8; // required - optional bool verify_wrapper_proof = 9; // required + optional uint32 compression_mode = 1; // required; u8 + optional uint32 prometheus_listener_port = 2; // required; u16 + optional string prometheus_pushgateway_url = 3; // required + optional uint64 prometheus_push_interval_ms = 4; // optional; ms + optional uint32 generation_timeout_in_secs = 5; // required; s + optional uint32 max_attempts = 6; // required + optional string universal_setup_path = 7; // required; fs path + optional string universal_setup_download_url = 8; // required + optional bool verify_wrapper_proof = 9; // required } enum SetupLoadMode { - FROM_DISK = 0; - FROM_MEMORY = 1; + FROM_DISK = 0; + FROM_MEMORY = 1; } message Prover { - optional string setup_data_path = 1; // required; fs path? - optional uint32 prometheus_port = 2; // required; u16 - optional uint32 max_attempts = 3; // required - optional uint32 generation_timeout_in_secs = 4; // required; s - repeated uint32 base_layer_circuit_ids_to_be_verified = 5; // required - repeated uint32 recursive_layer_circuit_ids_to_be_verified = 6; // required - optional SetupLoadMode setup_load_mode = 7; // required - optional uint32 specialized_group_id = 8; // required; u8 - optional uint64 witness_vector_generator_thread_count = 9; // optional - optional uint64 queue_capacity = 10; // required - optional uint32 witness_vector_receiver_port = 11; // required; u16 - optional string zone_read_url = 12; // required - optional bool shall_save_to_public_bucket = 13; // required - optional config.object_store.ObjectStore object_store = 20; + optional string setup_data_path = 1; // required; fs path? + optional uint32 prometheus_port = 2; // required; u16 + optional uint32 max_attempts = 3; // required + optional uint32 generation_timeout_in_secs = 4; // required; s + repeated uint32 base_layer_circuit_ids_to_be_verified = 5; // required + repeated uint32 recursive_layer_circuit_ids_to_be_verified = 6; // required + optional SetupLoadMode setup_load_mode = 7; // required + optional uint32 specialized_group_id = 8; // required; u8 + optional uint64 witness_vector_generator_thread_count = 9; // optional + optional uint64 queue_capacity = 10; // required + optional uint32 witness_vector_receiver_port = 11; // required; u16 + optional string zone_read_url = 12; // required + optional uint32 availability_check_interval_in_secs = 21; // required; s + optional bool shall_save_to_public_bucket = 13; // required + optional config.object_store.ObjectStore object_store = 20; } message CircuitIdRoundTuple { - optional uint32 circuit_id = 1; // required; u8 - optional uint32 aggregation_round = 2; // required; u8 + optional uint32 circuit_id = 1; // required; u8 + optional uint32 aggregation_round = 2; // required; u8 } message ProverGroup { - repeated CircuitIdRoundTuple group_0 = 1; - repeated CircuitIdRoundTuple group_1 = 2; - repeated CircuitIdRoundTuple group_2 = 3; - repeated CircuitIdRoundTuple group_3 = 4; - repeated CircuitIdRoundTuple group_4 = 5; - repeated CircuitIdRoundTuple group_5 = 6; - repeated CircuitIdRoundTuple group_6 = 7; - repeated CircuitIdRoundTuple group_7 = 8; - repeated CircuitIdRoundTuple group_8 = 9; - repeated CircuitIdRoundTuple group_9 = 10; - repeated CircuitIdRoundTuple group_10 = 11; - repeated CircuitIdRoundTuple group_11 = 12; - repeated CircuitIdRoundTuple group_12 = 13; + repeated CircuitIdRoundTuple group_0 = 1; + repeated CircuitIdRoundTuple group_1 = 2; + repeated CircuitIdRoundTuple group_2 = 3; + repeated CircuitIdRoundTuple group_3 = 4; + repeated CircuitIdRoundTuple group_4 = 5; + repeated CircuitIdRoundTuple group_5 = 6; + repeated CircuitIdRoundTuple group_6 = 7; + repeated CircuitIdRoundTuple group_7 = 8; + repeated CircuitIdRoundTuple group_8 = 9; + repeated CircuitIdRoundTuple group_9 = 10; + repeated CircuitIdRoundTuple group_10 = 11; + repeated CircuitIdRoundTuple group_11 = 12; + repeated CircuitIdRoundTuple group_12 = 13; } message ProverGateway { - optional string api_url = 1; // required - optional uint32 api_poll_duration_secs = 2; // required; s - optional uint32 prometheus_listener_port = 3; // required; u16 - optional string prometheus_pushgateway_url = 4; // required - optional uint64 prometheus_push_interval_ms = 5; // optional; ms + optional string api_url = 1; // required + optional uint32 api_poll_duration_secs = 2; // required; s + optional uint32 prometheus_listener_port = 3; // required; u16 + optional string prometheus_pushgateway_url = 4; // required + optional uint64 prometheus_push_interval_ms = 5; // optional; ms } message WitnessGenerator { - optional uint32 generation_timeout_in_secs = 1; // required; - optional uint32 max_attempts = 2; // required - optional uint32 blocks_proving_percentage = 3; // optional; 0-100 - repeated uint32 dump_arguments_for_blocks = 4; - optional uint32 last_l1_batch_to_process = 5; // optional - optional uint32 force_process_block = 6; // optional - optional bool shall_save_to_public_bucket = 7; // required - optional uint32 basic_generation_timeout_in_secs = 8; // optional; - optional uint32 leaf_generation_timeout_in_secs = 9; // optional; - optional uint32 node_generation_timeout_in_secs = 10; // optional; - optional uint32 scheduler_generation_timeout_in_secs = 11; // optional; + optional uint32 generation_timeout_in_secs = 1; // required; + optional uint32 max_attempts = 2; // required + optional uint32 blocks_proving_percentage = 3; // optional; 0-100 + repeated uint32 dump_arguments_for_blocks = 4; + optional uint32 last_l1_batch_to_process = 5; // optional + optional uint32 force_process_block = 6; // optional + optional bool shall_save_to_public_bucket = 7; // required + optional uint32 basic_generation_timeout_in_secs = 8; // optional; + optional uint32 leaf_generation_timeout_in_secs = 9; // optional; + optional uint32 node_generation_timeout_in_secs = 10; // optional; + optional uint32 scheduler_generation_timeout_in_secs = 11; // optional; } message WitnessVectorGenerator { - optional uint32 max_prover_reservation_duration_in_secs = 1; // required; s - optional uint32 prover_instance_wait_timeout_in_secs = 2; // required; s - optional uint32 prover_instance_poll_time_in_milli_secs = 3; // required; ms - optional uint32 prometheus_listener_port = 4; // required; u16 - optional string prometheus_pushgateway_url = 5; // required - optional uint64 prometheus_push_interval_ms = 6; // optional; ms - optional uint32 specialized_group_id = 7; // required; u8 + optional uint32 max_prover_reservation_duration_in_secs = 1; // required; s + optional uint32 prover_instance_wait_timeout_in_secs = 2; // required; s + optional uint32 prover_instance_poll_time_in_milli_secs = 3; // required; ms + optional uint32 prometheus_listener_port = 4; // required; u16 + optional string prometheus_pushgateway_url = 5; // required + optional uint64 prometheus_push_interval_ms = 6; // optional; ms + optional uint32 specialized_group_id = 7; // required; u8 } message ProofDataHandler { - optional uint32 http_port = 1; // required; u16 - optional uint32 proof_generation_timeout_in_secs = 2; // required; s + optional uint32 http_port = 1; // required; u16 + optional uint32 proof_generation_timeout_in_secs = 2; // required; s } diff --git a/core/lib/protobuf_config/src/prover.rs b/core/lib/protobuf_config/src/prover.rs index 3dd582fb206..12ff2378251 100644 --- a/core/lib/protobuf_config/src/prover.rs +++ b/core/lib/protobuf_config/src/prover.rs @@ -335,6 +335,10 @@ impl ProtoRepr for proto::Prover { zone_read_url: required(&self.zone_read_url) .context("zone_read_url")? .clone(), + availability_check_interval_in_secs: *required( + &self.availability_check_interval_in_secs, + ) + .context("availability_check_interval_in_secs")?, shall_save_to_public_bucket: *required(&self.shall_save_to_public_bucket) .context("shall_save_to_public_bucket")?, object_store, @@ -365,6 +369,7 @@ impl ProtoRepr for proto::Prover { queue_capacity: Some(this.queue_capacity.try_into().unwrap()), witness_vector_receiver_port: Some(this.witness_vector_receiver_port.into()), zone_read_url: Some(this.zone_read_url.clone()), + availability_check_interval_in_secs: Some(this.availability_check_interval_in_secs), shall_save_to_public_bucket: Some(this.shall_save_to_public_bucket), object_store: this.object_store.as_ref().map(ProtoRepr::build), } diff --git a/core/lib/zksync_core/src/house_keeper/fri_gpu_prover_archiver.rs b/core/lib/zksync_core/src/house_keeper/fri_gpu_prover_archiver.rs new file mode 100644 index 00000000000..0297dc9c328 --- /dev/null +++ b/core/lib/zksync_core/src/house_keeper/fri_gpu_prover_archiver.rs @@ -0,0 +1,52 @@ +use prover_dal::{Prover, ProverDal}; +use zksync_db_connection::connection_pool::ConnectionPool; + +use crate::{house_keeper::periodic_job::PeriodicJob, metrics::HOUSE_KEEPER_METRICS}; + +/// FriGpuProverArchiver is a task that periodically archives old fri GPU prover records. +/// The task will archive the `dead` prover records that have not been updated for a certain amount of time. +#[derive(Debug)] +pub struct FriGpuProverArchiver { + pool: ConnectionPool, + archiving_interval_ms: u64, + archive_prover_after_secs: u64, +} + +impl FriGpuProverArchiver { + pub fn new( + pool: ConnectionPool, + archiving_interval_ms: u64, + archive_prover_after_secs: u64, + ) -> Self { + Self { + pool, + archiving_interval_ms, + archive_prover_after_secs, + } + } +} + +#[async_trait::async_trait] +impl PeriodicJob for FriGpuProverArchiver { + const SERVICE_NAME: &'static str = "FriGpuProverArchiver"; + + async fn run_routine_task(&mut self) -> anyhow::Result<()> { + let archived_provers = self + .pool + .connection() + .await + .unwrap() + .fri_gpu_prover_queue_dal() + .archive_old_provers(self.archive_prover_after_secs) + .await; + tracing::info!("Archived {:?} fri gpu prover records", archived_provers); + HOUSE_KEEPER_METRICS + .gpu_prover_archived + .inc_by(archived_provers as u64); + Ok(()) + } + + fn polling_interval_ms(&self) -> u64 { + self.archiving_interval_ms + } +} diff --git a/core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs b/core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs index 28fadd104ed..afbd7d8c005 100644 --- a/core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs +++ b/core/lib/zksync_core/src/house_keeper/fri_prover_jobs_archiver.rs @@ -1,7 +1,7 @@ use prover_dal::{Prover, ProverDal}; use zksync_db_connection::connection_pool::ConnectionPool; -use crate::house_keeper::periodic_job::PeriodicJob; +use crate::{house_keeper::periodic_job::PeriodicJob, metrics::HOUSE_KEEPER_METRICS}; #[derive(Debug)] pub struct FriProverJobArchiver { @@ -38,7 +38,9 @@ impl PeriodicJob for FriProverJobArchiver { .archive_old_jobs(self.archiving_interval_secs) .await; tracing::info!("Archived {:?} fri prover jobs", archived_jobs); - metrics::counter!("server.prover_fri.archived_jobs", archived_jobs as u64); + HOUSE_KEEPER_METRICS + .prover_job_archived + .inc_by(archived_jobs as u64); Ok(()) } diff --git a/core/lib/zksync_core/src/house_keeper/mod.rs b/core/lib/zksync_core/src/house_keeper/mod.rs index f3b5b202ee1..ca28384fab1 100644 --- a/core/lib/zksync_core/src/house_keeper/mod.rs +++ b/core/lib/zksync_core/src/house_keeper/mod.rs @@ -1,4 +1,5 @@ pub mod blocks_state_reporter; +pub mod fri_gpu_prover_archiver; pub mod fri_proof_compressor_job_retry_manager; pub mod fri_proof_compressor_queue_monitor; pub mod fri_prover_job_retry_manager; diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index bc5f7d20503..2ae7a4e64bf 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -71,6 +71,7 @@ use crate::{ genesis::GenesisParams, house_keeper::{ blocks_state_reporter::L1BatchMetricsReporter, + fri_gpu_prover_archiver::FriGpuProverArchiver, fri_proof_compressor_job_retry_manager::FriProofCompressorJobRetryManager, fri_proof_compressor_queue_monitor::FriProofCompressorStatsReporter, fri_prover_job_retry_manager::FriProverJobRetryManager, @@ -1104,20 +1105,30 @@ async fn add_house_keeper_to_task_futures( task_futures.push(tokio::spawn(task)); // TODO(PLA-862): remove after fields become required - if house_keeper_config.prover_job_archiver_enabled() { + if let Some((archiving_interval, archive_after)) = + house_keeper_config.prover_job_archiver_params() + { let fri_prover_jobs_archiver = FriProverJobArchiver::new( prover_connection_pool.clone(), - house_keeper_config - .prover_job_archiver_reporting_interval_ms - .unwrap(), - house_keeper_config - .prover_job_archiver_archiving_interval_secs - .unwrap(), + archiving_interval, + archive_after, ); let task = fri_prover_jobs_archiver.run(stop_receiver.clone()); task_futures.push(tokio::spawn(task)); } + if let Some((archiving_interval, archive_after)) = + house_keeper_config.fri_gpu_prover_archiver_params() + { + let fri_gpu_prover_jobs_archiver = FriGpuProverArchiver::new( + prover_connection_pool.clone(), + archiving_interval, + archive_after, + ); + let task = fri_gpu_prover_jobs_archiver.run(stop_receiver.clone()); + task_futures.push(tokio::spawn(task)); + } + let fri_prover_group_config = configs .prover_group_config .clone() diff --git a/core/lib/zksync_core/src/metrics.rs b/core/lib/zksync_core/src/metrics.rs index 56e8223b893..066f8f3e251 100644 --- a/core/lib/zksync_core/src/metrics.rs +++ b/core/lib/zksync_core/src/metrics.rs @@ -183,3 +183,13 @@ pub(crate) struct ExternalNodeMetrics { #[vise::register] pub(crate) static EN_METRICS: vise::Global = vise::Global::new(); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "house_keeper")] +pub(crate) struct HouseKeeperMetrics { + pub prover_job_archived: Counter, + pub gpu_prover_archived: Counter, +} + +#[vise::register] +pub(crate) static HOUSE_KEEPER_METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/node_framework/src/implementations/layers/house_keeper.rs b/core/node/node_framework/src/implementations/layers/house_keeper.rs index 43e35103da2..f6257808fc8 100644 --- a/core/node/node_framework/src/implementations/layers/house_keeper.rs +++ b/core/node/node_framework/src/implementations/layers/house_keeper.rs @@ -5,7 +5,7 @@ use zksync_config::configs::{ FriProofCompressorConfig, FriProverConfig, FriWitnessGeneratorConfig, }; use zksync_core::house_keeper::{ - blocks_state_reporter::L1BatchMetricsReporter, + blocks_state_reporter::L1BatchMetricsReporter, fri_gpu_prover_archiver::FriGpuProverArchiver, fri_proof_compressor_job_retry_manager::FriProofCompressorJobRetryManager, fri_proof_compressor_queue_monitor::FriProofCompressorStatsReporter, fri_prover_job_retry_manager::FriProverJobRetryManager, @@ -112,21 +112,26 @@ impl WiringLayer for HouseKeeperLayer { waiting_to_queued_fri_witness_job_mover, })); - if self.house_keeper_config.prover_job_archiver_enabled() { - let fri_prover_job_archiver = FriProverJobArchiver::new( - prover_pool.clone(), - self.house_keeper_config - .prover_job_archiver_reporting_interval_ms - .unwrap(), - self.house_keeper_config - .prover_job_archiver_archiving_interval_secs - .unwrap(), - ); + if let Some((archiving_interval, archive_after)) = + self.house_keeper_config.prover_job_archiver_params() + { + let fri_prover_job_archiver = + FriProverJobArchiver::new(prover_pool.clone(), archiving_interval, archive_after); context.add_task(Box::new(FriProverJobArchiverTask { fri_prover_job_archiver, })); } + if let Some((archiving_interval, archive_after)) = + self.house_keeper_config.fri_gpu_prover_archiver_params() + { + let fri_prover_gpu_archiver = + FriGpuProverArchiver::new(prover_pool.clone(), archiving_interval, archive_after); + context.add_task(Box::new(FriProverGpuArchiverTask { + fri_prover_gpu_archiver, + })); + } + let scheduler_circuit_queuer = SchedulerCircuitQueuer::new( self.house_keeper_config.witness_job_moving_interval_ms, prover_pool.clone(), @@ -364,3 +369,18 @@ impl Task for FriProverJobArchiverTask { self.fri_prover_job_archiver.run(stop_receiver.0).await } } + +struct FriProverGpuArchiverTask { + fri_prover_gpu_archiver: FriGpuProverArchiver, +} + +#[async_trait::async_trait] +impl Task for FriProverGpuArchiverTask { + fn name(&self) -> &'static str { + "fri_prover_gpu_archiver" + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.fri_prover_gpu_archiver.run(stop_receiver.0).await + } +} diff --git a/etc/env/base/fri_prover.toml b/etc/env/base/fri_prover.toml index 94af27417ae..fc99e756cf5 100644 --- a/etc/env/base/fri_prover.toml +++ b/etc/env/base/fri_prover.toml @@ -1,14 +1,15 @@ [fri_prover] -setup_data_path="/usr/src/setup-data" -prometheus_port=3315 -max_attempts=10 -generation_timeout_in_secs=600 -base_layer_circuit_ids_to_be_verified="1" -recursive_layer_circuit_ids_to_be_verified="1" -setup_load_mode="FromDisk" -specialized_group_id=100 -witness_vector_generator_thread_count=5 -queue_capacity=10 -witness_vector_receiver_port=3316 -zone_read_url="http://metadata.google.internal/computeMetadata/v1/instance/zone" -shall_save_to_public_bucket=true +setup_data_path = "/usr/src/setup-data" +prometheus_port = 3315 +max_attempts = 10 +generation_timeout_in_secs = 600 +base_layer_circuit_ids_to_be_verified = "1" +recursive_layer_circuit_ids_to_be_verified = "1" +setup_load_mode = "FromDisk" +specialized_group_id = 100 +witness_vector_generator_thread_count = 5 +queue_capacity = 10 +witness_vector_receiver_port = 3316 +zone_read_url = "http://metadata.google.internal/computeMetadata/v1/instance/zone" +availability_check_interval_in_secs = 10 +shall_save_to_public_bucket = true diff --git a/etc/env/base/house_keeper.toml b/etc/env/base/house_keeper.toml index 9043455491b..9596f63d062 100644 --- a/etc/env/base/house_keeper.toml +++ b/etc/env/base/house_keeper.toml @@ -9,5 +9,7 @@ prover_db_pool_size = 2 prover_stats_reporting_interval_ms = 50000 proof_compressor_job_retrying_interval_ms = 30000 proof_compressor_stats_reporting_interval_ms = 10000 -prover_job_archiver_reporting_interval_ms = 1800000 -prover_job_archiver_archiving_interval_secs = 172800 \ No newline at end of file +prover_job_archiver_archiving_interval_ms = 1800000 +prover_job_archiver_archive_after_secs = 172800 +fri_gpu_prover_archiver_archiving_interval_ms = 86400000 +fri_gpu_prover_archiver_archive_after_secs = 172800 \ No newline at end of file diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index eb52db083a1..5415c70e572 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -170,6 +170,7 @@ prover: witness_vector_generator_thread_count: 5 queue_capacity: 10 witness_vector_receiver_port: 3316 + availability_check_interval_in_secs: 10000 zone_read_url: http://metadata.google.internal/computeMetadata/v1/instance/zone shall_save_to_public_bucket: true witness_generator: @@ -286,8 +287,10 @@ house_keeper: prover_stats_reporting_interval_ms: 5000 proof_compressor_job_retrying_interval_ms: 30000 proof_compressor_stats_reporting_interval_ms: 10000 - prover_job_archiver_reporting_interval_ms: 1800000 - prover_job_archiver_archiving_interval_secs: 15 + prover_job_archiver_archiving_interval_ms: 1800000 + prover_job_archiver_archive_after_secs: 172800 + fri_gpu_prover_archiver_archiving_interval_ms: 86400000 + fri_gpu_prover_archiver_archive_after_secs: 172800 prometheus: listener_port: 3312 diff --git a/prover/prover_dal/.sqlx/query-02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b.json b/prover/prover_dal/.sqlx/query-02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b.json new file mode 100644 index 00000000000..013d7db9cad --- /dev/null +++ b/prover/prover_dal/.sqlx/query-02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH deleted AS (\n DELETE FROM gpu_prover_queue_fri\n WHERE\n instance_status = 'dead'\n AND updated_at < NOW() - $1::INTERVAL\n RETURNING *, NOW() AS archived_at\n ),\n inserted_count AS (\n INSERT INTO gpu_prover_queue_fri_archive\n SELECT * FROM deleted\n )\n SELECT COUNT(*) FROM deleted\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [ + null + ] + }, + "hash": "02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b" +} diff --git a/prover/prover_dal/.sqlx/query-2095e5646c382ccbc6e3bafdeddaae31358088e142dff51c9f0bde8f386900d3.json b/prover/prover_dal/.sqlx/query-2095e5646c382ccbc6e3bafdeddaae31358088e142dff51c9f0bde8f386900d3.json new file mode 100644 index 00000000000..fd7c7c7874d --- /dev/null +++ b/prover/prover_dal/.sqlx/query-2095e5646c382ccbc6e3bafdeddaae31358088e142dff51c9f0bde8f386900d3.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n instance_status\n FROM\n gpu_prover_queue_fri\n WHERE\n instance_host = $1::TEXT::inet\n AND instance_port = $2\n AND zone = $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "instance_status", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text", + "Int4", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2095e5646c382ccbc6e3bafdeddaae31358088e142dff51c9f0bde8f386900d3" +} diff --git a/prover/prover_dal/migrations/20240403070124_add_archived_at_column_to_prover_queue_archive.down.sql b/prover/prover_dal/migrations/20240403070124_add_archived_at_column_to_prover_queue_archive.down.sql new file mode 100644 index 00000000000..557f6f6e175 --- /dev/null +++ b/prover/prover_dal/migrations/20240403070124_add_archived_at_column_to_prover_queue_archive.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE gpu_prover_queue_fri_archive + DROP COLUMN IF EXISTS archived_at; diff --git a/prover/prover_dal/migrations/20240403070124_add_archived_at_column_to_prover_queue_archive.up.sql b/prover/prover_dal/migrations/20240403070124_add_archived_at_column_to_prover_queue_archive.up.sql new file mode 100644 index 00000000000..fa8b61e5599 --- /dev/null +++ b/prover/prover_dal/migrations/20240403070124_add_archived_at_column_to_prover_queue_archive.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE gpu_prover_queue_fri_archive + ADD COLUMN archived_at TIMESTAMP DEFAULT NULL; diff --git a/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs b/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs index 231f1b6599d..731d74df2ca 100644 --- a/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs +++ b/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{str::FromStr, time::Duration}; use zksync_basic_types::prover_dal::{GpuProverInstanceStatus, SocketAddress}; use zksync_db_connection::connection::Connection; @@ -156,4 +156,57 @@ impl FriGpuProverQueueDal<'_, '_> { .await .unwrap(); } + + pub async fn get_prover_instance_status( + &mut self, + address: SocketAddress, + zone: String, + ) -> Option { + sqlx::query!( + r#" + SELECT + instance_status + FROM + gpu_prover_queue_fri + WHERE + instance_host = $1::TEXT::inet + AND instance_port = $2 + AND zone = $3 + "#, + address.host.to_string(), + i32::from(address.port), + zone + ) + .fetch_optional(self.storage.conn()) + .await + .unwrap() + .map(|row| GpuProverInstanceStatus::from_str(&row.instance_status).unwrap()) + } + + pub async fn archive_old_provers(&mut self, archive_prover_after_secs: u64) -> usize { + let prover_max_age = + pg_interval_from_duration(Duration::from_secs(archive_prover_after_secs)); + + sqlx::query_scalar!( + r#" + WITH deleted AS ( + DELETE FROM gpu_prover_queue_fri + WHERE + instance_status = 'dead' + AND updated_at < NOW() - $1::INTERVAL + RETURNING *, NOW() AS archived_at + ), + inserted_count AS ( + INSERT INTO gpu_prover_queue_fri_archive + SELECT * FROM deleted + ) + SELECT COUNT(*) FROM deleted + "#, + &prover_max_age + ) + .fetch_one(self.storage.conn()) + .await + .unwrap() + .unwrap_or(0) as usize + } } diff --git a/prover/prover_fri/src/gpu_prover_availability_checker.rs b/prover/prover_fri/src/gpu_prover_availability_checker.rs new file mode 100644 index 00000000000..515919cff5b --- /dev/null +++ b/prover/prover_fri/src/gpu_prover_availability_checker.rs @@ -0,0 +1,81 @@ +#[cfg(feature = "gpu")] +pub mod availability_checker { + use std::time::Duration; + + use prover_dal::{ConnectionPool, Prover, ProverDal}; + use zksync_types::prover_dal::{GpuProverInstanceStatus, SocketAddress}; + + use crate::metrics::{KillingReason, METRICS}; + + /// Availability checker is a task that periodically checks the status of the prover instance in the database. + /// If the prover instance is not found in the database or marked as dead, the availability checker will shut down the prover. + pub struct AvailabilityChecker { + address: SocketAddress, + zone: String, + polling_interval: Duration, + pool: ConnectionPool, + } + + impl AvailabilityChecker { + pub fn new( + address: SocketAddress, + zone: String, + polling_interval_secs: u32, + pool: ConnectionPool, + ) -> Self { + Self { + address, + zone, + polling_interval: Duration::from_secs(polling_interval_secs as u64), + pool, + } + } + + pub async fn run( + self, + stop_receiver: tokio::sync::watch::Receiver, + ) -> anyhow::Result<()> { + while !*stop_receiver.borrow() { + let status = self + .pool + .connection() + .await + .unwrap() + .fri_gpu_prover_queue_dal() + .get_prover_instance_status(self.address.clone(), self.zone.clone()) + .await; + + // If the prover instance is not found in the database or marked as dead, we should shut down the prover + match status { + None => { + METRICS.zombie_prover_instances_count[&KillingReason::Absent].inc(); + tracing::info!( + "Prover instance at address {:?}, availability zone {} was not found in the database, shutting down", + self.address, + self.zone + ); + // After returning from the task, it will shut down all the other tasks + return Ok(()); + } + Some(GpuProverInstanceStatus::Dead) => { + METRICS.zombie_prover_instances_count[&KillingReason::Dead].inc(); + tracing::info!( + "Prover instance at address {:?}, availability zone {} was found marked as dead, shutting down", + self.address, + self.zone + ); + // After returning from the task, it will shut down all the other tasks + return Ok(()); + } + Some(_) => (), + } + + tokio::time::sleep(self.polling_interval).await; + } + + tracing::info!("Availability checker was shut down"); + + Ok(()) + } + } +} diff --git a/prover/prover_fri/src/main.rs b/prover/prover_fri/src/main.rs index 841406c2833..efc552d50c2 100644 --- a/prover/prover_fri/src/main.rs +++ b/prover/prover_fri/src/main.rs @@ -26,6 +26,7 @@ use zksync_types::{ }; use zksync_utils::wait_for_tasks::ManagedTasks; +mod gpu_prover_availability_checker; mod gpu_prover_job_processor; mod metrics; mod prover_job_processor; @@ -256,14 +257,23 @@ async fn get_prover_tasks( zone.clone() ); let socket_listener = gpu_socket_listener::SocketListener::new( - address, + address.clone(), producer, pool.clone(), prover_config.specialized_group_id, - zone, + zone.clone(), ); + let availability_checker = + gpu_prover_availability_checker::availability_checker::AvailabilityChecker::new( + address, + zone, + prover_config.availability_check_interval_in_secs, + pool, + ); + Ok(vec![ tokio::spawn(socket_listener.listen_incoming_connections(stop_receiver.clone())), - tokio::spawn(prover.run(stop_receiver, None)), + tokio::spawn(prover.run(stop_receiver.clone(), None)), + tokio::spawn(availability_checker.run(stop_receiver.clone())), ]) } diff --git a/prover/prover_fri/src/metrics.rs b/prover/prover_fri/src/metrics.rs index b03f0233e7a..1bff9161c49 100644 --- a/prover/prover_fri/src/metrics.rs +++ b/prover/prover_fri/src/metrics.rs @@ -1,6 +1,8 @@ use std::time::Duration; -use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, LabeledFamily, Metrics}; +use vise::{ + Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Histogram, LabeledFamily, Metrics, +}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet)] pub(crate) struct CircuitLabels { @@ -15,6 +17,16 @@ pub(crate) enum Layer { Base, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "reason", rename_all = "snake_case")] +#[allow(dead_code)] +pub(crate) enum KillingReason { + /// Prover was found with Dead status in the database + Dead, + /// Prover was not found in the database + Absent, +} + #[derive(Debug, Metrics)] #[metrics(prefix = "prover_fri_prover")] pub(crate) struct ProverFriMetrics { @@ -36,6 +48,7 @@ pub(crate) struct ProverFriMetrics { pub witness_vector_blob_time: LabeledFamily>, #[metrics(buckets = Buckets::LATENCIES, labels = ["circuit_type"])] pub blob_save_time: LabeledFamily>, + pub zombie_prover_instances_count: Family, } #[vise::register]