Skip to content

Commit

Permalink
Use lru instead of cached in near-network
Browse files Browse the repository at this point in the history
  • Loading branch information
pmnoxx committed Dec 3, 2021
1 parent 513e199 commit f192942
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 44 deletions.
11 changes: 10 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion chain/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ actix = "=0.11.0-beta.2"
borsh = { version = "0.9", features = ["rc"] }
bytes = "1"
bytesize = "1.1"
cached = "0.23"
conqueue = "0.4.0"
futures = "0.3"
lru = "0.6.5"
near-rust-allocator-proxy = "0.3.0"
once_cell = "1.5.2"
rand = "0.7"
Expand Down
10 changes: 5 additions & 5 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use actix::{
};
use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use cached::{Cached, SizedCache};
use lru::LruCache;
use near_crypto::Signature;
use near_network_primitives::types::{
Ban, NetworkViewClientMessages, NetworkViewClientResponses, PeerChainInfo, PeerChainInfoV2,
Expand Down Expand Up @@ -103,7 +103,7 @@ pub struct PeerActor {
/// How many peer actors are created
peer_counter: Arc<AtomicUsize>,
/// Cache of recently routed messages, this allows us to drop duplicates
routed_message_cache: SizedCache<(PeerId, PeerIdOrHash, Signature), Instant>,
routed_message_cache: LruCache<(PeerId, PeerIdOrHash, Signature), Instant>,
/// A helper data structure for limiting reading
#[allow(unused)]
throttle_controller: ThrottleController,
Expand Down Expand Up @@ -152,7 +152,7 @@ impl PeerActor {
network_metrics,
txns_since_last_block,
peer_counter,
routed_message_cache: SizedCache::with_size(ROUTED_MESSAGE_CACHE_SIZE),
routed_message_cache: LruCache::new(ROUTED_MESSAGE_CACHE_SIZE),
throttle_controller,
}
}
Expand Down Expand Up @@ -683,13 +683,13 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
if let PeerMessage::Routed(msg) = &peer_msg {
let key = (msg.author.clone(), msg.target.clone(), msg.signature.clone());
let now = Clock::instant();
if let Some(time) = self.routed_message_cache.cache_get(&key) {
if let Some(time) = self.routed_message_cache.get(&key) {
if now.saturating_duration_since(*time) <= DROP_DUPLICATED_MESSAGES_PERIOD {
debug!(target: "network", "Dropping duplicated message from {} to {:?}", msg.author, msg.target);
return;
}
}
self.routed_message_cache.cache_set(key, now);
self.routed_message_cache.put(key, now);
}
if let PeerMessage::Routed(RoutedMessage {
body: RoutedMessageBody::ForwardTx(_), ..
Expand Down
68 changes: 34 additions & 34 deletions chain/network/src/routing/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::routing::utils::cache_to_hashmap;
use crate::PeerInfo;
use actix::dev::{MessageResponse, ResponseChannel};
use actix::{Actor, Message};
use cached::{Cached, SizedCache};
use lru::LruCache;
use near_network_primitives::types::{PeerIdOrHash, Ping, Pong};
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
Expand Down Expand Up @@ -57,7 +57,7 @@ pub struct RoutingTableView {
/// PeerId associated with this instance.
my_peer_id: PeerId,
/// PeerId associated for every known account id.
account_peers: SizedCache<AccountId, AnnounceAccount>,
account_peers: LruCache<AccountId, AnnounceAccount>,
/// Active PeerId that are part of the shortest path to each PeerId.
pub peer_forwarding: Arc<HashMap<PeerId, Vec<PeerId>>>,
/// Store last update for known edges. This is limited to list of adjacent edges to `my_peer_id`.
Expand All @@ -69,15 +69,15 @@ pub struct RoutingTableView {
/// Number of times each active connection was used to route a message.
/// If there are several options use route with minimum nonce.
/// New routes are added with minimum nonce.
route_nonce: SizedCache<PeerId, usize>,
route_nonce: LruCache<PeerId, usize>,
/// Ping received by nonce.
ping_info: SizedCache<usize, (Ping, usize)>,
ping_info: LruCache<usize, (Ping, usize)>,
/// Ping received by nonce.
pong_info: SizedCache<usize, (Pong, usize)>,
pong_info: LruCache<usize, (Pong, usize)>,
/// List of pings sent for which we haven't received any pong yet.
waiting_pong: SizedCache<PeerId, SizedCache<usize, Instant>>,
waiting_pong: LruCache<PeerId, LruCache<usize, Instant>>,
/// Last nonce sent to each peer through pings.
last_ping_nonce: SizedCache<PeerId, usize>,
last_ping_nonce: LruCache<PeerId, usize>,
}

#[derive(Debug)]
Expand All @@ -94,16 +94,16 @@ impl RoutingTableView {

Self {
my_peer_id,
account_peers: SizedCache::with_size(ANNOUNCE_ACCOUNT_CACHE_SIZE),
account_peers: LruCache::new(ANNOUNCE_ACCOUNT_CACHE_SIZE),
peer_forwarding: Default::default(),
local_edges_info: Default::default(),
route_back: RouteBackCache::default(),
store,
route_nonce: SizedCache::with_size(ROUND_ROBIN_NONCE_CACHE_SIZE),
ping_info: SizedCache::with_size(PING_PONG_CACHE_SIZE),
pong_info: SizedCache::with_size(PING_PONG_CACHE_SIZE),
waiting_pong: SizedCache::with_size(PING_PONG_CACHE_SIZE),
last_ping_nonce: SizedCache::with_size(PING_PONG_CACHE_SIZE),
route_nonce: LruCache::new(ROUND_ROBIN_NONCE_CACHE_SIZE),
ping_info: LruCache::new(PING_PONG_CACHE_SIZE),
pong_info: LruCache::new(PING_PONG_CACHE_SIZE),
waiting_pong: LruCache::new(PING_PONG_CACHE_SIZE),
last_ping_nonce: LruCache::new(PING_PONG_CACHE_SIZE),
}
}

Expand Down Expand Up @@ -132,7 +132,7 @@ impl RoutingTableView {
// max nonce - threshold.
let nonce_peer = routes
.iter()
.map(|peer_id| (self.route_nonce.cache_get(peer_id).cloned().unwrap_or(0), peer_id))
.map(|peer_id| (self.route_nonce.get(peer_id).cloned().unwrap_or(0), peer_id))
.collect::<Vec<_>>();

// Neighbor with minimum and maximum nonce respectively.
Expand All @@ -141,12 +141,12 @@ impl RoutingTableView {

if min_v.0 + ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED < max_v.0 {
self.route_nonce
.cache_set(min_v.1.clone(), max_v.0 - ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED);
.put(min_v.1.clone(), max_v.0 - ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED);
}

let next_hop = min_v.1;
let nonce = self.route_nonce.cache_get(next_hop).cloned();
self.route_nonce.cache_set(next_hop.clone(), nonce.map_or(1, |nonce| nonce + 1));
let nonce = self.route_nonce.get(next_hop).cloned();
self.route_nonce.put(next_hop.clone(), nonce.map_or(1, |nonce| nonce + 1));
Ok(next_hop.clone())
} else {
Err(FindRouteError::PeerNotFound)
Expand All @@ -173,7 +173,7 @@ impl RoutingTableView {
/// Note: There is at most on peer id per account id.
pub fn add_account(&mut self, announce_account: AnnounceAccount) {
let account_id = announce_account.account_id.clone();
self.account_peers.cache_set(account_id.clone(), announce_account.clone());
self.account_peers.put(account_id.clone(), announce_account.clone());

// Add account to store
let mut update = self.store.store_update();
Expand Down Expand Up @@ -213,46 +213,46 @@ impl RoutingTableView {
}

pub fn add_ping(&mut self, ping: Ping) {
let cnt = self.ping_info.cache_get(&(ping.nonce as usize)).map(|v| v.1).unwrap_or(0);
let cnt = self.ping_info.get(&(ping.nonce as usize)).map(|v| v.1).unwrap_or(0);

self.ping_info.cache_set(ping.nonce as usize, (ping, cnt + 1));
self.ping_info.put(ping.nonce as usize, (ping, cnt + 1));
}

/// Return time of the round trip of ping + pong
pub fn add_pong(&mut self, pong: Pong) -> Option<f64> {
let mut res = None;

if let Some(nonces) = self.waiting_pong.cache_get_mut(&pong.source) {
res = nonces.cache_remove(&(pong.nonce as usize)).map(|sent| {
if let Some(nonces) = self.waiting_pong.get_mut(&pong.source) {
res = nonces.pop(&(pong.nonce as usize)).map(|sent| {
Clock::instant().saturating_duration_since(sent).as_secs_f64() * 1000f64
});
}

let cnt = self.pong_info.cache_get(&(pong.nonce as usize)).map(|v| v.1).unwrap_or(0);
let cnt = self.pong_info.get(&(pong.nonce as usize)).map(|v| v.1).unwrap_or(0);

self.pong_info.cache_set(pong.nonce as usize, (pong, (cnt + 1)));
self.pong_info.put(pong.nonce as usize, (pong, (cnt + 1)));

res
}

// for unit tests
pub fn sending_ping(&mut self, nonce: usize, target: PeerId) {
let entry = if let Some(entry) = self.waiting_pong.cache_get_mut(&target) {
let entry = if let Some(entry) = self.waiting_pong.get_mut(&target) {
entry
} else {
self.waiting_pong.cache_set(target.clone(), SizedCache::with_size(10));
self.waiting_pong.cache_get_mut(&target).unwrap()
self.waiting_pong.put(target.clone(), LruCache::new(10));
self.waiting_pong.get_mut(&target).unwrap()
};

entry.cache_set(nonce, Clock::instant());
entry.put(nonce, Clock::instant());
}

pub fn get_ping(&mut self, peer_id: PeerId) -> usize {
if let Some(entry) = self.last_ping_nonce.cache_get_mut(&peer_id) {
if let Some(entry) = self.last_ping_nonce.get_mut(&peer_id) {
*entry += 1;
*entry - 1
} else {
self.last_ping_nonce.cache_set(peer_id, 1);
self.last_ping_nonce.put(peer_id, 1);
0
}
}
Expand All @@ -277,22 +277,22 @@ impl RoutingTableView {
///
/// Get keys currently on cache.
pub fn get_accounts_keys(&mut self) -> Vec<AccountId> {
self.account_peers.key_order().cloned().collect()
self.account_peers.iter().map(|(k, _v)| (k.clone())).collect()
}

/// Get announce accounts on cache.
pub fn get_announce_accounts(&mut self) -> Vec<AnnounceAccount> {
self.account_peers.value_order().cloned().collect()
self.account_peers.iter().map(|(_k, v)| v).cloned().collect()
}

/// Get number of accounts
pub fn get_announce_accounts_size(&mut self) -> usize {
self.account_peers.cache_size()
self.account_peers.len()
}

/// Get account announce from
pub fn get_announce(&mut self, account_id: &AccountId) -> Option<AnnounceAccount> {
if let Some(announce_account) = self.account_peers.cache_get(account_id) {
if let Some(announce_account) = self.account_peers.get(account_id) {
Some(announce_account.clone())
} else {
self.store
Expand Down
6 changes: 3 additions & 3 deletions chain/network/src/routing/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use cached::SizedCache;
use lru::LruCache;
use std::collections::HashMap;
use std::hash::Hash;

/// `cache_to_hashmap` - converts SizedCache<K, V> to HashMap<K, V>
pub fn cache_to_hashmap<K: Hash + Eq + Clone, V: Clone>(cache: &SizedCache<K, V>) -> HashMap<K, V> {
cache.key_order().cloned().zip(cache.value_order().cloned()).collect()
pub fn cache_to_hashmap<K: Hash + Eq + Clone, V: Clone>(cache: &LruCache<K, V>) -> HashMap<K, V> {
cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
}

0 comments on commit f192942

Please sign in to comment.