diff --git a/core/lib/dal/.sqlx/query-982a7566aebc4f94b37c9cbe32f0f6d8b037d7a5fade524a4faadb421399a00b.json b/core/lib/dal/.sqlx/.query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json.nPBbNl similarity index 61% rename from core/lib/dal/.sqlx/query-982a7566aebc4f94b37c9cbe32f0f6d8b037d7a5fade524a4faadb421399a00b.json rename to core/lib/dal/.sqlx/.query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json.nPBbNl index a0567d0917d..69a1077452d 100644 --- a/core/lib/dal/.sqlx/query-982a7566aebc4f94b37c9cbe32f0f6d8b037d7a5fade524a4faadb421399a00b.json +++ b/core/lib/dal/.sqlx/.query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json.nPBbNl @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n transactions.hash AS tx_hash,\n transactions.index_in_block AS index_in_block,\n transactions.miniblock_number AS block_number,\n transactions.nonce AS nonce,\n transactions.signature AS signature,\n transactions.initiator_address AS initiator_address,\n transactions.tx_format AS tx_format,\n transactions.value AS value,\n transactions.gas_limit AS gas_limit,\n transactions.max_fee_per_gas AS max_fee_per_gas,\n transactions.max_priority_fee_per_gas AS max_priority_fee_per_gas,\n transactions.effective_gas_price AS effective_gas_price,\n transactions.l1_batch_number AS l1_batch_number,\n transactions.l1_batch_tx_index AS l1_batch_tx_index,\n transactions.data->'contractAddress' AS \"execute_contract_address\",\n transactions.data->'calldata' AS \"calldata\",\n miniblocks.hash AS \"block_hash\"\n FROM transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n WHERE\n transactions.miniblock_number = $1 AND transactions.index_in_block = $2 AND transactions.data != '{}'::jsonb", + "query": "\n SELECT\n transactions.hash AS tx_hash,\n transactions.index_in_block AS index_in_block,\n miniblocks.number AS block_number,\n transactions.nonce AS nonce,\n transactions.signature AS signature,\n transactions.initiator_address AS initiator_address,\n transactions.tx_format AS tx_format,\n transactions.value AS value,\n transactions.gas_limit AS gas_limit,\n transactions.max_fee_per_gas AS max_fee_per_gas,\n transactions.max_priority_fee_per_gas AS max_priority_fee_per_gas,\n transactions.effective_gas_price AS effective_gas_price,\n transactions.l1_batch_number AS l1_batch_number,\n transactions.l1_batch_tx_index AS l1_batch_tx_index,\n transactions.data->'contractAddress' AS \"execute_contract_address\",\n transactions.data->'calldata' AS \"calldata\",\n miniblocks.hash AS \"block_hash\"\n FROM transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n WHERE\n miniblocks.number = $1 AND transactions.index_in_block = $2 AND transactions.data != '{}'::jsonb", "describe": { "columns": [ { @@ -98,7 +98,7 @@ "nullable": [ false, true, - true, + false, true, true, false, @@ -115,5 +115,5 @@ false ] }, - "hash": "982a7566aebc4f94b37c9cbe32f0f6d8b037d7a5fade524a4faadb421399a00b" + "hash": "05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6" } diff --git a/core/lib/dal/.sqlx/query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json b/core/lib/dal/.sqlx/query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json new file mode 100644 index 00000000000..69a1077452d --- /dev/null +++ b/core/lib/dal/.sqlx/query-05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6.json @@ -0,0 +1,119 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n transactions.hash AS tx_hash,\n transactions.index_in_block AS index_in_block,\n miniblocks.number AS block_number,\n transactions.nonce AS nonce,\n transactions.signature AS signature,\n transactions.initiator_address AS initiator_address,\n transactions.tx_format AS tx_format,\n transactions.value AS value,\n transactions.gas_limit AS gas_limit,\n transactions.max_fee_per_gas AS max_fee_per_gas,\n transactions.max_priority_fee_per_gas AS max_priority_fee_per_gas,\n transactions.effective_gas_price AS effective_gas_price,\n transactions.l1_batch_number AS l1_batch_number,\n transactions.l1_batch_tx_index AS l1_batch_tx_index,\n transactions.data->'contractAddress' AS \"execute_contract_address\",\n transactions.data->'calldata' AS \"calldata\",\n miniblocks.hash AS \"block_hash\"\n FROM transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n WHERE\n miniblocks.number = $1 AND transactions.index_in_block = $2 AND transactions.data != '{}'::jsonb", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "index_in_block", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "block_number", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "nonce", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "signature", + "type_info": "Bytea" + }, + { + "ordinal": 5, + "name": "initiator_address", + "type_info": "Bytea" + }, + { + "ordinal": 6, + "name": "tx_format", + "type_info": "Int4" + }, + { + "ordinal": 7, + "name": "value", + "type_info": "Numeric" + }, + { + "ordinal": 8, + "name": "gas_limit", + "type_info": "Numeric" + }, + { + "ordinal": 9, + "name": "max_fee_per_gas", + "type_info": "Numeric" + }, + { + "ordinal": 10, + "name": "max_priority_fee_per_gas", + "type_info": "Numeric" + }, + { + "ordinal": 11, + "name": "effective_gas_price", + "type_info": "Numeric" + }, + { + "ordinal": 12, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 13, + "name": "l1_batch_tx_index", + "type_info": "Int4" + }, + { + "ordinal": 14, + "name": "execute_contract_address", + "type_info": "Jsonb" + }, + { + "ordinal": 15, + "name": "calldata", + "type_info": "Jsonb" + }, + { + "ordinal": 16, + "name": "block_hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int4" + ] + }, + "nullable": [ + false, + true, + false, + true, + true, + false, + true, + false, + true, + true, + true, + true, + true, + true, + null, + null, + false + ] + }, + "hash": "05be1a2c5cefcb1a58af2e5113e89003638d26219c13bc176e8cfee696d4e9f6" +} diff --git a/core/lib/dal/.sqlx/query-1dcb3afb0c1947f92981f61d95c099c4591ce3f8d51f3df99db0165e086f96af.json b/core/lib/dal/.sqlx/query-1dcb3afb0c1947f92981f61d95c099c4591ce3f8d51f3df99db0165e086f96af.json deleted file mode 100644 index dd142601d00..00000000000 --- a/core/lib/dal/.sqlx/query-1dcb3afb0c1947f92981f61d95c099c4591ce3f8d51f3df99db0165e086f96af.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n bytecode\n FROM\n factory_deps\n WHERE\n bytecode_hash = $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "bytecode", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Bytea" - ] - }, - "nullable": [ - false - ] - }, - "hash": "1dcb3afb0c1947f92981f61d95c099c4591ce3f8d51f3df99db0165e086f96af" -} diff --git a/core/lib/dal/.sqlx/query-55e6f5ea121090034c8920dffb7cfb3ba0659b6fcc582e174ee80601b54f89d9.json b/core/lib/dal/.sqlx/query-2d8da95804a7a300ff2b756e6785c0665ee3ea4086ddb91428fa1b6a00760737.json similarity index 51% rename from core/lib/dal/.sqlx/query-55e6f5ea121090034c8920dffb7cfb3ba0659b6fcc582e174ee80601b54f89d9.json rename to core/lib/dal/.sqlx/query-2d8da95804a7a300ff2b756e6785c0665ee3ea4086ddb91428fa1b6a00760737.json index 117d1bad9bf..8b8cef7dd77 100644 --- a/core/lib/dal/.sqlx/query-55e6f5ea121090034c8920dffb7cfb3ba0659b6fcc582e174ee80601b54f89d9.json +++ b/core/lib/dal/.sqlx/query-2d8da95804a7a300ff2b756e6785c0665ee3ea4086ddb91428fa1b6a00760737.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT DISTINCT\n ON (hashed_key) hashed_key,\n miniblock_number,\n value\n FROM\n storage_logs\n WHERE\n hashed_key = ANY ($1)\n AND miniblock_number <= $2\n ORDER BY\n hashed_key,\n miniblock_number DESC,\n operation_number DESC\n ", + "query": "\n SELECT DISTINCT\n ON (hashed_key) hashed_key,\n miniblock_number,\n value\n FROM\n storage_logs\n WHERE\n hashed_key = ANY ($1)\n AND miniblock_number <= $2\n AND miniblock_number <= COALESCE(\n (\n SELECT\n MAX(number)\n FROM\n miniblocks\n ),\n (\n SELECT\n miniblock_number\n FROM\n snapshot_recovery\n )\n )\n ORDER BY\n hashed_key,\n miniblock_number DESC,\n operation_number DESC\n ", "describe": { "columns": [ { @@ -31,5 +31,5 @@ false ] }, - "hash": "55e6f5ea121090034c8920dffb7cfb3ba0659b6fcc582e174ee80601b54f89d9" + "hash": "2d8da95804a7a300ff2b756e6785c0665ee3ea4086ddb91428fa1b6a00760737" } diff --git a/core/lib/dal/.sqlx/query-66b3f476cf4487239190175d2940fd5e1330393d8b04c7cf03b8c7ab2edad5d1.json b/core/lib/dal/.sqlx/query-66b3f476cf4487239190175d2940fd5e1330393d8b04c7cf03b8c7ab2edad5d1.json new file mode 100644 index 00000000000..0cf4a1e3200 --- /dev/null +++ b/core/lib/dal/.sqlx/query-66b3f476cf4487239190175d2940fd5e1330393d8b04c7cf03b8c7ab2edad5d1.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n transactions (\n hash,\n is_priority,\n initiator_address,\n gas_limit,\n max_fee_per_gas,\n gas_per_pubdata_limit,\n data,\n upgrade_id,\n contract_address,\n l1_block_number,\n value,\n paymaster,\n paymaster_input,\n tx_format,\n l1_tx_mint,\n l1_tx_refund_recipient,\n miniblock_number,\n index_in_block,\n error,\n execution_info,\n refunded_gas,\n effective_gas_price,\n received_at,\n created_at,\n updated_at\n )\n SELECT\n data_table.hash,\n TRUE,\n data_table.initiator_address,\n data_table.gas_limit,\n data_table.max_fee_per_gas,\n data_table.gas_per_pubdata_limit,\n data_table.data,\n data_table.upgrade_id,\n data_table.contract_address,\n data_table.l1_block_number,\n data_table.value,\n '\\x0000000000000000000000000000000000000000'::bytea,\n '\\x'::bytea,\n data_table.tx_format,\n data_table.l1_tx_mint,\n data_table.l1_tx_refund_recipient,\n $19,\n data_table.index_in_block,\n NULLIF(data_table.error, ''),\n data_table.execution_info,\n data_table.refunded_gas,\n data_table.effective_gas_price,\n NOW(),\n NOW(),\n NOW()\n FROM\n (\n SELECT\n UNNEST($1::BYTEA[]) AS hash,\n UNNEST($2::BYTEA[]) AS initiator_address,\n UNNEST($3::NUMERIC[]) AS gas_limit,\n UNNEST($4::NUMERIC[]) AS max_fee_per_gas,\n UNNEST($5::NUMERIC[]) AS gas_per_pubdata_limit,\n UNNEST($6::JSONB[]) AS data,\n UNNEST($7::INT[]) AS upgrade_id,\n UNNEST($8::BYTEA[]) AS contract_address,\n UNNEST($9::INT[]) AS l1_block_number,\n UNNEST($10::NUMERIC[]) AS value,\n UNNEST($11::INTEGER[]) AS tx_format,\n UNNEST($12::NUMERIC[]) AS l1_tx_mint,\n UNNEST($13::BYTEA[]) AS l1_tx_refund_recipient,\n UNNEST($14::INT[]) AS index_in_block,\n UNNEST($15::VARCHAR[]) AS error,\n UNNEST($16::JSONB[]) AS execution_info,\n UNNEST($17::BIGINT[]) AS refunded_gas,\n UNNEST($18::NUMERIC[]) AS effective_gas_price\n ) AS data_table\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray", + "ByteaArray", + "NumericArray", + "NumericArray", + "NumericArray", + "JsonbArray", + "Int4Array", + "ByteaArray", + "Int4Array", + "NumericArray", + "Int4Array", + "NumericArray", + "ByteaArray", + "Int4Array", + "VarcharArray", + "JsonbArray", + "Int8Array", + "NumericArray", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "66b3f476cf4487239190175d2940fd5e1330393d8b04c7cf03b8c7ab2edad5d1" +} diff --git a/core/lib/dal/.sqlx/query-69c885498b186f3b7cbb215112ec86783d7da0ec1d008680872f3619cf217923.json b/core/lib/dal/.sqlx/query-69c885498b186f3b7cbb215112ec86783d7da0ec1d008680872f3619cf217923.json new file mode 100644 index 00000000000..82575c807fb --- /dev/null +++ b/core/lib/dal/.sqlx/query-69c885498b186f3b7cbb215112ec86783d7da0ec1d008680872f3619cf217923.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM transactions\n WHERE\n hash = ANY ($1)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "69c885498b186f3b7cbb215112ec86783d7da0ec1d008680872f3619cf217923" +} diff --git a/core/lib/dal/.sqlx/query-ac47b807af0441cd522a41879f25679d90947c0af172c9a199cf9280aa557e95.json b/core/lib/dal/.sqlx/query-6e3a3ef443ce8aab55b10eea55f9c8ff11775885aebaf457075c6825305244e5.json similarity index 54% rename from core/lib/dal/.sqlx/query-ac47b807af0441cd522a41879f25679d90947c0af172c9a199cf9280aa557e95.json rename to core/lib/dal/.sqlx/query-6e3a3ef443ce8aab55b10eea55f9c8ff11775885aebaf457075c6825305244e5.json index be3f6ca3c77..de474897307 100644 --- a/core/lib/dal/.sqlx/query-ac47b807af0441cd522a41879f25679d90947c0af172c9a199cf9280aa557e95.json +++ b/core/lib/dal/.sqlx/query-6e3a3ef443ce8aab55b10eea55f9c8ff11775885aebaf457075c6825305244e5.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n transactions.is_priority,\n transactions.initiator_address,\n transactions.gas_limit,\n transactions.gas_per_pubdata_limit,\n transactions.received_at,\n transactions.miniblock_number,\n transactions.error,\n transactions.effective_gas_price,\n transactions.refunded_gas,\n commit_tx.tx_hash AS \"eth_commit_tx_hash?\",\n prove_tx.tx_hash AS \"eth_prove_tx_hash?\",\n execute_tx.tx_hash AS \"eth_execute_tx_hash?\"\n FROM\n transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n LEFT JOIN l1_batches ON l1_batches.number = miniblocks.l1_batch_number\n LEFT JOIN eth_txs_history AS commit_tx ON (\n l1_batches.eth_commit_tx_id = commit_tx.eth_tx_id\n AND commit_tx.confirmed_at IS NOT NULL\n )\n LEFT JOIN eth_txs_history AS prove_tx ON (\n l1_batches.eth_prove_tx_id = prove_tx.eth_tx_id\n AND prove_tx.confirmed_at IS NOT NULL\n )\n LEFT JOIN eth_txs_history AS execute_tx ON (\n l1_batches.eth_execute_tx_id = execute_tx.eth_tx_id\n AND execute_tx.confirmed_at IS NOT NULL\n )\n WHERE\n transactions.hash = $1\n AND transactions.data != '{}'::jsonb\n ", + "query": "\n SELECT\n transactions.is_priority,\n transactions.initiator_address,\n transactions.gas_limit,\n transactions.gas_per_pubdata_limit,\n transactions.received_at,\n miniblocks.number AS \"miniblock_number?\",\n transactions.error,\n transactions.effective_gas_price,\n transactions.refunded_gas,\n commit_tx.tx_hash AS \"eth_commit_tx_hash?\",\n prove_tx.tx_hash AS \"eth_prove_tx_hash?\",\n execute_tx.tx_hash AS \"eth_execute_tx_hash?\"\n FROM\n transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n LEFT JOIN l1_batches ON l1_batches.number = miniblocks.l1_batch_number\n LEFT JOIN eth_txs_history AS commit_tx ON (\n l1_batches.eth_commit_tx_id = commit_tx.eth_tx_id\n AND commit_tx.confirmed_at IS NOT NULL\n )\n LEFT JOIN eth_txs_history AS prove_tx ON (\n l1_batches.eth_prove_tx_id = prove_tx.eth_tx_id\n AND prove_tx.confirmed_at IS NOT NULL\n )\n LEFT JOIN eth_txs_history AS execute_tx ON (\n l1_batches.eth_execute_tx_id = execute_tx.eth_tx_id\n AND execute_tx.confirmed_at IS NOT NULL\n )\n WHERE\n transactions.hash = $1\n AND transactions.data != '{}'::jsonb\n ", "describe": { "columns": [ { @@ -30,7 +30,7 @@ }, { "ordinal": 5, - "name": "miniblock_number", + "name": "miniblock_number?", "type_info": "Int8" }, { @@ -75,7 +75,7 @@ true, true, false, - true, + false, true, true, false, @@ -84,5 +84,5 @@ false ] }, - "hash": "ac47b807af0441cd522a41879f25679d90947c0af172c9a199cf9280aa557e95" + "hash": "6e3a3ef443ce8aab55b10eea55f9c8ff11775885aebaf457075c6825305244e5" } diff --git a/core/lib/dal/.sqlx/query-2dd7dbaeb2572404451e78a96f540e73a2778633bbf9d8e591ec912634639af9.json b/core/lib/dal/.sqlx/query-a1829ef4532c8db6c1c907026e8643b7b722e0e467ad03978e9efe652c92a975.json similarity index 91% rename from core/lib/dal/.sqlx/query-2dd7dbaeb2572404451e78a96f540e73a2778633bbf9d8e591ec912634639af9.json rename to core/lib/dal/.sqlx/query-a1829ef4532c8db6c1c907026e8643b7b722e0e467ad03978e9efe652c92a975.json index bb81e7c3194..605b6c1f025 100644 --- a/core/lib/dal/.sqlx/query-2dd7dbaeb2572404451e78a96f540e73a2778633bbf9d8e591ec912634639af9.json +++ b/core/lib/dal/.sqlx/query-a1829ef4532c8db6c1c907026e8643b7b722e0e467ad03978e9efe652c92a975.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n transactions\n WHERE\n miniblock_number = $1\n ORDER BY\n index_in_block\n ", + "query": "\n SELECT\n transactions.*\n FROM\n transactions\n INNER JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n WHERE\n miniblocks.number = $1\n ORDER BY\n index_in_block\n ", "describe": { "columns": [ { @@ -228,5 +228,5 @@ true ] }, - "hash": "2dd7dbaeb2572404451e78a96f540e73a2778633bbf9d8e591ec912634639af9" + "hash": "a1829ef4532c8db6c1c907026e8643b7b722e0e467ad03978e9efe652c92a975" } diff --git a/core/lib/dal/.sqlx/query-a8d55f14de2f489fdc12c8da540e0d51f86d30d5ad92496e4aa087a3c4aae09f.json b/core/lib/dal/.sqlx/query-a8d55f14de2f489fdc12c8da540e0d51f86d30d5ad92496e4aa087a3c4aae09f.json new file mode 100644 index 00000000000..b00999f2144 --- /dev/null +++ b/core/lib/dal/.sqlx/query-a8d55f14de2f489fdc12c8da540e0d51f86d30d5ad92496e4aa087a3c4aae09f.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n transactions (\n hash,\n is_priority,\n initiator_address,\n gas_limit,\n max_fee_per_gas,\n gas_per_pubdata_limit,\n data,\n priority_op_id,\n full_fee,\n layer_2_tip_fee,\n contract_address,\n l1_block_number,\n value,\n paymaster,\n paymaster_input,\n tx_format,\n l1_tx_mint,\n l1_tx_refund_recipient,\n miniblock_number,\n index_in_block,\n error,\n execution_info,\n refunded_gas,\n effective_gas_price,\n received_at,\n created_at,\n updated_at\n )\n SELECT\n data_table.hash,\n TRUE,\n data_table.initiator_address,\n data_table.gas_limit,\n data_table.max_fee_per_gas,\n data_table.gas_per_pubdata_limit,\n data_table.data,\n data_table.priority_op_id,\n data_table.full_fee,\n data_table.layer_2_tip_fee,\n data_table.contract_address,\n data_table.l1_block_number,\n data_table.value,\n '\\x0000000000000000000000000000000000000000'::bytea,\n '\\x'::bytea,\n data_table.tx_format,\n data_table.l1_tx_mint,\n data_table.l1_tx_refund_recipient,\n $21,\n data_table.index_in_block,\n NULLIF(data_table.error, ''),\n data_table.execution_info,\n data_table.refunded_gas,\n data_table.effective_gas_price,\n NOW(),\n NOW(),\n NOW()\n FROM\n (\n SELECT\n UNNEST($1::BYTEA[]) AS hash,\n UNNEST($2::BYTEA[]) AS initiator_address,\n UNNEST($3::NUMERIC[]) AS gas_limit,\n UNNEST($4::NUMERIC[]) AS max_fee_per_gas,\n UNNEST($5::NUMERIC[]) AS gas_per_pubdata_limit,\n UNNEST($6::JSONB[]) AS data,\n UNNEST($7::BIGINT[]) AS priority_op_id,\n UNNEST($8::NUMERIC[]) AS full_fee,\n UNNEST($9::NUMERIC[]) AS layer_2_tip_fee,\n UNNEST($10::BYTEA[]) AS contract_address,\n UNNEST($11::INT[]) AS l1_block_number,\n UNNEST($12::NUMERIC[]) AS value,\n UNNEST($13::INTEGER[]) AS tx_format,\n UNNEST($14::NUMERIC[]) AS l1_tx_mint,\n UNNEST($15::BYTEA[]) AS l1_tx_refund_recipient,\n UNNEST($16::INT[]) AS index_in_block,\n UNNEST($17::VARCHAR[]) AS error,\n UNNEST($18::JSONB[]) AS execution_info,\n UNNEST($19::BIGINT[]) AS refunded_gas,\n UNNEST($20::NUMERIC[]) AS effective_gas_price\n ) AS data_table\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray", + "ByteaArray", + "NumericArray", + "NumericArray", + "NumericArray", + "JsonbArray", + "Int8Array", + "NumericArray", + "NumericArray", + "ByteaArray", + "Int4Array", + "NumericArray", + "Int4Array", + "NumericArray", + "ByteaArray", + "Int4Array", + "VarcharArray", + "JsonbArray", + "Int8Array", + "NumericArray", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "a8d55f14de2f489fdc12c8da540e0d51f86d30d5ad92496e4aa087a3c4aae09f" +} diff --git a/core/lib/dal/.sqlx/query-ae8050de1fc30824e15346f9ac426783113663c125882bc185190a5d733363bb.json b/core/lib/dal/.sqlx/query-ae8050de1fc30824e15346f9ac426783113663c125882bc185190a5d733363bb.json new file mode 100644 index 00000000000..23e695483db --- /dev/null +++ b/core/lib/dal/.sqlx/query-ae8050de1fc30824e15346f9ac426783113663c125882bc185190a5d733363bb.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bytecode\n FROM\n factory_deps\n WHERE\n bytecode_hash = $1\n AND miniblock_number <= COALESCE(\n (\n SELECT\n MAX(number)\n FROM\n miniblocks\n ),\n (\n SELECT\n miniblock_number\n FROM\n snapshot_recovery\n )\n )\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bytecode", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [ + false + ] + }, + "hash": "ae8050de1fc30824e15346f9ac426783113663c125882bc185190a5d733363bb" +} diff --git a/core/lib/dal/.sqlx/query-4c9df15553b3add049d5756bfa96cd97cb00335b1ef8c613824ea7b55c076b16.json b/core/lib/dal/.sqlx/query-b98e3790de305017c8fa5fba4c0c783b3710ee47f88edce1b17c2b8fa21dadd3.json similarity index 63% rename from core/lib/dal/.sqlx/query-4c9df15553b3add049d5756bfa96cd97cb00335b1ef8c613824ea7b55c076b16.json rename to core/lib/dal/.sqlx/query-b98e3790de305017c8fa5fba4c0c783b3710ee47f88edce1b17c2b8fa21dadd3.json index 23b98142557..81981683e89 100644 --- a/core/lib/dal/.sqlx/query-4c9df15553b3add049d5756bfa96cd97cb00335b1ef8c613824ea7b55c076b16.json +++ b/core/lib/dal/.sqlx/query-b98e3790de305017c8fa5fba4c0c783b3710ee47f88edce1b17c2b8fa21dadd3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n protocol_version\n FROM\n transactions\n LEFT JOIN miniblocks ON transactions.miniblock_number = miniblocks.number\n WHERE\n transactions.hash = $1\n ", + "query": "\n SELECT\n protocol_version\n FROM\n transactions\n INNER JOIN miniblocks ON transactions.miniblock_number = miniblocks.number\n WHERE\n transactions.hash = $1\n ", "describe": { "columns": [ { @@ -18,5 +18,5 @@ true ] }, - "hash": "4c9df15553b3add049d5756bfa96cd97cb00335b1ef8c613824ea7b55c076b16" + "hash": "b98e3790de305017c8fa5fba4c0c783b3710ee47f88edce1b17c2b8fa21dadd3" } diff --git a/core/lib/dal/.sqlx/query-bf481c2b498420f80765b837059cab02b0656d863dbfce4b5dad7bc72e52b05d.json b/core/lib/dal/.sqlx/query-bf481c2b498420f80765b837059cab02b0656d863dbfce4b5dad7bc72e52b05d.json new file mode 100644 index 00000000000..c353a35ec5e --- /dev/null +++ b/core/lib/dal/.sqlx/query-bf481c2b498420f80765b837059cab02b0656d863dbfce4b5dad7bc72e52b05d.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n transactions (\n hash,\n is_priority,\n initiator_address,\n nonce,\n signature,\n gas_limit,\n max_fee_per_gas,\n max_priority_fee_per_gas,\n gas_per_pubdata_limit,\n input,\n data,\n tx_format,\n contract_address,\n value,\n paymaster,\n paymaster_input,\n execution_info,\n miniblock_number,\n index_in_block,\n error,\n effective_gas_price,\n refunded_gas,\n received_at,\n created_at,\n updated_at\n )\n SELECT\n data_table.hash,\n FALSE,\n data_table.initiator_address,\n data_table.nonce,\n data_table.signature,\n data_table.gas_limit,\n data_table.max_fee_per_gas,\n data_table.max_priority_fee_per_gas,\n data_table.gas_per_pubdata_limit,\n data_table.input,\n data_table.data,\n data_table.tx_format,\n data_table.contract_address,\n data_table.value,\n data_table.paymaster,\n data_table.paymaster_input,\n data_table.new_execution_info,\n $21,\n data_table.index_in_block,\n NULLIF(data_table.error, ''),\n data_table.effective_gas_price,\n data_table.refunded_gas,\n NOW(),\n NOW(),\n NOW()\n FROM\n (\n SELECT\n UNNEST($1::bytea[]) AS hash,\n UNNEST($2::bytea[]) AS initiator_address,\n UNNEST($3::INT[]) AS nonce,\n UNNEST($4::bytea[]) AS signature,\n UNNEST($5::NUMERIC[]) AS gas_limit,\n UNNEST($6::NUMERIC[]) AS max_fee_per_gas,\n UNNEST($7::NUMERIC[]) AS max_priority_fee_per_gas,\n UNNEST($8::NUMERIC[]) AS gas_per_pubdata_limit,\n UNNEST($9::bytea[]) AS input,\n UNNEST($10::jsonb[]) AS data,\n UNNEST($11::INT[]) AS tx_format,\n UNNEST($12::bytea[]) AS contract_address,\n UNNEST($13::NUMERIC[]) AS value,\n UNNEST($14::bytea[]) AS paymaster,\n UNNEST($15::bytea[]) AS paymaster_input,\n UNNEST($16::jsonb[]) AS new_execution_info,\n UNNEST($17::INTEGER[]) AS index_in_block,\n UNNEST($18::VARCHAR[]) AS error,\n UNNEST($19::NUMERIC[]) AS effective_gas_price,\n UNNEST($20::BIGINT[]) AS refunded_gas\n ) AS data_table\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray", + "ByteaArray", + "Int4Array", + "ByteaArray", + "NumericArray", + "NumericArray", + "NumericArray", + "NumericArray", + "ByteaArray", + "JsonbArray", + "Int4Array", + "ByteaArray", + "NumericArray", + "ByteaArray", + "ByteaArray", + "JsonbArray", + "Int4Array", + "VarcharArray", + "NumericArray", + "Int8Array", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "bf481c2b498420f80765b837059cab02b0656d863dbfce4b5dad7bc72e52b05d" +} diff --git a/core/lib/dal/.sqlx/query-2003dcf7bc807c7d345368538accd9b0128f82306e27e4c7258116082a54ab95.json b/core/lib/dal/.sqlx/query-bf6eac8dce87063b971724e85d20f288a7a8867c55e28467cddeea8d1b0397cf.json similarity index 60% rename from core/lib/dal/.sqlx/query-2003dcf7bc807c7d345368538accd9b0128f82306e27e4c7258116082a54ab95.json rename to core/lib/dal/.sqlx/query-bf6eac8dce87063b971724e85d20f288a7a8867c55e28467cddeea8d1b0397cf.json index 77177e405f1..6c42b6fa6d4 100644 --- a/core/lib/dal/.sqlx/query-2003dcf7bc807c7d345368538accd9b0128f82306e27e4c7258116082a54ab95.json +++ b/core/lib/dal/.sqlx/query-bf6eac8dce87063b971724e85d20f288a7a8867c55e28467cddeea8d1b0397cf.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n transactions.hash,\n transactions.received_at\n FROM\n transactions\n LEFT JOIN miniblocks ON miniblocks.number = miniblock_number\n WHERE\n received_at > $1\n ORDER BY\n received_at ASC\n LIMIT\n $2\n ", + "query": "\n SELECT\n transactions.hash,\n transactions.received_at\n FROM\n transactions\n WHERE\n received_at > $1\n ORDER BY\n received_at ASC\n LIMIT\n $2\n ", "describe": { "columns": [ { @@ -25,5 +25,5 @@ false ] }, - "hash": "2003dcf7bc807c7d345368538accd9b0128f82306e27e4c7258116082a54ab95" + "hash": "bf6eac8dce87063b971724e85d20f288a7a8867c55e28467cddeea8d1b0397cf" } diff --git a/core/lib/dal/.sqlx/query-c809f42a221b18a767e9dd0286503d8bd356f2f9cc249cd8b90caa5a8b5918e3.json b/core/lib/dal/.sqlx/query-c809f42a221b18a767e9dd0286503d8bd356f2f9cc249cd8b90caa5a8b5918e3.json deleted file mode 100644 index b85f4c542bf..00000000000 --- a/core/lib/dal/.sqlx/query-c809f42a221b18a767e9dd0286503d8bd356f2f9cc249cd8b90caa5a8b5918e3.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n COUNT(*) AS \"count!\"\n FROM\n (\n SELECT\n *\n FROM\n storage_logs\n WHERE\n storage_logs.hashed_key = $1\n ORDER BY\n storage_logs.miniblock_number DESC,\n storage_logs.operation_number DESC\n LIMIT\n 1\n ) sl\n WHERE\n sl.value != $2\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Bytea", - "Bytea" - ] - }, - "nullable": [ - null - ] - }, - "hash": "c809f42a221b18a767e9dd0286503d8bd356f2f9cc249cd8b90caa5a8b5918e3" -} diff --git a/core/lib/dal/.sqlx/query-61bc330d6d1b5fddec78342c1b0f00e82b0b3ad9ae36bf4fe44d7e85b74c6f49.json b/core/lib/dal/.sqlx/query-cf3a9acaeff6d30aff772db93bce1edd4dc242a6b32e61d779b7ec65f6e7b2b1.json similarity index 56% rename from core/lib/dal/.sqlx/query-61bc330d6d1b5fddec78342c1b0f00e82b0b3ad9ae36bf4fe44d7e85b74c6f49.json rename to core/lib/dal/.sqlx/query-cf3a9acaeff6d30aff772db93bce1edd4dc242a6b32e61d779b7ec65f6e7b2b1.json index 2c0454b0dd8..624a1366471 100644 --- a/core/lib/dal/.sqlx/query-61bc330d6d1b5fddec78342c1b0f00e82b0b3ad9ae36bf4fe44d7e85b74c6f49.json +++ b/core/lib/dal/.sqlx/query-cf3a9acaeff6d30aff772db93bce1edd4dc242a6b32e61d779b7ec65f6e7b2b1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n MAX(priority_op_id) AS \"op_id\"\n FROM\n transactions\n WHERE\n is_priority = TRUE\n AND miniblock_number IS NOT NULL\n ", + "query": "\n SELECT\n MAX(priority_op_id) AS \"op_id\"\n FROM\n transactions\n WHERE\n is_priority = TRUE\n AND transactions.miniblock_number <= (\n SELECT\n MAX(number)\n FROM\n miniblocks\n )\n ", "describe": { "columns": [ { @@ -16,5 +16,5 @@ null ] }, - "hash": "61bc330d6d1b5fddec78342c1b0f00e82b0b3ad9ae36bf4fe44d7e85b74c6f49" + "hash": "cf3a9acaeff6d30aff772db93bce1edd4dc242a6b32e61d779b7ec65f6e7b2b1" } diff --git a/core/lib/dal/.sqlx/query-cf3c7b918a3f82476543841d4dc5393ec02458104c483a2023b24881ae0c6716.json b/core/lib/dal/.sqlx/query-cf3c7b918a3f82476543841d4dc5393ec02458104c483a2023b24881ae0c6716.json new file mode 100644 index 00000000000..59bfa4858c0 --- /dev/null +++ b/core/lib/dal/.sqlx/query-cf3c7b918a3f82476543841d4dc5393ec02458104c483a2023b24881ae0c6716.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n COUNT(*) AS \"count!\"\n FROM\n (\n SELECT\n *\n FROM\n storage_logs\n WHERE\n hashed_key = $1\n AND miniblock_number <= COALESCE(\n (\n SELECT\n MAX(number)\n FROM\n miniblocks\n ),\n (\n SELECT\n miniblock_number\n FROM\n snapshot_recovery\n )\n )\n ORDER BY\n miniblock_number DESC,\n operation_number DESC\n LIMIT\n 1\n ) sl\n WHERE\n sl.value != $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Bytea" + ] + }, + "nullable": [ + null + ] + }, + "hash": "cf3c7b918a3f82476543841d4dc5393ec02458104c483a2023b24881ae0c6716" +} diff --git a/core/lib/dal/.sqlx/query-3596a70433e4e27fcda18f37073b51dea32e37cb8c51f1dd5d82c15eddc48e6b.json b/core/lib/dal/.sqlx/query-d0636ad46d8978f18292b3e66209bcc9e940c555a8629afa0960d99ca177f220.json similarity index 66% rename from core/lib/dal/.sqlx/query-3596a70433e4e27fcda18f37073b51dea32e37cb8c51f1dd5d82c15eddc48e6b.json rename to core/lib/dal/.sqlx/query-d0636ad46d8978f18292b3e66209bcc9e940c555a8629afa0960d99ca177f220.json index 7fc673c7c22..c9f08e92810 100644 --- a/core/lib/dal/.sqlx/query-3596a70433e4e27fcda18f37073b51dea32e37cb8c51f1dd5d82c15eddc48e6b.json +++ b/core/lib/dal/.sqlx/query-d0636ad46d8978f18292b3e66209bcc9e940c555a8629afa0960d99ca177f220.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n miniblocks.number,\n COALESCE(\n miniblocks.l1_batch_number,\n (\n SELECT\n (MAX(number) + 1)\n FROM\n l1_batches\n ),\n (\n SELECT\n MAX(l1_batch_number) + 1\n FROM\n snapshot_recovery\n )\n ) AS \"l1_batch_number!\",\n (\n SELECT\n MAX(m2.number)\n FROM\n miniblocks m2\n WHERE\n miniblocks.l1_batch_number = m2.l1_batch_number\n ) AS \"last_batch_miniblock?\",\n miniblocks.timestamp,\n miniblocks.l1_gas_price,\n miniblocks.l2_fair_gas_price,\n miniblocks.fair_pubdata_price,\n miniblocks.bootloader_code_hash,\n miniblocks.default_aa_code_hash,\n miniblocks.virtual_blocks,\n miniblocks.hash,\n miniblocks.protocol_version AS \"protocol_version!\",\n miniblocks.fee_account_address AS \"fee_account_address!\"\n FROM\n miniblocks\n WHERE\n miniblocks.number = $1\n ", + "query": "\n SELECT\n miniblocks.number,\n COALESCE(\n miniblocks.l1_batch_number,\n (\n SELECT\n (MAX(number) + 1)\n FROM\n l1_batches\n ),\n (\n SELECT\n MAX(l1_batch_number) + 1\n FROM\n snapshot_recovery\n )\n ) AS \"l1_batch_number!\",\n (miniblocks.l1_tx_count + miniblocks.l2_tx_count) AS \"tx_count!\",\n miniblocks.timestamp,\n miniblocks.l1_gas_price,\n miniblocks.l2_fair_gas_price,\n miniblocks.fair_pubdata_price,\n miniblocks.bootloader_code_hash,\n miniblocks.default_aa_code_hash,\n miniblocks.virtual_blocks,\n miniblocks.hash,\n miniblocks.protocol_version AS \"protocol_version!\",\n miniblocks.fee_account_address AS \"fee_account_address!\"\n FROM\n miniblocks\n WHERE\n miniblocks.number = $1\n ", "describe": { "columns": [ { @@ -15,8 +15,8 @@ }, { "ordinal": 2, - "name": "last_batch_miniblock?", - "type_info": "Int8" + "name": "tx_count!", + "type_info": "Int4" }, { "ordinal": 3, @@ -90,5 +90,5 @@ false ] }, - "hash": "3596a70433e4e27fcda18f37073b51dea32e37cb8c51f1dd5d82c15eddc48e6b" + "hash": "d0636ad46d8978f18292b3e66209bcc9e940c555a8629afa0960d99ca177f220" } diff --git a/core/lib/dal/.sqlx/query-f406091f793e2eb09d9490f2f8f7ac942fc88835d4bd925e4144cfbb7bc1cf2c.json b/core/lib/dal/.sqlx/query-faef06e2c44922a25eede5ff7f28f79ad3b5bab2de3c616fa38a9abd583b47a1.json similarity index 62% rename from core/lib/dal/.sqlx/query-f406091f793e2eb09d9490f2f8f7ac942fc88835d4bd925e4144cfbb7bc1cf2c.json rename to core/lib/dal/.sqlx/query-faef06e2c44922a25eede5ff7f28f79ad3b5bab2de3c616fa38a9abd583b47a1.json index eb09b438c68..49b37075487 100644 --- a/core/lib/dal/.sqlx/query-f406091f793e2eb09d9490f2f8f7ac942fc88835d4bd925e4144cfbb7bc1cf2c.json +++ b/core/lib/dal/.sqlx/query-faef06e2c44922a25eede5ff7f28f79ad3b5bab2de3c616fa38a9abd583b47a1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n transactions.hash AS tx_hash,\n transactions.index_in_block AS index_in_block,\n transactions.miniblock_number AS block_number,\n transactions.nonce AS nonce,\n transactions.signature AS signature,\n transactions.initiator_address AS initiator_address,\n transactions.tx_format AS tx_format,\n transactions.value AS value,\n transactions.gas_limit AS gas_limit,\n transactions.max_fee_per_gas AS max_fee_per_gas,\n transactions.max_priority_fee_per_gas AS max_priority_fee_per_gas,\n transactions.effective_gas_price AS effective_gas_price,\n transactions.l1_batch_number AS l1_batch_number,\n transactions.l1_batch_tx_index AS l1_batch_tx_index,\n transactions.data->'contractAddress' AS \"execute_contract_address\",\n transactions.data->'calldata' AS \"calldata\",\n miniblocks.hash AS \"block_hash\"\n FROM transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n WHERE\n transactions.hash = ANY($1) AND transactions.data != '{}'::jsonb", + "query": "\n SELECT\n transactions.hash AS tx_hash,\n transactions.index_in_block AS index_in_block,\n miniblocks.number AS block_number,\n transactions.nonce AS nonce,\n transactions.signature AS signature,\n transactions.initiator_address AS initiator_address,\n transactions.tx_format AS tx_format,\n transactions.value AS value,\n transactions.gas_limit AS gas_limit,\n transactions.max_fee_per_gas AS max_fee_per_gas,\n transactions.max_priority_fee_per_gas AS max_priority_fee_per_gas,\n transactions.effective_gas_price AS effective_gas_price,\n transactions.l1_batch_number AS l1_batch_number,\n transactions.l1_batch_tx_index AS l1_batch_tx_index,\n transactions.data->'contractAddress' AS \"execute_contract_address\",\n transactions.data->'calldata' AS \"calldata\",\n miniblocks.hash AS \"block_hash\"\n FROM transactions\n LEFT JOIN miniblocks ON miniblocks.number = transactions.miniblock_number\n WHERE\n transactions.hash = ANY($1) AND transactions.data != '{}'::jsonb", "describe": { "columns": [ { @@ -114,5 +114,5 @@ true ] }, - "hash": "f406091f793e2eb09d9490f2f8f7ac942fc88835d4bd925e4144cfbb7bc1cf2c" + "hash": "faef06e2c44922a25eede5ff7f28f79ad3b5bab2de3c616fa38a9abd583b47a1" } diff --git a/core/lib/dal/migrations/20240415161211_drop_miniblock_constraints.down.sql b/core/lib/dal/migrations/20240415161211_drop_miniblock_constraints.down.sql new file mode 100644 index 00000000000..6337be1598a --- /dev/null +++ b/core/lib/dal/migrations/20240415161211_drop_miniblock_constraints.down.sql @@ -0,0 +1,4 @@ +ALTER TABLE events + ADD FOREIGN KEY (miniblock_number) REFERENCES miniblocks; +ALTER TABLE l2_to_l1_logs + ADD FOREIGN KEY (miniblock_number) REFERENCES miniblocks; diff --git a/core/lib/dal/migrations/20240415161211_drop_miniblock_constraints.up.sql b/core/lib/dal/migrations/20240415161211_drop_miniblock_constraints.up.sql new file mode 100644 index 00000000000..310c7994552 --- /dev/null +++ b/core/lib/dal/migrations/20240415161211_drop_miniblock_constraints.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE events DROP CONSTRAINT IF EXISTS events_miniblock_number_fkey; +ALTER TABLE l2_to_l1_logs DROP CONSTRAINT IF EXISTS l2_to_l1_logs_miniblock_number_fkey; diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 4e39a7bcea3..b8da3e202ce 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -2060,29 +2060,6 @@ impl BlocksDal<'_, '_> { Ok(()) } - pub async fn get_l2_block_protocol_version_id( - &mut self, - l2_block_number: L2BlockNumber, - ) -> DalResult> { - Ok(sqlx::query!( - r#" - SELECT - protocol_version - FROM - miniblocks - WHERE - number = $1 - "#, - i64::from(l2_block_number.0) - ) - .try_map(|row| row.protocol_version.map(parse_protocol_version).transpose()) - .instrument("get_l2_block_protocol_version_id") - .with_arg("l2_block_number", &l2_block_number) - .fetch_optional(self.storage) - .await? - .flatten()) - } - pub async fn get_fee_address_for_l2_block( &mut self, number: L2BlockNumber, diff --git a/core/lib/dal/src/blocks_web3_dal.rs b/core/lib/dal/src/blocks_web3_dal.rs index 81a28879571..8fbfabf0fba 100644 --- a/core/lib/dal/src/blocks_web3_dal.rs +++ b/core/lib/dal/src/blocks_web3_dal.rs @@ -14,13 +14,14 @@ use zksync_utils::bigdecimal_to_u256; use crate::{ models::{ + parse_protocol_version, storage_block::{ ResolvedL1BatchForL2Block, StorageBlockDetails, StorageL1BatchDetails, LEGACY_BLOCK_GAS_LIMIT, }, storage_transaction::CallTrace, }, - Core, CoreDal, + Core, }; #[derive(Debug)] @@ -486,12 +487,27 @@ impl BlocksWeb3Dal<'_, '_> { &mut self, block_number: L2BlockNumber, ) -> DalResult> { - let protocol_version = self - .storage - .blocks_dal() - .get_l2_block_protocol_version_id(block_number) - .await? - .unwrap_or_else(ProtocolVersionId::last_potentially_undefined); + let protocol_version = sqlx::query!( + r#" + SELECT + protocol_version + FROM + miniblocks + WHERE + number = $1 + "#, + i64::from(block_number.0) + ) + .try_map(|row| row.protocol_version.map(parse_protocol_version).transpose()) + .instrument("get_traces_for_l2_block#get_l2_block_protocol_version_id") + .with_arg("l2_block_number", &block_number) + .fetch_optional(self.storage) + .await?; + let Some(protocol_version) = protocol_version else { + return Ok(Vec::new()); + }; + let protocol_version = + protocol_version.unwrap_or_else(ProtocolVersionId::last_potentially_undefined); Ok(sqlx::query_as!( CallTrace, @@ -694,7 +710,7 @@ mod tests { create_l2_block_header, create_snapshot_recovery, mock_execution_result, mock_l2_transaction, }, - ConnectionPool, Core, + ConnectionPool, Core, CoreDal, }; #[tokio::test] @@ -912,7 +928,13 @@ mod tests { tx_results.push(tx_result); } conn.transactions_dal() - .mark_txs_as_executed_in_l2_block(L2BlockNumber(1), &tx_results, 1.into()) + .mark_txs_as_executed_in_l2_block( + L2BlockNumber(1), + &tx_results, + 1.into(), + ProtocolVersionId::latest(), + false, + ) .await .unwrap(); diff --git a/core/lib/dal/src/events_web3_dal.rs b/core/lib/dal/src/events_web3_dal.rs index de973bacb79..1a182f6052d 100644 --- a/core/lib/dal/src/events_web3_dal.rs +++ b/core/lib/dal/src/events_web3_dal.rs @@ -81,7 +81,7 @@ impl EventsWeb3Dal<'_, '_> { ) SELECT miniblocks.hash as "block_hash", miniblocks.l1_batch_number as "l1_batch_number", events_select.* FROM events_select - LEFT JOIN miniblocks ON events_select.miniblock_number = miniblocks.number + INNER JOIN miniblocks ON events_select.miniblock_number = miniblocks.number ORDER BY miniblock_number ASC, event_index_in_block ASC "#, where_sql, arg_index diff --git a/core/lib/dal/src/factory_deps_dal.rs b/core/lib/dal/src/factory_deps_dal.rs index d846a233a3b..02ce32306cf 100644 --- a/core/lib/dal/src/factory_deps_dal.rs +++ b/core/lib/dal/src/factory_deps_dal.rs @@ -56,7 +56,8 @@ impl FactoryDepsDal<'_, '_> { } /// Returns bytecode for a factory dependency with the specified bytecode `hash`. - pub async fn get_factory_dep(&mut self, hash: H256) -> DalResult>> { + /// Returns bytecodes only from sealed miniblocks. + pub async fn get_sealed_factory_dep(&mut self, hash: H256) -> DalResult>> { Ok(sqlx::query!( r#" SELECT @@ -65,10 +66,24 @@ impl FactoryDepsDal<'_, '_> { factory_deps WHERE bytecode_hash = $1 + AND miniblock_number <= COALESCE( + ( + SELECT + MAX(number) + FROM + miniblocks + ), + ( + SELECT + miniblock_number + FROM + snapshot_recovery + ) + ) "#, hash.as_bytes(), ) - .instrument("get_factory_dep") + .instrument("get_sealed_factory_dep") .with_arg("hash", &hash) .fetch_optional(self.storage) .await? @@ -81,7 +96,7 @@ impl FactoryDepsDal<'_, '_> { default_aa_hash: H256, ) -> anyhow::Result { let bootloader_bytecode = self - .get_factory_dep(bootloader_hash) + .get_sealed_factory_dep(bootloader_hash) .await .context("failed loading bootloader code")? .with_context(|| format!("bootloader code with hash {bootloader_hash:?} should be present in the database"))?; @@ -91,7 +106,7 @@ impl FactoryDepsDal<'_, '_> { }; let default_aa_bytecode = self - .get_factory_dep(default_aa_hash) + .get_sealed_factory_dep(default_aa_hash) .await .context("failed loading default account code")? .with_context(|| format!("default account code with hash {default_aa_hash:?} should be present in the database"))?; diff --git a/core/lib/dal/src/models/storage_sync.rs b/core/lib/dal/src/models/storage_sync.rs index 4bd00f478dc..688a6f99790 100644 --- a/core/lib/dal/src/models/storage_sync.rs +++ b/core/lib/dal/src/models/storage_sync.rs @@ -13,7 +13,7 @@ use crate::{ pub(crate) struct StorageSyncBlock { pub number: i64, pub l1_batch_number: i64, - pub last_batch_miniblock: Option, + pub tx_count: i32, pub timestamp: i64, // L1 gas price assumed in the corresponding batch pub l1_gas_price: i64, @@ -55,7 +55,7 @@ impl TryFrom for SyncBlock { .try_into() .decode_column("l1_batch_number")?, ), - last_in_batch: block.last_batch_miniblock == Some(block.number), + last_in_batch: block.tx_count == 0, timestamp: block.timestamp.try_into().decode_column("timestamp")?, l1_gas_price: block .l1_gas_price diff --git a/core/lib/dal/src/pruning_dal/tests.rs b/core/lib/dal/src/pruning_dal/tests.rs index a328870d60b..ab976f52d21 100644 --- a/core/lib/dal/src/pruning_dal/tests.rs +++ b/core/lib/dal/src/pruning_dal/tests.rs @@ -486,6 +486,8 @@ async fn transactions_are_handled_correctly_after_pruning() { L2BlockNumber(1), &[mock_execution_result(tx.clone())], 1.into(), + ProtocolVersionId::latest(), + false, ) .await .unwrap(); diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 9a483ce2b44..7546812ae6c 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -200,10 +200,24 @@ impl StorageLogsDal<'_, '_> { FROM storage_logs WHERE - storage_logs.hashed_key = $1 + hashed_key = $1 + AND miniblock_number <= COALESCE( + ( + SELECT + MAX(number) + FROM + miniblocks + ), + ( + SELECT + miniblock_number + FROM + snapshot_recovery + ) + ) ORDER BY - storage_logs.miniblock_number DESC, - storage_logs.operation_number DESC + miniblock_number DESC, + operation_number DESC LIMIT 1 ) sl @@ -247,6 +261,20 @@ impl StorageLogsDal<'_, '_> { WHERE hashed_key = ANY ($1) AND miniblock_number <= $2 + AND miniblock_number <= COALESCE( + ( + SELECT + MAX(number) + FROM + miniblocks + ), + ( + SELECT + miniblock_number + FROM + snapshot_recovery + ) + ) ORDER BY hashed_key, miniblock_number DESC, @@ -1049,15 +1077,13 @@ mod tests { let pool = ConnectionPool::::test_pool().await; let mut conn = pool.connection().await.unwrap(); - // If deployment fails then two writes are issued, one that writes `bytecode_hash` to the "correct" value, - // and the next write reverts its value back to `FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH`. - conn.storage_logs_dal() - .insert_storage_logs( - L2BlockNumber(1), - &[(H256::zero(), vec![successful_deployment, failed_deployment])], - ) + conn.protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) .await .unwrap(); + // If deployment fails then two writes are issued, one that writes `bytecode_hash` to the "correct" value, + // and the next write reverts its value back to `FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH`. + insert_l2_block(&mut conn, 1, vec![successful_deployment, failed_deployment]).await; let tested_l2_blocks = [ None, @@ -1080,13 +1106,7 @@ mod tests { ); } - conn.storage_logs_dal() - .insert_storage_logs( - L2BlockNumber(2), - &[(H256::zero(), vec![successful_deployment])], - ) - .await - .unwrap(); + insert_l2_block(&mut conn, 2, vec![successful_deployment]).await; for old_l2_block in [L2BlockNumber(0), L2BlockNumber(1)] { let deployed_map = conn @@ -1121,13 +1141,7 @@ mod tests { get_code_key(&other_contract_address), H256::repeat_byte(0xff), ); - conn.storage_logs_dal() - .insert_storage_logs( - L2BlockNumber(3), - &[(H256::zero(), vec![other_successful_deployment])], - ) - .await - .unwrap(); + insert_l2_block(&mut conn, 3, vec![other_successful_deployment]).await; for old_l2_block in [L2BlockNumber(0), L2BlockNumber(1)] { let deployed_map = conn diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index 9b6e332c99c..843752360ef 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -1,6 +1,10 @@ use std::collections::HashMap; -use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; +use zksync_db_connection::{ + connection::Connection, + error::DalResult, + instrument::{InstrumentExt, Instrumented}, +}; use zksync_types::{ get_code_key, get_nonce_key, utils::{decompose_full_nonce, storage_key_for_standard_token_balance}, @@ -67,19 +71,43 @@ impl StorageWeb3Dal<'_, '_> { Ok(h256_to_u256(balance)) } - /// Gets the current value for the specified `key`. + /// Gets the current value for the specified `key`. Uses state of the latest sealed L2 block. + /// Returns error if there is no sealed L2 blocks. pub async fn get_value(&mut self, key: &StorageKey) -> DalResult { - self.get_historical_value_unchecked(key, L2BlockNumber(u32::MAX)) + let Some(l2_block_number) = self + .storage + .blocks_dal() + .get_sealed_l2_block_number() + .await? + else { + let err = Instrumented::new("get_value") + .with_arg("key", &key) + .constraint_error(anyhow::anyhow!("no sealed l2 blocks")); + return Err(err); + }; + self.get_historical_value_unchecked(key, l2_block_number) .await } /// Gets the current values for the specified `hashed_keys`. The returned map has requested hashed keys as keys - /// and current storage values as values. + /// and current storage values as values. Uses state of the latest sealed L2 block. + /// Returns error if there is no sealed L2 blocks. pub async fn get_values(&mut self, hashed_keys: &[H256]) -> DalResult> { + let Some(l2_block_number) = self + .storage + .blocks_dal() + .get_sealed_l2_block_number() + .await? + else { + let err = Instrumented::new("get_values") + .with_arg("hashed_keys", &hashed_keys) + .constraint_error(anyhow::anyhow!("no sealed l2 blocks")); + return Err(err); + }; let storage_map = self .storage .storage_logs_dal() - .get_storage_values(hashed_keys, L2BlockNumber(u32::MAX)) + .get_storage_values(hashed_keys, l2_block_number) .await?; Ok(storage_map .into_iter() diff --git a/core/lib/dal/src/sync_dal.rs b/core/lib/dal/src/sync_dal.rs index ccee0c53d08..1296cb6e24a 100644 --- a/core/lib/dal/src/sync_dal.rs +++ b/core/lib/dal/src/sync_dal.rs @@ -39,14 +39,7 @@ impl SyncDal<'_, '_> { snapshot_recovery ) ) AS "l1_batch_number!", - ( - SELECT - MAX(m2.number) - FROM - miniblocks m2 - WHERE - miniblocks.l1_batch_number = m2.l1_batch_number - ) AS "last_batch_miniblock?", + (miniblocks.l1_tx_count + miniblocks.l2_tx_count) AS "tx_count!", miniblocks.timestamp, miniblocks.l1_gas_price, miniblocks.l2_fair_gas_price, @@ -152,6 +145,7 @@ mod tests { // Insert another block in the store. let miniblock_header = L2BlockHeader { fee_account_address: Address::repeat_byte(0x42), + l2_tx_count: 1, ..create_l2_block_header(1) }; let tx = mock_l2_transaction(); @@ -168,6 +162,8 @@ mod tests { L2BlockNumber(1), &[mock_execution_result(tx.clone())], 1.into(), + ProtocolVersionId::latest(), + false, ) .await .unwrap(); @@ -210,6 +206,16 @@ mod tests { let transactions = block.transactions.unwrap(); assert_eq!(transactions, [Transaction::from(tx)]); + let miniblock_header = L2BlockHeader { + fee_account_address: Address::repeat_byte(0x42), + l2_tx_count: 0, + ..create_l2_block_header(2) + }; + conn.blocks_dal() + .insert_l2_block(&miniblock_header) + .await + .unwrap(); + l1_batch_header.number = L1BatchNumber(1); l1_batch_header.timestamp = 1; conn.blocks_dal() @@ -223,7 +229,7 @@ mod tests { let block = conn .sync_dal() - .sync_block(L2BlockNumber(1), true) + .sync_block(L2BlockNumber(2), true) .await .unwrap() .expect("no sync block"); diff --git a/core/lib/dal/src/tests/mod.rs b/core/lib/dal/src/tests/mod.rs index 6cc415f000f..246578f4584 100644 --- a/core/lib/dal/src/tests/mod.rs +++ b/core/lib/dal/src/tests/mod.rs @@ -300,6 +300,8 @@ async fn remove_stuck_txs() { L2BlockNumber(1), &[mock_execution_result(executed_tx.clone())], U256::from(1), + ProtocolVersionId::latest(), + false, ) .await .unwrap(); diff --git a/core/lib/dal/src/tokens_dal.rs b/core/lib/dal/src/tokens_dal.rs index 9a892c20bb5..cf0b89c950b 100644 --- a/core/lib/dal/src/tokens_dal.rs +++ b/core/lib/dal/src/tokens_dal.rs @@ -91,11 +91,20 @@ impl TokensDal<'_, '_> { let token_deployment_data = self .storage .storage_logs_dal() - .filter_deployed_contracts(all_token_addresses.into_iter(), None) + .filter_deployed_contracts(all_token_addresses.iter().copied(), None) .await?; - let token_addresses_to_be_removed: Vec<_> = token_deployment_data + let token_addresses_to_be_removed: Vec<_> = all_token_addresses .into_iter() - .filter_map(|(address, deployed_at)| (deployed_at > block_number).then_some(address.0)) + .filter_map(|address| { + if address.is_zero() { + None + } else if let Some(deployed_at) = token_deployment_data.get(&address) { + (deployed_at > &block_number).then_some(address.0) + } else { + // Token belongs to a "pending" L2 block that's not yet fully inserted to the database. + Some(address.0) + } + }) .collect(); sqlx::query!( r#" @@ -123,10 +132,10 @@ mod tests { use std::{collections::HashSet, slice}; use zksync_system_constants::FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH; - use zksync_types::{get_code_key, tokens::TokenMetadata, StorageLog, H256}; + use zksync_types::{get_code_key, tokens::TokenMetadata, ProtocolVersion, StorageLog, H256}; use super::*; - use crate::{ConnectionPool, Core, CoreDal}; + use crate::{tests::create_l2_block_header, ConnectionPool, Core, CoreDal}; fn test_token_info() -> TokenInfo { TokenInfo { @@ -152,13 +161,43 @@ mod tests { } } + async fn insert_l2_block(conn: &mut Connection<'_, Core>, number: u32, logs: Vec) { + conn.blocks_dal() + .insert_l2_block(&create_l2_block_header(number)) + .await + .unwrap(); + + let logs = [(H256::zero(), logs)]; + conn.storage_logs_dal() + .insert_storage_logs(L2BlockNumber(number), &logs) + .await + .unwrap(); + } + #[tokio::test] async fn adding_and_getting_tokens() { let pool = ConnectionPool::::test_pool().await; let mut storage = pool.connection().await.unwrap(); + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + let tokens = [test_token_info(), eth_token_info()]; storage.tokens_dal().add_tokens(&tokens).await.unwrap(); + let storage_logs: Vec<_> = tokens + .iter() + .map(|token_info| { + StorageLog::new_write_log( + get_code_key(&token_info.l2_address), + H256::repeat_byte(0xff), + ) + }) + .collect(); + insert_l2_block(&mut storage, 1, storage_logs).await; + let token_addresses = storage .tokens_dal() .get_all_l2_token_addresses() @@ -203,40 +242,31 @@ mod tests { async fn rolling_back_tokens() { let pool = ConnectionPool::::test_pool().await; let mut storage = pool.connection().await.unwrap(); + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); let eth_info = eth_token_info(); let eth_deployment_log = StorageLog::new_write_log(get_code_key(ð_info.l2_address), H256::repeat_byte(1)); - storage - .storage_logs_dal() - .insert_storage_logs( - L2BlockNumber(0), - &[(H256::zero(), vec![eth_deployment_log])], - ) - .await - .unwrap(); storage .tokens_dal() .add_tokens(slice::from_ref(ð_info)) .await .unwrap(); + insert_l2_block(&mut storage, 0, vec![eth_deployment_log]).await; let test_info = test_token_info(); let test_deployment_log = StorageLog::new_write_log(get_code_key(&test_info.l2_address), H256::repeat_byte(2)); - storage - .storage_logs_dal() - .insert_storage_logs( - L2BlockNumber(2), - &[(H256::zero(), vec![test_deployment_log])], - ) - .await - .unwrap(); storage .tokens_dal() .add_tokens(slice::from_ref(&test_info)) .await .unwrap(); + insert_l2_block(&mut storage, 2, vec![test_deployment_log]).await; test_getting_all_tokens(&mut storage).await; @@ -283,10 +313,10 @@ mod tests { assert!(all_tokens.contains(&test_token_info())); } - for at_miniblock in [L2BlockNumber(0), L2BlockNumber(1)] { + for at_l2_block in [L2BlockNumber(0), L2BlockNumber(1)] { let all_tokens = storage .tokens_web3_dal() - .get_all_tokens(Some(at_miniblock)) + .get_all_tokens(Some(at_l2_block)) .await .unwrap(); assert_eq!(all_tokens, [eth_token_info()]); diff --git a/core/lib/dal/src/tokens_web3_dal.rs b/core/lib/dal/src/tokens_web3_dal.rs index 092847cc4f2..00c7a69385d 100644 --- a/core/lib/dal/src/tokens_web3_dal.rs +++ b/core/lib/dal/src/tokens_web3_dal.rs @@ -58,7 +58,30 @@ impl TokensWeb3Dal<'_, '_> { .fetch_all(self.storage) .await?; - Ok(records.into_iter().map(Into::into).collect()) + let l2_token_addresses = records + .iter() + .map(|storage_token_info| Address::from_slice(&storage_token_info.l2_address)); + let token_deployment_data = self + .storage + .storage_logs_dal() + .filter_deployed_contracts(l2_token_addresses, None) + .await?; + + let tokens = records + .into_iter() + .filter_map(|storage_token_info| { + let l2_token_address = Address::from_slice(&storage_token_info.l2_address); + if !l2_token_address.is_zero() + && !token_deployment_data.contains_key(&l2_token_address) + { + return None; + } + + Some(TokenInfo::from(storage_token_info)) + }) + .collect(); + + Ok(tokens) } /// Returns information about all tokens. @@ -88,18 +111,17 @@ impl TokensWeb3Dal<'_, '_> { .await?; let mut all_tokens: Vec<_> = records.into_iter().map(TokenInfo::from).collect(); - let Some(at_l2_block) = at_l2_block else { - return Ok(all_tokens); // No additional filtering is required - }; let token_addresses = all_tokens.iter().map(|token| token.l2_address); let filtered_addresses = self .storage .storage_logs_dal() - .filter_deployed_contracts(token_addresses, Some(at_l2_block)) + .filter_deployed_contracts(token_addresses, at_l2_block) .await?; - all_tokens.retain(|token| filtered_addresses.contains_key(&token.l2_address)); + all_tokens.retain(|token| { + token.l2_address.is_zero() || filtered_addresses.contains_key(&token.l2_address) + }); Ok(all_tokens) } } diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index 23c162d24c5..5eadc2b9d16 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -510,13 +510,12 @@ impl TransactionsDal<'_, '_> { l2_block_number: L2BlockNumber, transactions: &[TransactionExecutionResult], block_base_fee_per_gas: U256, + protocol_version: ProtocolVersionId, + // On the main node, transactions are inserted into the DB by API servers. + // However on the EN, they need to be inserted after they are executed by the state keeper. + insert_txs: bool, ) -> DalResult<()> { let mut transaction = self.storage.start_transaction().await?; - let protocol_version = transaction - .blocks_dal() - .get_l2_block_protocol_version_id(l2_block_number) - .await? - .unwrap_or_else(ProtocolVersionId::last_potentially_undefined); let mut call_traces_tx_hashes = Vec::with_capacity(transactions.len()); let mut bytea_call_traces = Vec::with_capacity(transactions.len()); @@ -528,19 +527,58 @@ impl TransactionsDal<'_, '_> { } } - transaction - .transactions_dal() - .handle_executed_l2_transactions(l2_block_number, block_base_fee_per_gas, transactions) - .await?; - transaction - .transactions_dal() - .handle_executed_l1_transactions(l2_block_number, transactions) - .await?; - transaction - .transactions_dal() - .handle_executed_upgrade_transactions(l2_block_number, transactions) + if insert_txs { + // There can be transactions in the DB in case of block rollback or if the DB was restored from a dump. + let tx_hashes: Vec<_> = transactions.iter().map(|tx| tx.hash.as_bytes()).collect(); + sqlx::query!( + r#" + DELETE FROM transactions + WHERE + hash = ANY ($1) + "#, + &tx_hashes as &[&[u8]], + ) + .instrument("mark_txs_as_executed_in_l2_block#remove_old_txs") + .execute(&mut transaction) .await?; + // Different transaction types have different sets of fields to insert so we handle them separately. + transaction + .transactions_dal() + .insert_executed_l2_transactions( + l2_block_number, + block_base_fee_per_gas, + transactions, + ) + .await?; + transaction + .transactions_dal() + .insert_executed_l1_transactions(l2_block_number, transactions) + .await?; + transaction + .transactions_dal() + .insert_executed_upgrade_transactions(l2_block_number, transactions) + .await?; + } else { + // Different transaction types have different sets of fields to update so we handle them separately. + transaction + .transactions_dal() + .update_executed_l2_transactions( + l2_block_number, + block_base_fee_per_gas, + transactions, + ) + .await?; + transaction + .transactions_dal() + .update_executed_l1_transactions(l2_block_number, transactions) + .await?; + transaction + .transactions_dal() + .update_executed_upgrade_transactions(l2_block_number, transactions) + .await?; + } + if !bytea_call_traces.is_empty() { sqlx::query!( r#" @@ -575,7 +613,211 @@ impl TransactionsDal<'_, '_> { } } - async fn handle_executed_l2_transactions( + async fn insert_executed_l2_transactions( + &mut self, + l2_block_number: L2BlockNumber, + block_base_fee_per_gas: U256, + transactions: &[TransactionExecutionResult], + ) -> DalResult<()> { + let l2_txs_len = transactions + .iter() + .filter(|tx_res| { + matches!( + tx_res.transaction.common_data, + ExecuteTransactionCommon::L2(_) + ) + }) + .count(); + if l2_txs_len == 0 { + return Ok(()); + } + + let instrumentation = Instrumented::new("mark_txs_as_executed_in_l2_block#insert_l2_txs") + .with_arg("l2_block_number", &l2_block_number) + .with_arg("l2_txs.len", &l2_txs_len); + + let mut l2_hashes = Vec::with_capacity(l2_txs_len); + let mut l2_initiators = Vec::with_capacity(l2_txs_len); + let mut l2_nonces = Vec::with_capacity(l2_txs_len); + let mut l2_signatures = Vec::with_capacity(l2_txs_len); + let mut l2_gas_limits = Vec::with_capacity(l2_txs_len); + let mut l2_max_fees_per_gas = Vec::with_capacity(l2_txs_len); + let mut l2_max_priority_fees_per_gas = Vec::with_capacity(l2_txs_len); + let mut l2_gas_per_pubdata_limit = Vec::with_capacity(l2_txs_len); + let mut l2_inputs = Vec::with_capacity(l2_txs_len); + let mut l2_datas = Vec::with_capacity(l2_txs_len); + let mut l2_tx_formats = Vec::with_capacity(l2_txs_len); + let mut l2_contract_addresses = Vec::with_capacity(l2_txs_len); + let mut l2_values = Vec::with_capacity(l2_txs_len); + let mut l2_paymaster = Vec::with_capacity(l2_txs_len); + let mut l2_paymaster_input = Vec::with_capacity(l2_txs_len); + let mut l2_execution_infos = Vec::with_capacity(l2_txs_len); + let mut l2_indices_in_block = Vec::with_capacity(l2_txs_len); + let mut l2_errors = Vec::with_capacity(l2_txs_len); + let mut l2_effective_gas_prices = Vec::with_capacity(l2_txs_len); + let mut l2_refunded_gas = Vec::with_capacity(l2_txs_len); + + for (index_in_block, tx_res) in transactions.iter().enumerate() { + let transaction = &tx_res.transaction; + let ExecuteTransactionCommon::L2(common_data) = &transaction.common_data else { + continue; + }; + + let data = serde_json::to_value(&transaction.execute).map_err(|err| { + instrumentation.arg_error( + &format!("transactions[{index_in_block}].transaction.execute"), + err, + ) + })?; + let l2_execution_info = serde_json::to_value(tx_res.execution_info).map_err(|err| { + instrumentation.arg_error( + &format!("transactions[{index_in_block}].execution_info"), + err, + ) + })?; + let refunded_gas = i64::try_from(tx_res.refunded_gas).map_err(|err| { + instrumentation + .arg_error(&format!("transactions[{index_in_block}].refunded_gas"), err) + })?; + + l2_values.push(u256_to_big_decimal(transaction.execute.value)); + l2_contract_addresses.push(transaction.execute.contract_address.as_bytes()); + l2_paymaster_input.push(&common_data.paymaster_params.paymaster_input[..]); + l2_paymaster.push(common_data.paymaster_params.paymaster.as_bytes()); + l2_hashes.push(tx_res.hash.as_bytes()); + l2_indices_in_block.push(index_in_block as i32); + l2_initiators.push(transaction.initiator_account().0); + l2_nonces.push(common_data.nonce.0 as i32); + l2_signatures.push(&common_data.signature[..]); + l2_tx_formats.push(common_data.transaction_type as i32); + l2_errors.push(Self::map_transaction_error(tx_res)); + let l2_effective_gas_price = common_data + .fee + .get_effective_gas_price(block_base_fee_per_gas); + l2_effective_gas_prices.push(u256_to_big_decimal(l2_effective_gas_price)); + l2_execution_infos.push(l2_execution_info); + // Normally input data is mandatory + l2_inputs.push(common_data.input_data().unwrap_or_default()); + l2_datas.push(data); + l2_gas_limits.push(u256_to_big_decimal(common_data.fee.gas_limit)); + l2_max_fees_per_gas.push(u256_to_big_decimal(common_data.fee.max_fee_per_gas)); + l2_max_priority_fees_per_gas.push(u256_to_big_decimal( + common_data.fee.max_priority_fee_per_gas, + )); + l2_gas_per_pubdata_limit + .push(u256_to_big_decimal(common_data.fee.gas_per_pubdata_limit)); + l2_refunded_gas.push(refunded_gas); + } + + let query = sqlx::query!( + r#" + INSERT INTO + transactions ( + hash, + is_priority, + initiator_address, + nonce, + signature, + gas_limit, + max_fee_per_gas, + max_priority_fee_per_gas, + gas_per_pubdata_limit, + input, + data, + tx_format, + contract_address, + value, + paymaster, + paymaster_input, + execution_info, + miniblock_number, + index_in_block, + error, + effective_gas_price, + refunded_gas, + received_at, + created_at, + updated_at + ) + SELECT + data_table.hash, + FALSE, + data_table.initiator_address, + data_table.nonce, + data_table.signature, + data_table.gas_limit, + data_table.max_fee_per_gas, + data_table.max_priority_fee_per_gas, + data_table.gas_per_pubdata_limit, + data_table.input, + data_table.data, + data_table.tx_format, + data_table.contract_address, + data_table.value, + data_table.paymaster, + data_table.paymaster_input, + data_table.new_execution_info, + $21, + data_table.index_in_block, + NULLIF(data_table.error, ''), + data_table.effective_gas_price, + data_table.refunded_gas, + NOW(), + NOW(), + NOW() + FROM + ( + SELECT + UNNEST($1::bytea[]) AS hash, + UNNEST($2::bytea[]) AS initiator_address, + UNNEST($3::INT[]) AS nonce, + UNNEST($4::bytea[]) AS signature, + UNNEST($5::NUMERIC[]) AS gas_limit, + UNNEST($6::NUMERIC[]) AS max_fee_per_gas, + UNNEST($7::NUMERIC[]) AS max_priority_fee_per_gas, + UNNEST($8::NUMERIC[]) AS gas_per_pubdata_limit, + UNNEST($9::bytea[]) AS input, + UNNEST($10::jsonb[]) AS data, + UNNEST($11::INT[]) AS tx_format, + UNNEST($12::bytea[]) AS contract_address, + UNNEST($13::NUMERIC[]) AS value, + UNNEST($14::bytea[]) AS paymaster, + UNNEST($15::bytea[]) AS paymaster_input, + UNNEST($16::jsonb[]) AS new_execution_info, + UNNEST($17::INTEGER[]) AS index_in_block, + UNNEST($18::VARCHAR[]) AS error, + UNNEST($19::NUMERIC[]) AS effective_gas_price, + UNNEST($20::BIGINT[]) AS refunded_gas + ) AS data_table + "#, + &l2_hashes as &[&[u8]], + &l2_initiators as &[[u8; 20]], + &l2_nonces, + &l2_signatures as &[&[u8]], + &l2_gas_limits, + &l2_max_fees_per_gas, + &l2_max_priority_fees_per_gas, + &l2_gas_per_pubdata_limit, + &l2_inputs as &[&[u8]], + &l2_datas, + &l2_tx_formats, + &l2_contract_addresses as &[&[u8]], + &l2_values, + &l2_paymaster as &[&[u8]], + &l2_paymaster_input as &[&[u8]], + &l2_execution_infos, + &l2_indices_in_block, + &l2_errors as &[&str], + &l2_effective_gas_prices, + &l2_refunded_gas, + l2_block_number.0 as i32, + ); + + instrumentation.with(query).execute(self.storage).await?; + Ok(()) + } + + async fn update_executed_l2_transactions( &mut self, l2_block_number: L2BlockNumber, block_base_fee_per_gas: U256, @@ -764,7 +1006,205 @@ impl TransactionsDal<'_, '_> { Ok(()) } - async fn handle_executed_l1_transactions( + async fn insert_executed_l1_transactions( + &mut self, + l2_block_number: L2BlockNumber, + transactions: &[TransactionExecutionResult], + ) -> DalResult<()> { + let l1_txs_len = transactions + .iter() + .filter(|tx_res| { + matches!( + tx_res.transaction.common_data, + ExecuteTransactionCommon::L1(_) + ) + }) + .count(); + if l1_txs_len == 0 { + return Ok(()); + } + + let instrumentation = Instrumented::new("mark_txs_as_executed_in_l2_block#insert_l1_txs") + .with_arg("l2_block_number", &l2_block_number) + .with_arg("l1_txs.len", &l1_txs_len); + + let mut l1_hashes = Vec::with_capacity(l1_txs_len); + let mut l1_initiator_address = Vec::with_capacity(l1_txs_len); + let mut l1_gas_limit = Vec::with_capacity(l1_txs_len); + let mut l1_max_fee_per_gas = Vec::with_capacity(l1_txs_len); + let mut l1_gas_per_pubdata_limit = Vec::with_capacity(l1_txs_len); + let mut l1_data = Vec::with_capacity(l1_txs_len); + let mut l1_priority_op_id = Vec::with_capacity(l1_txs_len); + let mut l1_full_fee = Vec::with_capacity(l1_txs_len); + let mut l1_layer_2_tip_fee = Vec::with_capacity(l1_txs_len); + let mut l1_contract_address = Vec::with_capacity(l1_txs_len); + let mut l1_l1_block_number = Vec::with_capacity(l1_txs_len); + let mut l1_value = Vec::with_capacity(l1_txs_len); + let mut l1_tx_format = Vec::with_capacity(l1_txs_len); + let mut l1_tx_mint = Vec::with_capacity(l1_txs_len); + let mut l1_tx_refund_recipient = Vec::with_capacity(l1_txs_len); + let mut l1_indices_in_block = Vec::with_capacity(l1_txs_len); + let mut l1_errors = Vec::with_capacity(l1_txs_len); + let mut l1_execution_infos = Vec::with_capacity(l1_txs_len); + let mut l1_refunded_gas = Vec::with_capacity(l1_txs_len); + let mut l1_effective_gas_prices = Vec::with_capacity(l1_txs_len); + + for (index_in_block, tx_res) in transactions.iter().enumerate() { + let transaction = &tx_res.transaction; + let ExecuteTransactionCommon::L1(common_data) = &transaction.common_data else { + continue; + }; + + let l1_execution_info = serde_json::to_value(tx_res.execution_info).map_err(|err| { + instrumentation.arg_error( + &format!("transactions[{index_in_block}].execution_info"), + err, + ) + })?; + let refunded_gas = i64::try_from(tx_res.refunded_gas).map_err(|err| { + instrumentation + .arg_error(&format!("transactions[{index_in_block}].refunded_gas"), err) + })?; + + let tx = &tx_res.transaction; + l1_hashes.push(tx_res.hash.as_bytes()); + l1_initiator_address.push(common_data.sender.as_bytes()); + l1_gas_limit.push(u256_to_big_decimal(common_data.gas_limit)); + l1_max_fee_per_gas.push(u256_to_big_decimal(common_data.max_fee_per_gas)); + l1_gas_per_pubdata_limit.push(u256_to_big_decimal(common_data.gas_per_pubdata_limit)); + l1_data.push( + serde_json::to_value(&tx.execute) + .unwrap_or_else(|_| panic!("cannot serialize tx {:?} to json", tx.hash())), + ); + l1_priority_op_id.push(common_data.serial_id.0 as i64); + l1_full_fee.push(u256_to_big_decimal(common_data.full_fee)); + l1_layer_2_tip_fee.push(u256_to_big_decimal(common_data.layer_2_tip_fee)); + l1_contract_address.push(tx.execute.contract_address.as_bytes()); + l1_l1_block_number.push(common_data.eth_block as i32); + l1_value.push(u256_to_big_decimal(tx.execute.value)); + l1_tx_format.push(common_data.tx_format() as i32); + l1_tx_mint.push(u256_to_big_decimal(common_data.to_mint)); + l1_tx_refund_recipient.push(common_data.refund_recipient.as_bytes()); + l1_indices_in_block.push(index_in_block as i32); + l1_errors.push(Self::map_transaction_error(tx_res)); + l1_execution_infos.push(l1_execution_info); + l1_refunded_gas.push(refunded_gas); + l1_effective_gas_prices.push(u256_to_big_decimal(common_data.max_fee_per_gas)); + } + + let query = sqlx::query!( + r#" + INSERT INTO + transactions ( + hash, + is_priority, + initiator_address, + gas_limit, + max_fee_per_gas, + gas_per_pubdata_limit, + data, + priority_op_id, + full_fee, + layer_2_tip_fee, + contract_address, + l1_block_number, + value, + paymaster, + paymaster_input, + tx_format, + l1_tx_mint, + l1_tx_refund_recipient, + miniblock_number, + index_in_block, + error, + execution_info, + refunded_gas, + effective_gas_price, + received_at, + created_at, + updated_at + ) + SELECT + data_table.hash, + TRUE, + data_table.initiator_address, + data_table.gas_limit, + data_table.max_fee_per_gas, + data_table.gas_per_pubdata_limit, + data_table.data, + data_table.priority_op_id, + data_table.full_fee, + data_table.layer_2_tip_fee, + data_table.contract_address, + data_table.l1_block_number, + data_table.value, + '\x0000000000000000000000000000000000000000'::bytea, + '\x'::bytea, + data_table.tx_format, + data_table.l1_tx_mint, + data_table.l1_tx_refund_recipient, + $21, + data_table.index_in_block, + NULLIF(data_table.error, ''), + data_table.execution_info, + data_table.refunded_gas, + data_table.effective_gas_price, + NOW(), + NOW(), + NOW() + FROM + ( + SELECT + UNNEST($1::BYTEA[]) AS hash, + UNNEST($2::BYTEA[]) AS initiator_address, + UNNEST($3::NUMERIC[]) AS gas_limit, + UNNEST($4::NUMERIC[]) AS max_fee_per_gas, + UNNEST($5::NUMERIC[]) AS gas_per_pubdata_limit, + UNNEST($6::JSONB[]) AS data, + UNNEST($7::BIGINT[]) AS priority_op_id, + UNNEST($8::NUMERIC[]) AS full_fee, + UNNEST($9::NUMERIC[]) AS layer_2_tip_fee, + UNNEST($10::BYTEA[]) AS contract_address, + UNNEST($11::INT[]) AS l1_block_number, + UNNEST($12::NUMERIC[]) AS value, + UNNEST($13::INTEGER[]) AS tx_format, + UNNEST($14::NUMERIC[]) AS l1_tx_mint, + UNNEST($15::BYTEA[]) AS l1_tx_refund_recipient, + UNNEST($16::INT[]) AS index_in_block, + UNNEST($17::VARCHAR[]) AS error, + UNNEST($18::JSONB[]) AS execution_info, + UNNEST($19::BIGINT[]) AS refunded_gas, + UNNEST($20::NUMERIC[]) AS effective_gas_price + ) AS data_table + "#, + &l1_hashes as &[&[u8]], + &l1_initiator_address as &[&[u8]], + &l1_gas_limit, + &l1_max_fee_per_gas, + &l1_gas_per_pubdata_limit, + &l1_data, + &l1_priority_op_id, + &l1_full_fee, + &l1_layer_2_tip_fee, + &l1_contract_address as &[&[u8]], + &l1_l1_block_number, + &l1_value, + &l1_tx_format, + &l1_tx_mint, + &l1_tx_refund_recipient as &[&[u8]], + &l1_indices_in_block, + &l1_errors as &[&str], + &l1_execution_infos, + &l1_refunded_gas, + &l1_effective_gas_prices, + l2_block_number.0 as i32, + ); + + instrumentation.with(query).execute(self.storage).await?; + Ok(()) + } + + async fn update_executed_l1_transactions( &mut self, l2_block_number: L2BlockNumber, transactions: &[TransactionExecutionResult], @@ -856,7 +1296,196 @@ impl TransactionsDal<'_, '_> { Ok(()) } - async fn handle_executed_upgrade_transactions( + async fn insert_executed_upgrade_transactions( + &mut self, + l2_block_number: L2BlockNumber, + transactions: &[TransactionExecutionResult], + ) -> DalResult<()> { + let upgrade_txs_len = transactions + .iter() + .filter(|tx_res| { + matches!( + tx_res.transaction.common_data, + ExecuteTransactionCommon::ProtocolUpgrade(_) + ) + }) + .count(); + if upgrade_txs_len == 0 { + return Ok(()); + } + + let instrumentation = + Instrumented::new("mark_txs_as_executed_in_l2_block#insert_upgrade_txs") + .with_arg("l2_block_number", &l2_block_number) + .with_arg("upgrade_txs.len", &upgrade_txs_len); + + let mut upgrade_hashes = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_initiator_address = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_gas_limit = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_max_fee_per_gas = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_gas_per_pubdata_limit = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_data = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_upgrade_id = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_contract_address = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_l1_block_number = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_value = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_tx_format = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_tx_mint = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_tx_refund_recipient = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_indices_in_block = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_errors = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_execution_infos = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_refunded_gas = Vec::with_capacity(upgrade_txs_len); + let mut upgrade_effective_gas_prices = Vec::with_capacity(upgrade_txs_len); + + for (index_in_block, tx_res) in transactions.iter().enumerate() { + let transaction = &tx_res.transaction; + let ExecuteTransactionCommon::ProtocolUpgrade(common_data) = &transaction.common_data + else { + continue; + }; + + let execution_info = serde_json::to_value(tx_res.execution_info).map_err(|err| { + instrumentation.arg_error( + &format!("transactions[{index_in_block}].execution_info"), + err, + ) + })?; + let refunded_gas = i64::try_from(tx_res.refunded_gas).map_err(|err| { + instrumentation + .arg_error(&format!("transactions[{index_in_block}].refunded_gas"), err) + })?; + + let tx = &tx_res.transaction; + upgrade_hashes.push(tx_res.hash.as_bytes()); + upgrade_initiator_address.push(common_data.sender.as_bytes()); + upgrade_gas_limit.push(u256_to_big_decimal(common_data.gas_limit)); + upgrade_max_fee_per_gas.push(u256_to_big_decimal(common_data.max_fee_per_gas)); + upgrade_gas_per_pubdata_limit + .push(u256_to_big_decimal(common_data.gas_per_pubdata_limit)); + upgrade_data.push( + serde_json::to_value(&tx.execute) + .unwrap_or_else(|_| panic!("cannot serialize tx {:?} to json", tx.hash())), + ); + upgrade_upgrade_id.push(common_data.upgrade_id as i32); + upgrade_contract_address.push(tx.execute.contract_address.as_bytes()); + upgrade_l1_block_number.push(common_data.eth_block as i32); + upgrade_value.push(u256_to_big_decimal(tx.execute.value)); + upgrade_tx_format.push(common_data.tx_format() as i32); + upgrade_tx_mint.push(u256_to_big_decimal(common_data.to_mint)); + upgrade_tx_refund_recipient.push(common_data.refund_recipient.as_bytes()); + upgrade_indices_in_block.push(index_in_block as i32); + upgrade_errors.push(Self::map_transaction_error(tx_res)); + upgrade_execution_infos.push(execution_info); + upgrade_refunded_gas.push(refunded_gas); + upgrade_effective_gas_prices.push(u256_to_big_decimal(common_data.max_fee_per_gas)); + } + + let query = sqlx::query!( + r#" + INSERT INTO + transactions ( + hash, + is_priority, + initiator_address, + gas_limit, + max_fee_per_gas, + gas_per_pubdata_limit, + data, + upgrade_id, + contract_address, + l1_block_number, + value, + paymaster, + paymaster_input, + tx_format, + l1_tx_mint, + l1_tx_refund_recipient, + miniblock_number, + index_in_block, + error, + execution_info, + refunded_gas, + effective_gas_price, + received_at, + created_at, + updated_at + ) + SELECT + data_table.hash, + TRUE, + data_table.initiator_address, + data_table.gas_limit, + data_table.max_fee_per_gas, + data_table.gas_per_pubdata_limit, + data_table.data, + data_table.upgrade_id, + data_table.contract_address, + data_table.l1_block_number, + data_table.value, + '\x0000000000000000000000000000000000000000'::bytea, + '\x'::bytea, + data_table.tx_format, + data_table.l1_tx_mint, + data_table.l1_tx_refund_recipient, + $19, + data_table.index_in_block, + NULLIF(data_table.error, ''), + data_table.execution_info, + data_table.refunded_gas, + data_table.effective_gas_price, + NOW(), + NOW(), + NOW() + FROM + ( + SELECT + UNNEST($1::BYTEA[]) AS hash, + UNNEST($2::BYTEA[]) AS initiator_address, + UNNEST($3::NUMERIC[]) AS gas_limit, + UNNEST($4::NUMERIC[]) AS max_fee_per_gas, + UNNEST($5::NUMERIC[]) AS gas_per_pubdata_limit, + UNNEST($6::JSONB[]) AS data, + UNNEST($7::INT[]) AS upgrade_id, + UNNEST($8::BYTEA[]) AS contract_address, + UNNEST($9::INT[]) AS l1_block_number, + UNNEST($10::NUMERIC[]) AS value, + UNNEST($11::INTEGER[]) AS tx_format, + UNNEST($12::NUMERIC[]) AS l1_tx_mint, + UNNEST($13::BYTEA[]) AS l1_tx_refund_recipient, + UNNEST($14::INT[]) AS index_in_block, + UNNEST($15::VARCHAR[]) AS error, + UNNEST($16::JSONB[]) AS execution_info, + UNNEST($17::BIGINT[]) AS refunded_gas, + UNNEST($18::NUMERIC[]) AS effective_gas_price + ) AS data_table + "#, + &upgrade_hashes as &[&[u8]], + &upgrade_initiator_address as &[&[u8]], + &upgrade_gas_limit, + &upgrade_max_fee_per_gas, + &upgrade_gas_per_pubdata_limit, + &upgrade_data, + &upgrade_upgrade_id, + &upgrade_contract_address as &[&[u8]], + &upgrade_l1_block_number, + &upgrade_value, + &upgrade_tx_format, + &upgrade_tx_mint, + &upgrade_tx_refund_recipient as &[&[u8]], + &upgrade_indices_in_block, + &upgrade_errors as &[&str], + &upgrade_execution_infos, + &upgrade_refunded_gas, + &upgrade_effective_gas_prices, + l2_block_number.0 as i32, + ); + + instrumentation.with(query).execute(self.storage).await?; + Ok(()) + } + + async fn update_executed_upgrade_transactions( &mut self, l2_block_number: L2BlockNumber, transactions: &[TransactionExecutionResult], @@ -1206,6 +1835,8 @@ impl TransactionsDal<'_, '_> { .map(|op_id| PriorityOpId(op_id as u64))) } + /// Returns the next ID after the ID of the last sealed priority operation. + /// Doesn't work if node was recovered from snapshot because transaction history is not recovered. pub async fn next_priority_id(&mut self) -> PriorityOpId { { sqlx::query!( @@ -1216,7 +1847,12 @@ impl TransactionsDal<'_, '_> { transactions WHERE is_priority = TRUE - AND miniblock_number IS NOT NULL + AND transactions.miniblock_number <= ( + SELECT + MAX(number) + FROM + miniblocks + ) "# ) .fetch_optional(self.storage.conn()) @@ -1435,13 +2071,13 @@ impl TransactionsDal<'_, '_> { } pub async fn get_call_trace(&mut self, tx_hash: H256) -> DalResult> { - let protocol_version: ProtocolVersionId = sqlx::query!( + let row = sqlx::query!( r#" SELECT protocol_version FROM transactions - LEFT JOIN miniblocks ON transactions.miniblock_number = miniblocks.number + INNER JOIN miniblocks ON transactions.miniblock_number = miniblocks.number WHERE transactions.hash = $1 "#, @@ -1450,9 +2086,16 @@ impl TransactionsDal<'_, '_> { .instrument("get_call_trace") .with_arg("tx_hash", &tx_hash) .fetch_optional(self.storage) - .await? - .and_then(|row| row.protocol_version.map(|v| (v as u16).try_into().unwrap())) - .unwrap_or_else(ProtocolVersionId::last_potentially_undefined); + .await?; + + let Some(row) = row else { + return Ok(None); + }; + + let protocol_version = row + .protocol_version + .map(|v| (v as u16).try_into().unwrap()) + .unwrap_or_else(ProtocolVersionId::last_potentially_undefined); Ok(sqlx::query_as!( CallTrace, @@ -1532,7 +2175,13 @@ mod tests { }); let expected_call_trace = tx_result.call_trace().unwrap(); conn.transactions_dal() - .mark_txs_as_executed_in_l2_block(L2BlockNumber(1), &[tx_result], 1.into()) + .mark_txs_as_executed_in_l2_block( + L2BlockNumber(1), + &[tx_result], + 1.into(), + ProtocolVersionId::latest(), + false, + ) .await .unwrap(); @@ -1544,4 +2193,35 @@ mod tests { .expect("no call trace"); assert_eq!(call_trace, expected_call_trace); } + + #[tokio::test] + async fn insert_l2_block_executed_txs() { + let connection_pool = ConnectionPool::::test_pool().await; + let mut conn = connection_pool.connection().await.unwrap(); + conn.protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + + let tx = mock_l2_transaction(); + let tx_hash = tx.hash(); + let tx_result = mock_execution_result(tx); + conn.transactions_dal() + .mark_txs_as_executed_in_l2_block( + L2BlockNumber(1), + &[tx_result], + 1.into(), + ProtocolVersionId::latest(), + true, + ) + .await + .unwrap(); + + let tx_from_db = conn + .transactions_web3_dal() + .get_transactions(&[tx_hash], Default::default()) + .await + .unwrap(); + assert_eq!(tx_from_db[0].hash, tx_hash); + } } diff --git a/core/lib/dal/src/transactions_web3_dal.rs b/core/lib/dal/src/transactions_web3_dal.rs index f855b8bbc86..54bdb9da632 100644 --- a/core/lib/dal/src/transactions_web3_dal.rs +++ b/core/lib/dal/src/transactions_web3_dal.rs @@ -163,7 +163,7 @@ impl TransactionsWeb3Dal<'_, '_> { SELECT transactions.hash AS tx_hash, transactions.index_in_block AS index_in_block, - transactions.miniblock_number AS block_number, + miniblocks.number AS block_number, transactions.nonce AS nonce, transactions.signature AS signature, transactions.initiator_address AS initiator_address, @@ -193,7 +193,7 @@ impl TransactionsWeb3Dal<'_, '_> { &hashes.iter().map(H256::as_bytes).collect::>() as &[&[u8]] ), TransactionSelector::Position(block_number, idx) => ( - "transactions.miniblock_number = $1 AND transactions.index_in_block = $2"; + "miniblocks.number = $1 AND transactions.index_in_block = $2"; i64::from(block_number.0), idx as i32 ), @@ -249,7 +249,7 @@ impl TransactionsWeb3Dal<'_, '_> { transactions.gas_limit, transactions.gas_per_pubdata_limit, transactions.received_at, - transactions.miniblock_number, + miniblocks.number AS "miniblock_number?", transactions.error, transactions.effective_gas_price, transactions.refunded_gas, @@ -301,7 +301,6 @@ impl TransactionsWeb3Dal<'_, '_> { transactions.received_at FROM transactions - LEFT JOIN miniblocks ON miniblocks.number = miniblock_number WHERE received_at > $1 ORDER BY @@ -388,11 +387,12 @@ impl TransactionsWeb3Dal<'_, '_> { StorageTransaction, r#" SELECT - * + transactions.* FROM transactions + INNER JOIN miniblocks ON miniblocks.number = transactions.miniblock_number WHERE - miniblock_number = $1 + miniblocks.number = $1 ORDER BY index_in_block "#, @@ -411,7 +411,9 @@ impl TransactionsWeb3Dal<'_, '_> { mod tests { use std::collections::HashMap; - use zksync_types::{fee::TransactionExecutionMetrics, l2::L2Tx, Nonce, ProtocolVersion}; + use zksync_types::{ + fee::TransactionExecutionMetrics, l2::L2Tx, Nonce, ProtocolVersion, ProtocolVersionId, + }; use super::*; use crate::{ @@ -448,7 +450,13 @@ mod tests { .collect::>(); conn.transactions_dal() - .mark_txs_as_executed_in_l2_block(L2BlockNumber(1), &tx_results, U256::from(1)) + .mark_txs_as_executed_in_l2_block( + L2BlockNumber(1), + &tx_results, + U256::from(1), + ProtocolVersionId::latest(), + false, + ) .await .unwrap(); } @@ -619,7 +627,13 @@ mod tests { mock_execution_result(tx_by_nonce[&1].clone()), ]; conn.transactions_dal() - .mark_txs_as_executed_in_l2_block(l2_block.number, &executed_txs, 1.into()) + .mark_txs_as_executed_in_l2_block( + l2_block.number, + &executed_txs, + 1.into(), + ProtocolVersionId::latest(), + false, + ) .await .unwrap(); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 37ba7755fd6..463d2a914ab 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -5,6 +5,7 @@ use std::{ time::Duration, }; +use anyhow::Context; use tokio::sync::{watch, RwLock}; use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, Core, CoreDal}; use zksync_shared_metrics::{TxStage, APP_METRICS}; @@ -21,6 +22,7 @@ use zksync_web3_decl::{ }; use super::{tx_sink::TxSink, SubmitTxError}; +use crate::utils::wait_for_l1_batch; #[derive(Debug, Clone, Default)] pub(crate) struct TxCache { @@ -65,10 +67,28 @@ impl TxCache { async fn run_updates( self, pool: ConnectionPool, - stop_receiver: watch::Receiver, + mut stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { const UPDATE_INTERVAL: Duration = Duration::from_secs(1); + tracing::info!( + "Waiting for at least one L1 batch in Postgres to start TxCache::run_updates" + ); + // Starting the updater before L1 batches are present in Postgres can lead to some invariants the server logic + // implicitly assumes not being upheld. The only case when we'll actually wait here is immediately after snapshot recovery. + let earliest_l1_batch_number = + wait_for_l1_batch(&pool, UPDATE_INTERVAL, &mut stop_receiver) + .await + .context("error while waiting for L1 batch in Postgres")?; + if let Some(number) = earliest_l1_batch_number { + tracing::info!("Successfully waited for at least one L1 batch in Postgres; the earliest one is #{number}"); + } else { + tracing::info!( + "Received shutdown signal before TxCache::run_updates is started; shutting down" + ); + return Ok(()); + } + loop { if *stop_receiver.borrow() { return Ok(()); diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 45e938cf216..b00cd0dd0c6 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -766,7 +766,7 @@ impl EthNamespace { return Err(Web3Error::LogsLimitExceeded( self.state.api_config.req_entities_limit, from_block.0, - l2_block_number.0 - 1, + from_block.0.max(l2_block_number.0 - 1), )); } } diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs index 339961f7dd6..7c07db46ccd 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs @@ -438,7 +438,7 @@ impl ZksNamespace { let mut storage = self.state.acquire_connection().await?; Ok(storage .factory_deps_dal() - .get_factory_dep(hash) + .get_sealed_factory_dep(hash) .await .map_err(DalError::generalize)?) } diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index a0088c6cb3e..07bc4b745d1 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -44,7 +44,8 @@ use zksync_types::{ TransactionExecutionResult, }, utils::{storage_key_for_eth_balance, storage_key_for_standard_token_balance}, - AccountTreeId, Address, L1BatchNumber, Nonce, StorageKey, StorageLog, VmEvent, H256, U64, + AccountTreeId, Address, L1BatchNumber, Nonce, ProtocolVersionId, StorageKey, StorageLog, + VmEvent, H256, U64, }; use zksync_utils::u256_to_h256; use zksync_web3_decl::{ @@ -427,7 +428,13 @@ async fn store_l2_block( storage.blocks_dal().insert_l2_block(&new_l2_block).await?; storage .transactions_dal() - .mark_txs_as_executed_in_l2_block(new_l2_block.number, transaction_results, 1.into()) + .mark_txs_as_executed_in_l2_block( + new_l2_block.number, + transaction_results, + 1.into(), + ProtocolVersionId::latest(), + false, + ) .await?; Ok(new_l2_block) } diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 48cb17c51a6..d2ed6469919 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -76,8 +76,9 @@ use crate::{ }, metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig}, state_keeper::{ - create_state_keeper, AsyncRocksdbCache, MempoolFetcher, MempoolGuard, OutputHandler, - SequencerSealer, StateKeeperPersistence, + create_state_keeper, io::seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, + AsyncRocksdbCache, MempoolFetcher, MempoolGuard, OutputHandler, SequencerSealer, + StateKeeperPersistence, }, tee_verifier_input_producer::TeeVerifierInputProducer, utils::L1BatchCommitmentModeValidationTask, @@ -824,10 +825,14 @@ async fn add_state_keeper_to_task_futures( mempool }; - let l2_block_sealer_pool = ConnectionPool::::singleton(postgres_config.master_url()?) - .build() - .await - .context("failed to build l2_block_sealer_pool")?; + // L2 Block sealing process is parallelized, so we have to provide enough pooled connections. + let l2_block_sealer_pool = ConnectionPool::::builder( + postgres_config.master_url()?, + L2BlockSealProcess::subtasks_len(), + ) + .build() + .await + .context("failed to build l2_block_sealer_pool")?; let (persistence, l2_block_sealer) = StateKeeperPersistence::new( l2_block_sealer_pool, contracts_config diff --git a/core/lib/zksync_core/src/state_keeper/io/common/tests.rs b/core/lib/zksync_core/src/state_keeper/io/common/tests.rs index 0382dd99007..59eae1bb7e6 100644 --- a/core/lib/zksync_core/src/state_keeper/io/common/tests.rs +++ b/core/lib/zksync_core/src/state_keeper/io/common/tests.rs @@ -370,7 +370,13 @@ async fn store_pending_l2_blocks( let tx_result = execute_l2_transaction(tx); storage .transactions_dal() - .mark_txs_as_executed_in_l2_block(new_l2_block.number, &[tx_result], 1.into()) + .mark_txs_as_executed_in_l2_block( + new_l2_block.number, + &[tx_result], + 1.into(), + ProtocolVersionId::latest(), + false, + ) .await .unwrap(); } diff --git a/core/lib/zksync_core/src/state_keeper/io/mempool.rs b/core/lib/zksync_core/src/state_keeper/io/mempool.rs index d017716f82e..8a7ee77be08 100644 --- a/core/lib/zksync_core/src/state_keeper/io/mempool.rs +++ b/core/lib/zksync_core/src/state_keeper/io/mempool.rs @@ -24,6 +24,7 @@ use zksync_utils::time::millis_since_epoch; use crate::state_keeper::{ io::{ common::{load_pending_batch, poll_iters, IoCursor}, + seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, L1BatchParams, L2BlockParams, PendingBatchData, StateKeeperIO, }, mempool_actor::l2_tx_filter, @@ -79,6 +80,8 @@ impl StateKeeperIO for MempoolIO { let mut storage = self.pool.connection_tagged("state_keeper").await?; let cursor = IoCursor::new(&mut storage).await?; + L2BlockSealProcess::clear_pending_l2_block(&mut storage, cursor.next_l2_block - 1).await?; + let pending_l2_block_header = self .l1_batch_params_provider .load_first_l2_block_in_batch(&mut storage, cursor.l1_batch) diff --git a/core/lib/zksync_core/src/state_keeper/io/persistence.rs b/core/lib/zksync_core/src/state_keeper/io/persistence.rs index 7e206514bec..a45a0d40a12 100644 --- a/core/lib/zksync_core/src/state_keeper/io/persistence.rs +++ b/core/lib/zksync_core/src/state_keeper/io/persistence.rs @@ -10,7 +10,9 @@ use zksync_shared_metrics::{BlockStage, APP_METRICS}; use zksync_types::Address; use crate::state_keeper::{ - io::StateKeeperOutputHandler, + io::{ + seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, IoCursor, StateKeeperOutputHandler, + }, metrics::{L2BlockQueueStage, L2_BLOCK_METRICS}, updates::{L2BlockSealCommand, UpdatesManager}, }; @@ -148,6 +150,11 @@ impl StateKeeperPersistence { #[async_trait] impl StateKeeperOutputHandler for StateKeeperPersistence { + async fn initialize(&mut self, cursor: &IoCursor) -> anyhow::Result<()> { + let mut connection = self.pool.connection_tagged("state_keeper").await?; + L2BlockSealProcess::clear_pending_l2_block(&mut connection, cursor.next_l2_block - 1).await + } + async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { let command = updates_manager.seal_l2_block_command(self.l2_shared_bridge_addr, self.pre_insert_txs); @@ -159,12 +166,10 @@ impl StateKeeperOutputHandler for StateKeeperPersistence { // We cannot start sealing an L1 batch until we've sealed all L2 blocks included in it. self.wait_for_all_commands().await; - let pool = self.pool.clone(); - let mut storage = pool.connection_tagged("state_keeper").await?; let batch_number = updates_manager.l1_batch.number; updates_manager .seal_l1_batch( - &mut storage, + self.pool.clone(), self.l2_shared_bridge_addr, self.insert_protective_reads, ) @@ -204,8 +209,7 @@ impl L2BlockSealerTask { // Commands must be processed sequentially: a later L2 block cannot be saved before // an earlier one. while let Some(completable) = self.next_command().await { - let mut storage = self.pool.connection_tagged("state_keeper").await?; - completable.command.seal(&mut storage).await?; + completable.command.seal(self.pool.clone()).await?; if let Some(delta) = l2_block_seal_delta { L2_BLOCK_METRICS.seal_delta.observe(delta.elapsed()); } diff --git a/core/lib/zksync_core/src/state_keeper/io/seal_logic/l2_block_seal_subtasks.rs b/core/lib/zksync_core/src/state_keeper/io/seal_logic/l2_block_seal_subtasks.rs new file mode 100644 index 00000000000..2e6e41e61ce --- /dev/null +++ b/core/lib/zksync_core/src/state_keeper/io/seal_logic/l2_block_seal_subtasks.rs @@ -0,0 +1,563 @@ +use anyhow::Context; +use async_trait::async_trait; +use zksync_dal::{Core, CoreDal}; +use zksync_db_connection::connection::Connection; +use zksync_types::{event::extract_added_tokens, L2BlockNumber}; + +use crate::state_keeper::{ + io::seal_logic::SealStrategy, + metrics::{L2BlockSealStage, L2_BLOCK_METRICS}, + updates::L2BlockSealCommand, +}; + +/// Helper struct that encapsulates parallel l2 block sealing logic. +#[derive(Debug)] +pub(crate) struct L2BlockSealProcess; + +impl L2BlockSealProcess { + pub fn all_subtasks() -> Vec> { + vec![ + Box::new(MarkTransactionsInL2BlockSubtask), + Box::new(InsertStorageLogsSubtask), + Box::new(InsertFactoryDepsSubtask), + Box::new(InsertTokensSubtask), + Box::new(InsertEventsSubtask), + Box::new(InsertL2ToL1LogsSubtask), + ] + } + + pub fn subtasks_len() -> u32 { + Self::all_subtasks().len() as u32 + } + + pub async fn run_subtasks( + command: &L2BlockSealCommand, + strategy: &mut SealStrategy<'_>, + ) -> anyhow::Result<()> { + let subtasks = Self::all_subtasks(); + match strategy { + SealStrategy::Sequential(connection) => { + for subtask in subtasks { + let subtask_name = subtask.name(); + subtask + .run(command, connection) + .await + .context(subtask_name)?; + } + } + SealStrategy::Parallel(pool) => { + let pool = &*pool; + let handles = subtasks.into_iter().map(|subtask| { + let subtask_name = subtask.name(); + async move { + let mut connection = pool.connection_tagged("state_keeper").await?; + subtask + .run(command, &mut connection) + .await + .context(subtask_name) + } + }); + futures::future::try_join_all(handles).await?; + } + } + + Ok(()) + } + + /// Clears pending l2 block data from the database. + pub async fn clear_pending_l2_block( + connection: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + let seal_subtasks = L2BlockSealProcess::all_subtasks(); + for subtask in seal_subtasks { + subtask.rollback(connection, last_sealed_l2_block).await?; + } + + Ok(()) + } +} + +/// An abstraction that represents l2 block seal sub-task that can be run in parallel with other sub-tasks. +#[async_trait::async_trait] +pub(crate) trait L2BlockSealSubtask: Send + Sync + 'static { + /// Returns sub-task name. + fn name(&self) -> &'static str; + + /// Runs seal process. + async fn run( + self: Box, + command: &L2BlockSealCommand, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result<()>; + + /// Rollbacks data that was saved to database for the pending L2 block. + async fn rollback( + &self, + storage: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()>; +} + +#[derive(Debug)] +pub(super) struct MarkTransactionsInL2BlockSubtask; + +#[async_trait] +impl L2BlockSealSubtask for MarkTransactionsInL2BlockSubtask { + fn name(&self) -> &'static str { + "mark_transactions_in_l2_block" + } + + async fn run( + self: Box, + command: &L2BlockSealCommand, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result<()> { + let progress = L2_BLOCK_METRICS.start( + L2BlockSealStage::MarkTransactionsInL2Block, + command.is_l2_block_fictive(), + ); + + connection + .transactions_dal() + .mark_txs_as_executed_in_l2_block( + command.l2_block.number, + &command.l2_block.executed_transactions, + command.base_fee_per_gas.into(), + command.l2_block.protocol_version, + command.pre_insert_txs, + ) + .await?; + + progress.observe(command.l2_block.executed_transactions.len()); + Ok(()) + } + + async fn rollback( + &self, + storage: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + storage + .transactions_dal() + .reset_transactions_state(last_sealed_l2_block) + .await?; + Ok(()) + } +} + +#[derive(Debug)] +pub(super) struct InsertStorageLogsSubtask; + +#[async_trait] +impl L2BlockSealSubtask for InsertStorageLogsSubtask { + fn name(&self) -> &'static str { + "insert_storage_logs" + } + + async fn run( + self: Box, + command: &L2BlockSealCommand, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result<()> { + let is_fictive = command.is_l2_block_fictive(); + let write_logs = command.extract_deduplicated_write_logs(is_fictive); + + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertStorageLogs, is_fictive); + + let write_log_count: usize = write_logs.iter().map(|(_, logs)| logs.len()).sum(); + connection + .storage_logs_dal() + .insert_storage_logs(command.l2_block.number, &write_logs) + .await?; + + progress.observe(write_log_count); + Ok(()) + } + + async fn rollback( + &self, + storage: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + storage + .storage_logs_dal() + .roll_back_storage_logs(last_sealed_l2_block) + .await?; + Ok(()) + } +} + +#[derive(Debug)] +pub(super) struct InsertFactoryDepsSubtask; + +#[async_trait] +impl L2BlockSealSubtask for InsertFactoryDepsSubtask { + fn name(&self) -> &'static str { + "insert_factory_deps" + } + + async fn run( + self: Box, + command: &L2BlockSealCommand, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result<()> { + let progress = L2_BLOCK_METRICS.start( + L2BlockSealStage::InsertFactoryDeps, + command.is_l2_block_fictive(), + ); + + if !command.l2_block.new_factory_deps.is_empty() { + connection + .factory_deps_dal() + .insert_factory_deps(command.l2_block.number, &command.l2_block.new_factory_deps) + .await?; + progress.observe(command.l2_block.new_factory_deps.len()); + } + + Ok(()) + } + + async fn rollback( + &self, + storage: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + storage + .factory_deps_dal() + .roll_back_factory_deps(last_sealed_l2_block) + .await?; + Ok(()) + } +} + +#[derive(Debug)] +pub(super) struct InsertTokensSubtask; + +#[async_trait] +impl L2BlockSealSubtask for InsertTokensSubtask { + fn name(&self) -> &'static str { + "insert_tokens" + } + + async fn run( + self: Box, + command: &L2BlockSealCommand, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result<()> { + let is_fictive = command.is_l2_block_fictive(); + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::ExtractAddedTokens, is_fictive); + let added_tokens = + extract_added_tokens(command.l2_shared_bridge_addr, &command.l2_block.events); + progress.observe(added_tokens.len()); + + if !added_tokens.is_empty() { + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertTokens, is_fictive); + let added_tokens_len = added_tokens.len(); + connection.tokens_dal().add_tokens(&added_tokens).await?; + progress.observe(added_tokens_len); + } + + Ok(()) + } + + async fn rollback( + &self, + storage: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + storage + .tokens_dal() + .roll_back_tokens(last_sealed_l2_block) + .await?; + Ok(()) + } +} + +#[derive(Debug)] +pub(super) struct InsertEventsSubtask; + +#[async_trait] +impl L2BlockSealSubtask for InsertEventsSubtask { + fn name(&self) -> &'static str { + "insert_events" + } + + async fn run( + self: Box, + command: &L2BlockSealCommand, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result<()> { + let is_fictive = command.is_l2_block_fictive(); + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::ExtractEvents, is_fictive); + let l2_block_events = command.extract_events(is_fictive); + let l2_block_event_count: usize = + l2_block_events.iter().map(|(_, events)| events.len()).sum(); + progress.observe(l2_block_event_count); + + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertEvents, is_fictive); + connection + .events_dal() + .save_events(command.l2_block.number, &l2_block_events) + .await?; + progress.observe(l2_block_event_count); + Ok(()) + } + + async fn rollback( + &self, + storage: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + storage + .events_dal() + .roll_back_events(last_sealed_l2_block) + .await?; + Ok(()) + } +} + +#[derive(Debug)] +pub(super) struct InsertL2ToL1LogsSubtask; + +#[async_trait] +impl L2BlockSealSubtask for InsertL2ToL1LogsSubtask { + fn name(&self) -> &'static str { + "insert_l2_to_l1_logs" + } + + async fn run( + self: Box, + command: &L2BlockSealCommand, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result<()> { + let is_fictive = command.is_l2_block_fictive(); + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::ExtractL2ToL1Logs, is_fictive); + + let user_l2_to_l1_logs = command.extract_user_l2_to_l1_logs(is_fictive); + let user_l2_to_l1_log_count: usize = user_l2_to_l1_logs + .iter() + .map(|(_, l2_to_l1_logs)| l2_to_l1_logs.len()) + .sum(); + + progress.observe(user_l2_to_l1_log_count); + + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertL2ToL1Logs, is_fictive); + connection + .events_dal() + .save_user_l2_to_l1_logs(command.l2_block.number, &user_l2_to_l1_logs) + .await?; + progress.observe(user_l2_to_l1_log_count); + Ok(()) + } + + async fn rollback( + &self, + storage: &mut Connection<'_, Core>, + last_sealed_l2_block: L2BlockNumber, + ) -> anyhow::Result<()> { + storage + .events_dal() + .roll_back_l2_to_l1_logs(last_sealed_l2_block) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use multivm::{ + utils::{get_max_batch_gas_limit, get_max_gas_per_pubdata_byte}, + zk_evm_latest::ethereum_types::H256, + VmVersion, + }; + use zksync_dal::Core; + use zksync_db_connection::connection_pool::ConnectionPool; + use zksync_node_test_utils::create_l2_transaction; + use zksync_types::{ + block::L2BlockHeader, + l2_to_l1_log::{L2ToL1Log, UserL2ToL1Log}, + tx::{tx_execution_info::TxExecutionStatus, TransactionExecutionResult}, + zk_evm_types::{LogQuery, Timestamp}, + AccountTreeId, Address, L1BatchNumber, ProtocolVersionId, StorageKey, StorageLogQuery, + StorageLogQueryType, VmEvent, U256, + }; + use zksync_utils::h256_to_u256; + + use super::*; + use crate::state_keeper::updates::L2BlockUpdates; + + #[tokio::test] + async fn rollback_pending_l2_block() { + let pool = + ConnectionPool::::constrained_test_pool(L2BlockSealProcess::subtasks_len()).await; + + // Prepare data. + let tx = create_l2_transaction(1, 1); + pool.connection() + .await + .unwrap() + .transactions_dal() + .insert_transaction_l2(&tx, Default::default()) + .await + .unwrap(); + let tx_hash = tx.hash(); + let executed_transactions = vec![TransactionExecutionResult { + transaction: tx.into(), + hash: tx_hash, + execution_info: Default::default(), + execution_status: TxExecutionStatus::Success, + refunded_gas: 0, + operator_suggested_refund: 0, + compressed_bytecodes: Vec::new(), + call_traces: Vec::new(), + revert_reason: None, + }]; + let events = vec![VmEvent { + location: (L1BatchNumber(1), 0), + address: Address::zero(), + indexed_topics: Vec::new(), + value: Vec::new(), + }]; + let storage_key = StorageKey::new(AccountTreeId::new(Address::zero()), H256::zero()); + let storage_value = H256::from_low_u64_be(1); + let storage_logs = vec![StorageLogQuery { + log_query: LogQuery { + timestamp: Timestamp(0), + tx_number_in_block: 0, + aux_byte: 0, + shard_id: 0, + address: *storage_key.address(), + key: h256_to_u256(*storage_key.key()), + read_value: U256::zero(), + written_value: h256_to_u256(storage_value), + rw_flag: true, + rollback: false, + is_service: false, + }, + log_type: StorageLogQueryType::InitialWrite, + }]; + let user_l2_to_l1_logs = vec![UserL2ToL1Log(L2ToL1Log { + shard_id: 0, + is_service: false, + tx_number_in_block: 0, + sender: Address::zero(), + key: H256::zero(), + value: H256::zero(), + })]; + + let bytecode_hash = H256::repeat_byte(0x12); + let bytecode = vec![0u8; 32]; + let new_factory_deps = vec![(bytecode_hash, bytecode)].into_iter().collect(); + let l2_block_seal_command = L2BlockSealCommand { + l1_batch_number: L1BatchNumber(1), + l2_block: L2BlockUpdates { + executed_transactions, + events, + storage_logs, + user_l2_to_l1_logs, + system_l2_to_l1_logs: Default::default(), + new_factory_deps, + l1_gas_count: Default::default(), + block_execution_metrics: Default::default(), + txs_encoding_size: Default::default(), + payload_encoding_size: Default::default(), + timestamp: 1, + number: L2BlockNumber(1), + prev_block_hash: Default::default(), + virtual_blocks: Default::default(), + protocol_version: ProtocolVersionId::latest(), + }, + first_tx_index: 0, + fee_account_address: Default::default(), + fee_input: Default::default(), + base_fee_per_gas: Default::default(), + base_system_contracts_hashes: Default::default(), + protocol_version: Some(ProtocolVersionId::latest()), + l2_shared_bridge_addr: Default::default(), + pre_insert_txs: false, + }; + + // Run. + let mut strategy = SealStrategy::Parallel(&pool); + L2BlockSealProcess::run_subtasks(&l2_block_seal_command, &mut strategy) + .await + .unwrap(); + + // Check factory dependency is saved. + let mut connection = pool.connection().await.unwrap(); + let factory_deps = connection + .factory_deps_dal() + .get_factory_deps(&vec![bytecode_hash].into_iter().collect()) + .await; + assert!(factory_deps.contains_key(&h256_to_u256(bytecode_hash))); + + // Rollback. + L2BlockSealProcess::clear_pending_l2_block(&mut connection, L2BlockNumber(0)) + .await + .unwrap(); + + // Check factory dependency was removed. + let factory_deps = connection + .factory_deps_dal() + .get_factory_deps(&vec![bytecode_hash].into_iter().collect()) + .await; + assert!(factory_deps.is_empty()); + drop(connection); + + // Run again. + let mut strategy = SealStrategy::Parallel(&pool); + L2BlockSealProcess::run_subtasks(&l2_block_seal_command, &mut strategy) + .await + .unwrap(); + + // Check DAL doesn't return tx receipt before block header is saved. + let mut connection = pool.connection().await.unwrap(); + let tx_receipt = connection + .transactions_web3_dal() + .get_transaction_receipts(&[tx_hash]) + .await + .unwrap() + .get(0) + .cloned(); + assert!(tx_receipt.is_none()); + + // Insert block header. + let l2_block_header = L2BlockHeader { + number: l2_block_seal_command.l2_block.number, + timestamp: l2_block_seal_command.l2_block.timestamp, + hash: l2_block_seal_command.l2_block.get_l2_block_hash(), + l1_tx_count: 0, + l2_tx_count: 1, + fee_account_address: l2_block_seal_command.fee_account_address, + base_fee_per_gas: l2_block_seal_command.base_fee_per_gas, + batch_fee_input: l2_block_seal_command.fee_input, + base_system_contracts_hashes: l2_block_seal_command.base_system_contracts_hashes, + protocol_version: l2_block_seal_command.protocol_version, + gas_per_pubdata_limit: get_max_gas_per_pubdata_byte(VmVersion::latest()), + virtual_blocks: l2_block_seal_command.l2_block.virtual_blocks, + gas_limit: get_max_batch_gas_limit(VmVersion::latest()), + }; + connection + .protocol_versions_dal() + .save_protocol_version_with_tx(&Default::default()) + .await + .unwrap(); + connection + .blocks_dal() + .insert_l2_block(&l2_block_header) + .await + .unwrap(); + + // Check tx receipt. + let tx_receipt = connection + .transactions_web3_dal() + .get_transaction_receipts(&[tx_hash]) + .await + .unwrap() + .remove(0); + assert_eq!(tx_receipt.block_number.as_u32(), 1); + assert_eq!(tx_receipt.logs.len(), 1); + assert_eq!(tx_receipt.l2_to_l1_logs.len(), 1); + } +} diff --git a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs b/core/lib/zksync_core/src/state_keeper/io/seal_logic/mod.rs similarity index 74% rename from core/lib/zksync_core/src/state_keeper/io/seal_logic.rs rename to core/lib/zksync_core/src/state_keeper/io/seal_logic/mod.rs index d04f17b2c22..2b9f922ade9 100644 --- a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs +++ b/core/lib/zksync_core/src/state_keeper/io/seal_logic/mod.rs @@ -1,21 +1,22 @@ //! This module is a source-of-truth on what is expected to be done when sealing a block. //! It contains the logic of the block sealing, which is used by both the mempool-based and external node IO. -use std::time::{Duration, Instant}; +use std::{ + ops, + time::{Duration, Instant}, +}; use anyhow::Context as _; use itertools::Itertools; use multivm::utils::{get_max_batch_gas_limit, get_max_gas_per_pubdata_byte}; -use zksync_dal::{Connection, Core, CoreDal}; +use zksync_dal::{Core, CoreDal}; +use zksync_db_connection::{connection::Connection, connection_pool::ConnectionPool}; use zksync_shared_metrics::{BlockStage, L2BlockStage, APP_METRICS}; use zksync_types::{ block::{L1BatchHeader, L2BlockHeader}, - event::{extract_added_tokens, extract_long_l2_to_l1_messages}, + event::extract_long_l2_to_l1_messages, helpers::unix_timestamp_ms, - l1::L1Tx, - l2::L2Tx, - l2_to_l1_log::{SystemL2ToL1Log, UserL2ToL1Log}, - protocol_upgrade::ProtocolUpgradeTx, + l2_to_l1_log::UserL2ToL1Log, storage_writes_deduplicator::{ModifiedSlot, StorageWritesDeduplicator}, tx::{ tx_execution_info::DeduplicatedWritesMetrics, IncludedTxLocation, @@ -23,12 +24,13 @@ use zksync_types::{ }, utils::display_timestamp, zk_evm_types::LogQuery, - AccountTreeId, Address, ExecuteTransactionCommon, L1BlockNumber, ProtocolVersionId, StorageKey, - StorageLog, Transaction, VmEvent, H256, + AccountTreeId, Address, ExecuteTransactionCommon, ProtocolVersionId, StorageKey, StorageLog, + Transaction, VmEvent, H256, }; use zksync_utils::u256_to_h256; use crate::state_keeper::{ + io::seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, metrics::{ L1BatchSealStage, L2BlockSealStage, TxExecutionType, KEEPER_METRICS, L1_BATCH_METRICS, L2_BLOCK_METRICS, @@ -36,13 +38,15 @@ use crate::state_keeper::{ updates::{L2BlockSealCommand, UpdatesManager}, }; +pub(crate) mod l2_block_seal_subtasks; + impl UpdatesManager { /// Persists an L1 batch in the storage. /// This action includes a creation of an empty "fictive" L2 block that contains /// the events generated during the bootloader "tip phase". Returns updates for this fictive L2 block. pub(super) async fn seal_l1_batch( &self, - storage: &mut Connection<'_, Core>, + pool: ConnectionPool, l2_shared_bridge_addr: Address, insert_protective_reads: bool, ) -> anyhow::Result<()> { @@ -52,7 +56,6 @@ impl UpdatesManager { .finished .as_ref() .context("L1 batch is not actually finished")?; - let mut transaction = storage.start_transaction().await?; let progress = L1_BATCH_METRICS.start(L1BatchSealStage::FictiveL2Block); // Seal fictive L2 block with last events and storage logs. @@ -60,10 +63,19 @@ impl UpdatesManager { l2_shared_bridge_addr, false, // fictive L2 blocks don't have txs, so it's fine to pass `false` here. ); + + let mut connection = pool.connection_tagged("state_keeper").await?; + let transaction = connection.start_transaction().await?; + + // We rely on the fact that fictive L2 block and L1 batch data is saved in the same transaction. + let mut strategy = SealStrategy::Sequential(transaction); l2_block_command - .seal_inner(&mut transaction, true) + .seal_inner(&mut strategy, true) .await .context("failed persisting fictive L2 block")?; + let SealStrategy::Sequential(mut transaction) = strategy else { + panic!("Sealing L2 block should not mutate type of strategy"); + }; progress.observe(None); let progress = L1_BATCH_METRICS.start(L1BatchSealStage::LogDeduplication); @@ -127,6 +139,7 @@ impl UpdatesManager { .final_bootloader_memory .clone() .unwrap_or_default(); + transaction .blocks_dal() .insert_l1_batch( @@ -260,56 +273,53 @@ impl UpdatesManager { } } -impl L2BlockSealCommand { - pub(super) async fn seal(&self, storage: &mut Connection<'_, Core>) -> anyhow::Result<()> { - self.seal_inner(storage, false) - .await - .with_context(|| format!("failed sealing L2 block #{}", self.l2_block.number)) +#[derive(Debug)] +pub(crate) enum SealStrategy<'pool> { + Sequential(Connection<'pool, Core>), + Parallel(&'pool ConnectionPool), +} + +// As opposed to `Cow` from `std`; a union of an owned type and a mutable ref to it +enum Goat<'a, T> { + Owned(T), + Borrowed(&'a mut T), +} + +impl ops::Deref for Goat<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + match self { + Self::Owned(value) => value, + Self::Borrowed(value) => value, + } } +} - async fn insert_transactions( - &self, - transaction: &mut Connection<'_, Core>, - ) -> anyhow::Result<()> { - for tx_result in &self.l2_block.executed_transactions { - let tx = tx_result.transaction.clone(); - let tx_hash = tx.hash(); - - match &tx.common_data { - ExecuteTransactionCommon::L1(_) => { - // `unwrap` is safe due to the check above - let l1_tx = L1Tx::try_from(tx).unwrap(); - let l1_block_number = L1BlockNumber(l1_tx.common_data.eth_block as u32); - transaction - .transactions_dal() - .insert_transaction_l1(&l1_tx, l1_block_number) - .await - .with_context(|| format!("failed persisting L1 transaction {tx_hash:?}"))?; - } - ExecuteTransactionCommon::L2(_) => { - // `unwrap` is safe due to the check above - let l2_tx = L2Tx::try_from(tx).unwrap(); - // Using `Default` for execution metrics should be OK here, since this data is not used on the EN. - transaction - .transactions_dal() - .insert_transaction_l2(&l2_tx, Default::default()) - .await - .with_context(|| format!("failed persisting L2 transaction {tx_hash:?}"))?; - } - ExecuteTransactionCommon::ProtocolUpgrade(_) => { - // `unwrap` is safe due to the check above - let protocol_system_upgrade_tx = ProtocolUpgradeTx::try_from(tx).unwrap(); - transaction - .transactions_dal() - .insert_system_transaction(&protocol_system_upgrade_tx) - .await - .with_context(|| { - format!("failed persisting protocol upgrade transaction {tx_hash:?}") - })?; - } - } +impl ops::DerefMut for Goat<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::Owned(value) => value, + Self::Borrowed(value) => value, } - Ok(()) + } +} + +impl<'pool> SealStrategy<'pool> { + async fn connection(&mut self) -> anyhow::Result>> { + Ok(match self { + Self::Parallel(pool) => Goat::Owned(pool.connection_tagged("state_keeper").await?), + Self::Sequential(conn) => Goat::Borrowed(conn), + }) + } +} + +impl L2BlockSealCommand { + pub(super) async fn seal(&self, pool: ConnectionPool) -> anyhow::Result<()> { + let l2_block_number = self.l2_block.number; + self.seal_inner(&mut SealStrategy::Parallel(&pool), false) + .await + .with_context(|| format!("failed sealing L2 block #{l2_block_number}")) } /// Seals an L2 block with the given number. @@ -323,42 +333,36 @@ impl L2BlockSealCommand { /// `l2_shared_bridge_addr` is required to extract the information on newly added tokens. async fn seal_inner( &self, - storage: &mut Connection<'_, Core>, + strategy: &mut SealStrategy<'_>, is_fictive: bool, ) -> anyhow::Result<()> { + let started_at = Instant::now(); self.ensure_valid_l2_block(is_fictive) .context("L2 block is invalid")?; - let mut transaction = storage.start_transaction().await?; - if self.pre_insert_txs { - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::PreInsertTxs, is_fictive); - self.insert_transactions(&mut transaction) - .await - .context("failed persisting transactions in L2 block")?; - progress.observe(Some(self.l2_block.executed_transactions.len())); - } - - let l1_batch_number = self.l1_batch_number; - let l2_block_number = self.l2_block.number; - let started_at = Instant::now(); - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertL2BlockHeader, is_fictive); - let (l1_tx_count, l2_tx_count) = l1_l2_tx_count(&self.l2_block.executed_transactions); tracing::info!( "Sealing L2 block {l2_block_number} with timestamp {ts} (L1 batch {l1_batch_number}) \ with {total_tx_count} ({l2_tx_count} L2 + {l1_tx_count} L1) txs, {event_count} events", + l2_block_number = self.l2_block.number, + l1_batch_number = self.l1_batch_number, ts = display_timestamp(self.l2_block.timestamp), total_tx_count = l1_tx_count + l2_tx_count, event_count = self.l2_block.events.len() ); + // Run sub-tasks in parallel. + L2BlockSealProcess::run_subtasks(self, strategy).await?; + + // Seal block header at the last step. + let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertL2BlockHeader, is_fictive); let definite_vm_version = self .protocol_version .unwrap_or_else(ProtocolVersionId::last_potentially_undefined) .into(); let l2_block_header = L2BlockHeader { - number: l2_block_number, + number: self.l2_block.number, timestamp: self.l2_block.timestamp, hash: self.l2_block.get_l2_block_hash(), l1_tx_count: l1_tx_count as u16, @@ -373,94 +377,14 @@ impl L2BlockSealCommand { gas_limit: get_max_batch_gas_limit(definite_vm_version), }; - transaction + let mut connection = strategy.connection().await?; + connection .blocks_dal() .insert_l2_block(&l2_block_header) .await?; progress.observe(None); - let progress = - L2_BLOCK_METRICS.start(L2BlockSealStage::MarkTransactionsInL2Block, is_fictive); - transaction - .transactions_dal() - .mark_txs_as_executed_in_l2_block( - l2_block_number, - &self.l2_block.executed_transactions, - self.base_fee_per_gas.into(), - ) - .await?; - progress.observe(self.l2_block.executed_transactions.len()); - - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertStorageLogs, is_fictive); - let write_logs = self.extract_deduplicated_write_logs(is_fictive); - let write_log_count: usize = write_logs.iter().map(|(_, logs)| logs.len()).sum(); - transaction - .storage_logs_dal() - .insert_storage_logs(l2_block_number, &write_logs) - .await?; - progress.observe(write_log_count); - - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertFactoryDeps, is_fictive); - let new_factory_deps = &self.l2_block.new_factory_deps; - let new_factory_deps_count = new_factory_deps.len(); - if !new_factory_deps.is_empty() { - transaction - .factory_deps_dal() - .insert_factory_deps(l2_block_number, new_factory_deps) - .await?; - } - progress.observe(new_factory_deps_count); - - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::ExtractAddedTokens, is_fictive); - let added_tokens = extract_added_tokens(self.l2_shared_bridge_addr, &self.l2_block.events); - progress.observe(added_tokens.len()); - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertTokens, is_fictive); - let added_tokens_len = added_tokens.len(); - if !added_tokens.is_empty() { - transaction.tokens_dal().add_tokens(&added_tokens).await?; - } - progress.observe(added_tokens_len); - - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::ExtractEvents, is_fictive); - let l2_block_events = self.extract_events(is_fictive); - let l2_block_event_count: usize = - l2_block_events.iter().map(|(_, events)| events.len()).sum(); - progress.observe(l2_block_event_count); - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertEvents, is_fictive); - transaction - .events_dal() - .save_events(l2_block_number, &l2_block_events) - .await?; - progress.observe(l2_block_event_count); - - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::ExtractL2ToL1Logs, is_fictive); - - let system_l2_to_l1_logs = self.extract_system_l2_to_l1_logs(is_fictive); - let user_l2_to_l1_logs = self.extract_user_l2_to_l1_logs(is_fictive); - - let system_l2_to_l1_log_count: usize = system_l2_to_l1_logs - .iter() - .map(|(_, l2_to_l1_logs)| l2_to_l1_logs.len()) - .sum(); - let user_l2_to_l1_log_count: usize = user_l2_to_l1_logs - .iter() - .map(|(_, l2_to_l1_logs)| l2_to_l1_logs.len()) - .sum(); - - progress.observe(system_l2_to_l1_log_count + user_l2_to_l1_log_count); - - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::InsertL2ToL1Logs, is_fictive); - transaction - .events_dal() - .save_user_l2_to_l1_logs(l2_block_number, &user_l2_to_l1_logs) - .await?; - progress.observe(user_l2_to_l1_log_count); - - let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::CommitL2Block, is_fictive); - - transaction.commit().await?; - progress.observe(None); - + // Report metrics. let progress = L2_BLOCK_METRICS.start(L2BlockSealStage::ReportTxMetrics, is_fictive); self.report_transaction_metrics(); progress.observe(Some(self.l2_block.executed_transactions.len())); @@ -585,15 +509,6 @@ impl L2BlockSealCommand { grouped_entries.collect() } - fn extract_system_l2_to_l1_logs( - &self, - is_fictive: bool, - ) -> Vec<(IncludedTxLocation, Vec<&SystemL2ToL1Log>)> { - self.group_by_tx_location(&self.l2_block.system_l2_to_l1_logs, is_fictive, |log| { - u32::from(log.0.tx_number_in_block) - }) - } - fn extract_user_l2_to_l1_logs( &self, is_fictive: bool, @@ -653,6 +568,10 @@ impl L2BlockSealCommand { started_at.elapsed() ); } + + fn is_l2_block_fictive(&self) -> bool { + self.l2_block.executed_transactions.is_empty() + } } fn l1_l2_tx_count(executed_transactions: &[TransactionExecutionResult]) -> (usize, usize) { diff --git a/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs b/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs index 20050fd8f85..40dd8571f37 100644 --- a/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/io/tests/mod.rs @@ -19,7 +19,7 @@ use zksync_utils::time::seconds_since_epoch; use self::tester::Tester; use crate::state_keeper::{ - io::StateKeeperIO, + io::{seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, StateKeeperIO}, mempool_actor::l2_tx_filter, tests::{create_execution_result, create_transaction, Query, BASE_SYSTEM_CONTRACTS}, updates::{L2BlockSealCommand, L2BlockUpdates, UpdatesManager}, @@ -217,7 +217,8 @@ async fn l1_batch_timestamp_respects_prev_l2_block_with_clock_skew( #[tokio::test] async fn processing_storage_logs_when_sealing_l2_block() { - let connection_pool = ConnectionPool::::constrained_test_pool(1).await; + let connection_pool = + ConnectionPool::::constrained_test_pool(L2BlockSealProcess::subtasks_len()).await; let mut l2_block = L2BlockUpdates::new( 0, L2BlockNumber(3), @@ -284,12 +285,16 @@ async fn processing_storage_logs_when_sealing_l2_block() { l2_shared_bridge_addr: Address::default(), pre_insert_txs: false, }; - let mut conn = connection_pool.connection().await.unwrap(); - conn.protocol_versions_dal() + connection_pool + .connection() + .await + .unwrap() + .protocol_versions_dal() .save_protocol_version_with_tx(&ProtocolVersion::default()) .await .unwrap(); - seal_command.seal(&mut conn).await.unwrap(); + seal_command.seal(connection_pool.clone()).await.unwrap(); + let mut conn = connection_pool.connection().await.unwrap(); // Manually mark the L2 block as executed so that getting touched slots from it works conn.blocks_dal() @@ -320,11 +325,13 @@ async fn processing_storage_logs_when_sealing_l2_block() { #[tokio::test] async fn processing_events_when_sealing_l2_block() { - let pool = ConnectionPool::::constrained_test_pool(1).await; + let pool = + ConnectionPool::::constrained_test_pool(L2BlockSealProcess::subtasks_len()).await; let l1_batch_number = L1BatchNumber(2); + let l2_block_number = L2BlockNumber(3); let mut l2_block = L2BlockUpdates::new( 0, - L2BlockNumber(3), + l2_block_number, H256::zero(), 1, ProtocolVersionId::latest(), @@ -367,16 +374,19 @@ async fn processing_events_when_sealing_l2_block() { l2_shared_bridge_addr: Address::default(), pre_insert_txs: false, }; - let mut conn = pool.connection().await.unwrap(); - conn.protocol_versions_dal() + pool.connection() + .await + .unwrap() + .protocol_versions_dal() .save_protocol_version_with_tx(&ProtocolVersion::default()) .await .unwrap(); - seal_command.seal(&mut conn).await.unwrap(); + seal_command.seal(pool.clone()).await.unwrap(); + let mut conn = pool.connection().await.unwrap(); let logs = conn .events_web3_dal() - .get_all_logs(seal_command.l2_block.number - 1) + .get_all_logs(l2_block_number - 1) .await .unwrap(); diff --git a/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs index 2189247201b..fb1e1f08bc0 100644 --- a/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/io/tests/tester.rs @@ -175,6 +175,8 @@ impl Tester { L2BlockNumber(number), slice::from_ref(&tx_result), 1.into(), + ProtocolVersionId::latest(), + false, ) .await .unwrap(); diff --git a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs index 8bc2498f94a..fb91fdfb117 100644 --- a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs +++ b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs @@ -181,6 +181,9 @@ mod tests { async fn getting_transaction_nonces() { let pool = ConnectionPool::::test_pool().await; let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); let transaction = create_l2_transaction(10, 100); let transaction_initiator = transaction.initiator_account(); diff --git a/core/lib/zksync_core/src/state_keeper/metrics.rs b/core/lib/zksync_core/src/state_keeper/metrics.rs index df8d8360691..6da54abd854 100644 --- a/core/lib/zksync_core/src/state_keeper/metrics.rs +++ b/core/lib/zksync_core/src/state_keeper/metrics.rs @@ -254,7 +254,6 @@ pub(super) enum L2BlockQueueStage { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] #[metrics(rename_all = "snake_case")] pub(super) enum L2BlockSealStage { - PreInsertTxs, #[metrics(name = "insert_miniblock_header")] InsertL2BlockHeader, #[metrics(name = "mark_transactions_in_miniblock")] @@ -267,8 +266,6 @@ pub(super) enum L2BlockSealStage { InsertEvents, ExtractL2ToL1Logs, InsertL2ToL1Logs, - #[metrics(name = "commit_miniblock")] - CommitL2Block, ReportTxMetrics, } diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index 482c0fbe2de..c725741f50f 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -18,6 +18,7 @@ use super::{ use crate::state_keeper::{ io::{ common::{load_pending_batch, IoCursor}, + seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, L1BatchParams, L2BlockParams, PendingBatchData, StateKeeperIO, }, metrics::KEEPER_METRICS, @@ -72,7 +73,7 @@ impl ExternalIO { .connection_tagged("sync_layer") .await? .factory_deps_dal() - .get_factory_dep(hash) + .get_sealed_factory_dep(hash) .await?; Ok(match bytecode { @@ -141,6 +142,7 @@ impl StateKeeperIO for ExternalIO { cursor.next_l2_block, ); + L2BlockSealProcess::clear_pending_l2_block(&mut storage, cursor.next_l2_block - 1).await?; let pending_l2_block_header = self .l1_batch_params_provider .load_first_l2_block_in_batch(&mut storage, cursor.l1_batch) diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 292aa7d8d4d..8450acd419f 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -199,7 +199,13 @@ async fn store_l2_blocks( conn.blocks_dal().insert_l2_block(&new_l2_block).await?; let tx_result = execute_l2_transaction(tx); conn.transactions_dal() - .mark_txs_as_executed_in_l2_block(new_l2_block.number, &[tx_result], 1.into()) + .mark_txs_as_executed_in_l2_block( + new_l2_block.number, + &[tx_result], + 1.into(), + ProtocolVersionId::latest(), + false, + ) .await?; // Insert a fictive L2 block at the end of the batch diff --git a/prover/witness_generator/src/basic_circuits.rs b/prover/witness_generator/src/basic_circuits.rs index a8fe1ab7d13..62c3d913ba2 100644 --- a/prover/witness_generator/src/basic_circuits.rs +++ b/prover/witness_generator/src/basic_circuits.rs @@ -485,14 +485,14 @@ async fn generate_witness( let bootloader_code_bytes = connection .factory_deps_dal() - .get_factory_dep(header.base_system_contracts_hashes.bootloader) + .get_sealed_factory_dep(header.base_system_contracts_hashes.bootloader) .await .expect("Failed fetching bootloader bytecode from DB") .expect("Bootloader bytecode should exist"); let bootloader_code = bytes_to_chunks(&bootloader_code_bytes); let account_bytecode_bytes = connection .factory_deps_dal() - .get_factory_dep(header.base_system_contracts_hashes.default_aa) + .get_sealed_factory_dep(header.base_system_contracts_hashes.default_aa) .await .expect("Failed fetching default account bytecode from DB") .expect("Default account bytecode should exist");