From 9a4aa19d7d136ec7b746c969c169cc50e0ba37ac Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 26 Dec 2022 15:05:47 -0500 Subject: [PATCH] dedups gossip addresses, keeping only the one with the highest weight In order to avoid traffic congestion or sending duplicate packets, when sampling gossip nodes if several nodes have the same gossip address (because they are behind a relayer or whatever), they need to be deduplicated into one. --- gossip/src/contact_info.rs | 4 +++- gossip/src/crds_gossip.rs | 17 +++++++++++++++++ gossip/src/crds_gossip_pull.rs | 13 ++++++++----- gossip/src/crds_gossip_push.rs | 22 ++++++++++++++-------- gossip/src/crds_value.rs | 15 +++++++++------ 5 files changed, 51 insertions(+), 20 deletions(-) diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 1a5a33aa79d256..eb261607cd5970 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -112,7 +112,9 @@ impl ContactInfo { let delay = 10 * 60 * 1000; // 10 minutes let now = timestamp() - delay + rng.gen_range(0, 2 * delay); let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); - ContactInfo::new_localhost(&pubkey, now) + let mut node = ContactInfo::new_localhost(&pubkey, now); + node.gossip.set_port(rng.gen_range(1024, u16::MAX)); + node } #[cfg(test)] diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 34d867e38b713b..6110f862a778ef 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -17,6 +17,7 @@ use { duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS}, ping_pong::PingCache, }, + itertools::Itertools, rayon::ThreadPool, solana_ledger::shred::Shred, solana_sdk::{ @@ -353,6 +354,22 @@ pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> 1.0_f32.max(weight.min(max_weight)) } +// Dedups gossip addresses, keeping only the one with the highest weight. +pub(crate) fn dedup_gossip_addresses( + nodes: I, +) -> HashMap +where + I: IntoIterator, +{ + nodes + .into_iter() + .into_grouping_map_by(|(_weight, node)| node.gossip) + .aggregate(|acc, _node_gossip, (weight, node)| match acc { + Some((ref w, _)) if w >= &weight => acc, + Some(_) | None => Some((weight, node)), + }) +} + #[cfg(test)] mod test { use { diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index b6cf917f5ca47b..734d35fee0833b 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -17,7 +17,7 @@ use { cluster_info_metrics::GossipStats, contact_info::ContactInfo, crds::{Crds, GossipRoute, VersionedCrdsValue}, - crds_gossip::{get_stake, get_weight}, + crds_gossip::{self, get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, ping_pong::PingCache, @@ -244,22 +244,25 @@ impl CrdsGossipPull { ); // Check for nodes which have responded to ping messages. let mut rng = rand::thread_rng(); - let (weights, peers): (Vec<_>, Vec<_>) = { + let peers: Vec<_> = { let mut ping_cache = ping_cache.lock().unwrap(); let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); let now = Instant::now(); peers .into_iter() - .filter_map(|(weight, peer)| { + .filter(|(_weight, peer)| { let node = (peer.id, peer.gossip); let (check, ping) = ping_cache.check(now, node, &mut pingf); if let Some(ping) = ping { pings.push((peer.gossip, ping)); } - check.then_some((weight, peer)) + check }) - .unzip() + .collect() }; + let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers) + .into_values() + .unzip(); if peers.is_empty() { return Err(CrdsGossipError::NoPeers); } diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index d98489363c06d2..e92d211cb2e361 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -16,7 +16,7 @@ use { cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, contact_info::ContactInfo, crds::{Crds, CrdsError, Cursor, GossipRoute}, - crds_gossip::{get_stake, get_weight}, + crds_gossip::{self, get_stake, get_weight}, crds_value::CrdsValue, ping_pong::PingCache, received_cache::ReceivedCache, @@ -299,22 +299,26 @@ impl CrdsGossipPush { socket_addr_space, ); // Check for nodes which have responded to ping messages. - let (weights, peers): (Vec<_>, Vec<_>) = { + let peers: Vec<_> = { let mut ping_cache = ping_cache.lock().unwrap(); let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); let now = Instant::now(); peers .into_iter() - .filter_map(|(weight, peer)| { + .filter(|(_weight, peer)| { let node = (peer.id, peer.gossip); let (check, ping) = ping_cache.check(now, node, &mut pingf); if let Some(ping) = ping { pings.push((peer.gossip, ping)); } - check.then_some((weight, peer.id)) + check }) - .unzip() + .collect() }; + let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers) + .into_values() + .map(|(weight, node)| (weight, node.id)) + .unzip(); if peers.is_empty() { return; } @@ -572,7 +576,8 @@ mod tests { let active_set = push.active_set.read().unwrap(); assert!(active_set.get(&value1.label().pubkey()).is_some()); - let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + value2.gossip.set_port(1245); ping_cache .lock() .unwrap() @@ -608,8 +613,9 @@ mod tests { let active_set = push.active_set.read().unwrap(); assert!(active_set.get(&value2.label().pubkey()).is_some()); } - for _ in 0..push.num_active { - let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + for k in 0..push.num_active { + let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + value2.gossip.set_port(1246 + k as u16); ping_cache .lock() .unwrap() diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 9d1fe6e14190b9..1be6127df85314 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -148,7 +148,8 @@ impl CrdsData { // the mainnet crds table. match kind { 0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)), - 1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)), + // Index for LowestSlot is deprecated and should be zero. + 1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)), 2 => CrdsData::SnapshotHashes(SnapshotHashes::new_rand(rng, pubkey)), 3 => CrdsData::AccountsHashes(SnapshotHashes::new_rand(rng, pubkey)), 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), @@ -864,7 +865,7 @@ mod test { let index = rng.gen_range(0, keys.len()); CrdsValue::new_rand(&mut rng, Some(&keys[index])) }) - .take(2048) + .take(1 << 12) .collect(); let mut currents = HashMap::new(); for value in filter_current(&values) { @@ -888,10 +889,12 @@ mod test { } } assert_eq!(count, currents.len()); - // Currently CrdsData::new_rand is only implemented for 5 different - // kinds and excludes EpochSlots, and so the unique labels cannot be - // more than (5 + MAX_VOTES) times number of keys. - assert!(currents.len() <= keys.len() * (5 + MAX_VOTES as usize)); + // Currently CrdsData::new_rand is implemented for: + // AccountsHashes, ContactInfo, LowestSlot, SnapshotHashes, Version + // EpochSlots x MAX_EPOCH_SLOTS + // Vote x MAX_VOTES + let num_kinds = 5 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize; + assert!(currents.len() <= keys.len() * num_kinds); } #[test]