Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dedups gossip addresses, taking the one with highest weight #29421

Merged
merged 1 commit into from Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion gossip/src/contact_info.rs
Expand Up @@ -112,7 +112,9 @@ impl ContactInfo {
let delay = 10 * 60 * 1000; // 10 minutes
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
ContactInfo::new_localhost(&pubkey, now)
let mut node = ContactInfo::new_localhost(&pubkey, now);
node.gossip.set_port(rng.gen_range(1024, u16::MAX));
node
}

#[cfg(test)]
Expand Down
17 changes: 17 additions & 0 deletions gossip/src/crds_gossip.rs
Expand Up @@ -17,6 +17,7 @@ use {
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
ping_pong::PingCache,
},
itertools::Itertools,
rayon::ThreadPool,
solana_ledger::shred::Shred,
solana_sdk::{
Expand Down Expand Up @@ -353,6 +354,22 @@ pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) ->
1.0_f32.max(weight.min(max_weight))
}

// Dedups gossip addresses, keeping only the one with the highest weight.
pub(crate) fn dedup_gossip_addresses<I, T: PartialOrd>(
nodes: I,
) -> HashMap</*gossip:*/ SocketAddr, (/*weight:*/ T, ContactInfo)>
where
I: IntoIterator<Item = (/*weight:*/ T, ContactInfo)>,
{
nodes
.into_iter()
.into_grouping_map_by(|(_weight, node)| node.gossip)
.aggregate(|acc, _node_gossip, (weight, node)| match acc {
Some((ref w, _)) if w >= &weight => acc,
Some(_) | None => Some((weight, node)),
})
}

#[cfg(test)]
mod test {
use {
Expand Down
13 changes: 8 additions & 5 deletions gossip/src/crds_gossip_pull.rs
Expand Up @@ -17,7 +17,7 @@ use {
cluster_info_metrics::GossipStats,
contact_info::ContactInfo,
crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight},
crds_gossip::{self, get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
ping_pong::PingCache,
Expand Down Expand Up @@ -244,22 +244,25 @@ impl CrdsGossipPull {
);
// Check for nodes which have responded to ping messages.
let mut rng = rand::thread_rng();
let (weights, peers): (Vec<_>, Vec<_>) = {
let peers: Vec<_> = {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers
.into_iter()
.filter_map(|(weight, peer)| {
.filter(|(_weight, peer)| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
}
check.then_some((weight, peer))
check
})
.unzip()
.collect()
};
let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers)
.into_values()
.unzip();
if peers.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
Expand Down
22 changes: 14 additions & 8 deletions gossip/src/crds_gossip_push.rs
Expand Up @@ -16,7 +16,7 @@ use {
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, CrdsError, Cursor, GossipRoute},
crds_gossip::{get_stake, get_weight},
crds_gossip::{self, get_stake, get_weight},
crds_value::CrdsValue,
ping_pong::PingCache,
received_cache::ReceivedCache,
Expand Down Expand Up @@ -299,22 +299,26 @@ impl CrdsGossipPush {
socket_addr_space,
);
// Check for nodes which have responded to ping messages.
let (weights, peers): (Vec<_>, Vec<_>) = {
let peers: Vec<_> = {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers
.into_iter()
.filter_map(|(weight, peer)| {
.filter(|(_weight, peer)| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
}
check.then_some((weight, peer.id))
check
})
.unzip()
.collect()
};
let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers)
.into_values()
.map(|(weight, node)| (weight, node.id))
.unzip();
t-nelson marked this conversation as resolved.
Show resolved Hide resolved
if peers.is_empty() {
return;
}
Expand Down Expand Up @@ -572,7 +576,8 @@ mod tests {

let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value1.label().pubkey()).is_some());
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
value2.gossip.set_port(1245);
ping_cache
.lock()
.unwrap()
Expand Down Expand Up @@ -608,8 +613,9 @@ mod tests {
let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value2.label().pubkey()).is_some());
}
for _ in 0..push.num_active {
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
for k in 0..push.num_active {
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
value2.gossip.set_port(1246 + k as u16);
ping_cache
.lock()
.unwrap()
Expand Down
15 changes: 9 additions & 6 deletions gossip/src/crds_value.rs
Expand Up @@ -148,7 +148,8 @@ impl CrdsData {
// the mainnet crds table.
match kind {
0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)),
1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)),
// Index for LowestSlot is deprecated and should be zero.
1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)),
2 => CrdsData::SnapshotHashes(SnapshotHashes::new_rand(rng, pubkey)),
3 => CrdsData::AccountsHashes(SnapshotHashes::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
Expand Down Expand Up @@ -864,7 +865,7 @@ mod test {
let index = rng.gen_range(0, keys.len());
CrdsValue::new_rand(&mut rng, Some(&keys[index]))
})
.take(2048)
.take(1 << 12)
t-nelson marked this conversation as resolved.
Show resolved Hide resolved
.collect();
let mut currents = HashMap::new();
for value in filter_current(&values) {
Expand All @@ -888,10 +889,12 @@ mod test {
}
}
assert_eq!(count, currents.len());
// Currently CrdsData::new_rand is only implemented for 5 different
// kinds and excludes EpochSlots, and so the unique labels cannot be
// more than (5 + MAX_VOTES) times number of keys.
assert!(currents.len() <= keys.len() * (5 + MAX_VOTES as usize));
// Currently CrdsData::new_rand is implemented for:
// AccountsHashes, ContactInfo, LowestSlot, SnapshotHashes, Version
// EpochSlots x MAX_EPOCH_SLOTS
// Vote x MAX_VOTES
let num_kinds = 5 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize;
assert!(currents.len() <= keys.len() * num_kinds);
}

#[test]
Expand Down