diff --git a/checks-config/era.dic b/checks-config/era.dic index 36256d8e34a..d8248e0afa4 100644 --- a/checks-config/era.dic +++ b/checks-config/era.dic @@ -902,4 +902,6 @@ reimplementation composability md5 shivini -balancer \ No newline at end of file +balancer +lookups +stateful diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index b36e62e1612..735df320e7b 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -13,7 +13,7 @@ use zksync_core::{ api_server::{ execution_sandbox::VmConcurrencyLimiter, healthcheck::HealthCheckHandle, - tx_sender::{ApiContracts, TxSenderBuilder}, + tx_sender::{proxy::TxProxy, ApiContracts, TxSenderBuilder}, web3::{ApiBuilder, Namespace}, }, block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert}, @@ -275,11 +275,22 @@ async fn init_tasks( let fee_params_fetcher_handle = tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone())); - let (tx_sender, vm_barrier, cache_update_handle) = { - let tx_sender_builder = - TxSenderBuilder::new(config.clone().into(), connection_pool.clone()) - .with_main_connection_pool(connection_pool.clone()) - .with_tx_proxy(main_node_client); + let (tx_sender, vm_barrier, cache_update_handle, proxy_cache_updater_handle) = { + let tx_proxy = TxProxy::new(main_node_client); + let proxy_cache_updater_pool = singleton_pool_builder + .build() + .await + .context("failed to build a tree_pool")?; + let proxy_cache_updater_handle = tokio::spawn( + tx_proxy + .run_account_nonce_sweeper(proxy_cache_updater_pool.clone(), stop_receiver.clone()), + ); + + let tx_sender_builder = TxSenderBuilder::new( + config.clone().into(), + connection_pool.clone(), + Arc::new(tx_proxy), + ); if config.optional.transactions_per_sec_limit.is_some() { tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored"); @@ -308,7 +319,12 @@ async fn init_tasks( storage_caches, ) .await; - (tx_sender, vm_barrier, cache_update_handle) + ( + tx_sender, + vm_barrier, + cache_update_handle, + proxy_cache_updater_handle, + ) }; let http_server_handles = @@ -359,6 +375,7 @@ async fn init_tasks( task_handles.extend(http_server_handles.tasks); task_handles.extend(ws_server_handles.tasks); task_handles.extend(cache_update_handle); + task_handles.push(proxy_cache_updater_handle); task_handles.extend([ sk_handle, fee_address_migration_handle, diff --git a/core/lib/zksync_core/src/api_server/tx_sender/master_pool_sink.rs b/core/lib/zksync_core/src/api_server/tx_sender/master_pool_sink.rs new file mode 100644 index 00000000000..f6fc7e72937 --- /dev/null +++ b/core/lib/zksync_core/src/api_server/tx_sender/master_pool_sink.rs @@ -0,0 +1,37 @@ +use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool}; +use zksync_types::{fee::TransactionExecutionMetrics, l2::L2Tx}; + +use super::{tx_sink::TxSink, SubmitTxError}; +use crate::metrics::{TxStage, APP_METRICS}; + +/// Wrapper for the master DB pool that allows to submit transactions to the mempool. +#[derive(Debug)] +pub struct MasterPoolSink { + master_pool: ConnectionPool, +} + +impl MasterPoolSink { + pub fn new(master_pool: ConnectionPool) -> Self { + Self { master_pool } + } +} + +#[async_trait::async_trait] +impl TxSink for MasterPoolSink { + async fn submit_tx( + &self, + tx: L2Tx, + execution_metrics: TransactionExecutionMetrics, + ) -> Result { + let submission_res_handle = self + .master_pool + .access_storage_tagged("api") + .await? + .transactions_dal() + .insert_transaction_l2(tx, execution_metrics) + .await; + + APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc(); + Ok(submission_res_handle) + } +} diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index 8fe0380e751..ba146303827 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -25,9 +25,9 @@ use zksync_types::{ MAX_NEW_FACTORY_DEPS, U256, }; use zksync_utils::h256_to_u256; -use zksync_web3_decl::jsonrpsee::http_client::HttpClient; -pub(super) use self::{proxy::TxProxy, result::SubmitTxError}; +pub(super) use self::result::SubmitTxError; +use self::tx_sink::TxSink; use crate::{ api_server::{ execution_sandbox::{ @@ -38,15 +38,16 @@ use crate::{ tx_sender::result::ApiCallResult, }, fee_model::BatchFeeModelInputProvider, - metrics::{TxStage, APP_METRICS}, state_keeper::seal_criteria::{ConditionalSealer, NoopSealer, SealData}, utils::pending_protocol_version, }; -mod proxy; +pub mod master_pool_sink; +pub mod proxy; mod result; #[cfg(test)] pub(crate) mod tests; +pub mod tx_sink; #[derive(Debug, Clone)] pub struct MultiVMBaseSystemContracts { @@ -141,21 +142,22 @@ pub struct TxSenderBuilder { config: TxSenderConfig, /// Connection pool for read requests. replica_connection_pool: ConnectionPool, - /// Connection pool for write requests. If not set, `proxy` must be set. - master_connection_pool: Option, - /// Proxy to submit transactions to the network. If not set, `master_connection_pool` must be set. - proxy: Option, + /// Sink to be used to persist transactions. + tx_sink: Arc, /// Batch sealer used to check whether transaction can be executed by the sequencer. sealer: Option>, } impl TxSenderBuilder { - pub fn new(config: TxSenderConfig, replica_connection_pool: ConnectionPool) -> Self { + pub fn new( + config: TxSenderConfig, + replica_connection_pool: ConnectionPool, + tx_sink: Arc, + ) -> Self { Self { config, replica_connection_pool, - master_connection_pool: None, - proxy: None, + tx_sink, sealer: None, } } @@ -165,16 +167,6 @@ impl TxSenderBuilder { self } - pub fn with_tx_proxy(mut self, client: HttpClient) -> Self { - self.proxy = Some(TxProxy::new(client)); - self - } - - pub fn with_main_connection_pool(mut self, master_connection_pool: ConnectionPool) -> Self { - self.master_connection_pool = Some(master_connection_pool); - self - } - pub async fn build( self, batch_fee_input_provider: Arc, @@ -182,21 +174,15 @@ impl TxSenderBuilder { api_contracts: ApiContracts, storage_caches: PostgresStorageCaches, ) -> TxSender { - assert!( - self.master_connection_pool.is_some() || self.proxy.is_some(), - "Either master connection pool or proxy must be set" - ); - // Use noop sealer if no sealer was explicitly provided. let sealer = self.sealer.unwrap_or_else(|| Arc::new(NoopSealer)); TxSender(Arc::new(TxSenderInner { sender_config: self.config, - master_connection_pool: self.master_connection_pool, + tx_sink: self.tx_sink, replica_connection_pool: self.replica_connection_pool, batch_fee_input_provider, api_contracts, - proxy: self.proxy, vm_concurrency_limiter, storage_caches, sealer, @@ -244,13 +230,12 @@ impl TxSenderConfig { pub struct TxSenderInner { pub(super) sender_config: TxSenderConfig, - pub master_connection_pool: Option, + /// Sink to be used to persist transactions. + pub tx_sink: Arc, pub replica_connection_pool: ConnectionPool, // Used to keep track of gas prices for the fee ticker. pub batch_fee_input_provider: Arc, pub(super) api_contracts: ApiContracts, - /// Optional transaction proxy to be used for transaction submission. - pub(super) proxy: Option, /// Used to limit the amount of VMs that can be executed simultaneously. pub(super) vm_concurrency_limiter: Arc, // Caches used in VM execution. @@ -348,41 +333,14 @@ impl TxSender { let stage_started_at = Instant::now(); self.ensure_tx_executable(tx.clone().into(), &execution_output.metrics, true)?; - if let Some(proxy) = &self.0.proxy { - // We're running an external node: we have to proxy the transaction to the main node. - // But before we do that, save the tx to cache in case someone will request it - // Before it reaches the main node. - proxy.save_tx(tx.clone()).await; - proxy.submit_tx(&tx).await?; - // Now, after we are sure that the tx is on the main node, remove it from cache - // since we don't want to store txs that might have been replaced or otherwise removed - // from the mempool. - proxy.forget_tx(tx.hash()).await; - SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed()); - APP_METRICS.processed_txs[&TxStage::Proxied].inc(); - return Ok(L2TxSubmissionResult::Proxied); - } else { - assert!( - self.0.master_connection_pool.is_some(), - "TxSender is instantiated without both master connection pool and tx proxy" - ); - } - let nonce = tx.common_data.nonce.0; let hash = tx.hash(); let initiator_account = tx.initiator_account(); let submission_res_handle = self .0 - .master_connection_pool - .as_ref() - .unwrap() // Checked above - .access_storage_tagged("api") - .await? - .transactions_dal() - .insert_transaction_l2(tx, execution_output.metrics) - .await; - - APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc(); + .tx_sink + .submit_tx(tx, execution_output.metrics) + .await?; match submission_res_handle { L2TxSubmissionResult::AlreadyExecuted => { @@ -399,6 +357,11 @@ impl TxSender { )) } L2TxSubmissionResult::Duplicate => Err(SubmitTxError::IncorrectTx(TxDuplication(hash))), + L2TxSubmissionResult::Proxied => { + SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy] + .observe(stage_started_at.elapsed()); + Ok(submission_res_handle) + } _ => { SANDBOX_METRICS.submit_tx[&SubmitTxStage::DbInsert] .observe(stage_started_at.elapsed()); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 5cd22874598..4a702fbbeed 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -6,18 +6,25 @@ use std::{ }; use tokio::sync::{watch, RwLock}; -use zksync_dal::ConnectionPool; +use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool}; use zksync_types::{ api::{BlockId, Transaction, TransactionDetails, TransactionId}, + fee::TransactionExecutionMetrics, l2::L2Tx, Address, Nonce, H256, }; use zksync_web3_decl::{ - error::{ClientRpcContext, EnrichedClientResult}, + error::{ClientRpcContext, EnrichedClientResult, Web3Error}, jsonrpsee::http_client::HttpClient, namespaces::{EthNamespaceClient, ZksNamespaceClient}, }; +use super::{tx_sink::TxSink, SubmitTxError}; +use crate::{ + api_server::web3::backend_jsonrpsee::internal_error, + metrics::{TxStage, APP_METRICS}, +}; + #[derive(Debug, Clone, Default)] pub(crate) struct TxCache { inner: Arc>, @@ -116,29 +123,37 @@ impl TxProxy { } } - pub async fn find_tx(&self, tx_hash: H256) -> Option { - self.tx_cache.get_tx(tx_hash).await + async fn submit_tx_impl(&self, tx: &L2Tx) -> EnrichedClientResult { + let input_data = tx.common_data.input_data().expect("raw tx is absent"); + let raw_tx = zksync_types::Bytes(input_data.to_vec()); + let tx_hash = tx.hash(); + tracing::info!("Proxying tx {tx_hash:?}"); + self.client + .send_raw_transaction(raw_tx) + .rpc_context("send_raw_transaction") + .with_arg("tx_hash", &tx_hash) + .await } - pub async fn forget_tx(&self, tx_hash: H256) { - self.tx_cache.remove_tx(tx_hash).await; + async fn save_tx(&self, tx: L2Tx) { + self.tx_cache.push(tx).await; } - pub async fn save_tx(&self, tx: L2Tx) { - self.tx_cache.push(tx).await; + async fn find_tx(&self, tx_hash: H256) -> Option { + self.tx_cache.get_tx(tx_hash).await } - pub async fn get_nonces_by_account(&self, account_address: Address) -> BTreeSet { - self.tx_cache.get_nonces_for_account(account_address).await + async fn forget_tx(&self, tx_hash: H256) { + self.tx_cache.remove_tx(tx_hash).await; } - pub async fn next_nonce_by_initiator_account( + async fn next_nonce_by_initiator_account( &self, account_address: Address, current_nonce: u32, ) -> Nonce { let mut pending_nonce = Nonce(current_nonce); - let nonces = self.get_nonces_by_account(account_address).await; + let nonces = self.tx_cache.get_nonces_for_account(account_address).await; for nonce in nonces.range(pending_nonce + 1..) { // If nonce is not sequential, then we should not increment nonce. if nonce == &pending_nonce { @@ -151,19 +166,7 @@ impl TxProxy { pending_nonce } - pub async fn submit_tx(&self, tx: &L2Tx) -> EnrichedClientResult { - let input_data = tx.common_data.input_data().expect("raw tx is absent"); - let raw_tx = zksync_types::Bytes(input_data.to_vec()); - let tx_hash = tx.hash(); - tracing::info!("Proxying tx {tx_hash:?}"); - self.client - .send_raw_transaction(raw_tx) - .rpc_context("send_raw_transaction") - .with_arg("tx_hash", &tx_hash) - .await - } - - pub async fn request_tx(&self, id: TransactionId) -> EnrichedClientResult> { + async fn request_tx(&self, id: TransactionId) -> EnrichedClientResult> { match id { TransactionId::Block(BlockId::Hash(block), index) => { self.client @@ -191,7 +194,7 @@ impl TxProxy { } } - pub async fn request_tx_details( + async fn request_tx_details( &self, hash: H256, ) -> EnrichedClientResult> { @@ -211,3 +214,67 @@ impl TxProxy { tx_cache.run_updates(pool, stop_receiver) } } + +#[async_trait::async_trait] +impl TxSink for TxProxy { + async fn submit_tx( + &self, + tx: L2Tx, + _execution_metrics: TransactionExecutionMetrics, + ) -> Result { + // We're running an external node: we have to proxy the transaction to the main node. + // But before we do that, save the tx to cache in case someone will request it + // Before it reaches the main node. + self.save_tx(tx.clone()).await; + self.submit_tx_impl(&tx).await?; + // Now, after we are sure that the tx is on the main node, remove it from cache + // since we don't want to store txs that might have been replaced or otherwise removed + // from the mempool. + self.forget_tx(tx.hash()).await; + APP_METRICS.processed_txs[&TxStage::Proxied].inc(); + Ok(L2TxSubmissionResult::Proxied) + } + + async fn lookup_pending_nonce( + &self, + _method_name: &'static str, + account_address: Address, + last_known_nonce: u32, + ) -> Result, Web3Error> { + // EN: get pending nonces from the transaction cache + // We don't have mempool in EN, it's safe to use the proxy cache as a mempool + Ok(Some( + self.next_nonce_by_initiator_account(account_address, last_known_nonce) + .await + .0 + .into(), + )) + } + + async fn lookup_tx( + &self, + method_name: &'static str, + id: TransactionId, + ) -> Result, Web3Error> { + if let TransactionId::Hash(hash) = id { + // If the transaction is not in the db, check the cache + if let Some(tx) = self.find_tx(hash).await { + return Ok(Some(tx.into())); + } + } + // If the transaction is not in the cache, query main node + self.request_tx(id) + .await + .map_err(|err| internal_error(method_name, err)) + } + + async fn lookup_tx_details( + &self, + method_name: &'static str, + hash: H256, + ) -> Result, Web3Error> { + self.request_tx_details(hash) + .await + .map_err(|err| internal_error(method_name, err)) + } +} diff --git a/core/lib/zksync_core/src/api_server/tx_sender/tx_sink.rs b/core/lib/zksync_core/src/api_server/tx_sender/tx_sink.rs new file mode 100644 index 00000000000..0cf353470ab --- /dev/null +++ b/core/lib/zksync_core/src/api_server/tx_sender/tx_sink.rs @@ -0,0 +1,63 @@ +use zksync_dal::transactions_dal::L2TxSubmissionResult; +use zksync_types::{ + api::{Transaction, TransactionDetails, TransactionId}, + fee::TransactionExecutionMetrics, + l2::L2Tx, + Address, Nonce, H256, +}; +use zksync_web3_decl::error::Web3Error; + +use super::SubmitTxError; + +/// An abstraction of "destination" for transactions that should be propagated to the mempool. +/// +/// By default, `TxSender` always has access to the Postgres replica pool, but this only provides read-only progress. +/// However, `TxSender` should be able to propagate transactions as well, and for that purpose the implementation may +/// be different. For example, main node has access to the master pool, and external node has a proxy that submits +/// transaction to the main node. +/// +/// Both approaches represent different implementations of `TxSink` trait. +/// +/// Additionally, `TxSink` may be stateful: e.g. if the effects of transaction submission are not immediately visible +/// through the replica pool, `TxSink` may implement methods to allow cache-like lookups. These methods are not mandatory +/// and may be implemented as no-ops. +#[async_trait::async_trait] +pub trait TxSink: std::fmt::Debug + Send + Sync + 'static { + /// Ensures that transaction is propagated to the mempool. + async fn submit_tx( + &self, + tx: L2Tx, + execution_metrics: TransactionExecutionMetrics, + ) -> Result; + + /// Attempts to look up the pending nonce for the account in the sink-specific storage. + /// By default, returns `Ok(None)`. + async fn lookup_pending_nonce( + &self, + _method_name: &'static str, + _account_address: Address, + _last_known_nonce: u32, + ) -> Result, Web3Error> { + Ok(None) + } + + /// Attempts to look up the transaction by its API ID in the sink-specific storage. + /// By default, returns `Ok(None)`. + async fn lookup_tx( + &self, + _method_name: &'static str, + _id: TransactionId, + ) -> Result, Web3Error> { + Ok(None) + } + + /// Attempts to look up the transaction details by its hash in the sink-specific storage. + /// By default, returns `Ok(None)`. + async fn lookup_tx_details( + &self, + _method_name: &'static str, + _hash: H256, + ) -> Result, Web3Error> { + Ok(None) + } +} diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 2e007d94673..cf055b946c0 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -427,11 +427,6 @@ impl FullApiParams { ); let mut tasks = vec![tokio::spawn(update_task)]; - if let Some(tx_proxy) = &self.tx_sender.0.proxy { - let task = tx_proxy - .run_account_nonce_sweeper(self.updaters_pool.clone(), stop_receiver.clone()); - tasks.push(tokio::spawn(task)); - } let pub_sub = if matches!(transport, ApiTransport::WebSocket(_)) && self.namespaces.contains(&Namespace::Pubsub) { diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 76780075f03..0f3590a54fc 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -485,16 +485,17 @@ impl EthNamespace { if matches!(block_id, BlockId::Number(BlockNumber::Pending)) { let account_nonce_u64 = u64::try_from(account_nonce) .map_err(|err| internal_error(method_name, anyhow::anyhow!(err)))?; - account_nonce = if let Some(proxy) = &self.state.tx_sender.0.proxy { - // EN: get pending nonces from the transaction cache - // We don't have mempool in EN, it's safe to use the proxy cache as a mempool - proxy - .next_nonce_by_initiator_account(address, account_nonce_u64 as u32) - .await - .0 - .into() + account_nonce = if let Some(account_nonce) = self + .state + .tx_sender + .0 + .tx_sink + .lookup_pending_nonce(method_name, address, account_nonce_u64 as u32) + .await? + { + account_nonce.0.into() } else { - // Main node: get pending nonces from the mempool + // No nonce hint in the sink: get pending nonces from the mempool connection .transactions_web3_dal() .next_nonce_by_initiator_account(address, account_nonce_u64) @@ -527,27 +528,14 @@ impl EthNamespace { .await .map_err(|err| internal_error(METHOD_NAME, err)); - if let Some(proxy) = &self.state.tx_sender.0.proxy { - // We're running an external node - check the proxy cache in - // case the transaction was proxied but not yet synced back to us - if let Ok(Some(tx)) = &transaction { - // If the transaction is already in the db, remove it from cache - proxy.forget_tx(tx.hash).await - } else { - if let TransactionId::Hash(hash) = id { - // If the transaction is not in the db, check the cache - if let Some(tx) = proxy.find_tx(hash).await { - transaction = Ok(Some(tx.into())); - } - } - if !matches!(transaction, Ok(Some(_))) { - // If the transaction is not in the db or cache, query main node - transaction = proxy - .request_tx(id) - .await - .map_err(|err| internal_error(METHOD_NAME, err)); - } - } + if let Ok(None) = transaction { + transaction = self + .state + .tx_sender + .0 + .tx_sink + .lookup_tx(METHOD_NAME, id) + .await; } method_latency.observe(); diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs index 6543e37e393..6bef4b5e234 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs @@ -480,16 +480,14 @@ impl ZksNamespace { .map_err(|err| internal_error(METHOD_NAME, err)); drop(storage); - if let Some(proxy) = &self.state.tx_sender.0.proxy { - // We're running an external node - we should query the main node directly - // in case the transaction was proxied but not yet synced back to us - if matches!(tx_details, Ok(None)) { - // If the transaction is not in the db, query main node for details - tx_details = proxy - .request_tx_details(hash) - .await - .map_err(|err| internal_error(METHOD_NAME, err)); - } + if let Ok(None) = tx_details { + tx_details = self + .state + .tx_sender + .0 + .tx_sink + .lookup_tx_details(METHOD_NAME, hash) + .await; } method_latency.observe(); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index f7b1f9fb4a7..92416d6a686 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -3,6 +3,7 @@ use std::{net::Ipv4Addr, str::FromStr, sync::Arc, time::Instant}; use anyhow::Context as _; +use api_server::tx_sender::master_pool_sink::MasterPoolSink; use fee_model::{ApiFeeInputProvider, BatchFeeModelInputProvider, MainNodeFeeInputProvider}; use futures::channel::oneshot; use prometheus_exporter::PrometheusExporterConfig; @@ -1101,9 +1102,13 @@ async fn build_tx_sender( storage_caches: PostgresStorageCaches, ) -> (TxSender, VmConcurrencyBarrier) { let sequencer_sealer = SequencerSealer::new(state_keeper_config.clone()); - let tx_sender_builder = TxSenderBuilder::new(tx_sender_config.clone(), replica_pool.clone()) - .with_main_connection_pool(master_pool) - .with_sealer(Arc::new(sequencer_sealer)); + let master_pool_sink = MasterPoolSink::new(master_pool); + let tx_sender_builder = TxSenderBuilder::new( + tx_sender_config.clone(), + replica_pool.clone(), + Arc::new(master_pool_sink), + ) + .with_sealer(Arc::new(sequencer_sealer)); let max_concurrency = web3_json_config.vm_concurrency_limit(); let (vm_concurrency_limiter, vm_barrier) = VmConcurrencyLimiter::new(max_concurrency);