Skip to content

Commit

Permalink
feat(api): Implement TxSink abstraction (#1204)
Browse files Browse the repository at this point in the history
## What ❔

Implements a new abstraction: `TxSink` to be used in `TxSender`.
Right now we have to use either `TxProxy` or master pool (e.g. only one
component), but the code allows invalid configurations. Moreover, mixing
two approaches on the `TxSender` level is unreasonably clunky and
complex.
Having a single abstraction seems to be a better approach.

Additionally, it's a prerequisite for implementing an API layer for the
framework.

⚠️ There are quite a few possibilities for improvements here, but I
suggest not going down the rabbit hole. As long as it's perceived as an
incremental improvement, let's not focus on the adjacent code. We can
always revisit this place next time it becomes problematic.
  • Loading branch information
popzxc committed Feb 22, 2024
1 parent 66cdefc commit 11a34d4
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 143 deletions.
4 changes: 3 additions & 1 deletion checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -902,4 +902,6 @@ reimplementation
composability
md5
shivini
balancer
balancer
lookups
stateful
31 changes: 24 additions & 7 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zksync_core::{
api_server::{
execution_sandbox::VmConcurrencyLimiter,
healthcheck::HealthCheckHandle,
tx_sender::{ApiContracts, TxSenderBuilder},
tx_sender::{proxy::TxProxy, ApiContracts, TxSenderBuilder},
web3::{ApiBuilder, Namespace},
},
block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert},
Expand Down Expand Up @@ -275,11 +275,22 @@ async fn init_tasks(
let fee_params_fetcher_handle =
tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone()));

