diff --git a/core/lib/dal/.sqlx/query-24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b.json b/core/lib/dal/.sqlx/query-24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b.json new file mode 100644 index 00000000000..d28ef3efdc2 --- /dev/null +++ b/core/lib/dal/.sqlx/query-24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n initiator_address,\n MAX(nonce)\n FROM\n transactions\n WHERE\n initiator_address = ANY ($1)\n AND is_priority = FALSE\n AND (\n miniblock_number IS NOT NULL\n OR error IS NULL\n )\n GROUP BY\n initiator_address\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "initiator_address", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "max", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b" +} diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index dbb4f2a58b8..573fbe9f53a 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, ops}; use zksync_types::{ get_code_key, get_nonce_key, utils::{decompose_full_nonce, storage_key_for_standard_token_balance}, - AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, + AccountTreeId, Address, L1BatchNumber, MiniblockNumber, Nonce, StorageKey, FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, U256, }; use zksync_utils::h256_to_u256; @@ -32,6 +32,30 @@ impl StorageWeb3Dal<'_, '_> { Ok(decompose_full_nonce(full_nonce).0) } + /// Returns the current *stored* nonces (i.e., w/o accounting for pending transactions) for the specified accounts. + pub async fn get_nonces_for_addresses( + &mut self, + addresses: &[Address], + ) -> sqlx::Result> { + let nonce_keys: HashMap<_, _> = addresses + .iter() + .map(|address| (get_nonce_key(address).hashed_key(), *address)) + .collect(); + + let res = self + .get_values(&nonce_keys.keys().copied().collect::>()) + .await? + .into_iter() + .filter_map(|(hashed_key, value)| { + let address = nonce_keys.get(&hashed_key)?; + let full_nonce = h256_to_u256(value); + let (nonce, _) = decompose_full_nonce(full_nonce); + Some((*address, Nonce(nonce.as_u32()))) + }) + .collect(); + Ok(res) + } + pub async fn standard_token_historical_balance( &mut self, token_id: AccountTreeId, diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index 51c817b8fbe..8cf1dd3a96c 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -351,7 +351,7 @@ impl TxSender { // We're running an external node: we have to proxy the transaction to the main node. // But before we do that, save the tx to cache in case someone will request it // Before it reaches the main node. - proxy.save_tx(tx.hash(), tx.clone()).await; + proxy.save_tx(tx.clone()).await; proxy.submit_tx(&tx).await?; // Now, after we are sure that the tx is on the main node, remove it from cache // since we don't want to store txs that might have been replaced or otherwise removed diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index e99899d7cff..a16ee2d07bb 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -1,10 +1,16 @@ -use std::collections::HashMap; +use std::{ + collections::{BTreeSet, HashMap}, + future::Future, + sync::Arc, + time::Duration, +}; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; +use zksync_dal::ConnectionPool; use zksync_types::{ api::{BlockId, Transaction, TransactionDetails, TransactionId}, l2::L2Tx, - H256, + Address, Nonce, H256, }; use zksync_web3_decl::{ error::{ClientRpcContext, EnrichedClientResult}, @@ -12,11 +18,93 @@ use zksync_web3_decl::{ namespaces::{EthNamespaceClient, ZksNamespaceClient}, }; +#[derive(Debug, Clone, Default)] +pub(crate) struct TxCache { + inner: Arc>, +} + +#[derive(Debug, Default)] +struct TxCacheInner { + tx_cache: HashMap, + nonces_by_account: HashMap>, +} + +impl TxCache { + async fn push(&self, tx: L2Tx) { + let mut inner = self.inner.write().await; + inner + .nonces_by_account + .entry(tx.initiator_account()) + .or_default() + .insert(tx.nonce()); + inner.tx_cache.insert(tx.hash(), tx); + } + + async fn get_tx(&self, tx_hash: H256) -> Option { + self.inner.read().await.tx_cache.get(&tx_hash).cloned() + } + + async fn get_nonces_for_account(&self, account_address: Address) -> BTreeSet { + let inner = self.inner.read().await; + if let Some(nonces) = inner.nonces_by_account.get(&account_address) { + nonces.clone() + } else { + BTreeSet::new() + } + } + + async fn remove_tx(&self, tx_hash: H256) { + self.inner.write().await.tx_cache.remove(&tx_hash); + // We intentionally don't change `nonces_by_account`; they should only be changed in response to new miniblocks + } + + async fn run_updates( + self, + pool: ConnectionPool, + stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + const UPDATE_INTERVAL: Duration = Duration::from_secs(1); + + loop { + if *stop_receiver.borrow() { + return Ok(()); + } + + let addresses: Vec<_> = { + // Split into 2 statements for readability. + let inner = self.inner.read().await; + inner.nonces_by_account.keys().copied().collect() + }; + let mut storage = pool.access_storage_tagged("api").await?; + let nonces_for_accounts = storage + .storage_web3_dal() + .get_nonces_for_addresses(&addresses) + .await?; + drop(storage); // Don't hold both `storage` and lock on `inner` at the same time. + + let mut inner = self.inner.write().await; + inner.nonces_by_account.retain(|address, account_nonces| { + let stored_nonce = nonces_for_accounts + .get(address) + .copied() + .unwrap_or(Nonce(0)); + // Retain only nonces starting from the stored one. + *account_nonces = account_nonces.split_off(&stored_nonce); + // If we've removed all nonces, drop the account entry so we don't request stored nonces for it later. + !account_nonces.is_empty() + }); + drop(inner); + + tokio::time::sleep(UPDATE_INTERVAL).await; + } + } +} + /// Used by external node to proxy transaction to the main node /// and store them while they're not synced back yet #[derive(Debug)] pub struct TxProxy { - tx_cache: RwLock>, + tx_cache: TxCache, client: HttpClient, } @@ -25,20 +113,43 @@ impl TxProxy { let client = HttpClientBuilder::default().build(main_node_url).unwrap(); Self { client, - tx_cache: RwLock::new(HashMap::new()), + tx_cache: TxCache::default(), } } pub async fn find_tx(&self, tx_hash: H256) -> Option { - self.tx_cache.read().await.get(&tx_hash).cloned() + self.tx_cache.get_tx(tx_hash).await } pub async fn forget_tx(&self, tx_hash: H256) { - self.tx_cache.write().await.remove(&tx_hash); + self.tx_cache.remove_tx(tx_hash).await; + } + + pub async fn save_tx(&self, tx: L2Tx) { + self.tx_cache.push(tx).await; } - pub async fn save_tx(&self, tx_hash: H256, tx: L2Tx) { - self.tx_cache.write().await.insert(tx_hash, tx); + pub async fn get_nonces_by_account(&self, account_address: Address) -> BTreeSet { + self.tx_cache.get_nonces_for_account(account_address).await + } + + pub async fn next_nonce_by_initiator_account( + &self, + account_address: Address, + current_nonce: u32, + ) -> Nonce { + let mut pending_nonce = Nonce(current_nonce); + let nonces = self.get_nonces_by_account(account_address).await; + for nonce in nonces.range(pending_nonce + 1..) { + // If nonce is not sequential, then we should not increment nonce. + if nonce == &pending_nonce { + pending_nonce += 1; + } else { + break; + } + } + + pending_nonce } pub async fn submit_tx(&self, tx: &L2Tx) -> EnrichedClientResult { @@ -91,4 +202,13 @@ impl TxProxy { .with_arg("hash", &hash) .await } + + pub fn run_account_nonce_sweeper( + &self, + pool: ConnectionPool, + stop_receiver: watch::Receiver, + ) -> impl Future> { + let tx_cache = self.tx_cache.clone(); + tx_cache.run_updates(pool, stop_receiver) + } } diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 7b304f4f2df..97eeb790c31 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -122,7 +122,7 @@ struct OptionalApiParams { #[derive(Debug)] struct FullApiParams { pool: ConnectionPool, - last_miniblock_pool: ConnectionPool, + updaters_pool: ConnectionPool, config: InternalApiConfig, transport: ApiTransport, tx_sender: TxSender, @@ -135,7 +135,7 @@ struct FullApiParams { #[derive(Debug)] pub struct ApiBuilder { pool: ConnectionPool, - last_miniblock_pool: ConnectionPool, + updaters_pool: ConnectionPool, config: InternalApiConfig, polling_interval: Duration, // Mandatory params that must be set using builder methods. @@ -153,7 +153,7 @@ impl ApiBuilder { pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool) -> Self { Self { - last_miniblock_pool: pool.clone(), + updaters_pool: pool.clone(), pool, config, polling_interval: Self::DEFAULT_POLLING_INTERVAL, @@ -175,11 +175,12 @@ impl ApiBuilder { self } - /// Configures a dedicated DB pool to be used for updating the latest miniblock information + /// Configures a dedicated DB pool to be used for updating different information, + /// such as last mined block number or account nonces. This pool is used to execute /// in a background task. If not called, the main pool will be used. If the API server is under high load, /// it may make sense to supply a single-connection pool to reduce pool contention with the API methods. - pub fn with_last_miniblock_pool(mut self, pool: ConnectionPool) -> Self { - self.last_miniblock_pool = pool; + pub fn with_updaters_pool(mut self, pool: ConnectionPool) -> Self { + self.updaters_pool = pool; self } @@ -247,7 +248,7 @@ impl ApiBuilder { fn into_full_params(self) -> anyhow::Result { Ok(FullApiParams { pool: self.pool, - last_miniblock_pool: self.last_miniblock_pool, + updaters_pool: self.updaters_pool, config: self.config, transport: self.transport.context("API transport not set")?, tx_sender: self.tx_sender.context("Transaction sender not set")?, @@ -278,10 +279,7 @@ impl FullApiParams { self, last_sealed_miniblock: SealedMiniblockNumber, ) -> anyhow::Result { - let mut storage = self - .last_miniblock_pool - .access_storage_tagged("api") - .await?; + let mut storage = self.updaters_pool.access_storage_tagged("api").await?; let start_info = BlockStartInfo::new(&mut storage).await?; drop(storage); @@ -409,12 +407,17 @@ impl FullApiParams { let (health_check, health_updater) = ReactiveHealthCheck::new(health_check_name); let (last_sealed_miniblock, update_task) = SealedMiniblockNumber::new( - self.last_miniblock_pool.clone(), + self.updaters_pool.clone(), SEALED_MINIBLOCK_UPDATE_INTERVAL, stop_receiver.clone(), ); - let mut tasks = vec![tokio::spawn(update_task)]; + let mut tasks = vec![tokio::spawn(update_task)]; + if let Some(tx_proxy) = &self.tx_sender.0.proxy { + let task = tx_proxy + .run_account_nonce_sweeper(self.updaters_pool.clone(), stop_receiver.clone()); + tasks.push(tokio::spawn(task)); + } let pub_sub = if matches!(transport, ApiTransport::WebSocket(_)) && self.namespaces.contains(&Namespace::Pubsub) { diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 86f7e8a5676..5ffbabf834d 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -7,8 +7,10 @@ use zksync_types::{ l2::{L2Tx, TransactionType}, transaction_request::CallRequest, utils::decompose_full_nonce, - web3, - web3::types::{FeeHistory, SyncInfo, SyncState}, + web3::{ + self, + types::{FeeHistory, SyncInfo, SyncState}, + }, AccountTreeId, Bytes, MiniblockNumber, StorageKey, H256, L2_ETH_TOKEN_ADDRESS, U256, }; use zksync_utils::u256_to_h256; @@ -302,6 +304,7 @@ impl EthNamespace { const METHOD_NAME: &str = "get_block_transaction_count"; let method_latency = API_METRICS.start_block_call(METHOD_NAME, block_id); + self.state.start_info.ensure_not_pruned(block_id)?; let tx_count = self .state @@ -482,11 +485,22 @@ impl EthNamespace { if matches!(block_id, BlockId::Number(BlockNumber::Pending)) { let account_nonce_u64 = u64::try_from(account_nonce) .map_err(|err| internal_error(method_name, anyhow::anyhow!(err)))?; - account_nonce = connection - .transactions_web3_dal() - .next_nonce_by_initiator_account(address, account_nonce_u64) - .await - .map_err(|err| internal_error(method_name, err))?; + account_nonce = if let Some(proxy) = &self.state.tx_sender.0.proxy { + // EN: get pending nonces from the transaction cache + // We don't have mempool in EN, it's safe to use the proxy cache as a mempool + proxy + .next_nonce_by_initiator_account(address, account_nonce_u64 as u32) + .await + .0 + .into() + } else { + // Main node: get pending nonces from the mempool + connection + .transactions_web3_dal() + .next_nonce_by_initiator_account(address, account_nonce_u64) + .await + .map_err(|err| internal_error(method_name, err))? + }; } let block_diff = self.state.last_sealed_miniblock.diff(block_number); diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index 8af73f13444..d0ed762f5e1 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -427,6 +427,7 @@ impl StateKeeperRunner { let (stop_sender, stop_receiver) = sync::watch::channel(false); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(self.pool.clone(), 5); + let io = ExternalIO::new( miniblock_sealer_handle, self.pool.clone(), diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 4bbbc7551b5..baf622375b3 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -1154,7 +1154,7 @@ async fn run_http_api( } namespaces.push(Namespace::Snapshots); - let last_miniblock_pool = ConnectionPool::singleton(postgres_config.replica_url()?) + let updaters_pool = ConnectionPool::builder(postgres_config.replica_url()?, 2) .build() .await .context("failed to build last_miniblock_pool")?; @@ -1162,7 +1162,7 @@ async fn run_http_api( let api_builder = web3::ApiBuilder::jsonrpsee_backend(internal_api.clone(), replica_connection_pool) .http(api_config.web3_json_rpc.http_port) - .with_last_miniblock_pool(last_miniblock_pool) + .with_updaters_pool(updaters_pool) .with_filter_limit(api_config.web3_json_rpc.filters_limit()) .with_tree_api(api_config.web3_json_rpc.tree_api_url()) .with_batch_request_size_limit(api_config.web3_json_rpc.max_batch_request_size()) @@ -1206,7 +1206,7 @@ async fn run_ws_api( let api_builder = web3::ApiBuilder::jsonrpsee_backend(internal_api.clone(), replica_connection_pool) .ws(api_config.web3_json_rpc.ws_port) - .with_last_miniblock_pool(last_miniblock_pool) + .with_updaters_pool(last_miniblock_pool) .with_filter_limit(api_config.web3_json_rpc.filters_limit()) .with_subscriptions_limit(api_config.web3_json_rpc.subscriptions_limit()) .with_batch_request_size_limit(api_config.web3_json_rpc.max_batch_request_size()) diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index f78a3de8437..eb4745156fc 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -71,7 +71,6 @@ impl StateKeeperHandles { let sync_state = SyncState::default(); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5); tokio::spawn(miniblock_sealer.run()); - let io = ExternalIO::new( miniblock_sealer_handle, pool, diff --git a/core/tests/ts-integration/tests/mempool.test.ts b/core/tests/ts-integration/tests/mempool.test.ts index 160b2a2b81a..b4fdafd54a6 100644 --- a/core/tests/ts-integration/tests/mempool.test.ts +++ b/core/tests/ts-integration/tests/mempool.test.ts @@ -81,14 +81,15 @@ describe('Tests for the mempool behavior', () => { nonce: startNonce + 1, to: bob.address }); - // First transaction should disappear from the server. - await expect(alice.provider.getTransaction(tx2.hash)).resolves.toBeNull(); // Now fill the gap and see what gets executed await sendTxWithNonce(alice, startNonce).then((tx) => tx.wait()); const replacedReceipt = await replacedTx2.wait(); expect(replacedReceipt.to).toEqual(bob.address); + + // First transaction should disappear from the server. + await expect(alice.provider.getTransaction(tx2.hash)).resolves.toBeNull(); }); test('Should reject a pre-sent transaction with not enough balance', async () => { diff --git a/etc/tokens/sepolia.json b/etc/tokens/sepolia.json index 899571df9e5..e69de29bb2d 100644 --- a/etc/tokens/sepolia.json +++ b/etc/tokens/sepolia.json @@ -1,14 +0,0 @@ -[ - { - "name": "DAI", - "symbol": "DAI", - "decimals": 18, - "address": "0x35EfF6eA96571ff475136117FdD92A9ba25b1f37" - }, - { - "name": "Wrapped Ether", - "symbol": "WETH", - "decimals": 18, - "address": "0x7b79995e5f793A07Bc00c21412e50Ecae098E7f9" - } -]