Skip to content

Commit

Permalink
feat(api): Make Web3 API server work with pruned data (#838)
Browse files Browse the repository at this point in the history
## What ❔

Modifies the Web3 API server so that it works with pruned node data
during snapshot recovery.

## Why ❔

Part of preparations of EN code to support snapshot recovery.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
slowli committed Jan 18, 2024
1 parent 12974fc commit 0b7cd0b
Show file tree
Hide file tree
Showing 48 changed files with 3,483 additions and 1,325 deletions.
3 changes: 2 additions & 1 deletion core/bin/storage_logs_dedup_migration/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ async fn main() {
.blocks_dal()
.get_sealed_miniblock_number()
.await
.unwrap();
.unwrap()
.expect("Cannot start migration for Postgres recovered from snapshot");
println!(
"Migration started for miniblock range {}..={}",
opt.start_from_miniblock, sealed_miniblock
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl BlocksDal<'_, '_> {
Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

pub async fn get_sealed_miniblock_number(&mut self) -> sqlx::Result<MiniblockNumber> {
let number: i64 = sqlx::query!(
pub async fn get_sealed_miniblock_number(&mut self) -> sqlx::Result<Option<MiniblockNumber>> {
let row = sqlx::query!(
r#"
SELECT
MAX(number) AS "number"
Expand All @@ -73,10 +73,9 @@ impl BlocksDal<'_, '_> {
.instrument("get_sealed_miniblock_number")
.report_latency()
.fetch_one(self.storage.conn())
.await?
.number
.unwrap_or(0);
Ok(MiniblockNumber(number as u32))
.await?;

Ok(row.number.map(|number| MiniblockNumber(number as u32)))
}

/// Returns the number of the earliest L1 batch present in the DB, or `None` if there are no L1 batches.
Expand Down
147 changes: 82 additions & 65 deletions core/lib/dal/src/blocks_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
models::{
storage_block::{
bind_block_where_sql_params, web3_block_number_to_sql, web3_block_where_sql,
StorageBlockDetails, StorageL1BatchDetails,
ResolvedL1BatchForMiniblock, StorageBlockDetails, StorageL1BatchDetails,
},
storage_transaction::{extract_web3_transaction, web3_transaction_select_sql, CallTrace},
},
Expand All @@ -32,42 +32,6 @@ pub struct BlocksWeb3Dal<'a, 'c> {
}

impl BlocksWeb3Dal<'_, '_> {
pub async fn get_sealed_miniblock_number(&mut self) -> sqlx::Result<MiniblockNumber> {
let number = sqlx::query!(
r#"
SELECT
MAX(number) AS "number"
FROM
miniblocks
"#
)
.instrument("get_sealed_block_number")
.report_latency()
.fetch_one(self.storage.conn())
.await?
.number
.expect("DAL invocation before genesis");
Ok(MiniblockNumber(number as u32))
}

pub async fn get_sealed_l1_batch_number(&mut self) -> sqlx::Result<L1BatchNumber> {
let number = sqlx::query!(
r#"
SELECT
MAX(number) AS "number"
FROM
l1_batches
"#
)
.instrument("get_sealed_block_number")
.report_latency()
.fetch_one(self.storage.conn())
.await?
.number
.expect("DAL invocation before genesis");
Ok(L1BatchNumber(number as u32))
}

pub async fn get_block_by_web3_block_id(
&mut self,
block_id: api::BlockId,
Expand Down Expand Up @@ -258,21 +222,26 @@ impl BlocksWeb3Dal<'_, '_> {
&mut self,
block_id: api::BlockId,
) -> sqlx::Result<Option<MiniblockNumber>> {
let query_string = match block_id {
api::BlockId::Hash(_) => "SELECT number FROM miniblocks WHERE hash = $1".to_owned(),
let query_string;
let query_str = match block_id {
api::BlockId::Hash(_) => "SELECT number FROM miniblocks WHERE hash = $1",
api::BlockId::Number(api::BlockNumber::Number(_)) => {
// The reason why instead of returning the `block_number` directly we use query is
// to handle numbers of blocks that are not created yet.
// the `SELECT number FROM miniblocks WHERE number=block_number` for
// non-existing block number will returns zero.
"SELECT number FROM miniblocks WHERE number = $1".to_owned()
// to handle numbers of blocks that are not created yet or were pruned.
// The query below will return NULL for non-existing block numbers.
"SELECT number FROM miniblocks WHERE number = $1"
}
api::BlockId::Number(api::BlockNumber::Earliest) => {
return Ok(Some(MiniblockNumber(0)));
// Similarly to `BlockNumber::Number`, we may be missing the earliest block
// if the storage was recovered from a snapshot.
"SELECT number FROM miniblocks WHERE number = 0"
}
api::BlockId::Number(block_number) => {
query_string = web3_block_number_to_sql(block_number);
&query_string
}
api::BlockId::Number(block_number) => web3_block_number_to_sql(block_number),
};
let row = bind_block_where_sql_params(&block_id, sqlx::query(&query_string))
let row = bind_block_where_sql_params(&block_id, sqlx::query(query_str))
.fetch_optional(self.storage.conn())
.await?;

Expand All @@ -283,31 +252,33 @@ impl BlocksWeb3Dal<'_, '_> {
}

/// Returns L1 batch timestamp for either sealed or pending L1 batch.
///
/// The correctness of the current implementation depends on the timestamp of an L1 batch always
/// being equal to the timestamp of the first miniblock in the batch.
pub async fn get_expected_l1_batch_timestamp(
&mut self,
l1_batch_number: L1BatchNumber,
l1_batch_number: &ResolvedL1BatchForMiniblock,
) -> sqlx::Result<Option<u64>> {
let first_miniblock_of_batch = if l1_batch_number.0 == 0 {
MiniblockNumber(0)
} else {
match self
.get_miniblock_range_of_l1_batch(l1_batch_number - 1)
.await?
{
Some((_, miniblock_number)) => miniblock_number + 1,
None => return Ok(None),
}
};
let timestamp = sqlx::query!(
r#"
SELECT
timestamp
FROM
miniblocks
WHERE
number = $1
(
$1::BIGINT IS NULL
AND l1_batch_number IS NULL
)
OR (l1_batch_number = $1::BIGINT)
ORDER BY
number
LIMIT
1
"#,
first_miniblock_of_batch.0 as i64
l1_batch_number
.miniblock_l1_batch
.map(|number| i64::from(number.0))
)
.fetch_optional(self.storage.conn())
.await?
Expand Down Expand Up @@ -629,6 +600,7 @@ impl BlocksWeb3Dal<'_, '_> {
mod tests {
use zksync_types::{
block::{MiniblockHasher, MiniblockHeader},
snapshots::SnapshotRecoveryStatus,
MiniblockNumber, ProtocolVersion, ProtocolVersionId,
};

Expand Down Expand Up @@ -698,8 +670,18 @@ mod tests {
async fn resolving_earliest_block_id() {
let connection_pool = ConnectionPool::test_pool().await;
let mut conn = connection_pool.access_storage().await.unwrap();

let miniblock_number = conn
.blocks_web3_dal()
.resolve_block_id(api::BlockId::Number(api::BlockNumber::Earliest))
.await;
assert_eq!(miniblock_number.unwrap(), None);

conn.protocol_versions_dal()
.save_protocol_version_with_tx(ProtocolVersion::default())
.await;
conn.blocks_dal()
.delete_miniblocks(MiniblockNumber(0))
.insert_miniblock(&create_miniblock_header(0))
.await
.unwrap();

Expand All @@ -714,13 +696,23 @@ mod tests {
async fn resolving_latest_block_id() {
let connection_pool = ConnectionPool::test_pool().await;
let mut conn = connection_pool.access_storage().await.unwrap();
conn.blocks_dal()
.delete_miniblocks(MiniblockNumber(0))
.await
.unwrap();
conn.protocol_versions_dal()
.save_protocol_version_with_tx(ProtocolVersion::default())
.await;

let miniblock_number = conn
.blocks_web3_dal()
.resolve_block_id(api::BlockId::Number(api::BlockNumber::Latest))
.await
.unwrap();
assert_eq!(miniblock_number, None);
let miniblock_number = conn
.blocks_web3_dal()
.resolve_block_id(api::BlockId::Number(api::BlockNumber::Pending))
.await
.unwrap();
assert_eq!(miniblock_number, Some(MiniblockNumber(0)));

conn.blocks_dal()
.insert_miniblock(&create_miniblock_header(0))
.await
Expand Down Expand Up @@ -766,6 +758,31 @@ mod tests {
assert_eq!(miniblock_number.unwrap(), Some(MiniblockNumber(1)));
}

#[tokio::test]
async fn resolving_pending_block_id_for_snapshot_recovery() {
let connection_pool = ConnectionPool::test_pool().await;
let mut conn = connection_pool.access_storage().await.unwrap();
let snapshot_recovery = SnapshotRecoveryStatus {
l1_batch_number: L1BatchNumber(23),
l1_batch_root_hash: H256::zero(),
miniblock_number: MiniblockNumber(42),
miniblock_root_hash: H256::zero(),
last_finished_chunk_id: None,
total_chunk_count: 100,
};
conn.snapshot_recovery_dal()
.set_applied_snapshot_status(&snapshot_recovery)
.await
.unwrap();

let miniblock_number = conn
.blocks_web3_dal()
.resolve_block_id(api::BlockId::Number(api::BlockNumber::Pending))
.await
.unwrap();
assert_eq!(miniblock_number, Some(MiniblockNumber(43)));
}

#[tokio::test]
async fn resolving_block_by_hash() {
let connection_pool = ConnectionPool::test_pool().await;
Expand Down

0 comments on commit 0b7cd0b

Please sign in to comment.