let (tx_sender, vm_barrier, cache_update_handle) = {
let tx_sender_builder =
TxSenderBuilder::new(config.clone().into(), connection_pool.clone())
.with_main_connection_pool(connection_pool.clone())
.with_tx_proxy(main_node_client);
let (tx_sender, vm_barrier, cache_update_handle, proxy_cache_updater_handle) = {
let tx_proxy = TxProxy::new(main_node_client);
let proxy_cache_updater_pool = singleton_pool_builder
.build()
.await
.context("failed to build a tree_pool")?;
let proxy_cache_updater_handle = tokio::spawn(
tx_proxy
.run_account_nonce_sweeper(proxy_cache_updater_pool.clone(), stop_receiver.clone()),
);

let tx_sender_builder = TxSenderBuilder::new(
config.clone().into(),
connection_pool.clone(),
Arc::new(tx_proxy),
);

if config.optional.transactions_per_sec_limit.is_some() {
tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored");
Expand Down Expand Up @@ -308,7 +319,12 @@ async fn init_tasks(
storage_caches,
)
.await;
(tx_sender, vm_barrier, cache_update_handle)
(
tx_sender,
vm_barrier,
cache_update_handle,
proxy_cache_updater_handle,
)
};

let http_server_handles =
Expand Down Expand Up @@ -359,6 +375,7 @@ async fn init_tasks(
task_handles.extend(http_server_handles.tasks);
task_handles.extend(ws_server_handles.tasks);
task_handles.extend(cache_update_handle);
task_handles.push(proxy_cache_updater_handle);
task_handles.extend([
sk_handle,
fee_address_migration_handle,
Expand Down
37 changes: 37 additions & 0 deletions core/lib/zksync_core/src/api_server/tx_sender/master_pool_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool};
use zksync_types::{fee::TransactionExecutionMetrics, l2::L2Tx};

use super::{tx_sink::TxSink, SubmitTxError};
use crate::metrics::{TxStage, APP_METRICS};

/// Wrapper for the master DB pool that allows to submit transactions to the mempool.
#[derive(Debug)]
pub struct MasterPoolSink {
master_pool: ConnectionPool,
}

impl MasterPoolSink {
pub fn new(master_pool: ConnectionPool) -> Self {
Self { master_pool }
}
}

#[async_trait::async_trait]
impl TxSink for MasterPoolSink {
async fn submit_tx(
&self,
tx: L2Tx,
execution_metrics: TransactionExecutionMetrics,
) -> Result<L2TxSubmissionResult, SubmitTxError> {
let submission_res_handle = self
.master_pool
.access_storage_tagged("api")
.await?
.transactions_dal()
.insert_transaction_l2(tx, execution_metrics)
.await;

APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
Ok(submission_res_handle)
}
}
85 changes: 24 additions & 61 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use zksync_types::{
MAX_NEW_FACTORY_DEPS, U256,
};
use zksync_utils::h256_to_u256;
use zksync_web3_decl::jsonrpsee::http_client::HttpClient;

pub(super) use self::{proxy::TxProxy, result::SubmitTxError};
pub(super) use self::result::SubmitTxError;
use self::tx_sink::TxSink;
use crate::{
api_server::{
execution_sandbox::{
Expand All @@ -38,15 +38,16 @@ use crate::{
tx_sender::result::ApiCallResult,
},
fee_model::BatchFeeModelInputProvider,
metrics::{TxStage, APP_METRICS},
state_keeper::seal_criteria::{ConditionalSealer, NoopSealer, SealData},
utils::pending_protocol_version,
};

mod proxy;
pub mod master_pool_sink;
pub mod proxy;
mod result;
#[cfg(test)]
pub(crate) mod tests;
pub mod tx_sink;

#[derive(Debug, Clone)]
pub struct MultiVMBaseSystemContracts {
Expand Down Expand Up @@ -141,21 +142,22 @@ pub struct TxSenderBuilder {
config: TxSenderConfig,
/// Connection pool for read requests.
replica_connection_pool: ConnectionPool,
/// Connection pool for write requests. If not set, `proxy` must be set.
master_connection_pool: Option<ConnectionPool>,
/// Proxy to submit transactions to the network. If not set, `master_connection_pool` must be set.
proxy: Option<TxProxy>,
/// Sink to be used to persist transactions.
tx_sink: Arc<dyn TxSink>,
/// Batch sealer used to check whether transaction can be executed by the sequencer.
sealer: Option<Arc<dyn ConditionalSealer>>,
}

impl TxSenderBuilder {
pub fn new(config: TxSenderConfig, replica_connection_pool: ConnectionPool) -> Self {
pub fn new(
config: TxSenderConfig,
replica_connection_pool: ConnectionPool,
tx_sink: Arc<dyn TxSink>,
) -> Self {
Self {
config,
replica_connection_pool,
master_connection_pool: None,
proxy: None,
tx_sink,
sealer: None,
}
}
Expand All @@ -165,38 +167,22 @@ impl TxSenderBuilder {
self
}

pub fn with_tx_proxy(mut self, client: HttpClient) -> Self {
self.proxy = Some(TxProxy::new(client));
self
}

pub fn with_main_connection_pool(mut self, master_connection_pool: ConnectionPool) -> Self {
self.master_connection_pool = Some(master_connection_pool);
self
}

pub async fn build(
self,
batch_fee_input_provider: Arc<dyn BatchFeeModelInputProvider>,
vm_concurrency_limiter: Arc<VmConcurrencyLimiter>,
api_contracts: ApiContracts,
storage_caches: PostgresStorageCaches,
) -> TxSender {
assert!(
self.master_connection_pool.is_some() || self.proxy.is_some(),
"Either master connection pool or proxy must be set"
);

// Use noop sealer if no sealer was explicitly provided.
let sealer = self.sealer.unwrap_or_else(|| Arc::new(NoopSealer));

TxSender(Arc::new(TxSenderInner {
sender_config: self.config,
master_connection_pool: self.master_connection_pool,
tx_sink: self.tx_sink,
replica_connection_pool: self.replica_connection_pool,
batch_fee_input_provider,
api_contracts,
proxy: self.proxy,
vm_concurrency_limiter,
storage_caches,
sealer,
Expand Down Expand Up @@ -244,13 +230,12 @@ impl TxSenderConfig {

pub struct TxSenderInner {
pub(super) sender_config: TxSenderConfig,
pub master_connection_pool: Option<ConnectionPool>,
/// Sink to be used to persist transactions.
pub tx_sink: Arc<dyn TxSink>,
pub replica_connection_pool: ConnectionPool,
// Used to keep track of gas prices for the fee ticker.
pub batch_fee_input_provider: Arc<dyn BatchFeeModelInputProvider>,
pub(super) api_contracts: ApiContracts,
/// Optional transaction proxy to be used for transaction submission.
pub(super) proxy: Option<TxProxy>,
/// Used to limit the amount of VMs that can be executed simultaneously.
pub(super) vm_concurrency_limiter: Arc<VmConcurrencyLimiter>,
// Caches used in VM execution.
Expand Down Expand Up @@ -348,41 +333,14 @@ impl TxSender {
let stage_started_at = Instant::now();
self.ensure_tx_executable(tx.clone().into(), &execution_output.metrics, true)?;

if let Some(proxy) = &self.0.proxy {
// We're running an external node: we have to proxy the transaction to the main node.
// But before we do that, save the tx to cache in case someone will request it
// Before it reaches the main node.
proxy.save_tx(tx.clone()).await;
proxy.submit_tx(&tx).await?;
// Now, after we are sure that the tx is on the main node, remove it from cache
// since we don't want to store txs that might have been replaced or otherwise removed
// from the mempool.
proxy.forget_tx(tx.hash()).await;
SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed());
APP_METRICS.processed_txs[&TxStage::Proxied].inc();
return Ok(L2TxSubmissionResult::Proxied);
} else {
assert!(
self.0.master_connection_pool.is_some(),
"TxSender is instantiated without both master connection pool and tx proxy"
);
}

let nonce = tx.common_data.nonce.0;
let hash = tx.hash();
let initiator_account = tx.initiator_account();
let submission_res_handle = self
.0
.master_connection_pool
.as_ref()
.unwrap() // Checked above
.access_storage_tagged("api")
.await?
.transactions_dal()
.insert_transaction_l2(tx, execution_output.metrics)
.await;

APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
.tx_sink
.submit_tx(tx, execution_output.metrics)
.await?;

match submission_res_handle {
L2TxSubmissionResult::AlreadyExecuted => {
Expand All @@ -399,6 +357,11 @@ impl TxSender {
))
}
L2TxSubmissionResult::Duplicate => Err(SubmitTxError::IncorrectTx(TxDuplication(hash))),
L2TxSubmissionResult::Proxied => {
SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy]
.observe(stage_started_at.elapsed());
Ok(submission_res_handle)
}
_ => {
SANDBOX_METRICS.submit_tx[&SubmitTxStage::DbInsert]
.observe(stage_started_at.elapsed());
Expand Down

0 comments on commit 11a34d4

Please sign in to comment.