Skip to content

Commit

Permalink
fix: PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tomg10 committed Dec 7, 2023
1 parent c29521a commit 1220fe8
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 75 deletions.
5 changes: 1 addition & 4 deletions core/bin/snapshots_creator/src/chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,5 @@ pub fn get_chunk_hashed_keys_range(chunk_id: u64, chunks_count: u64) -> std::ops
start_bytes.resize(32, 0);
end_bytes.resize(32, 0);

std::ops::Range {
start: H256::from_slice(&start_bytes),
end: H256::from_slice(&end_bytes),
}
H256::from_slice(&start_bytes)..H256::from_slice(&end_bytes)
}
70 changes: 29 additions & 41 deletions core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
mod chunking;

use anyhow::Context as _;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use prometheus_exporter::PrometheusExporterConfig;
use std::cmp::max;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio::sync::watch;
use tokio::sync::watch::Receiver;
use tokio::sync::{watch, Semaphore};
use vise::Unit;
use vise::{Buckets, Gauge, Histogram, Metrics};
use zksync_config::configs::PrometheusConfig;
Expand Down Expand Up @@ -47,7 +42,9 @@ struct SnapshotsCreatorMetrics {
#[vise::register]
pub(crate) static METRICS: vise::Global<SnapshotsCreatorMetrics> = vise::Global::new();

async fn maybe_enable_prometheus_metrics(stop_receiver: Receiver<bool>) -> anyhow::Result<()> {
async fn maybe_enable_prometheus_metrics(
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let prometheus_config = PrometheusConfig::from_env().ok();
if let Some(prometheus_config) = prometheus_config {
let exporter_config = PrometheusExporterConfig::push(
Expand All @@ -66,11 +63,13 @@ async fn maybe_enable_prometheus_metrics(stop_receiver: Receiver<bool>) -> anyho
async fn process_storage_logs_single_chunk(
blob_store: &dyn ObjectStore,
pool: &ConnectionPool,
semaphore: &Semaphore,
miniblock_number: MiniblockNumber,
l1_batch_number: L1BatchNumber,
chunk_id: u64,
chunks_count: u64,
) -> anyhow::Result<String> {
let _permit = semaphore.acquire().await?;
let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunks_count);
let latency = METRICS.storage_logs_processing_duration.start();
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;
Expand All @@ -94,10 +93,13 @@ async fn process_storage_logs_single_chunk(
let output_filepath = format!("{output_filepath_prefix}/{filename}");

let elapsed = latency.observe();
let tasks_left = METRICS.storage_logs_chunks_left_to_process.dec_by(1) - 1;
tracing::info!(
"Finished storage logs chunk {}/{chunks_count}, step took {elapsed:?}, output stored in {output_filepath}",
chunk_id + 1
);
"Finished chunk number {chunk_id}, overall_progress {}/{}, step took {elapsed:?}, output stored in {output_filepath}",
chunks_count - tasks_left,
chunks_count
);

Ok(output_filepath)
}

Expand Down Expand Up @@ -192,46 +194,32 @@ async fn run(
)
.await?;

let mut storage_logs_output_files = vec![];

METRICS
.storage_logs_chunks_left_to_process
.set(chunks_count);
let mut tasks =
FuturesUnordered::<Pin<Box<dyn Future<Output = anyhow::Result<String>>>>>::new();
let mut last_chunk_id = 0;
while last_chunk_id < chunks_count || !tasks.is_empty() {
while (tasks.len() as u32) < config.concurrent_queries_count && last_chunk_id < chunks_count
{
tasks.push(Box::pin(process_storage_logs_single_chunk(
&*blob_store,
&replica_pool,
last_miniblock_number_in_batch,
l1_batch_number,
last_chunk_id,
chunks_count,
)));
last_chunk_id += 1;
}
if let Some(result) = tasks.next().await {
tracing::info!(
"Completed chunk {}/{}, {} chunks are still in progress",
last_chunk_id - tasks.len() as u64,
chunks_count,
tasks.len()
);
storage_logs_output_files.push(result.context("Chunk task failed")?);
METRICS
.storage_logs_chunks_left_to_process
.set(chunks_count - last_chunk_id - tasks.len() as u64);
}
}

let semaphore = Semaphore::new(config.concurrent_queries_count as usize);
let tasks = (0..chunks_count).map(|chunk_id| {
process_storage_logs_single_chunk(
&*blob_store,
&replica_pool,
&semaphore,
last_miniblock_number_in_batch,
l1_batch_number,
chunk_id,
chunks_count,
)
});
let mut storage_logs_output_files = futures::future::try_join_all(tasks).await?;
tracing::info!("Finished generating snapshot, storing progress in db");

let mut master_conn = master_pool
.access_storage_tagged("snapshots_creator")
.await?;

storage_logs_output_files.sort();
//sanity check
assert_eq!(storage_logs_output_files.len(), chunks_count as usize);
master_conn
.snapshots_dal()
.add_snapshot(
Expand Down
2 changes: 1 addition & 1 deletion core/lib/config/src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ pub use self::{
proof_data_handler::ProofDataHandlerConfig,
prover::{ProverConfig, ProverConfigs},
prover_group::ProverGroupConfig,
snapshots_creator::SnapshotsCreatorConfig,
utils::PrometheusConfig,
witness_generator::WitnessGeneratorConfig,
snapshots_creator::SnapshotsCreatorConfig,
};

pub mod alerts;
Expand Down
3 changes: 1 addition & 2 deletions core/lib/object_store/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use std::io::Read;
use zksync_types::aggregated_operations::L1BatchProofForL1;
use zksync_types::snapshots::{SnapshotFactoryDependencies, SnapshotStorageLogsStorageKey};
use zksync_types::{
aggregated_operations::L1BatchProofForL1,
proofs::{AggregationRound, PrepareBasicCircuitsJob},
snapshots::SnapshotStorageLogsChunk,
snapshots::{SnapshotFactoryDependencies, SnapshotStorageLogsStorageKey},
storage::witness_block_state::WitnessBlockState,
zkevm_test_harness::{
abstract_zksync_circuit::concrete_circuits::ZkSyncCircuit,
Expand Down
16 changes: 0 additions & 16 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,6 @@ pub mod state;
#[cfg(test)]
pub(crate) mod tests;

use self::backend_jsonrpc::{
batch_limiter_middleware::{LimitMiddleware, Transport},
error::internal_error,
namespaces::{
debug::DebugNamespaceT, en::EnNamespaceT, eth::EthNamespaceT, net::NetNamespaceT,
web3::Web3NamespaceT, zks::ZksNamespaceT,
},
pub_sub::Web3PubSub,
};
use self::metrics::API_METRICS;
use self::namespaces::{
DebugNamespace, EnNamespace, EthNamespace, NetNamespace, Web3Namespace, ZksNamespace,
};
use self::pubsub::{EthSubscribe, PubSubEvent};
use self::state::{Filters, InternalApiConfig, RpcState, SealedMiniblockNumber};

/// Timeout for graceful shutdown logic within API servers.
const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zksync_web3_decl::error::Web3Error;

#[derive(Debug)]
pub struct SnapshotsNamespace<G> {
pub state: RpcState<G>,
state: RpcState<G>,
}

impl<G> Clone for SnapshotsNamespace<G> {
Expand All @@ -22,6 +22,7 @@ impl<G: L1GasPriceProvider> SnapshotsNamespace<G> {
pub fn new(state: RpcState<G>) -> Self {
Self { state }
}

pub async fn get_all_snapshots_impl(&self) -> Result<AllSnapshots, Web3Error> {
let method_name = "get_all_snapshots";
let method_latency = API_METRICS.start_call(method_name);
Expand Down
21 changes: 11 additions & 10 deletions core/tests/ts-integration/tests/api/snapshots-creator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { TestMaster } from '../../src/index';
import fs from 'fs';
import * as zlib from 'zlib';
import { snapshots_creator } from 'zk/build/run/run';

describe('Snapshots API tests', () => {
let testMaster: TestMaster;

Expand Down Expand Up @@ -46,15 +47,15 @@ describe('Snapshots API tests', () => {
});
}
async function createAndValidateSnapshot() {
let existingBatchNumbers = (await getAllSnapshots()).snapshotsL1BatchNumbers as number[];
const existingBatchNumbers = (await getAllSnapshots()).snapshotsL1BatchNumbers as number[];
await runCreator();
let newBatchNumbers = (await getAllSnapshots()).snapshotsL1BatchNumbers as number[];
let addedSnapshots = newBatchNumbers.filter((x) => existingBatchNumbers.indexOf(x) === -1);
const newBatchNumbers = (await getAllSnapshots()).snapshotsL1BatchNumbers as number[];
const addedSnapshots = newBatchNumbers.filter((x) => existingBatchNumbers.indexOf(x) === -1);
expect(addedSnapshots.length).toEqual(1);

let l1BatchNumber = addedSnapshots[0];
let fullSnapshot = await getSnapshot(l1BatchNumber);
let miniblockNumber = fullSnapshot.miniblockNumber;
const l1BatchNumber = addedSnapshots[0];
const fullSnapshot = await getSnapshot(l1BatchNumber);
const miniblockNumber = fullSnapshot.miniblockNumber;

expect(fullSnapshot.l1BatchNumber).toEqual(l1BatchNumber);
for (let chunkMetadata of fullSnapshot.storageLogsChunks) {
Expand All @@ -65,10 +66,10 @@ describe('Snapshots API tests', () => {
expect(output['storageLogs'].length > 0);

for (const storageLog of output['storageLogs'] as any[]) {
let snapshotAccountAddress = storageLog['key']['account']['address'];
let snapshotKey = storageLog['key']['key'];
let snapshotValue = storageLog['value'];
let snapshotL1BatchNumber = storageLog['l1BatchNumberOfInitialWrite'];
const snapshotAccountAddress = storageLog['key']['account']['address'];
const snapshotKey = storageLog['key']['key'];
const snapshotValue = storageLog['value'];
const snapshotL1BatchNumber = storageLog['l1BatchNumberOfInitialWrite'];
const valueOnBlockchain = await testMaster
.mainAccount()
.provider.getStorageAt(snapshotAccountAddress, snapshotKey, miniblockNumber);
Expand Down

0 comments on commit 1220fe8

Please sign in to comment.