Skip to content

Commit

Permalink
feat: Follow-up for DAL split (#1464)
Browse files Browse the repository at this point in the history
## What ❔

Rename types
* Server -> Core
* StorageProcessor -> Connection
* StorageProcessorTags -> ConnectionTags
* PooledStorageProcessor -> PooledConnection
* StorageProcessorInner -> ConnectionInner
* StorageMarker -> DbMarker
* ServerDals -> CoreDal
* ProverDals -> ProverDal
* access_storage -> connection

## Why ❔

For better intuitive understanding of what is going on in code.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
Artemka374 committed Mar 20, 2024
1 parent 1757412 commit c072288
Show file tree
Hide file tree
Showing 178 changed files with 1,955 additions and 2,151 deletions.
4 changes: 2 additions & 2 deletions core/bin/block_reverter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zksync_config::{
use zksync_core::block_reverter::{
BlockReverter, BlockReverterEthConfig, BlockReverterFlags, L1ExecutedBatchesRevert, NodeRole,
};
use zksync_dal::{ConnectionPool, Server};
use zksync_dal::{ConnectionPool, Core};
use zksync_env_config::FromEnv;
use zksync_types::{L1BatchNumber, U256};

Expand Down Expand Up @@ -96,7 +96,7 @@ async fn main() -> anyhow::Result<()> {
let postgres_config = PostgresConfig::from_env().context("PostgresConfig::from_env()")?;
let config = BlockReverterEthConfig::new(eth_sender, contracts, eth_client.web3_url.clone());

let connection_pool = ConnectionPool::<Server>::builder(
let connection_pool = ConnectionPool::<Core>::builder(
postgres_config.master_url()?,
postgres_config.max_connections()?,
)
Expand Down
8 changes: 4 additions & 4 deletions core/bin/contract-verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zksync_config::{
configs::{ObservabilityConfig, PrometheusConfig},
ApiConfig, ContractVerifierConfig, PostgresConfig,
};
use zksync_dal::{ConnectionPool, Server, ServerDals};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_env_config::FromEnv;
use zksync_queued_job_processor::JobProcessor;
use zksync_utils::wait_for_tasks::wait_for_tasks;
Expand All @@ -20,8 +20,8 @@ pub mod verifier;
pub mod zksolc_utils;
pub mod zkvyper_utils;

async fn update_compiler_versions(connection_pool: &ConnectionPool<Server>) {
let mut storage = connection_pool.access_storage().await.unwrap();
async fn update_compiler_versions(connection_pool: &ConnectionPool<Core>) {
let mut storage = connection_pool.connection().await.unwrap();
let mut transaction = storage.start_transaction().await.unwrap();

let zksync_home = std::env::var("ZKSYNC_HOME").unwrap_or_else(|_| ".".into());
Expand Down Expand Up @@ -134,7 +134,7 @@ async fn main() -> anyhow::Result<()> {
..ApiConfig::from_env().context("ApiConfig")?.prometheus
};
let postgres_config = PostgresConfig::from_env().context("PostgresConfig")?;
let pool = ConnectionPool::<Server>::singleton(
let pool = ConnectionPool::<Core>::singleton(
postgres_config
.master_url()
.context("Master DB URL is absent")?,
Expand Down
16 changes: 8 additions & 8 deletions core/bin/contract-verifier/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use tokio::time;
use zksync_config::ContractVerifierConfig;
use zksync_dal::{ConnectionPool, Server, ServerDals, StorageProcessor};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_env_config::FromEnv;
use zksync_queued_job_processor::{async_trait, JobProcessor};
use zksync_types::{
Expand Down Expand Up @@ -42,19 +42,19 @@ enum ConstructorArgs {
#[derive(Debug)]
pub struct ContractVerifier {
config: ContractVerifierConfig,
connection_pool: ConnectionPool<Server>,
connection_pool: ConnectionPool<Core>,
}

impl ContractVerifier {
pub fn new(config: ContractVerifierConfig, connection_pool: ConnectionPool<Server>) -> Self {
pub fn new(config: ContractVerifierConfig, connection_pool: ConnectionPool<Core>) -> Self {
Self {
config,
connection_pool,
}
}

async fn verify(
storage: &mut StorageProcessor<'_, Server>,
storage: &mut Connection<'_, Core>,
mut request: VerificationRequest,
config: ContractVerifierConfig,
) -> Result<VerificationInfo, ContractVerifierError> {
Expand Down Expand Up @@ -429,7 +429,7 @@ impl ContractVerifier {
}

async fn process_result(
storage: &mut StorageProcessor<'_, Server>,
storage: &mut Connection<'_, Core>,
request_id: usize,
verification_result: Result<VerificationInfo, ContractVerifierError>,
) {
Expand Down Expand Up @@ -471,7 +471,7 @@ impl JobProcessor for ContractVerifier {
const BACKOFF_MULTIPLIER: u64 = 1;

async fn get_next_job(&self) -> anyhow::Result<Option<(Self::JobId, Self::Job)>> {
let mut connection = self.connection_pool.access_storage().await.unwrap();
let mut connection = self.connection_pool.connection().await.unwrap();

// Time overhead for all operations except for compilation.
const TIME_OVERHEAD: Duration = Duration::from_secs(10);
Expand All @@ -489,7 +489,7 @@ impl JobProcessor for ContractVerifier {
}

async fn save_failure(&self, job_id: usize, _started_at: Instant, error: String) {
let mut connection = self.connection_pool.access_storage().await.unwrap();
let mut connection = self.connection_pool.connection().await.unwrap();

connection
.contract_verification_dal()
Expand All @@ -515,7 +515,7 @@ impl JobProcessor for ContractVerifier {

let config: ContractVerifierConfig =
ContractVerifierConfig::from_env().context("ContractVerifierConfig")?;
let mut connection = connection_pool.access_storage().await.unwrap();
let mut connection = connection_pool.connection().await.unwrap();

let job_id = job.id;
let verification_result = Self::verify(&mut connection, job, config).await;
Expand Down
8 changes: 4 additions & 4 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use anyhow::Context as _;
use zksync_basic_types::{L1BatchNumber, L2ChainId};
use zksync_core::sync_layer::genesis::perform_genesis_if_needed;
use zksync_dal::{ConnectionPool, Server, ServerDals};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::AppHealthCheck;
use zksync_object_store::ObjectStoreFactory;
use zksync_snapshots_applier::SnapshotsApplierConfig;
Expand All @@ -20,13 +20,13 @@ enum InitDecision {
}

pub(crate) async fn ensure_storage_initialized(
pool: &ConnectionPool<Server>,
pool: &ConnectionPool<Core>,
main_node_client: &HttpClient,
app_health: &AppHealthCheck,
l2_chain_id: L2ChainId,
consider_snapshot_recovery: bool,
) -> anyhow::Result<()> {
let mut storage = pool.access_storage_tagged("en").await?;
let mut storage = pool.connection_tagged("en").await?;
let genesis_l1_batch = storage
.blocks_dal()
.get_l1_batch_header(L1BatchNumber(0))
Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) async fn ensure_storage_initialized(
tracing::info!("Chosen node initialization strategy: {decision:?}");
match decision {
InitDecision::Genesis => {
let mut storage = pool.access_storage_tagged("en").await?;
let mut storage = pool.connection_tagged("en").await?;
perform_genesis_if_needed(&mut storage, l2_chain_id, main_node_client)
.await
.context("performing genesis failed")?;
Expand Down
18 changes: 9 additions & 9 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use zksync_core::{
MainNodeClient, SyncState,
},
};
use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Server, ServerDals};
use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core, CoreDal};
use zksync_db_connection::healthcheck::ConnectionPoolHealthCheck;
use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck};
use zksync_state::PostgresStorageCaches;
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn build_state_keeper(
action_queue: ActionQueue,
state_keeper_db_path: String,
config: &ExternalNodeConfig,
connection_pool: ConnectionPool<Server>,
connection_pool: ConnectionPool<Core>,
sync_state: SyncState,
l2_erc20_bridge_addr: Address,
miniblock_sealer_handle: MiniblockSealerHandle,
Expand Down Expand Up @@ -113,7 +113,7 @@ async fn build_state_keeper(

async fn init_tasks(
config: &ExternalNodeConfig,
connection_pool: ConnectionPool<Server>,
connection_pool: ConnectionPool<Core>,
main_node_client: HttpClient,
task_handles: &mut Vec<task::JoinHandle<anyhow::Result<()>>>,
app_health: &AppHealthCheck,
Expand Down Expand Up @@ -143,7 +143,7 @@ async fn init_tasks(
task_handles.push(tokio::spawn(async move {
loop {
let protocol_version = pool
.access_storage()
.connection()
.await
.unwrap()
.protocol_versions_dal()
Expand Down Expand Up @@ -222,7 +222,7 @@ async fn init_tasks(
}
}));

let singleton_pool_builder = ConnectionPool::<Server>::singleton(&config.postgres.database_url);
let singleton_pool_builder = ConnectionPool::<Core>::singleton(&config.postgres.database_url);

let metadata_calculator_config = MetadataCalculatorConfig {
db_path: config.required.merkle_tree_path.clone(),
Expand Down Expand Up @@ -497,13 +497,13 @@ async fn main() -> anyhow::Result<()> {
config.consensus = None;
}
if let Some(threshold) = config.optional.slow_query_threshold() {
ConnectionPool::<Server>::global_config().set_slow_query_threshold(threshold)?;
ConnectionPool::<Core>::global_config().set_slow_query_threshold(threshold)?;
}
if let Some(threshold) = config.optional.long_connection_threshold() {
ConnectionPool::<Server>::global_config().set_long_connection_threshold(threshold)?;
ConnectionPool::<Core>::global_config().set_long_connection_threshold(threshold)?;
}

let connection_pool = ConnectionPool::<Server>::builder(
let connection_pool = ConnectionPool::<Core>::builder(
&config.postgres.database_url,
config.postgres.max_connections,
)
Expand Down Expand Up @@ -598,7 +598,7 @@ async fn main() -> anyhow::Result<()> {
}
if opt.revert_pending_l1_batch {
tracing::info!("Rolling pending L1 batch back..");
let mut connection = connection_pool.access_storage().await?;
let mut connection = connection_pool.connection().await?;
let sealed_l1_batch_number = connection
.blocks_dal()
.get_sealed_l1_batch_number()
Expand Down
6 changes: 3 additions & 3 deletions core/bin/external_node/src/version_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cmp::Ordering;

use anyhow::Context;
use zksync_basic_types::{L1BatchNumber, MiniblockNumber};
use zksync_dal::{ConnectionPool, Server, ServerDals};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_types::ProtocolVersionId;
use zksync_web3_decl::{
jsonrpsee::http_client::HttpClient,
Expand All @@ -27,12 +27,12 @@ pub async fn get_l1_batch_remote_protocol_version(

// Synchronizes protocol version in `l1_batches` and `miniblocks` tables between EN and main node.
pub async fn sync_versions(
connection_pool: ConnectionPool<Server>,
connection_pool: ConnectionPool<Core>,
main_node_client: HttpClient,
) -> anyhow::Result<()> {
tracing::info!("Starting syncing protocol version of blocks");

let mut connection = connection_pool.access_storage().await?;
let mut connection = connection_pool.connection().await?;

// Load the first local batch number with version 22.
let Some(local_first_v22_l1_batch) = connection
Expand Down
18 changes: 9 additions & 9 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use anyhow::Context as _;
use tokio::sync::Semaphore;
use zksync_config::SnapshotsCreatorConfig;
use zksync_dal::{ConnectionPool, Server, ServerDals, StorageProcessor};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_object_store::ObjectStore;
use zksync_types::{
snapshots::{
Expand Down Expand Up @@ -60,16 +60,16 @@ impl SnapshotProgress {
#[derive(Debug)]
pub(crate) struct SnapshotCreator {
pub blob_store: Arc<dyn ObjectStore>,
pub master_pool: ConnectionPool<Server>,
pub replica_pool: ConnectionPool<Server>,
pub master_pool: ConnectionPool<Core>,
pub replica_pool: ConnectionPool<Core>,
#[cfg(test)]
pub event_listener: Box<dyn HandleEvent>,
}

impl SnapshotCreator {
async fn connect_to_replica(&self) -> anyhow::Result<StorageProcessor<'_, Server>> {
async fn connect_to_replica(&self) -> anyhow::Result<Connection<'_, Core>> {
self.replica_pool
.access_storage_tagged("snapshots_creator")
.connection_tagged("snapshots_creator")
.await
}

Expand Down Expand Up @@ -124,7 +124,7 @@ impl SnapshotCreator {

let mut master_conn = self
.master_pool
.access_storage_tagged("snapshots_creator")
.connection_tagged("snapshots_creator")
.await?;
master_conn
.snapshots_dal()
Expand Down Expand Up @@ -192,7 +192,7 @@ impl SnapshotCreator {
config: &SnapshotsCreatorConfig,
min_chunk_count: u64,
latest_snapshot: Option<&SnapshotMetadata>,
conn: &mut StorageProcessor<'_, Server>,
conn: &mut Connection<'_, Core>,
) -> anyhow::Result<Option<SnapshotProgress>> {
// We subtract 1 so that after restore, EN node has at least one L1 batch to fetch
let sealed_l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await?;
Expand Down Expand Up @@ -237,7 +237,7 @@ impl SnapshotCreator {
) -> anyhow::Result<Option<SnapshotProgress>> {
let mut master_conn = self
.master_pool
.access_storage_tagged("snapshots_creator")
.connection_tagged("snapshots_creator")
.await?;
let latest_snapshot = master_conn
.snapshots_dal()
Expand Down Expand Up @@ -302,7 +302,7 @@ impl SnapshotCreator {

let mut master_conn = self
.master_pool
.access_storage_tagged("snapshots_creator")
.connection_tagged("snapshots_creator")
.await?;
master_conn
.snapshots_dal()
Expand Down
6 changes: 3 additions & 3 deletions core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zksync_config::{
configs::{ObservabilityConfig, PrometheusConfig},
PostgresConfig, SnapshotsCreatorConfig,
};
use zksync_dal::{ConnectionPool, Server};
use zksync_dal::{ConnectionPool, Core};
use zksync_env_config::{object_store::SnapshotsObjectStoreConfig, FromEnv};
use zksync_object_store::ObjectStoreFactory;

Expand Down Expand Up @@ -81,14 +81,14 @@ async fn main() -> anyhow::Result<()> {
let creator_config =
SnapshotsCreatorConfig::from_env().context("SnapshotsCreatorConfig::from_env")?;

let replica_pool = ConnectionPool::<Server>::builder(
let replica_pool = ConnectionPool::<Core>::builder(
postgres_config.replica_url()?,
creator_config.concurrent_queries_count,
)
.build()
.await?;

let master_pool = ConnectionPool::<Server>::singleton(postgres_config.master_url()?)
let master_pool = ConnectionPool::<Core>::singleton(postgres_config.master_url()?)
.build()
.await?;

Expand Down

0 comments on commit c072288

Please sign in to comment.