From acc1052882f57e9ea14e00dbd53c75ba38455fdc Mon Sep 17 00:00:00 2001 From: Piotr Mikulski Date: Sun, 28 Nov 2021 12:55:31 -0800 Subject: [PATCH] Use `lru` instead of `cached` in `near-network` --- Cargo.lock | 11 ++++- chain/network/Cargo.toml | 2 +- chain/network/src/peer/peer_actor.rs | 10 ++-- chain/network/src/routing/routing.rs | 68 ++++++++++++++-------------- chain/network/src/routing/utils.rs | 6 +-- 5 files changed, 53 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fd10dec0885..7bb7be864bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2362,6 +2362,15 @@ dependencies = [ "syn", ] +[[package]] +name = "lru" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f374d42cdfc1d7dbf3d3dec28afab2eb97ffbf43a3234d795b5986dbf4b90ba" +dependencies = [ + "hashbrown", +] + [[package]] name = "mach" version = "0.3.2" @@ -2854,13 +2863,13 @@ dependencies = [ "byteorder", "bytes", "bytesize", - "cached", "chrono", "conqueue", "deepsize", "delay-detector", "futures", "lazy_static", + "lru", "near-actix-test-utils", "near-chain", "near-crypto", diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index 46a833c0fff..b195692fcbc 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -11,11 +11,11 @@ borsh = { version = "0.9", features = ["rc"]} byteorder = "1.2" bytes = "1" bytesize = "1.1" -cached = "0.23" chrono = { version = "0.4.4", features = ["serde"] } conqueue = "0.4.0" futures = "0.3" lazy_static = "1.4" +lru = "0.6.5" near-rust-allocator-proxy = "0.3.0" once_cell = "1.5.2" rand = "0.7" diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 4bb528dea99..674003e4979 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -16,7 +16,7 @@ use actix::{ }; use borsh::BorshDeserialize; use borsh::BorshSerialize; -use cached::{Cached, SizedCache}; +use lru::LruCache; use near_crypto::Signature; use near_network_primitives::types::{ Ban, NetworkViewClientMessages, NetworkViewClientResponses, PeerChainInfo, PeerChainInfoV2, @@ -103,7 +103,7 @@ pub struct PeerActor { /// How many peer actors are created peer_counter: Arc, /// Cache of recently routed messages, this allows us to drop duplicates - routed_message_cache: SizedCache<(PeerId, PeerIdOrHash, Signature), Instant>, + routed_message_cache: LruCache<(PeerId, PeerIdOrHash, Signature), Instant>, /// A helper data structure for limiting reading #[allow(unused)] throttle_controller: ThrottleController, @@ -152,7 +152,7 @@ impl PeerActor { network_metrics, txns_since_last_block, peer_counter, - routed_message_cache: SizedCache::with_size(ROUTED_MESSAGE_CACHE_SIZE), + routed_message_cache: LruCache::new(ROUTED_MESSAGE_CACHE_SIZE), throttle_controller, } } @@ -681,13 +681,13 @@ impl StreamHandler, ReasonForBan>> for PeerActor { if let PeerMessage::Routed(msg) = &peer_msg { let key = (msg.author.clone(), msg.target.clone(), msg.signature.clone()); let now = Clock::instant(); - if let Some(time) = self.routed_message_cache.cache_get(&key) { + if let Some(time) = self.routed_message_cache.get(&key) { if now.saturating_duration_since(*time) <= DROP_DUPLICATED_MESSAGES_PERIOD { debug!(target: "network", "Dropping duplicated message from {} to {:?}", msg.author, msg.target); return; } } - self.routed_message_cache.cache_set(key, now); + self.routed_message_cache.put(key, now); } if let PeerMessage::Routed(RoutedMessage { body: RoutedMessageBody::ForwardTx(_), .. diff --git a/chain/network/src/routing/routing.rs b/chain/network/src/routing/routing.rs index 53f85d3db61..802557e7d7f 100644 --- a/chain/network/src/routing/routing.rs +++ b/chain/network/src/routing/routing.rs @@ -4,7 +4,7 @@ use crate::routing::utils::cache_to_hashmap; use crate::PeerInfo; use actix::dev::{MessageResponse, ResponseChannel}; use actix::{Actor, Message}; -use cached::{Cached, SizedCache}; +use lru::LruCache; use near_network_primitives::types::{PeerIdOrHash, Ping, Pong}; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; @@ -57,7 +57,7 @@ pub struct RoutingTableView { /// PeerId associated with this instance. my_peer_id: PeerId, /// PeerId associated for every known account id. - account_peers: SizedCache, + account_peers: LruCache, /// Active PeerId that are part of the shortest path to each PeerId. pub peer_forwarding: Arc>>, /// Store last update for known edges. This is limited to list of adjacent edges to `my_peer_id`. @@ -69,15 +69,15 @@ pub struct RoutingTableView { /// Number of times each active connection was used to route a message. /// If there are several options use route with minimum nonce. /// New routes are added with minimum nonce. - route_nonce: SizedCache, + route_nonce: LruCache, /// Ping received by nonce. - ping_info: SizedCache, + ping_info: LruCache, /// Ping received by nonce. - pong_info: SizedCache, + pong_info: LruCache, /// List of pings sent for which we haven't received any pong yet. - waiting_pong: SizedCache>, + waiting_pong: LruCache>, /// Last nonce sent to each peer through pings. - last_ping_nonce: SizedCache, + last_ping_nonce: LruCache, } #[derive(Debug)] @@ -94,16 +94,16 @@ impl RoutingTableView { Self { my_peer_id, - account_peers: SizedCache::with_size(ANNOUNCE_ACCOUNT_CACHE_SIZE), + account_peers: LruCache::new(ANNOUNCE_ACCOUNT_CACHE_SIZE), peer_forwarding: Default::default(), local_edges_info: Default::default(), route_back: RouteBackCache::default(), store, - route_nonce: SizedCache::with_size(ROUND_ROBIN_NONCE_CACHE_SIZE), - ping_info: SizedCache::with_size(PING_PONG_CACHE_SIZE), - pong_info: SizedCache::with_size(PING_PONG_CACHE_SIZE), - waiting_pong: SizedCache::with_size(PING_PONG_CACHE_SIZE), - last_ping_nonce: SizedCache::with_size(PING_PONG_CACHE_SIZE), + route_nonce: LruCache::new(ROUND_ROBIN_NONCE_CACHE_SIZE), + ping_info: LruCache::new(PING_PONG_CACHE_SIZE), + pong_info: LruCache::new(PING_PONG_CACHE_SIZE), + waiting_pong: LruCache::new(PING_PONG_CACHE_SIZE), + last_ping_nonce: LruCache::new(PING_PONG_CACHE_SIZE), } } @@ -132,7 +132,7 @@ impl RoutingTableView { // max nonce - threshold. let nonce_peer = routes .iter() - .map(|peer_id| (self.route_nonce.cache_get(peer_id).cloned().unwrap_or(0), peer_id)) + .map(|peer_id| (self.route_nonce.get(peer_id).cloned().unwrap_or(0), peer_id)) .collect::>(); // Neighbor with minimum and maximum nonce respectively. @@ -141,12 +141,12 @@ impl RoutingTableView { if min_v.0 + ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED < max_v.0 { self.route_nonce - .cache_set(min_v.1.clone(), max_v.0 - ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED); + .put(min_v.1.clone(), max_v.0 - ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED); } let next_hop = min_v.1; - let nonce = self.route_nonce.cache_get(next_hop).cloned(); - self.route_nonce.cache_set(next_hop.clone(), nonce.map_or(1, |nonce| nonce + 1)); + let nonce = self.route_nonce.get(next_hop).cloned(); + self.route_nonce.put(next_hop.clone(), nonce.map_or(1, |nonce| nonce + 1)); Ok(next_hop.clone()) } else { Err(FindRouteError::PeerNotFound) @@ -173,7 +173,7 @@ impl RoutingTableView { /// Note: There is at most on peer id per account id. pub fn add_account(&mut self, announce_account: AnnounceAccount) { let account_id = announce_account.account_id.clone(); - self.account_peers.cache_set(account_id.clone(), announce_account.clone()); + self.account_peers.put(account_id.clone(), announce_account.clone()); // Add account to store let mut update = self.store.store_update(); @@ -213,46 +213,46 @@ impl RoutingTableView { } pub fn add_ping(&mut self, ping: Ping) { - let cnt = self.ping_info.cache_get(&(ping.nonce as usize)).map(|v| v.1).unwrap_or(0); + let cnt = self.ping_info.get(&(ping.nonce as usize)).map(|v| v.1).unwrap_or(0); - self.ping_info.cache_set(ping.nonce as usize, (ping, cnt + 1)); + self.ping_info.put(ping.nonce as usize, (ping, cnt + 1)); } /// Return time of the round trip of ping + pong pub fn add_pong(&mut self, pong: Pong) -> Option { let mut res = None; - if let Some(nonces) = self.waiting_pong.cache_get_mut(&pong.source) { - res = nonces.cache_remove(&(pong.nonce as usize)).map(|sent| { + if let Some(nonces) = self.waiting_pong.get_mut(&pong.source) { + res = nonces.pop(&(pong.nonce as usize)).map(|sent| { Clock::instant().saturating_duration_since(sent).as_secs_f64() * 1000f64 }); } - let cnt = self.pong_info.cache_get(&(pong.nonce as usize)).map(|v| v.1).unwrap_or(0); + let cnt = self.pong_info.get(&(pong.nonce as usize)).map(|v| v.1).unwrap_or(0); - self.pong_info.cache_set(pong.nonce as usize, (pong, (cnt + 1))); + self.pong_info.put(pong.nonce as usize, (pong, (cnt + 1))); res } // for unit tests pub fn sending_ping(&mut self, nonce: usize, target: PeerId) { - let entry = if let Some(entry) = self.waiting_pong.cache_get_mut(&target) { + let entry = if let Some(entry) = self.waiting_pong.get_mut(&target) { entry } else { - self.waiting_pong.cache_set(target.clone(), SizedCache::with_size(10)); - self.waiting_pong.cache_get_mut(&target).unwrap() + self.waiting_pong.put(target.clone(), LruCache::new(10)); + self.waiting_pong.get_mut(&target).unwrap() }; - entry.cache_set(nonce, Clock::instant()); + entry.put(nonce, Clock::instant()); } pub fn get_ping(&mut self, peer_id: PeerId) -> usize { - if let Some(entry) = self.last_ping_nonce.cache_get_mut(&peer_id) { + if let Some(entry) = self.last_ping_nonce.get_mut(&peer_id) { *entry += 1; *entry - 1 } else { - self.last_ping_nonce.cache_set(peer_id, 1); + self.last_ping_nonce.put(peer_id, 1); 0 } } @@ -277,22 +277,22 @@ impl RoutingTableView { /// /// Get keys currently on cache. pub fn get_accounts_keys(&mut self) -> Vec { - self.account_peers.key_order().cloned().collect() + self.account_peers.iter().map(|(k, _v)| (k.clone())).collect() } /// Get announce accounts on cache. pub fn get_announce_accounts(&mut self) -> Vec { - self.account_peers.value_order().cloned().collect() + self.account_peers.iter().map(|(_k, v)| v).cloned().collect() } /// Get number of accounts pub fn get_announce_accounts_size(&mut self) -> usize { - self.account_peers.cache_size() + self.account_peers.len() } /// Get account announce from pub fn get_announce(&mut self, account_id: &AccountId) -> Option { - if let Some(announce_account) = self.account_peers.cache_get(account_id) { + if let Some(announce_account) = self.account_peers.get(account_id) { Some(announce_account.clone()) } else { self.store diff --git a/chain/network/src/routing/utils.rs b/chain/network/src/routing/utils.rs index 6d11b54a3cc..be9929a197a 100644 --- a/chain/network/src/routing/utils.rs +++ b/chain/network/src/routing/utils.rs @@ -1,8 +1,8 @@ -use cached::SizedCache; +use lru::LruCache; use std::collections::HashMap; use std::hash::Hash; /// `cache_to_hashmap` - converts SizedCache to HashMap -pub fn cache_to_hashmap(cache: &SizedCache) -> HashMap { - cache.key_order().cloned().zip(cache.value_order().cloned()).collect() +pub fn cache_to_hashmap(cache: &LruCache) -> HashMap { + cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect() }