Skip to content

Commit

Permalink
fix: PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tomg10 committed Dec 6, 2023
1 parent 9336b8c commit f8bdaf4
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 71 deletions.
16 changes: 12 additions & 4 deletions core/bin/snapshots_creator/src/chunking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::min;
use zksync_types::H256;

pub fn get_chunk_hashed_keys_range(chunk_id: u64, chunks_count: u64) -> ([u8; 2], [u8; 2]) {
pub fn get_chunk_hashed_keys_range(chunk_id: u64, chunks_count: u64) -> std::ops::Range<H256> {
//we don't need whole [u8; 32] range of H256, first two bytes are already enough to evenly divide work
// as two bytes = 65536 buckets and the chunks count would go in thousands
let buckets = (u16::MAX as u64) + 1;
Expand All @@ -15,7 +16,14 @@ pub fn get_chunk_hashed_keys_range(chunk_id: u64, chunks_count: u64) -> ([u8; 2]
let chunk_start = chunk_id * chunk_size + min(chunk_id, buckets % chunks_count);
let chunk_end = (chunk_id + 1) * chunk_size + min(chunk_id + 1, buckets % chunks_count) - 1;

let start_bytes = (chunk_start as u16).to_be_bytes();
let end_bytes = (chunk_end as u16).to_be_bytes();
(start_bytes, end_bytes)
let mut start_bytes = (chunk_start as u16).to_be_bytes().to_vec();
let mut end_bytes = (chunk_end as u16).to_be_bytes().to_vec();

start_bytes.resize(32, 0);
end_bytes.resize(32, 0);

std::ops::Range {
start: H256::from_slice(&start_bytes),
end: H256::from_slice(&end_bytes),
}
}
4 changes: 2 additions & 2 deletions core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ async fn process_storage_logs_single_chunk(
chunk_id: u64,
chunks_count: u64,
) -> anyhow::Result<String> {
let (min_hashed_key, max_hashed_key) = get_chunk_hashed_keys_range(chunk_id, chunks_count);
let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunks_count);
let latency = METRICS.storage_logs_processing_duration.start();
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(miniblock_number, &min_hashed_key, &max_hashed_key)
.get_storage_logs_chunk(miniblock_number, hashed_keys_range)
.await
.context("Error fetching storage logs count")?;
drop(conn);
Expand Down
104 changes: 52 additions & 52 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -5804,58 +5804,6 @@
},
"query": "INSERT INTO miniblocks ( number, timestamp, hash, l1_tx_count, l2_tx_count, base_fee_per_gas, l1_gas_price, l2_fair_gas_price, gas_per_pubdata_limit, bootloader_code_hash, default_aa_code_hash, protocol_version, virtual_blocks, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, now(), now())"
},
"6c68925cc6eb422d8c9f04cd353990c995e948f7031da654739852621d14fcea": {
"describe": {
"columns": [
{
"name": "key",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "value",
"ordinal": 1,
"type_info": "Bytea"
},
{
"name": "address",
"ordinal": 2,
"type_info": "Bytea"
},
{
"name": "miniblock_number",
"ordinal": 3,
"type_info": "Int8"
},
{
"name": "l1_batch_number",
"ordinal": 4,
"type_info": "Int8"
},
{
"name": "index",
"ordinal": 5,
"type_info": "Int8"
}
],
"nullable": [
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Int8",
"Bytea",
"Bytea"
]
}
},
"query": "\n SELECT storage_logs.key,\n storage_logs.value,\n storage_logs.address,\n storage_logs.miniblock_number,\n initial_writes.l1_batch_number,\n initial_writes.index\n FROM (SELECT hashed_key,\n max(ARRAY [miniblock_number, operation_number]::int[]) AS op\n FROM storage_logs\n WHERE miniblock_number <= $1 and hashed_key >= $2 and hashed_key < $3\n GROUP BY hashed_key\n ORDER BY hashed_key) AS keys\n INNER JOIN storage_logs ON keys.hashed_key = storage_logs.hashed_key\n AND storage_logs.miniblock_number = keys.op[1]\n AND storage_logs.operation_number = keys.op[2]\n INNER JOIN initial_writes ON keys.hashed_key = initial_writes.hashed_key;\n "
},
"6d142503d0d8682992a0353bae4a6b25ec82e7cadf0b2bbadcfd23c27f646bae": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -10746,6 +10694,58 @@
},
"query": "SELECT l1_batch_number FROM witness_inputs WHERE length(merkle_tree_paths) <> 0 ORDER BY l1_batch_number DESC LIMIT $1"
},
"dd650c06788a1c47b201e768382320fded2b8950ab836b2e5660f15b71dd11a0": {
"describe": {
"columns": [
{
"name": "key!",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "value!",
"ordinal": 1,
"type_info": "Bytea"
},
{
"name": "address!",
"ordinal": 2,
"type_info": "Bytea"
},
{
"name": "miniblock_number!",
"ordinal": 3,
"type_info": "Int8"
},
{
"name": "l1_batch_number!",
"ordinal": 4,
"type_info": "Int8"
},
{
"name": "index",
"ordinal": 5,
"type_info": "Int8"
}
],
"nullable": [
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Int8",
"Bytea",
"Bytea"
]
}
},
"query": "\n SELECT storage_logs.key as \"key!\",\n storage_logs.value as \"value!\",\n storage_logs.address as \"address!\",\n storage_logs.miniblock_number as \"miniblock_number!\",\n initial_writes.l1_batch_number as \"l1_batch_number!\",\n initial_writes.index\n FROM (SELECT hashed_key,\n max(ARRAY [miniblock_number, operation_number]::int[]) AS op\n FROM storage_logs\n WHERE miniblock_number <= $1 and hashed_key >= $2 and hashed_key < $3\n GROUP BY hashed_key\n ORDER BY hashed_key) AS keys\n INNER JOIN storage_logs ON keys.hashed_key = storage_logs.hashed_key\n AND storage_logs.miniblock_number = keys.op[1]\n AND storage_logs.operation_number = keys.op[2]\n INNER JOIN initial_writes ON keys.hashed_key = initial_writes.hashed_key;\n "
},
"dd8aa1c9d4dcea22c9a13cca5ae45e951cf963b0608046b88be40309d7379ec2": {
"describe": {
"columns": [],
Expand Down
28 changes: 15 additions & 13 deletions core/lib/dal/src/snapshots_creator_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ impl SnapshotsCreatorDal<'_, '_> {
pub async fn get_storage_logs_chunk(
&mut self,
miniblock_number: MiniblockNumber,
min_hashed_key: &[u8],
max_hashed_key: &[u8],
hashed_keys_range: std::ops::Range<H256>,
) -> sqlx::Result<Vec<SnapshotStorageLog>> {
let storage_logs = sqlx::query!(
r#"
SELECT storage_logs.key,
storage_logs.value,
storage_logs.address,
storage_logs.miniblock_number,
initial_writes.l1_batch_number,
SELECT storage_logs.key as "key!",
storage_logs.value as "value!",
storage_logs.address as "address!",
storage_logs.miniblock_number as "miniblock_number!",
initial_writes.l1_batch_number as "l1_batch_number!",
initial_writes.index
FROM (SELECT hashed_key,
max(ARRAY [miniblock_number, operation_number]::int[]) AS op
Expand All @@ -57,21 +56,24 @@ impl SnapshotsCreatorDal<'_, '_> {
INNER JOIN initial_writes ON keys.hashed_key = initial_writes.hashed_key;
"#,
miniblock_number.0 as i64,
min_hashed_key,
max_hashed_key,
hashed_keys_range.start.0.as_slice(),
hashed_keys_range.end.0.as_slice(),
)
.instrument("get_storage_logs_chunk")
.with_arg("miniblock_number", &miniblock_number)
.with_arg("min_hashed_key", &hashed_keys_range.start)
.with_arg("max_hashed_key", &hashed_keys_range.end)
.report_latency()
.fetch_all(self.storage.conn())
.await?
.iter()
.map(|row| SnapshotStorageLog {
key: StorageKey::new(
AccountTreeId::new(Address::from_slice(row.address.as_ref().unwrap())),
H256::from_slice(row.key.as_ref().unwrap()),
AccountTreeId::new(Address::from_slice(&row.address)),
H256::from_slice(&row.key),
),
value: H256::from_slice(row.value.as_ref().unwrap()),
l1_batch_number_of_initial_write: L1BatchNumber(row.l1_batch_number.unwrap() as u32),
value: H256::from_slice(&row.value),
l1_batch_number_of_initial_write: L1BatchNumber(row.l1_batch_number as u32),
enumeration_index: row.index.unwrap() as u64,
})
.collect();
Expand Down

0 comments on commit f8bdaf4

Please sign in to comment.