diff --git a/core/lib/basic_types/src/protocol_version.rs b/core/lib/basic_types/src/protocol_version.rs index 19f4b668a87..805953e9d02 100644 --- a/core/lib/basic_types/src/protocol_version.rs +++ b/core/lib/basic_types/src/protocol_version.rs @@ -56,6 +56,10 @@ impl ProtocolVersionId { Self::Version24 } + pub fn current_prover_version() -> Self { + Self::Version23 + } + pub fn next() -> Self { Self::Version25 } diff --git a/prover/proof_fri_compressor/src/compressor.rs b/prover/proof_fri_compressor/src/compressor.rs index b85979e05e8..6f933aaf4a2 100644 --- a/prover/proof_fri_compressor/src/compressor.rs +++ b/prover/proof_fri_compressor/src/compressor.rs @@ -29,7 +29,7 @@ use zksync_prover_fri_types::{ }; use zksync_prover_interface::outputs::L1BatchProofForL1; use zksync_queued_job_processor::JobProcessor; -use zksync_types::L1BatchNumber; +use zksync_types::{L1BatchNumber, ProtocolVersionId}; use zksync_vk_setup_data_server_fri::keystore::Keystore; use crate::metrics::METRICS; @@ -40,6 +40,7 @@ pub struct ProofCompressor { compression_mode: u8, verify_wrapper_proof: bool, max_attempts: u32, + protocol_version: ProtocolVersionId, } impl ProofCompressor { @@ -49,6 +50,7 @@ impl ProofCompressor { compression_mode: u8, verify_wrapper_proof: bool, max_attempts: u32, + protocol_version: ProtocolVersionId, ) -> Self { Self { blob_store, @@ -56,6 +58,7 @@ impl ProofCompressor { compression_mode, verify_wrapper_proof, max_attempts, + protocol_version, } } @@ -135,7 +138,7 @@ impl JobProcessor for ProofCompressor { let pod_name = get_current_pod_name(); let Some(l1_batch_number) = conn .fri_proof_compressor_dal() - .get_next_proof_compression_job(&pod_name) + .get_next_proof_compression_job(&pod_name, &self.protocol_version) .await else { return Ok(None); diff --git a/prover/proof_fri_compressor/src/main.rs b/prover/proof_fri_compressor/src/main.rs index 2ae19423d45..065b5cdc6b2 100644 --- a/prover/proof_fri_compressor/src/main.rs +++ b/prover/proof_fri_compressor/src/main.rs @@ -9,6 +9,7 @@ use zksync_config::configs::{FriProofCompressorConfig, ObservabilityConfig, Post use zksync_env_config::{object_store::ProverObjectStoreConfig, FromEnv}; use zksync_object_store::ObjectStoreFactory; use zksync_queued_job_processor::JobProcessor; +use zksync_types::ProtocolVersionId; use zksync_utils::wait_for_tasks::ManagedTasks; use crate::{ @@ -69,12 +70,16 @@ async fn main() -> anyhow::Result<()> { let blob_store = ObjectStoreFactory::new(object_store_config.0) .create_store() .await; + + let protocol_version = ProtocolVersionId::current_prover_version(); + let proof_compressor = ProofCompressor::new( blob_store, pool, config.compression_mode, config.verify_wrapper_proof, config.max_attempts, + protocol_version, ); let (stop_sender, stop_receiver) = watch::channel(false); diff --git a/prover/prover_dal/.sqlx/query-02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b.json b/prover/prover_dal/.sqlx/query-02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b.json deleted file mode 100644 index 013d7db9cad..00000000000 --- a/prover/prover_dal/.sqlx/query-02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH deleted AS (\n DELETE FROM gpu_prover_queue_fri\n WHERE\n instance_status = 'dead'\n AND updated_at < NOW() - $1::INTERVAL\n RETURNING *, NOW() AS archived_at\n ),\n inserted_count AS (\n INSERT INTO gpu_prover_queue_fri_archive\n SELECT * FROM deleted\n )\n SELECT COUNT(*) FROM deleted\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Interval" - ] - }, - "nullable": [ - null - ] - }, - "hash": "02bbd0a1c01747fb24a68f0ecf447e0574419632a40119b661a31ec70f0f950b" -} diff --git a/prover/prover_dal/.sqlx/query-b4304b9afb9f838eee1fe95af5fd964d4bb39b9dcd18fb03bc11ce2fb32b7fb3.json b/prover/prover_dal/.sqlx/query-058ecac4aa3d2109606738de4bdba2cff712010267460dd28339472b9a7d8c9d.json similarity index 82% rename from prover/prover_dal/.sqlx/query-b4304b9afb9f838eee1fe95af5fd964d4bb39b9dcd18fb03bc11ce2fb32b7fb3.json rename to prover/prover_dal/.sqlx/query-058ecac4aa3d2109606738de4bdba2cff712010267460dd28339472b9a7d8c9d.json index fa6f91edfb3..a74d698ff20 100644 --- a/prover/prover_dal/.sqlx/query-b4304b9afb9f838eee1fe95af5fd964d4bb39b9dcd18fb03bc11ce2fb32b7fb3.json +++ b/prover/prover_dal/.sqlx/query-058ecac4aa3d2109606738de4bdba2cff712010267460dd28339472b9a7d8c9d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE scheduler_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n scheduler_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = ANY ($1)\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n scheduler_witness_jobs_fri.*\n ", + "query": "\n UPDATE scheduler_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n scheduler_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = $1\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n scheduler_witness_jobs_fri.*\n ", "describe": { "columns": [ { @@ -61,7 +61,7 @@ ], "parameters": { "Left": [ - "Int4Array", + "Int4", "Text" ] }, @@ -79,5 +79,5 @@ true ] }, - "hash": "b4304b9afb9f838eee1fe95af5fd964d4bb39b9dcd18fb03bc11ce2fb32b7fb3" + "hash": "058ecac4aa3d2109606738de4bdba2cff712010267460dd28339472b9a7d8c9d" } diff --git a/prover/prover_dal/.sqlx/query-01ac5343beb09ec5bd45b39d560e57a83f37da8999849377dfad60b44989be39.json b/prover/prover_dal/.sqlx/query-08dfe2267bf93d164c649e93f5355b403f1438679167ff218489e2c6d0c359a3.json similarity index 83% rename from prover/prover_dal/.sqlx/query-01ac5343beb09ec5bd45b39d560e57a83f37da8999849377dfad60b44989be39.json rename to prover/prover_dal/.sqlx/query-08dfe2267bf93d164c649e93f5355b403f1438679167ff218489e2c6d0c359a3.json index 8ca4bb693c2..a464eafa683 100644 --- a/prover/prover_dal/.sqlx/query-01ac5343beb09ec5bd45b39d560e57a83f37da8999849377dfad60b44989be39.json +++ b/prover/prover_dal/.sqlx/query-08dfe2267bf93d164c649e93f5355b403f1438679167ff218489e2c6d0c359a3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n id = (\n SELECT\n id\n FROM\n node_aggregation_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = ANY ($1)\n ORDER BY\n l1_batch_number ASC,\n depth ASC,\n id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n node_aggregation_witness_jobs_fri.*\n ", + "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n id = (\n SELECT\n id\n FROM\n node_aggregation_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = $1\n ORDER BY\n l1_batch_number ASC,\n depth ASC,\n id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n node_aggregation_witness_jobs_fri.*\n ", "describe": { "columns": [ { @@ -81,7 +81,7 @@ ], "parameters": { "Left": [ - "Int4Array", + "Int4", "Text" ] }, @@ -103,5 +103,5 @@ true ] }, - "hash": "01ac5343beb09ec5bd45b39d560e57a83f37da8999849377dfad60b44989be39" + "hash": "08dfe2267bf93d164c649e93f5355b403f1438679167ff218489e2c6d0c359a3" } diff --git a/prover/prover_dal/.sqlx/query-4d263992ed6d5abbd7d3ca43af9d772d8801b0ae673b7173ae08a1fa6cbf67b2.json b/prover/prover_dal/.sqlx/query-0e7f17dd9c10b779d62de504a9cc41d3d4edb2d28d2a1fdf919f234a9ab9c43a.json similarity index 64% rename from prover/prover_dal/.sqlx/query-4d263992ed6d5abbd7d3ca43af9d772d8801b0ae673b7173ae08a1fa6cbf67b2.json rename to prover/prover_dal/.sqlx/query-0e7f17dd9c10b779d62de504a9cc41d3d4edb2d28d2a1fdf919f234a9ab9c43a.json index b0fb8d4be23..6d967c5e79c 100644 --- a/prover/prover_dal/.sqlx/query-4d263992ed6d5abbd7d3ca43af9d772d8801b0ae673b7173ae08a1fa6cbf67b2.json +++ b/prover/prover_dal/.sqlx/query-0e7f17dd9c10b779d62de504a9cc41d3d4edb2d28d2a1fdf919f234a9ab9c43a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n id = (\n SELECT\n id\n FROM\n prover_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = ANY ($1)\n ORDER BY\n aggregation_round DESC,\n l1_batch_number ASC,\n id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n prover_jobs_fri.id,\n prover_jobs_fri.l1_batch_number,\n prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round,\n prover_jobs_fri.sequence_number,\n prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n ", + "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n id = (\n SELECT\n id\n FROM\n prover_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = $1\n ORDER BY\n aggregation_round DESC,\n l1_batch_number ASC,\n id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n prover_jobs_fri.id,\n prover_jobs_fri.l1_batch_number,\n prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round,\n prover_jobs_fri.sequence_number,\n prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n ", "describe": { "columns": [ { @@ -41,7 +41,7 @@ ], "parameters": { "Left": [ - "Int4Array", + "Int4", "Text" ] }, @@ -55,5 +55,5 @@ false ] }, - "hash": "4d263992ed6d5abbd7d3ca43af9d772d8801b0ae673b7173ae08a1fa6cbf67b2" + "hash": "0e7f17dd9c10b779d62de504a9cc41d3d4edb2d28d2a1fdf919f234a9ab9c43a" } diff --git a/prover/prover_dal/.sqlx/query-2ab2f83b273c5aa88c1eefc8f70a8ea23052f714cd74c1d28ae1203ce8f0eaa9.json b/prover/prover_dal/.sqlx/query-2ab2f83b273c5aa88c1eefc8f70a8ea23052f714cd74c1d28ae1203ce8f0eaa9.json index 3441906e0ce..529709763eb 100644 --- a/prover/prover_dal/.sqlx/query-2ab2f83b273c5aa88c1eefc8f70a8ea23052f714cd74c1d28ae1203ce8f0eaa9.json +++ b/prover/prover_dal/.sqlx/query-2ab2f83b273c5aa88c1eefc8f70a8ea23052f714cd74c1d28ae1203ce8f0eaa9.json @@ -57,6 +57,11 @@ "ordinal": 10, "name": "picked_by", "type_info": "Text" + }, + { + "ordinal": 11, + "name": "protocol_version", + "type_info": "Int4" } ], "parameters": { @@ -75,6 +80,7 @@ false, true, true, + true, true ] }, diff --git a/prover/prover_dal/.sqlx/query-e9adf5b5a1ab84c20a514a7775f91a9984685eaaaa0a8b223410d560a15a3034.json b/prover/prover_dal/.sqlx/query-384e70c7f7b302b90a9ce69752fb7f87115848d883ace09ead493637a303cbb2.json similarity index 61% rename from prover/prover_dal/.sqlx/query-e9adf5b5a1ab84c20a514a7775f91a9984685eaaaa0a8b223410d560a15a3034.json rename to prover/prover_dal/.sqlx/query-384e70c7f7b302b90a9ce69752fb7f87115848d883ace09ead493637a303cbb2.json index 975c061632a..b7c222927cd 100644 --- a/prover/prover_dal/.sqlx/query-e9adf5b5a1ab84c20a514a7775f91a9984685eaaaa0a8b223410d560a15a3034.json +++ b/prover/prover_dal/.sqlx/query-384e70c7f7b302b90a9ce69752fb7f87115848d883ace09ead493637a303cbb2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n processing_started_at = NOW(),\n updated_at = NOW(),\n picked_by = $4\n WHERE\n id = (\n SELECT\n pj.id\n FROM\n (\n SELECT\n *\n FROM\n UNNEST($1::SMALLINT[], $2::SMALLINT[])\n ) AS tuple (circuit_id, ROUND)\n JOIN LATERAL (\n SELECT\n *\n FROM\n prover_jobs_fri AS pj\n WHERE\n pj.status = 'queued'\n AND pj.protocol_version = ANY ($3)\n AND pj.circuit_id = tuple.circuit_id\n AND pj.aggregation_round = tuple.round\n ORDER BY\n pj.l1_batch_number ASC,\n pj.id ASC\n LIMIT\n 1\n ) AS pj ON TRUE\n ORDER BY\n pj.l1_batch_number ASC,\n pj.aggregation_round DESC,\n pj.id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n prover_jobs_fri.id,\n prover_jobs_fri.l1_batch_number,\n prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round,\n prover_jobs_fri.sequence_number,\n prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n ", + "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n processing_started_at = NOW(),\n updated_at = NOW(),\n picked_by = $4\n WHERE\n id = (\n SELECT\n pj.id\n FROM\n (\n SELECT\n *\n FROM\n UNNEST($1::SMALLINT[], $2::SMALLINT[])\n ) AS tuple (circuit_id, ROUND)\n JOIN LATERAL (\n SELECT\n *\n FROM\n prover_jobs_fri AS pj\n WHERE\n pj.status = 'queued'\n AND pj.protocol_version = $3\n AND pj.circuit_id = tuple.circuit_id\n AND pj.aggregation_round = tuple.round\n ORDER BY\n pj.l1_batch_number ASC,\n pj.id ASC\n LIMIT\n 1\n ) AS pj ON TRUE\n ORDER BY\n pj.l1_batch_number ASC,\n pj.aggregation_round DESC,\n pj.id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n prover_jobs_fri.id,\n prover_jobs_fri.l1_batch_number,\n prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round,\n prover_jobs_fri.sequence_number,\n prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n ", "describe": { "columns": [ { @@ -43,7 +43,7 @@ "Left": [ "Int2Array", "Int2Array", - "Int4Array", + "Int4", "Text" ] }, @@ -57,5 +57,5 @@ false ] }, - "hash": "e9adf5b5a1ab84c20a514a7775f91a9984685eaaaa0a8b223410d560a15a3034" + "hash": "384e70c7f7b302b90a9ce69752fb7f87115848d883ace09ead493637a303cbb2" } diff --git a/prover/prover_dal/.sqlx/query-d3fa0501fee53eb382e41b60b8c53d2dbe11869d86fc8b351aa2b2d47f295d7b.json b/prover/prover_dal/.sqlx/query-495a71634bcd8828bfaad5c4c542a172d47a65601b92f75da8f62ec2b18b9f4f.json similarity index 65% rename from prover/prover_dal/.sqlx/query-d3fa0501fee53eb382e41b60b8c53d2dbe11869d86fc8b351aa2b2d47f295d7b.json rename to prover/prover_dal/.sqlx/query-495a71634bcd8828bfaad5c4c542a172d47a65601b92f75da8f62ec2b18b9f4f.json index 6ca16a92778..fb16df5866d 100644 --- a/prover/prover_dal/.sqlx/query-d3fa0501fee53eb382e41b60b8c53d2dbe11869d86fc8b351aa2b2d47f295d7b.json +++ b/prover/prover_dal/.sqlx/query-495a71634bcd8828bfaad5c4c542a172d47a65601b92f75da8f62ec2b18b9f4f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE recursion_tip_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n recursion_tip_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = ANY ($1)\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n recursion_tip_witness_jobs_fri.l1_batch_number\n ", + "query": "\n UPDATE recursion_tip_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n recursion_tip_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = $1\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n recursion_tip_witness_jobs_fri.l1_batch_number\n ", "describe": { "columns": [ { @@ -11,7 +11,7 @@ ], "parameters": { "Left": [ - "Int4Array", + "Int4", "Text" ] }, @@ -19,5 +19,5 @@ false ] }, - "hash": "d3fa0501fee53eb382e41b60b8c53d2dbe11869d86fc8b351aa2b2d47f295d7b" + "hash": "495a71634bcd8828bfaad5c4c542a172d47a65601b92f75da8f62ec2b18b9f4f" } diff --git a/prover/prover_dal/.sqlx/query-f717ca5d0890759496739a678955e6f8b7f88a0894a7f9e27fc26f93997d37c7.json b/prover/prover_dal/.sqlx/query-5e4d784a3436335e9995a11f4c761ffb42bb2b325ba9206abbffe0dc74664566.json similarity index 61% rename from prover/prover_dal/.sqlx/query-f717ca5d0890759496739a678955e6f8b7f88a0894a7f9e27fc26f93997d37c7.json rename to prover/prover_dal/.sqlx/query-5e4d784a3436335e9995a11f4c761ffb42bb2b325ba9206abbffe0dc74664566.json index e6e12748d0d..6f8252d9998 100644 --- a/prover/prover_dal/.sqlx/query-f717ca5d0890759496739a678955e6f8b7f88a0894a7f9e27fc26f93997d37c7.json +++ b/prover/prover_dal/.sqlx/query-5e4d784a3436335e9995a11f4c761ffb42bb2b325ba9206abbffe0dc74664566.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE proof_compression_jobs_fri\n SET\n status = $1,\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $3\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n proof_compression_jobs_fri\n WHERE\n status = $2\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n proof_compression_jobs_fri.l1_batch_number\n ", + "query": "\n UPDATE proof_compression_jobs_fri\n SET\n status = $1,\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $3\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n proof_compression_jobs_fri\n WHERE\n status = $2\n AND protocol_version = $4\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n proof_compression_jobs_fri.l1_batch_number\n ", "describe": { "columns": [ { @@ -13,12 +13,13 @@ "Left": [ "Text", "Text", - "Text" + "Text", + "Int4" ] }, "nullable": [ false ] }, - "hash": "f717ca5d0890759496739a678955e6f8b7f88a0894a7f9e27fc26f93997d37c7" + "hash": "5e4d784a3436335e9995a11f4c761ffb42bb2b325ba9206abbffe0dc74664566" } diff --git a/prover/prover_dal/.sqlx/query-75f6eaa518e7840374c4e44b0788bf92c7f2c55386c8208e3a82b30456abd5b4.json b/prover/prover_dal/.sqlx/query-5e9618d3e1aa40639f2d5ad5cf5564eddf84760477518981c7acffc8bc4acf76.json similarity index 82% rename from prover/prover_dal/.sqlx/query-75f6eaa518e7840374c4e44b0788bf92c7f2c55386c8208e3a82b30456abd5b4.json rename to prover/prover_dal/.sqlx/query-5e9618d3e1aa40639f2d5ad5cf5564eddf84760477518981c7acffc8bc4acf76.json index 74a6c2724b6..ca90e154456 100644 --- a/prover/prover_dal/.sqlx/query-75f6eaa518e7840374c4e44b0788bf92c7f2c55386c8208e3a82b30456abd5b4.json +++ b/prover/prover_dal/.sqlx/query-5e9618d3e1aa40639f2d5ad5cf5564eddf84760477518981c7acffc8bc4acf76.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE witness_inputs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $3\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n witness_inputs_fri\n WHERE\n l1_batch_number <= $1\n AND status = 'queued'\n AND protocol_version = ANY ($2)\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n witness_inputs_fri.*\n ", + "query": "\n UPDATE witness_inputs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $3\n WHERE\n l1_batch_number = (\n SELECT\n l1_batch_number\n FROM\n witness_inputs_fri\n WHERE\n l1_batch_number <= $1\n AND status = 'queued'\n AND protocol_version = $2\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n witness_inputs_fri.*\n ", "describe": { "columns": [ { @@ -72,7 +72,7 @@ "parameters": { "Left": [ "Int8", - "Int4Array", + "Int4", "Text" ] }, @@ -92,5 +92,5 @@ true ] }, - "hash": "75f6eaa518e7840374c4e44b0788bf92c7f2c55386c8208e3a82b30456abd5b4" + "hash": "5e9618d3e1aa40639f2d5ad5cf5564eddf84760477518981c7acffc8bc4acf76" } diff --git a/prover/prover_dal/.sqlx/query-6ae2ed34230beae0e86c584e293e7ee767e4c98706246eb113498c0f817f5f38.json b/prover/prover_dal/.sqlx/query-6ae2ed34230beae0e86c584e293e7ee767e4c98706246eb113498c0f817f5f38.json deleted file mode 100644 index 08dff439a7c..00000000000 --- a/prover/prover_dal/.sqlx/query-6ae2ed34230beae0e86c584e293e7ee767e4c98706246eb113498c0f817f5f38.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n gpu_prover_queue_fri (\n instance_host,\n instance_port,\n instance_status,\n specialized_prover_group_id,\n zone,\n created_at,\n updated_at\n )\n VALUES\n (CAST($1::TEXT AS inet), $2, 'available', $3, $4, NOW(), NOW())\n ON CONFLICT (instance_host, instance_port, zone) DO\n UPDATE\n SET\n instance_status = 'available',\n specialized_prover_group_id = $3,\n zone = $4,\n updated_at = NOW()\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text", - "Int4", - "Int2", - "Text" - ] - }, - "nullable": [] - }, - "hash": "6ae2ed34230beae0e86c584e293e7ee767e4c98706246eb113498c0f817f5f38" -} diff --git a/prover/prover_dal/.sqlx/query-0d13b8947b1bafa9e5bc6fdc70a986511265c541d81b1d21f0a751ae1399c626.json b/prover/prover_dal/.sqlx/query-7dd14c5f887d6716a8f98414bddd562e556a712ba041237e4cb3dea27e89314e.json similarity index 66% rename from prover/prover_dal/.sqlx/query-0d13b8947b1bafa9e5bc6fdc70a986511265c541d81b1d21f0a751ae1399c626.json rename to prover/prover_dal/.sqlx/query-7dd14c5f887d6716a8f98414bddd562e556a712ba041237e4cb3dea27e89314e.json index 8b5605f078a..eb7984cc8e2 100644 --- a/prover/prover_dal/.sqlx/query-0d13b8947b1bafa9e5bc6fdc70a986511265c541d81b1d21f0a751ae1399c626.json +++ b/prover/prover_dal/.sqlx/query-7dd14c5f887d6716a8f98414bddd562e556a712ba041237e4cb3dea27e89314e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE gpu_prover_queue_fri\n SET\n instance_status = 'reserved',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n id IN (\n SELECT\n id\n FROM\n gpu_prover_queue_fri\n WHERE\n specialized_prover_group_id = $2\n AND zone = $3\n AND (\n instance_status = 'available'\n OR (\n instance_status = 'reserved'\n AND processing_started_at < NOW() - $1::INTERVAL\n )\n )\n ORDER BY\n updated_at ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n gpu_prover_queue_fri.*\n ", + "query": "\n UPDATE gpu_prover_queue_fri\n SET\n instance_status = 'reserved',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n id IN (\n SELECT\n id\n FROM\n gpu_prover_queue_fri\n WHERE\n specialized_prover_group_id = $2\n AND zone = $3\n AND protocol_version = $4\n AND (\n instance_status = 'available'\n OR (\n instance_status = 'reserved'\n AND processing_started_at < NOW() - $1::INTERVAL\n )\n )\n ORDER BY\n updated_at ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n gpu_prover_queue_fri.*\n ", "describe": { "columns": [ { @@ -47,13 +47,19 @@ "ordinal": 8, "name": "processing_started_at", "type_info": "Timestamp" + }, + { + "ordinal": 9, + "name": "protocol_version", + "type_info": "Int4" } ], "parameters": { "Left": [ "Interval", "Int2", - "Text" + "Text", + "Int4" ] }, "nullable": [ @@ -65,8 +71,9 @@ true, false, false, + true, true ] }, - "hash": "0d13b8947b1bafa9e5bc6fdc70a986511265c541d81b1d21f0a751ae1399c626" + "hash": "7dd14c5f887d6716a8f98414bddd562e556a712ba041237e4cb3dea27e89314e" } diff --git a/prover/prover_dal/.sqlx/query-832a1caa2808f49a5572cb782eca5bfec2a8272906eb1ed195f96c2508b9a3ef.json b/prover/prover_dal/.sqlx/query-832a1caa2808f49a5572cb782eca5bfec2a8272906eb1ed195f96c2508b9a3ef.json new file mode 100644 index 00000000000..c5a2a59aa20 --- /dev/null +++ b/prover/prover_dal/.sqlx/query-832a1caa2808f49a5572cb782eca5bfec2a8272906eb1ed195f96c2508b9a3ef.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n proof_compression_jobs_fri (l1_batch_number, fri_proof_blob_url, status, created_at, updated_at, protocol_version)\n VALUES\n ($1, $2, $3, NOW(), NOW(), $4)\n ON CONFLICT (l1_batch_number) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "832a1caa2808f49a5572cb782eca5bfec2a8272906eb1ed195f96c2508b9a3ef" +} diff --git a/prover/prover_dal/.sqlx/query-83d7409bedec3db527f6179e4baaa1b7d32b51659569fde755218d42da660b2f.json b/prover/prover_dal/.sqlx/query-83d7409bedec3db527f6179e4baaa1b7d32b51659569fde755218d42da660b2f.json new file mode 100644 index 00000000000..f3a919e3d98 --- /dev/null +++ b/prover/prover_dal/.sqlx/query-83d7409bedec3db527f6179e4baaa1b7d32b51659569fde755218d42da660b2f.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n recursion_scheduler_level_vk_hash,\n recursion_node_level_vk_hash,\n recursion_leaf_level_vk_hash,\n recursion_circuits_set_vks_hash\n FROM\n prover_fri_protocol_versions\n WHERE\n id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "recursion_scheduler_level_vk_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "recursion_node_level_vk_hash", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "recursion_leaf_level_vk_hash", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "recursion_circuits_set_vks_hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "83d7409bedec3db527f6179e4baaa1b7d32b51659569fde755218d42da660b2f" +} diff --git a/prover/prover_dal/.sqlx/query-9da0a96bf42ef7b60ec3e39056942cb36fcaf1679bf49d7741305e8bc6e5e318.json b/prover/prover_dal/.sqlx/query-9da0a96bf42ef7b60ec3e39056942cb36fcaf1679bf49d7741305e8bc6e5e318.json new file mode 100644 index 00000000000..5e2d4603317 --- /dev/null +++ b/prover/prover_dal/.sqlx/query-9da0a96bf42ef7b60ec3e39056942cb36fcaf1679bf49d7741305e8bc6e5e318.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n gpu_prover_queue_fri (\n instance_host,\n instance_port,\n instance_status,\n specialized_prover_group_id,\n zone,\n created_at,\n updated_at,\n protocol_version\n )\n VALUES\n (CAST($1::TEXT AS inet), $2, 'available', $3, $4, NOW(), NOW(), $5)\n ON CONFLICT (instance_host, instance_port, zone) DO\n UPDATE\n SET\n instance_status = 'available',\n specialized_prover_group_id = $3,\n zone = $4,\n updated_at = NOW(),\n protocol_version = $5\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int4", + "Int2", + "Text", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "9da0a96bf42ef7b60ec3e39056942cb36fcaf1679bf49d7741305e8bc6e5e318" +} diff --git a/prover/prover_dal/.sqlx/query-7a8fffe8d4e3085e00c98f770d250d625f057acf1440b6550375ce5509a816a6.json b/prover/prover_dal/.sqlx/query-d286520139c1f5daa90b20efffa515afcaedf541533f218ca6e167bdc7f6ea7f.json similarity index 84% rename from prover/prover_dal/.sqlx/query-7a8fffe8d4e3085e00c98f770d250d625f057acf1440b6550375ce5509a816a6.json rename to prover/prover_dal/.sqlx/query-d286520139c1f5daa90b20efffa515afcaedf541533f218ca6e167bdc7f6ea7f.json index da78974f61a..5024a94f4c3 100644 --- a/prover/prover_dal/.sqlx/query-7a8fffe8d4e3085e00c98f770d250d625f057acf1440b6550375ce5509a816a6.json +++ b/prover/prover_dal/.sqlx/query-d286520139c1f5daa90b20efffa515afcaedf541533f218ca6e167bdc7f6ea7f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n id = (\n SELECT\n id\n FROM\n leaf_aggregation_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = ANY ($1)\n ORDER BY\n l1_batch_number ASC,\n id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n leaf_aggregation_witness_jobs_fri.*\n ", + "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET\n status = 'in_progress',\n attempts = attempts + 1,\n updated_at = NOW(),\n processing_started_at = NOW(),\n picked_by = $2\n WHERE\n id = (\n SELECT\n id\n FROM\n leaf_aggregation_witness_jobs_fri\n WHERE\n status = 'queued'\n AND protocol_version = $1\n ORDER BY\n l1_batch_number ASC,\n id ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n leaf_aggregation_witness_jobs_fri.*\n ", "describe": { "columns": [ { @@ -81,7 +81,7 @@ ], "parameters": { "Left": [ - "Int4Array", + "Int4", "Text" ] }, @@ -103,5 +103,5 @@ true ] }, - "hash": "7a8fffe8d4e3085e00c98f770d250d625f057acf1440b6550375ce5509a816a6" + "hash": "d286520139c1f5daa90b20efffa515afcaedf541533f218ca6e167bdc7f6ea7f" } diff --git a/prover/prover_dal/.sqlx/query-d7b6196cfc17182b5280d0a13f873281bc865cc67b824af6ca3a76ae6065f151.json b/prover/prover_dal/.sqlx/query-d7b6196cfc17182b5280d0a13f873281bc865cc67b824af6ca3a76ae6065f151.json new file mode 100644 index 00000000000..d0e366aee00 --- /dev/null +++ b/prover/prover_dal/.sqlx/query-d7b6196cfc17182b5280d0a13f873281bc865cc67b824af6ca3a76ae6065f151.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH deleted AS (\n DELETE FROM gpu_prover_queue_fri\n WHERE\n instance_status = 'dead'\n AND updated_at < NOW() - $1::INTERVAL\n RETURNING id,\n instance_host,\n instance_port,\n instance_status,\n specialized_prover_group_id,\n zone,\n created_at,\n updated_at,\n processing_started_at,\n NOW() as archived_at,\n protocol_version\n ),\n inserted_count AS (\n INSERT INTO gpu_prover_queue_fri_archive\n SELECT * FROM deleted\n )\n SELECT COUNT(*) FROM deleted\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [ + null + ] + }, + "hash": "d7b6196cfc17182b5280d0a13f873281bc865cc67b824af6ca3a76ae6065f151" +} diff --git a/prover/prover_dal/.sqlx/query-e62407c355594b87c7caee2396f1d14910604ddd7eadc29db3634dc873254569.json b/prover/prover_dal/.sqlx/query-e62407c355594b87c7caee2396f1d14910604ddd7eadc29db3634dc873254569.json new file mode 100644 index 00000000000..acd2e7e9c50 --- /dev/null +++ b/prover/prover_dal/.sqlx/query-e62407c355594b87c7caee2396f1d14910604ddd7eadc29db3634dc873254569.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT protocol_version\n FROM prover_jobs_fri\n WHERE id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "protocol_version", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "e62407c355594b87c7caee2396f1d14910604ddd7eadc29db3634dc873254569" +} diff --git a/prover/prover_dal/.sqlx/query-ef687be83e496d6647e4dfef9eabae63443c51deb818dd0affd1a0949b161737.json b/prover/prover_dal/.sqlx/query-ef687be83e496d6647e4dfef9eabae63443c51deb818dd0affd1a0949b161737.json deleted file mode 100644 index 79b20fabb28..00000000000 --- a/prover/prover_dal/.sqlx/query-ef687be83e496d6647e4dfef9eabae63443c51deb818dd0affd1a0949b161737.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n proof_compression_jobs_fri (l1_batch_number, fri_proof_blob_url, status, created_at, updated_at)\n VALUES\n ($1, $2, $3, NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO NOTHING\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Text" - ] - }, - "nullable": [] - }, - "hash": "ef687be83e496d6647e4dfef9eabae63443c51deb818dd0affd1a0949b161737" -} diff --git a/prover/prover_dal/migrations/20240410141719_add-protocol-versions-to-tables.down.sql b/prover/prover_dal/migrations/20240410141719_add-protocol-versions-to-tables.down.sql new file mode 100644 index 00000000000..66bce23e35f --- /dev/null +++ b/prover/prover_dal/migrations/20240410141719_add-protocol-versions-to-tables.down.sql @@ -0,0 +1,8 @@ +ALTER TABLE proof_compression_jobs_fri + DROP IF EXISTS protocol_version; + +ALTER TABLE gpu_prover_queue_fri + DROP IF EXISTS protocol_version; + +ALTER TABLE gpu_prover_queue_fri_archive + DROP IF EXISTS protocol_version; diff --git a/prover/prover_dal/migrations/20240410141719_add-protocol-versions-to-tables.up.sql b/prover/prover_dal/migrations/20240410141719_add-protocol-versions-to-tables.up.sql new file mode 100644 index 00000000000..6966f0da818 --- /dev/null +++ b/prover/prover_dal/migrations/20240410141719_add-protocol-versions-to-tables.up.sql @@ -0,0 +1,8 @@ +ALTER TABLE proof_compression_jobs_fri + ADD COLUMN protocol_version INTEGER REFERENCES prover_fri_protocol_versions (id) DEFAULT NULL; + +ALTER TABLE gpu_prover_queue_fri + ADD COLUMN protocol_version INTEGER REFERENCES prover_fri_protocol_versions (id) DEFAULT NULL; + +ALTER TABLE gpu_prover_queue_fri_archive + ADD COLUMN protocol_version INTEGER REFERENCES prover_fri_protocol_versions (id) DEFAULT NULL; diff --git a/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs b/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs index 731d74df2ca..7d666baef0f 100644 --- a/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs +++ b/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs @@ -1,6 +1,9 @@ use std::{str::FromStr, time::Duration}; -use zksync_basic_types::prover_dal::{GpuProverInstanceStatus, SocketAddress}; +use zksync_basic_types::{ + protocol_version::ProtocolVersionId, + prover_dal::{GpuProverInstanceStatus, SocketAddress}, +}; use zksync_db_connection::connection::Connection; use crate::{pg_interval_from_duration, Prover}; @@ -16,6 +19,7 @@ impl FriGpuProverQueueDal<'_, '_> { processing_timeout: Duration, specialized_prover_group_id: u8, zone: String, + protocol_version: ProtocolVersionId, ) -> Option { let processing_timeout = pg_interval_from_duration(processing_timeout); let result: Option = sqlx::query!( @@ -34,6 +38,7 @@ impl FriGpuProverQueueDal<'_, '_> { WHERE specialized_prover_group_id = $2 AND zone = $3 + AND protocol_version = $4 AND ( instance_status = 'available' OR ( @@ -53,7 +58,8 @@ impl FriGpuProverQueueDal<'_, '_> { "#, &processing_timeout, i16::from(specialized_prover_group_id), - zone + zone, + protocol_version as i32 ) .fetch_optional(self.storage.conn()) .await @@ -71,6 +77,7 @@ impl FriGpuProverQueueDal<'_, '_> { address: SocketAddress, specialized_prover_group_id: u8, zone: String, + protocol_version: ProtocolVersionId, ) { sqlx::query!( r#" @@ -82,22 +89,25 @@ impl FriGpuProverQueueDal<'_, '_> { specialized_prover_group_id, zone, created_at, - updated_at + updated_at, + protocol_version ) VALUES - (CAST($1::TEXT AS inet), $2, 'available', $3, $4, NOW(), NOW()) + (CAST($1::TEXT AS inet), $2, 'available', $3, $4, NOW(), NOW(), $5) ON CONFLICT (instance_host, instance_port, zone) DO UPDATE SET instance_status = 'available', specialized_prover_group_id = $3, zone = $4, - updated_at = NOW() + updated_at = NOW(), + protocol_version = $5 "#, address.host.to_string(), i32::from(address.port), i16::from(specialized_prover_group_id), - zone + zone, + protocol_version as i32 ) .execute(self.storage.conn()) .await @@ -194,7 +204,17 @@ impl FriGpuProverQueueDal<'_, '_> { WHERE instance_status = 'dead' AND updated_at < NOW() - $1::INTERVAL - RETURNING *, NOW() AS archived_at + RETURNING id, + instance_host, + instance_port, + instance_status, + specialized_prover_group_id, + zone, + created_at, + updated_at, + processing_started_at, + NOW() as archived_at, + protocol_version ), inserted_count AS ( INSERT INTO gpu_prover_queue_fri_archive diff --git a/prover/prover_dal/src/fri_proof_compressor_dal.rs b/prover/prover_dal/src/fri_proof_compressor_dal.rs index 7016fcd64dd..7971d39acd6 100644 --- a/prover/prover_dal/src/fri_proof_compressor_dal.rs +++ b/prover/prover_dal/src/fri_proof_compressor_dal.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, str::FromStr, time::Duration}; use sqlx::Row; use zksync_basic_types::{ + protocol_version::ProtocolVersionId, prover_dal::{ JobCountStatistics, ProofCompressionJobInfo, ProofCompressionJobStatus, StuckJobs, }, @@ -22,18 +23,20 @@ impl FriProofCompressorDal<'_, '_> { &mut self, block_number: L1BatchNumber, fri_proof_blob_url: &str, + protocol_version: ProtocolVersionId, ) { sqlx::query!( r#" INSERT INTO - proof_compression_jobs_fri (l1_batch_number, fri_proof_blob_url, status, created_at, updated_at) + proof_compression_jobs_fri (l1_batch_number, fri_proof_blob_url, status, created_at, updated_at, protocol_version) VALUES - ($1, $2, $3, NOW(), NOW()) + ($1, $2, $3, NOW(), NOW(), $4) ON CONFLICT (l1_batch_number) DO NOTHING "#, i64::from(block_number.0), fri_proof_blob_url, - ProofCompressionJobStatus::Queued.to_string(), + ProofCompressionJobStatus::Queued.to_string(), + protocol_version as i32 ) .fetch_optional(self.storage.conn()) .await @@ -60,6 +63,7 @@ impl FriProofCompressorDal<'_, '_> { pub async fn get_next_proof_compression_job( &mut self, picked_by: &str, + protocol_version: &ProtocolVersionId, ) -> Option { sqlx::query!( r#" @@ -78,6 +82,7 @@ impl FriProofCompressorDal<'_, '_> { proof_compression_jobs_fri WHERE status = $2 + AND protocol_version = $4 ORDER BY l1_batch_number ASC LIMIT @@ -91,6 +96,7 @@ impl FriProofCompressorDal<'_, '_> { ProofCompressionJobStatus::InProgress.to_string(), ProofCompressionJobStatus::Queued.to_string(), picked_by, + *protocol_version as i32 ) .fetch_optional(self.storage.conn()) .await diff --git a/prover/prover_dal/src/fri_protocol_versions_dal.rs b/prover/prover_dal/src/fri_protocol_versions_dal.rs index 3d9b0b783ee..2fc0e12cad6 100644 --- a/prover/prover_dal/src/fri_protocol_versions_dal.rs +++ b/prover/prover_dal/src/fri_protocol_versions_dal.rs @@ -1,6 +1,9 @@ use std::convert::TryFrom; -use zksync_basic_types::protocol_version::{L1VerifierConfig, ProtocolVersionId}; +use zksync_basic_types::{ + protocol_version::{L1VerifierConfig, ProtocolVersionId, VerifierParams}, + H256, +}; use zksync_db_connection::connection::Connection; use crate::Prover; @@ -53,7 +56,7 @@ impl FriProtocolVersionsDal<'_, '_> { .unwrap(); } - pub async fn protocol_version_for( + pub async fn protocol_versions_for( &mut self, vk_commitments: &L1VerifierConfig, ) -> Vec { @@ -90,4 +93,39 @@ impl FriProtocolVersionsDal<'_, '_> { .map(|row| ProtocolVersionId::try_from(row.id as u16).unwrap()) .collect() } + + pub async fn vk_commitments_for( + &mut self, + protocol_version: ProtocolVersionId, + ) -> Option { + sqlx::query!( + r#" + SELECT + recursion_scheduler_level_vk_hash, + recursion_node_level_vk_hash, + recursion_leaf_level_vk_hash, + recursion_circuits_set_vks_hash + FROM + prover_fri_protocol_versions + WHERE + id = $1 + "#, + protocol_version as i32 + ) + .fetch_optional(self.storage.conn()) + .await + .unwrap() + .map(|row| L1VerifierConfig { + params: VerifierParams { + recursion_node_level_vk_hash: H256::from_slice(&row.recursion_node_level_vk_hash), + recursion_leaf_level_vk_hash: H256::from_slice(&row.recursion_leaf_level_vk_hash), + recursion_circuits_set_vks_hash: H256::from_slice( + &row.recursion_circuits_set_vks_hash, + ), + }, + recursion_scheduler_level_vk_hash: H256::from_slice( + &row.recursion_scheduler_level_vk_hash, + ), + }) + } } diff --git a/prover/prover_dal/src/fri_prover_dal.rs b/prover/prover_dal/src/fri_prover_dal.rs index 942d1c06612..d887bd0d515 100644 --- a/prover/prover_dal/src/fri_prover_dal.rs +++ b/prover/prover_dal/src/fri_prover_dal.rs @@ -50,10 +50,9 @@ impl FriProverDal<'_, '_> { pub async fn get_next_job( &mut self, - protocol_versions: &[ProtocolVersionId], + protocol_version: &ProtocolVersionId, picked_by: &str, ) -> Option { - let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); sqlx::query!( r#" UPDATE prover_jobs_fri @@ -71,7 +70,7 @@ impl FriProverDal<'_, '_> { prover_jobs_fri WHERE status = 'queued' - AND protocol_version = ANY ($1) + AND protocol_version = $1 ORDER BY aggregation_round DESC, l1_batch_number ASC, @@ -90,7 +89,7 @@ impl FriProverDal<'_, '_> { prover_jobs_fri.depth, prover_jobs_fri.is_node_final_proof "#, - &protocol_versions[..], + *protocol_version as i32, picked_by, ) .fetch_optional(self.storage.conn()) @@ -111,14 +110,13 @@ impl FriProverDal<'_, '_> { pub async fn get_next_job_for_circuit_id_round( &mut self, circuits_to_pick: &[CircuitIdRoundTuple], - protocol_versions: &[ProtocolVersionId], + protocol_version: &ProtocolVersionId, picked_by: &str, ) -> Option { let circuit_ids: Vec<_> = circuits_to_pick .iter() .map(|tuple| i16::from(tuple.circuit_id)) .collect(); - let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); let aggregation_rounds: Vec<_> = circuits_to_pick .iter() .map(|tuple| i16::from(tuple.aggregation_round)) @@ -150,7 +148,7 @@ impl FriProverDal<'_, '_> { prover_jobs_fri AS pj WHERE pj.status = 'queued' - AND pj.protocol_version = ANY ($3) + AND pj.protocol_version = $3 AND pj.circuit_id = tuple.circuit_id AND pj.aggregation_round = tuple.round ORDER BY @@ -179,7 +177,7 @@ impl FriProverDal<'_, '_> { "#, &circuit_ids[..], &aggregation_rounds[..], - &protocol_versions[..], + *protocol_version as i32, picked_by, ) .fetch_optional(self.storage.conn()) @@ -692,4 +690,21 @@ impl FriProverDal<'_, '_> { }) .collect() } + + pub async fn protocol_version_for_job(&mut self, job_id: u32) -> ProtocolVersionId { + sqlx::query!( + r#" + SELECT protocol_version + FROM prover_jobs_fri + WHERE id = $1 + "#, + job_id as i32 + ) + .fetch_one(self.storage.conn()) + .await + .unwrap() + .protocol_version + .map(|id| ProtocolVersionId::try_from(id as u16).unwrap()) + .unwrap() + } } diff --git a/prover/prover_dal/src/fri_witness_generator_dal.rs b/prover/prover_dal/src/fri_witness_generator_dal.rs index 947adf535db..fcbba5c5b7c 100644 --- a/prover/prover_dal/src/fri_witness_generator_dal.rs +++ b/prover/prover_dal/src/fri_witness_generator_dal.rs @@ -75,10 +75,9 @@ impl FriWitnessGeneratorDal<'_, '_> { pub async fn get_next_basic_circuit_witness_job( &mut self, last_l1_batch_to_process: u32, - protocol_versions: &[ProtocolVersionId], + protocol_version: ProtocolVersionId, picked_by: &str, ) -> Option<(L1BatchNumber, Eip4844Blobs)> { - let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); sqlx::query!( r#" UPDATE witness_inputs_fri @@ -97,7 +96,7 @@ impl FriWitnessGeneratorDal<'_, '_> { WHERE l1_batch_number <= $1 AND status = 'queued' - AND protocol_version = ANY ($2) + AND protocol_version = $2 ORDER BY l1_batch_number ASC LIMIT @@ -109,7 +108,7 @@ impl FriWitnessGeneratorDal<'_, '_> { witness_inputs_fri.* "#, i64::from(last_l1_batch_to_process), - &protocol_versions[..], + protocol_version as i32, picked_by, ) .fetch_optional(self.storage.conn()) @@ -421,10 +420,9 @@ impl FriWitnessGeneratorDal<'_, '_> { pub async fn get_next_leaf_aggregation_job( &mut self, - protocol_versions: &[ProtocolVersionId], + protocol_version: ProtocolVersionId, picked_by: &str, ) -> Option { - let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); let row = sqlx::query!( r#" UPDATE leaf_aggregation_witness_jobs_fri @@ -442,7 +440,7 @@ impl FriWitnessGeneratorDal<'_, '_> { leaf_aggregation_witness_jobs_fri WHERE status = 'queued' - AND protocol_version = ANY ($1) + AND protocol_version = $1 ORDER BY l1_batch_number ASC, id ASC @@ -454,7 +452,7 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING leaf_aggregation_witness_jobs_fri.* "#, - &protocol_versions[..], + protocol_version as i32, picked_by, ) .fetch_optional(self.storage.conn()) @@ -608,10 +606,9 @@ impl FriWitnessGeneratorDal<'_, '_> { pub async fn get_next_node_aggregation_job( &mut self, - protocol_versions: &[ProtocolVersionId], + protocol_version: ProtocolVersionId, picked_by: &str, ) -> Option { - let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); let row = sqlx::query!( r#" UPDATE node_aggregation_witness_jobs_fri @@ -629,7 +626,7 @@ impl FriWitnessGeneratorDal<'_, '_> { node_aggregation_witness_jobs_fri WHERE status = 'queued' - AND protocol_version = ANY ($1) + AND protocol_version = $1 ORDER BY l1_batch_number ASC, depth ASC, @@ -642,7 +639,7 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING node_aggregation_witness_jobs_fri.* "#, - &protocol_versions[..], + protocol_version as i32, picked_by, ) .fetch_optional(self.storage.conn()) @@ -1059,10 +1056,9 @@ impl FriWitnessGeneratorDal<'_, '_> { pub async fn get_next_recursion_tip_witness_job( &mut self, - protocol_versions: &[ProtocolVersionId], + protocol_version: ProtocolVersionId, picked_by: &str, ) -> Option { - let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); sqlx::query!( r#" UPDATE recursion_tip_witness_jobs_fri @@ -1080,7 +1076,7 @@ impl FriWitnessGeneratorDal<'_, '_> { recursion_tip_witness_jobs_fri WHERE status = 'queued' - AND protocol_version = ANY ($1) + AND protocol_version = $1 ORDER BY l1_batch_number ASC LIMIT @@ -1091,7 +1087,7 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING recursion_tip_witness_jobs_fri.l1_batch_number "#, - &protocol_versions[..], + protocol_version as i32, picked_by, ) .fetch_optional(self.storage.conn()) @@ -1163,10 +1159,9 @@ impl FriWitnessGeneratorDal<'_, '_> { pub async fn get_next_scheduler_witness_job( &mut self, - protocol_versions: &[ProtocolVersionId], + protocol_version: ProtocolVersionId, picked_by: &str, ) -> Option { - let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); sqlx::query!( r#" UPDATE scheduler_witness_jobs_fri @@ -1184,7 +1179,7 @@ impl FriWitnessGeneratorDal<'_, '_> { scheduler_witness_jobs_fri WHERE status = 'queued' - AND protocol_version = ANY ($1) + AND protocol_version = $1 ORDER BY l1_batch_number ASC LIMIT @@ -1195,7 +1190,7 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING scheduler_witness_jobs_fri.* "#, - &protocol_versions[..], + protocol_version as i32, picked_by, ) .fetch_optional(self.storage.conn()) diff --git a/prover/prover_fri/src/gpu_prover_job_processor.rs b/prover/prover_fri/src/gpu_prover_job_processor.rs index f2f6b99244e..cdd3c060c34 100644 --- a/prover/prover_fri/src/gpu_prover_job_processor.rs +++ b/prover/prover_fri/src/gpu_prover_job_processor.rs @@ -29,7 +29,9 @@ pub mod gpu_prover { CircuitWrapper, FriProofWrapper, ProverServiceDataKey, WitnessVectorArtifacts, }; use zksync_queued_job_processor::{async_trait, JobProcessor}; - use zksync_types::{basic_fri_types::CircuitIdRoundTuple, prover_dal::SocketAddress}; + use zksync_types::{ + basic_fri_types::CircuitIdRoundTuple, prover_dal::SocketAddress, ProtocolVersionId, + }; use zksync_vk_setup_data_server_fri::{keystore::Keystore, GoldilocksGpuProverSetupData}; use crate::{ @@ -62,6 +64,7 @@ pub mod gpu_prover { prover_context: ProverContext, address: SocketAddress, zone: String, + protocol_version: ProtocolVersionId, } impl Prover { @@ -76,6 +79,7 @@ pub mod gpu_prover { witness_vector_queue: SharedWitnessVectorQueue, address: SocketAddress, zone: String, + protocol_version: ProtocolVersionId, ) -> Self { Prover { blob_store, @@ -89,6 +93,7 @@ pub mod gpu_prover { .expect("failed initializing gpu prover context"), address, zone, + protocol_version, } } @@ -287,6 +292,7 @@ pub mod gpu_prover { self.public_blob_store.as_deref(), self.config.shall_save_to_public_bucket, &mut storage_processor, + self.protocol_version, ) .await; Ok(()) diff --git a/prover/prover_fri/src/main.rs b/prover/prover_fri/src/main.rs index 5d5d11a091a..c90b4ced147 100644 --- a/prover/prover_fri/src/main.rs +++ b/prover/prover_fri/src/main.rs @@ -23,6 +23,7 @@ use zksync_queued_job_processor::JobProcessor; use zksync_types::{ basic_fri_types::CircuitIdRoundTuple, prover_dal::{GpuProverInstanceStatus, SocketAddress}, + ProtocolVersionId, }; use zksync_utils::wait_for_tasks::ManagedTasks; @@ -192,15 +193,13 @@ async fn get_prover_tasks( circuit_ids_for_round_to_be_proven: Vec, _init_notifier: Arc, ) -> anyhow::Result>>> { - use zksync_vk_setup_data_server_fri::commitment_utils::get_cached_commitments; - use crate::prover_job_processor::{load_setup_data_cache, Prover}; - let vk_commitments = get_cached_commitments(); + let protocol_version = ProtocolVersionId::current_prover_version(); tracing::info!( - "Starting CPU FRI proof generation for with vk_commitments: {:?}", - vk_commitments + "Starting CPU FRI proof generation for with protocol_version: {:?}", + protocol_version ); let setup_load_mode = @@ -212,7 +211,7 @@ async fn get_prover_tasks( pool, setup_load_mode, circuit_ids_for_round_to_be_proven, - vk_commitments, + protocol_version, ); Ok(vec![tokio::spawn(prover.run(stop_receiver, None))]) } @@ -247,6 +246,9 @@ async fn get_prover_tasks( host: local_ip, port: prover_config.witness_vector_receiver_port, }; + + let protocol_version = ProtocolVersionId::current_prover_version(); + let prover = gpu_prover::Prover::new( store_factory.create_store().await, public_blob_store, @@ -257,6 +259,7 @@ async fn get_prover_tasks( consumer, address.clone(), zone.clone(), + protocol_version, ); let producer = shared_witness_vector_queue.clone(); @@ -271,6 +274,7 @@ async fn get_prover_tasks( pool.clone(), prover_config.specialized_group_id, zone.clone(), + protocol_version, ); let mut tasks = vec![ diff --git a/prover/prover_fri/src/prover_job_processor.rs b/prover/prover_fri/src/prover_job_processor.rs index b54205204ad..7805d27f55b 100644 --- a/prover/prover_fri/src/prover_job_processor.rs +++ b/prover/prover_fri/src/prover_job_processor.rs @@ -21,7 +21,7 @@ use zksync_prover_fri_types::{ }; use zksync_prover_fri_utils::fetch_next_circuit; use zksync_queued_job_processor::{async_trait, JobProcessor}; -use zksync_types::{basic_fri_types::CircuitIdRoundTuple, protocol_version::L1VerifierConfig}; +use zksync_types::{basic_fri_types::CircuitIdRoundTuple, ProtocolVersionId}; use zksync_vk_setup_data_server_fri::{keystore::Keystore, GoldilocksProverSetupData}; use crate::{ @@ -46,7 +46,7 @@ pub struct Prover { // Only pick jobs for the configured circuit id and aggregation rounds. // Empty means all jobs are picked. circuit_ids_for_round_to_be_proven: Vec, - vk_commitments: L1VerifierConfig, + protocol_version: ProtocolVersionId, } impl Prover { @@ -58,7 +58,7 @@ impl Prover { prover_connection_pool: ConnectionPool, setup_load_mode: SetupLoadMode, circuit_ids_for_round_to_be_proven: Vec, - vk_commitments: L1VerifierConfig, + protocol_version: ProtocolVersionId, ) -> Self { Prover { blob_store, @@ -67,7 +67,7 @@ impl Prover { prover_connection_pool, setup_load_mode, circuit_ids_for_round_to_be_proven, - vk_commitments, + protocol_version, } } @@ -194,7 +194,7 @@ impl JobProcessor for Prover { &mut storage, &*self.blob_store, &self.circuit_ids_for_round_to_be_proven, - &self.vk_commitments, + &self.protocol_version, ) .await else { @@ -249,6 +249,7 @@ impl JobProcessor for Prover { self.public_blob_store.as_deref(), self.config.shall_save_to_public_bucket, &mut storage_processor, + self.protocol_version, ) .await; Ok(()) diff --git a/prover/prover_fri/src/socket_listener.rs b/prover/prover_fri/src/socket_listener.rs index 7fd67290e23..44e8236308a 100644 --- a/prover/prover_fri/src/socket_listener.rs +++ b/prover/prover_fri/src/socket_listener.rs @@ -11,7 +11,10 @@ pub mod gpu_socket_listener { }; use zksync_object_store::bincode; use zksync_prover_fri_types::WitnessVectorArtifacts; - use zksync_types::prover_dal::{GpuProverInstanceStatus, SocketAddress}; + use zksync_types::{ + prover_dal::{GpuProverInstanceStatus, SocketAddress}, + ProtocolVersionId, + }; use crate::{ metrics::METRICS, @@ -24,6 +27,7 @@ pub mod gpu_socket_listener { pool: ConnectionPool, specialized_prover_group_id: u8, zone: String, + protocol_version: ProtocolVersionId, } impl SocketListener { @@ -33,6 +37,7 @@ pub mod gpu_socket_listener { pool: ConnectionPool, specialized_prover_group_id: u8, zone: String, + protocol_version: ProtocolVersionId, ) -> Self { Self { address, @@ -40,6 +45,7 @@ pub mod gpu_socket_listener { pool, specialized_prover_group_id, zone, + protocol_version, } } async fn init(&self, init_notifier: Arc) -> anyhow::Result { @@ -63,6 +69,7 @@ pub mod gpu_socket_listener { self.address.clone(), self.specialized_prover_group_id, self.zone.clone(), + self.protocol_version, ) .await; init_notifier.notify_one(); diff --git a/prover/prover_fri/src/utils.rs b/prover/prover_fri/src/utils.rs index 815613ac484..503f22da6d9 100644 --- a/prover/prover_fri/src/utils.rs +++ b/prover/prover_fri/src/utils.rs @@ -24,7 +24,7 @@ use zksync_prover_fri_types::{ }; use zksync_types::{ basic_fri_types::{AggregationRound, CircuitIdRoundTuple}, - L1BatchNumber, + L1BatchNumber, ProtocolVersionId, }; use crate::metrics::METRICS; @@ -55,6 +55,7 @@ pub struct GpuProverJob { pub witness_vector_artifacts: WitnessVectorArtifacts, } +#[allow(clippy::too_many_arguments)] pub async fn save_proof( job_id: u32, started_at: Instant, @@ -62,7 +63,8 @@ pub async fn save_proof( blob_store: &dyn ObjectStore, public_blob_store: Option<&dyn ObjectStore>, shall_save_to_public_bucket: bool, - storage_processor: &mut Connection<'_, Prover>, + connection: &mut Connection<'_, Prover>, + protocol_version: ProtocolVersionId, ) { tracing::info!( "Successfully proven job: {}, total time taken: {:?}", @@ -95,7 +97,7 @@ pub async fn save_proof( METRICS.blob_save_time[&circuit_type.to_string()].observe(blob_save_started_at.elapsed()); - let mut transaction = storage_processor.start_transaction().await.unwrap(); + let mut transaction = connection.start_transaction().await.unwrap(); transaction .fri_prover_jobs_dal() .save_proof(job_id, started_at.elapsed(), &blob_url) @@ -103,7 +105,7 @@ pub async fn save_proof( if is_scheduler_proof { transaction .fri_proof_compressor_dal() - .insert_proof_compression_job(artifacts.block_number, &blob_url) + .insert_proof_compression_job(artifacts.block_number, &blob_url, protocol_version) .await; } transaction.commit().await.unwrap(); diff --git a/prover/prover_fri_utils/src/lib.rs b/prover/prover_fri_utils/src/lib.rs index fb8c31a1cf5..39955fb5597 100644 --- a/prover/prover_fri_utils/src/lib.rs +++ b/prover/prover_fri_utils/src/lib.rs @@ -15,7 +15,7 @@ use zksync_prover_fri_types::{ }; use zksync_types::{ basic_fri_types::{AggregationRound, CircuitIdRoundTuple}, - protocol_version::L1VerifierConfig, + ProtocolVersionId, }; use crate::metrics::{CircuitLabels, PROVER_FRI_UTILS_METRICS}; @@ -28,12 +28,8 @@ pub async fn fetch_next_circuit( storage: &mut Connection<'_, Prover>, blob_store: &dyn ObjectStore, circuit_ids_for_round_to_be_proven: &[CircuitIdRoundTuple], - vk_commitments: &L1VerifierConfig, + protocol_version: &ProtocolVersionId, ) -> Option { - let protocol_versions = storage - .fri_protocol_versions_dal() - .protocol_version_for(vk_commitments) - .await; let pod_name = get_current_pod_name(); let prover_job = match &circuit_ids_for_round_to_be_proven.is_empty() { false => { @@ -42,7 +38,7 @@ pub async fn fetch_next_circuit( .fri_prover_jobs_dal() .get_next_job_for_circuit_id_round( circuit_ids_for_round_to_be_proven, - &protocol_versions, + protocol_version, &pod_name, ) .await @@ -51,7 +47,7 @@ pub async fn fetch_next_circuit( // Generalized prover: proving all circuits. storage .fri_prover_jobs_dal() - .get_next_job(&protocol_versions, &pod_name) + .get_next_job(protocol_version, &pod_name) .await } }?; diff --git a/prover/witness_generator/src/basic_circuits.rs b/prover/witness_generator/src/basic_circuits.rs index 52c508e9a6f..a8fe1ab7d13 100644 --- a/prover/witness_generator/src/basic_circuits.rs +++ b/prover/witness_generator/src/basic_circuits.rs @@ -89,7 +89,7 @@ pub struct BasicWitnessGenerator { public_blob_store: Option>, connection_pool: ConnectionPool, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, } impl BasicWitnessGenerator { @@ -99,7 +99,7 @@ impl BasicWitnessGenerator { public_blob_store: Option>, connection_pool: ConnectionPool, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, ) -> Self { Self { config: Arc::new(config), @@ -107,7 +107,7 @@ impl BasicWitnessGenerator { public_blob_store, connection_pool, prover_connection_pool, - protocol_versions, + protocol_version, } } @@ -160,7 +160,7 @@ impl JobProcessor for BasicWitnessGenerator { .fri_witness_generator_dal() .get_next_basic_circuit_witness_job( last_l1_batch_to_process, - &self.protocol_versions, + self.protocol_version, &pod_name, ) .await diff --git a/prover/witness_generator/src/leaf_aggregation.rs b/prover/witness_generator/src/leaf_aggregation.rs index c7159ba646a..181408c2e11 100644 --- a/prover/witness_generator/src/leaf_aggregation.rs +++ b/prover/witness_generator/src/leaf_aggregation.rs @@ -76,7 +76,7 @@ pub struct LeafAggregationWitnessGenerator { config: FriWitnessGeneratorConfig, object_store: Arc, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, } impl LeafAggregationWitnessGenerator { @@ -84,13 +84,13 @@ impl LeafAggregationWitnessGenerator { config: FriWitnessGeneratorConfig, store_factory: &ObjectStoreFactory, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, ) -> Self { Self { config, object_store: store_factory.create_store().await, prover_connection_pool, - protocol_versions, + protocol_version, } } @@ -121,7 +121,7 @@ impl JobProcessor for LeafAggregationWitnessGenerator { let pod_name = get_current_pod_name(); let Some(metadata) = prover_connection .fri_witness_generator_dal() - .get_next_leaf_aggregation_job(&self.protocol_versions, &pod_name) + .get_next_leaf_aggregation_job(self.protocol_version, &pod_name) .await else { return Ok(None); diff --git a/prover/witness_generator/src/main.rs b/prover/witness_generator/src/main.rs index 1e625b409f1..f234a80b481 100644 --- a/prover/witness_generator/src/main.rs +++ b/prover/witness_generator/src/main.rs @@ -15,7 +15,9 @@ use zksync_config::{ use zksync_env_config::{object_store::ProverObjectStoreConfig, FromEnv}; use zksync_object_store::ObjectStoreFactory; use zksync_queued_job_processor::JobProcessor; -use zksync_types::{basic_fri_types::AggregationRound, web3::futures::StreamExt}; +use zksync_types::{ + basic_fri_types::AggregationRound, web3::futures::StreamExt, ProtocolVersionId, +}; use zksync_utils::wait_for_tasks::ManagedTasks; use zksync_vk_setup_data_server_fri::commitment_utils::get_cached_commitments; @@ -119,25 +121,24 @@ async fn main() -> anyhow::Result<()> { .await .context("failed to build a prover_connection_pool")?; let (stop_sender, stop_receiver) = watch::channel(false); - let vk_commitments = get_cached_commitments(); - let protocol_versions = prover_connection_pool + + let protocol_version = ProtocolVersionId::current_prover_version(); + let vk_commitments_in_db = match prover_connection_pool .connection() .await .unwrap() .fri_protocol_versions_dal() - .protocol_version_for(&vk_commitments) - .await; - - // If `batch_size` is none, it means that the job is 'looping forever' (this is the usual setup in local network). - // At the same time, we're reading the `protocol_version` only once at startup - so if there is no protocol version - // read (this is often due to the fact, that the gateway was started too late, and it didn't put the updated protocol - // versions into the database) - then the job will simply 'hang forever' and not pick any tasks. - if opt.batch_size.is_none() && protocol_versions.is_empty() { - panic!( - "Could not find a protocol version for my commitments. Is gateway running? Maybe you started this job before gateway updated the database? Commitments: {:?}", - vk_commitments - ); - } + .vk_commitments_for(protocol_version) + .await + { + Some(commitments) => commitments, + None => { + panic!( + "No vk commitments available in database for a protocol version {:?}.", + protocol_version + ); + } + }; let rounds = match (opt.round, opt.all_rounds) { (Some(round), false) => vec![round], @@ -164,10 +165,10 @@ async fn main() -> anyhow::Result<()> { for (i, round) in rounds.iter().enumerate() { tracing::info!( - "initializing the {:?} witness generator, batch size: {:?} with protocol_versions: {:?}", + "initializing the {:?} witness generator, batch size: {:?} with protocol_version: {:?}", round, opt.batch_size, - &protocol_versions + &protocol_version ); let prometheus_config = if use_push_gateway { @@ -183,6 +184,13 @@ async fn main() -> anyhow::Result<()> { let witness_generator_task = match round { AggregationRound::BasicCircuits => { + let vk_commitments = get_cached_commitments(); + assert_eq!( + vk_commitments, + vk_commitments_in_db, + "VK commitments didn't match commitments from DB for protocol version {protocol_version:?}. Cached commitments: {vk_commitments:?}, commitments in database: {vk_commitments_in_db:?}" + ); + let public_blob_store = match config.shall_save_to_public_bucket { false => None, true => Some( @@ -200,7 +208,7 @@ async fn main() -> anyhow::Result<()> { public_blob_store, connection_pool.clone(), prover_connection_pool.clone(), - protocol_versions.clone(), + protocol_version, ) .await; generator.run(stop_receiver.clone(), opt.batch_size) @@ -210,7 +218,7 @@ async fn main() -> anyhow::Result<()> { config.clone(), &store_factory, prover_connection_pool.clone(), - protocol_versions.clone(), + protocol_version, ) .await; generator.run(stop_receiver.clone(), opt.batch_size) @@ -220,7 +228,7 @@ async fn main() -> anyhow::Result<()> { config.clone(), &store_factory, prover_connection_pool.clone(), - protocol_versions.clone(), + protocol_version, ) .await; generator.run(stop_receiver.clone(), opt.batch_size) @@ -230,7 +238,7 @@ async fn main() -> anyhow::Result<()> { config.clone(), &store_factory, prover_connection_pool.clone(), - protocol_versions.clone(), + protocol_version, ) .await; generator.run(stop_receiver.clone(), opt.batch_size) @@ -240,7 +248,7 @@ async fn main() -> anyhow::Result<()> { config.clone(), &store_factory, prover_connection_pool.clone(), - protocol_versions.clone(), + protocol_version, ) .await; generator.run(stop_receiver.clone(), opt.batch_size) diff --git a/prover/witness_generator/src/node_aggregation.rs b/prover/witness_generator/src/node_aggregation.rs index 5e1cbbd0d8c..95255f79ece 100644 --- a/prover/witness_generator/src/node_aggregation.rs +++ b/prover/witness_generator/src/node_aggregation.rs @@ -76,7 +76,7 @@ pub struct NodeAggregationWitnessGenerator { config: FriWitnessGeneratorConfig, object_store: Arc, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, } impl NodeAggregationWitnessGenerator { @@ -84,13 +84,13 @@ impl NodeAggregationWitnessGenerator { config: FriWitnessGeneratorConfig, store_factory: &ObjectStoreFactory, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, ) -> Self { Self { config, object_store: store_factory.create_store().await, prover_connection_pool, - protocol_versions, + protocol_version, } } @@ -152,7 +152,7 @@ impl JobProcessor for NodeAggregationWitnessGenerator { let pod_name = get_current_pod_name(); let Some(metadata) = prover_connection .fri_witness_generator_dal() - .get_next_node_aggregation_job(&self.protocol_versions, &pod_name) + .get_next_node_aggregation_job(self.protocol_version, &pod_name) .await else { return Ok(None); diff --git a/prover/witness_generator/src/recursion_tip.rs b/prover/witness_generator/src/recursion_tip.rs index 77dc82ca432..f4681c6e366 100644 --- a/prover/witness_generator/src/recursion_tip.rs +++ b/prover/witness_generator/src/recursion_tip.rs @@ -73,7 +73,7 @@ pub struct RecursionTipWitnessGenerator { config: FriWitnessGeneratorConfig, object_store: Arc, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, } impl RecursionTipWitnessGenerator { @@ -81,13 +81,13 @@ impl RecursionTipWitnessGenerator { config: FriWitnessGeneratorConfig, store_factory: &ObjectStoreFactory, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, ) -> Self { Self { config, object_store: store_factory.create_store().await, prover_connection_pool, - protocol_versions, + protocol_version, } } @@ -143,7 +143,7 @@ impl JobProcessor for RecursionTipWitnessGenerator { let pod_name = get_current_pod_name(); let Some(l1_batch_number) = prover_connection .fri_witness_generator_dal() - .get_next_recursion_tip_witness_job(&self.protocol_versions, &pod_name) + .get_next_recursion_tip_witness_job(self.protocol_version, &pod_name) .await else { return Ok(None); diff --git a/prover/witness_generator/src/scheduler.rs b/prover/witness_generator/src/scheduler.rs index e02fc2742f3..946c8cabaca 100644 --- a/prover/witness_generator/src/scheduler.rs +++ b/prover/witness_generator/src/scheduler.rs @@ -57,7 +57,7 @@ pub struct SchedulerWitnessGenerator { config: FriWitnessGeneratorConfig, object_store: Arc, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, } impl SchedulerWitnessGenerator { @@ -65,13 +65,13 @@ impl SchedulerWitnessGenerator { config: FriWitnessGeneratorConfig, store_factory: &ObjectStoreFactory, prover_connection_pool: ConnectionPool, - protocol_versions: Vec, + protocol_version: ProtocolVersionId, ) -> Self { Self { config, object_store: store_factory.create_store().await, prover_connection_pool, - protocol_versions, + protocol_version, } } @@ -128,7 +128,7 @@ impl JobProcessor for SchedulerWitnessGenerator { let pod_name = get_current_pod_name(); let Some(l1_batch_number) = prover_connection .fri_witness_generator_dal() - .get_next_scheduler_witness_job(&self.protocol_versions, &pod_name) + .get_next_scheduler_witness_job(self.protocol_version, &pod_name) .await else { return Ok(None); diff --git a/prover/witness_vector_generator/src/generator.rs b/prover/witness_vector_generator/src/generator.rs index 42fd4210f6f..60d0df58794 100644 --- a/prover/witness_vector_generator/src/generator.rs +++ b/prover/witness_vector_generator/src/generator.rs @@ -19,8 +19,7 @@ use zksync_prover_fri_utils::{ }; use zksync_queued_job_processor::JobProcessor; use zksync_types::{ - basic_fri_types::CircuitIdRoundTuple, protocol_version::L1VerifierConfig, - prover_dal::GpuProverInstanceStatus, + basic_fri_types::CircuitIdRoundTuple, prover_dal::GpuProverInstanceStatus, ProtocolVersionId, }; use zksync_vk_setup_data_server_fri::keystore::Keystore; @@ -32,7 +31,7 @@ pub struct WitnessVectorGenerator { circuit_ids_for_round_to_be_proven: Vec, zone: String, config: FriWitnessVectorGeneratorConfig, - vk_commitments: L1VerifierConfig, + protocol_version: ProtocolVersionId, max_attempts: u32, } @@ -43,7 +42,7 @@ impl WitnessVectorGenerator { circuit_ids_for_round_to_be_proven: Vec, zone: String, config: FriWitnessVectorGeneratorConfig, - vk_commitments: L1VerifierConfig, + protocol_version: ProtocolVersionId, max_attempts: u32, ) -> Self { Self { @@ -52,7 +51,7 @@ impl WitnessVectorGenerator { circuit_ids_for_round_to_be_proven, zone, config, - vk_commitments, + protocol_version, max_attempts, } } @@ -91,7 +90,7 @@ impl JobProcessor for WitnessVectorGenerator { &mut storage, &*self.blob_store, &self.circuit_ids_for_round_to_be_proven, - &self.vk_commitments, + &self.protocol_version, ) .await else { @@ -157,6 +156,7 @@ impl JobProcessor for WitnessVectorGenerator { self.config.max_prover_reservation_duration(), self.config.specialized_group_id, self.zone.clone(), + self.protocol_version, ) .await; diff --git a/prover/witness_vector_generator/src/main.rs b/prover/witness_vector_generator/src/main.rs index 3eaf8eda11e..2260648d3f1 100644 --- a/prover/witness_vector_generator/src/main.rs +++ b/prover/witness_vector_generator/src/main.rs @@ -15,8 +15,8 @@ use zksync_env_config::{object_store::ProverObjectStoreConfig, FromEnv}; use zksync_object_store::ObjectStoreFactory; use zksync_prover_fri_utils::{get_all_circuit_id_round_tuples_for, region_fetcher::get_zone}; use zksync_queued_job_processor::JobProcessor; +use zksync_types::ProtocolVersionId; use zksync_utils::wait_for_tasks::ManagedTasks; -use zksync_vk_setup_data_server_fri::commitment_utils::get_cached_commitments; use crate::generator::WitnessVectorGenerator; @@ -86,14 +86,16 @@ async fn main() -> anyhow::Result<()> { let fri_prover_config = FriProverConfig::from_env().context("FriProverConfig::from_env()")?; let zone_url = &fri_prover_config.zone_read_url; let zone = get_zone(zone_url).await.context("get_zone()")?; - let vk_commitments = get_cached_commitments(); + + let protocol_version = ProtocolVersionId::current_prover_version(); + let witness_vector_generator = WitnessVectorGenerator::new( blob_store, pool, circuit_ids_for_round_to_be_proven.clone(), zone.clone(), config, - vk_commitments, + protocol_version, fri_prover_config.max_attempts, ); @@ -108,7 +110,7 @@ async fn main() -> anyhow::Result<()> { }) .expect("Error setting Ctrl+C handler"); - tracing::info!("Starting witness vector generation for group: {} with circuits: {:?} in zone: {} with vk_commitments: {:?}", specialized_group_id, circuit_ids_for_round_to_be_proven, zone, vk_commitments); + tracing::info!("Starting witness vector generation for group: {} with circuits: {:?} in zone: {} with protocol_version: {:?}", specialized_group_id, circuit_ids_for_round_to_be_proven, zone, protocol_version); let tasks = vec![ tokio::spawn(exporter_config.run(stop_receiver.clone())),