Skip to content

Commit

Permalink
feat(api): introduce mempool cache (#1460)
Browse files Browse the repository at this point in the history
## What ❔

This introduces an in-memory cache on ws API servers that caters for
`eth_newPendingTransactionFilter` filter. Currently for each
`eth_getFilterChanges` request to such filter, we do a DB query. This
query dominates the DB load of ws DBs.

The cache is updated preiodically, loading the new mempool transactions
to a `Deque`. To return all the transactions that happened after a
particular timestamp, we binsearch this data strucute - avoiding
additional DB calls.

## Why ❔

to relieve some load from the read replica DB.
  • Loading branch information
RomanBrodetski committed Mar 21, 2024
1 parent 4a393dc commit c5d6c4b
Show file tree
Hide file tree
Showing 23 changed files with 560 additions and 152 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Expand Up @@ -237,6 +237,13 @@ pub struct OptionalENConfig {
/// 0 means that sealing is synchronous; this is mostly useful for performance comparison, testing etc.
#[serde(default = "OptionalENConfig::default_miniblock_seal_queue_capacity")]
pub miniblock_seal_queue_capacity: usize,
/// Polling period for mempool cache update - how often the mempool cache is updated from the database.
/// In milliseconds. Default is 50 milliseconds.
#[serde(default = "OptionalENConfig::default_mempool_cache_update_interval")]
pub mempool_cache_update_interval: u64,
/// Maximum number of transactions to be stored in the mempool cache. Default is 10000.
#[serde(default = "OptionalENConfig::default_mempool_cache_size")]
pub mempool_cache_size: usize,
/// Address of the L1 diamond proxy contract used by the consistency checker to match with the origin of logs emitted
/// by commit transactions. If not set, it will not be verified.
// This is intentionally not a part of `RemoteENConfig` because fetching this info from the main node would defeat
Expand Down Expand Up @@ -348,6 +355,14 @@ impl OptionalENConfig {
10
}

const fn default_mempool_cache_update_interval() -> u64 {
50
}

const fn default_mempool_cache_size() -> usize {
10_000
}

pub fn polling_interval(&self) -> Duration {
Duration::from_millis(self.polling_interval)
}
Expand Down Expand Up @@ -415,6 +430,10 @@ impl OptionalENConfig {
self.healthcheck_hard_time_limit_ms
.map(Duration::from_millis)
}

pub fn mempool_cache_update_interval(&self) -> Duration {
Duration::from_millis(self.mempool_cache_update_interval)
}
}

/// This part of the external node config is required for its operation.
Expand Down Expand Up @@ -619,6 +638,8 @@ impl From<ExternalNodeConfig> for InternalApiConfig {
req_entities_limit: config.optional.req_entities_limit,
fee_history_limit: config.optional.fee_history_limit,
filters_disabled: config.optional.filters_disabled,
mempool_cache_update_interval: config.optional.mempool_cache_update_interval(),
mempool_cache_size: config.optional.mempool_cache_size,
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions core/lib/config/src/configs/api.rs
Expand Up @@ -96,6 +96,11 @@ pub struct Web3JsonRpcConfig {
pub websocket_requests_per_minute_limit: Option<NonZeroU32>,
/// Tree API url, currently used to proxy `getProof` calls to the tree
pub tree_api_url: Option<String>,
/// Polling period for mempool cache update - how often the mempool cache is updated from the database.
/// In milliseconds. Default is 50 milliseconds.
pub mempool_cache_update_interval: Option<u64>,
/// Maximum number of transactions to be stored in the mempool cache. Default is 10000.
pub mempool_cache_size: Option<usize>,
}

impl Web3JsonRpcConfig {
Expand Down Expand Up @@ -130,6 +135,8 @@ impl Web3JsonRpcConfig {
max_batch_request_size: Default::default(),
max_response_body_size_mb: Default::default(),
websocket_requests_per_minute_limit: Default::default(),
mempool_cache_update_interval: Default::default(),
mempool_cache_size: Default::default(),
tree_api_url: None,
}
}
Expand Down Expand Up @@ -210,6 +217,14 @@ impl Web3JsonRpcConfig {
pub fn tree_api_url(&self) -> Option<&str> {
self.tree_api_url.as_deref()
}

pub fn mempool_cache_update_interval(&self) -> Duration {
Duration::from_millis(self.mempool_cache_update_interval.unwrap_or(50))
}

pub fn mempool_cache_size(&self) -> usize {
self.mempool_cache_size.unwrap_or(10_000)
}
}

#[derive(Debug, Deserialize, Clone, PartialEq)]
Expand Down
2 changes: 2 additions & 0 deletions core/lib/config/src/testonly.rs
Expand Up @@ -210,6 +210,8 @@ impl RandomConfig for configs::api::Web3JsonRpcConfig {
max_response_body_size_mb: g.gen(),
websocket_requests_per_minute_limit: g.gen(),
tree_api_url: g.gen(),
mempool_cache_update_interval: g.gen(),
mempool_cache_size: g.gen(),
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions core/lib/dal/src/transactions_web3_dal.rs
Expand Up @@ -281,7 +281,7 @@ impl TransactionsWeb3Dal<'_, '_> {
&mut self,
from_timestamp: NaiveDateTime,
limit: Option<usize>,
) -> Result<(Vec<H256>, Option<NaiveDateTime>), SqlxError> {
) -> Result<Vec<(NaiveDateTime, H256)>, SqlxError> {
let records = sqlx::query!(
r#"
SELECT
Expand All @@ -303,12 +303,11 @@ impl TransactionsWeb3Dal<'_, '_> {
.fetch_all(self.storage.conn())
.await?;

let last_loc = records.last().map(|record| record.received_at);
let hashes = records
.into_iter()
.map(|record| H256::from_slice(&record.hash))
.map(|record| (record.received_at, H256::from_slice(&record.hash)))
.collect();
Ok((hashes, last_loc))
Ok(hashes)
}

/// `committed_next_nonce` should equal the nonce for `initiator_address` in the storage.
Expand Down
4 changes: 4 additions & 0 deletions core/lib/env_config/src/api.rs
Expand Up @@ -88,6 +88,8 @@ mod tests {
max_response_body_size_mb: Some(10),
websocket_requests_per_minute_limit: Some(NonZeroU32::new(10).unwrap()),
tree_api_url: None,
mempool_cache_update_interval: Some(50),
mempool_cache_size: Some(10000),
},
contract_verification: ContractVerificationApiConfig {
port: 3070,
Expand Down Expand Up @@ -135,6 +137,8 @@ mod tests {
API_WEB3_JSON_RPC_FEE_HISTORY_LIMIT=100
API_WEB3_JSON_RPC_MAX_BATCH_REQUEST_SIZE=200
API_WEB3_JSON_RPC_WEBSOCKET_REQUESTS_PER_MINUTE_LIMIT=10
API_WEB3_JSON_RPC_MEMPOOL_CACHE_SIZE=10000
API_WEB3_JSON_RPC_MEMPOOL_CACHE_UPDATE_INTERVAL=50
API_CONTRACT_VERIFICATION_PORT="3070"
API_CONTRACT_VERIFICATION_URL="http://127.0.0.1:3070"
API_WEB3_JSON_RPC_MAX_RESPONSE_BODY_SIZE_MB=10
Expand Down
8 changes: 8 additions & 0 deletions core/lib/protobuf_config/src/api.rs
Expand Up @@ -120,6 +120,12 @@ impl ProtoRepr for proto::Web3JsonRpc {
.transpose()
.context("websocket_requests_per_minute_limit")?,
tree_api_url: self.tree_api_url.clone(),
mempool_cache_update_interval: self.mempool_cache_update_interval,
mempool_cache_size: self
.mempool_cache_size
.map(|x| x.try_into())
.transpose()
.context("mempool_cache_size")?,
})
}
fn build(this: &Self::Type) -> Self {
Expand All @@ -130,6 +136,8 @@ impl ProtoRepr for proto::Web3JsonRpc {
ws_url: Some(this.ws_url.clone()),
req_entities_limit: this.req_entities_limit,
filters_disabled: Some(this.filters_disabled),
mempool_cache_update_interval: this.mempool_cache_update_interval,
mempool_cache_size: this.mempool_cache_size.map(|x| x.try_into().unwrap()),
filters_limit: this.filters_limit,
subscriptions_limit: this.subscriptions_limit,
pubsub_polling_interval: this.pubsub_polling_interval,
Expand Down
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/proto/api.proto
Expand Up @@ -36,6 +36,8 @@ message Web3JsonRpc {
optional uint32 websocket_requests_per_minute_limit = 25; // optional
optional string tree_api_url = 26; // optional
optional bool filters_disabled = 27; // optional
optional uint64 mempool_cache_update_interval = 28; // optional
optional uint64 mempool_cache_size = 29; // optional
}

message ContractVerificationApi {
Expand Down
1 change: 1 addition & 0 deletions core/lib/state/Cargo.toml
Expand Up @@ -21,6 +21,7 @@ mini-moka = "0.10.0"
tokio = { version = "1", features = ["rt"] }
tracing = "0.1"
itertools = "0.10.3"
chrono = "0.4.31"

[dev-dependencies]
assert_matches = "1.5.0"
Expand Down
119 changes: 119 additions & 0 deletions core/lib/state/src/cache/lru_cache.rs
@@ -0,0 +1,119 @@
use std::hash::Hash;

use crate::cache::{
metrics::{Method, RequestOutcome, METRICS},
CacheValue, MokaBase,
};
/// Cache implementation that uses LRU eviction policy.
#[derive(Debug, Clone)]
pub struct LruCache<K: Eq + Hash, V> {
name: &'static str,
cache: Option<MokaBase<K, V>>,
}

impl<K, V> LruCache<K, V>
where
K: Eq + Hash + Send + Sync + 'static,
V: CacheValue<K> + 'static,
{
/// Creates a new cache.
///
/// # Panics
///
/// Panics if an invalid cache capacity is provided.
pub fn new(name: &'static str, capacity: u64) -> Self {
let cache = if capacity == 0 {
None
} else {
Some(
MokaBase::<K, V>::builder()
.weigher(|_, value| value.cache_weight())
.max_capacity(capacity)
.build(),
)
};

Self { name, cache }
}

/// Gets an entry and pulls it to the front if it exists.
pub fn get(&self, key: &K) -> Option<V> {
let latency = METRICS.latency[&(self.name, Method::Get)].start();
let entry = self.cache.as_ref()?.get(key);
// ^ We intentionally don't report metrics if there's no real cache.

latency.observe();
let request_outcome = if entry.is_some() {
RequestOutcome::Hit
} else {
RequestOutcome::Miss
};
METRICS.requests[&(self.name, request_outcome)].inc();

entry
}

/// Pushes an entry and performs LRU cache operations.
pub fn insert(&self, key: K, value: V) {
let latency = METRICS.latency[&(self.name, Method::Insert)].start();
let Some(cache) = self.cache.as_ref() else {
return;
};
// ^ We intentionally don't report metrics if there's no real cache.
cache.insert(key, value);

latency.observe();
self.report_size();
}

pub(crate) fn report_size(&self) {
if let Some(cache) = &self.cache {
METRICS.len[&self.name].set(cache.entry_count());
METRICS.used_memory[&self.name].set(cache.weighted_size());
}
}

/// Removes the specified key from this cache.
pub fn remove(&self, key: &K) {
if let Some(cache) = &self.cache {
cache.invalidate(key);
}
}

/// Removes all entries from this cache.
pub fn clear(&self) {
if let Some(cache) = &self.cache {
cache.invalidate_all();
self.report_size();
}
}

#[cfg(test)]
pub(crate) fn estimated_len(&self) -> u64 {
self.cache.as_ref().map_or(0, MokaBase::entry_count)
}
}

#[cfg(test)]
mod tests {
use zksync_types::H256;

use crate::cache::{lru_cache::LruCache, *};

#[test]
fn cache_with_zero_capacity() {
let zero_cache = LruCache::<H256, Vec<u8>>::new("test", 0);
zero_cache.insert(H256::zero(), vec![1, 2, 3]);
assert_eq!(zero_cache.get(&H256::zero()), None);

// The zero-capacity `MokaBase` cache can actually contain items temporarily!
let not_quite_zero_cache = MokaBase::<H256, Vec<u8>>::builder()
.weigher(|_, value| value.cache_weight())
.max_capacity(0)
.build();
not_quite_zero_cache.insert(H256::zero(), vec![1, 2, 3]);
assert_eq!(not_quite_zero_cache.get(&H256::zero()), Some(vec![1, 2, 3]));
// The item is evicted after the first access.
assert_eq!(not_quite_zero_cache.get(&H256::zero()), None);
}
}
10 changes: 10 additions & 0 deletions core/lib/state/src/cache/metrics.rs
Expand Up @@ -18,6 +18,16 @@ pub(super) enum RequestOutcome {
Miss,
}

impl RequestOutcome {
pub fn from_hit(hit: bool) -> Self {
if hit {
Self::Hit
} else {
Self::Miss
}
}
}

/// Buckets for small latencies: from 10 ns to 1 ms.
const SMALL_LATENCIES: Buckets = Buckets::values(&[
1e-8, 2.5e-8, 5e-8, 1e-7, 2.5e-7, 5e-7, 1e-6, 2.5e-6, 5e-6, 1e-5, 2.5e-5, 5e-5, 1e-4, 1e-3,
Expand Down

0 comments on commit c5d6c4b

Please sign in to comment.