Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retransmits shreds recovered from erasure codes #19233

Merged
merged 6 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ use {
shred::Shredder,
},
solana_measure::measure::Measure,
solana_perf::packet::{Packet, Packets},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
pubkey,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_transaction,
timing::timestamp,
Expand All @@ -40,6 +39,13 @@ use {
test::Bencher,
};

// TODO: The benchmark is ignored as it currently may indefinitely block.
// The code incorrectly expects that the node receiving the shred on tvu socket
// retransmits that to other nodes in its neighborhood. But that is no longer
// the case since https://github.com/solana-labs/solana/pull/17716.
// So depending on shred seed, peers may not receive packets and the receive
// threads loop indefinitely.
#[ignore]
#[bench]
#[allow(clippy::same_item_push)]
fn bench_retransmitter(bencher: &mut Bencher) {
Expand All @@ -52,12 +58,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
const NUM_PEERS: usize = 4;
let mut peer_sockets = Vec::new();
for _ in 0..NUM_PEERS {
// This ensures that cluster_info.id() is the root of turbine
// retransmit tree and so the shreds are retransmited to all other
// nodes in the cluster.
let id = std::iter::repeat_with(pubkey::new_rand)
.find(|pk| cluster_info.id() < *pk)
.unwrap();
let id = Pubkey::new_unique();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut contact_info = ContactInfo::new_localhost(&id, timestamp());
contact_info.tvu = socket.local_addr().unwrap();
Expand All @@ -76,8 +77,8 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let bank_forks = BankForks::new(bank0);
let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let (packet_sender, packet_receiver) = channel();
let packet_receiver = Arc::new(Mutex::new(packet_receiver));
let (shreds_sender, shreds_receiver) = channel();
let shreds_receiver = Arc::new(Mutex::new(shreds_receiver));
const NUM_THREADS: usize = 2;
let sockets = (0..NUM_THREADS)
.map(|_| UdpSocket::bind("0.0.0.0:0").unwrap())
Expand Down Expand Up @@ -107,9 +108,9 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let retransmitter_handles = retransmitter(
Arc::new(sockets),
bank_forks,
&leader_schedule_cache,
leader_schedule_cache,
cluster_info,
packet_receiver,
shreds_receiver,
Arc::default(), // solana_rpc::max_slots::MaxSlots
None,
);
Expand Down Expand Up @@ -148,9 +149,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
shred.set_index(index);
index += 1;
index %= 200;
let mut p = Packet::default();
shred.copy_to_packet(&mut p);
let _ = packet_sender.send(Packets::new(vec![p]));
let _ = shreds_sender.send(vec![shred.clone()]);
}
slot += 1;

Expand Down
24 changes: 18 additions & 6 deletions core/src/packet_hasher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Get a unique hash value for a packet
// Used in retransmit and shred fetch to prevent dos with same packet data.

use ahash::AHasher;
use rand::{thread_rng, Rng};
use solana_perf::packet::Packet;
use std::hash::Hasher;
use {
ahash::AHasher,
rand::{thread_rng, Rng},
solana_ledger::shred::Shred,
solana_perf::packet::Packet,
std::hash::Hasher,
};

#[derive(Clone)]
pub struct PacketHasher {
Expand All @@ -22,9 +25,18 @@ impl Default for PacketHasher {
}

impl PacketHasher {
pub fn hash_packet(&self, packet: &Packet) -> u64 {
pub(crate) fn hash_packet(&self, packet: &Packet) -> u64 {
let size = packet.data.len().min(packet.meta.size);
self.hash_data(&packet.data[..size])
}

pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 {
self.hash_data(&shred.payload)
}

fn hash_data(&self, data: &[u8]) -> u64 {
let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2);
hasher.write(&packet.data[0..packet.meta.size]);
hasher.write(data);
hasher.finish()
}

Expand Down
Loading