Skip to content

Commit

Permalink
moves shred-fetch deduper to shred-sigverify stage
Browse files Browse the repository at this point in the history
Shreds arriving at tvu/tvu_forward/repair sockets are each processed in
a separate thread, and since each thread has its own deduper, the
duplicates across these sockets are not filtered out.
Using a common deduper across these threads will require an RwLock
wrapper and may introduce lock contention.
The commit instead moves the shred-deduper to shred-sigverify-stage
where all these shreds arrive through the same channel.
  • Loading branch information
behzadnouri committed Mar 18, 2023
1 parent acbab11 commit 2336ac2
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 137 deletions.
88 changes: 12 additions & 76 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use {
crossbeam_channel::{unbounded, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::{
deduper::Deduper,
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
Expand All @@ -23,10 +20,6 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
Expand All @@ -43,9 +36,6 @@ impl ShredFetchStage {
repair_context: Option<(&UdpSocket, &ClusterInfo)>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng();
let mut deduper =
Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS);
let mut last_updated = Instant::now();
let mut keypair = repair_context
.as_ref()
Expand All @@ -60,7 +50,6 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default();

for mut packet_batch in recvr {
deduper.maybe_reset(&mut rng, &DEDUPER_RESET_CYCLE);
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
{
Expand Down Expand Up @@ -95,12 +84,11 @@ impl ShredFetchStage {
let should_drop_merkle_shreds =
|shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank);
for packet in packet_batch.iter_mut() {
if should_discard_packet(
if should_discard_shred(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
) {
Expand Down Expand Up @@ -230,35 +218,6 @@ impl ShredFetchStage {
}
}

// Returns true if the packet should be marked as discard.
#[must_use]
fn should_discard_packet<const K: usize>(
packet: &Packet,
root: Slot,
max_slot: Slot, // Max slot to ingest shreds for.
shred_version: u16,
deduper: &Deduper<K, [u8]>,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
if should_discard_shred(
packet,
root,
max_slot,
shred_version,
should_drop_merkle_shreds,
stats,
) {
return true;
}
if deduper.check_duplicate(packet.data(..).unwrap_or_default()) {
stats.duplicate_shred += 1;
true
} else {
false
}
}

#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
Expand All @@ -280,13 +239,12 @@ mod tests {
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{ReedSolomonCache, Shred, ShredFlags},
},
solana_sdk::packet::Packet,
};

#[test]
fn test_data_code_same_index() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();

Expand All @@ -308,12 +266,11 @@ mod tests {
let last_slot = 100;
let slots_per_epoch = 10;
let max_slot = last_slot + 2 * slots_per_epoch;
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -323,12 +280,11 @@ mod tests {
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -337,8 +293,6 @@ mod tests {
#[test]
fn test_shred_filter() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
let last_root = 0;
Expand All @@ -348,12 +302,11 @@ mod tests {
let max_slot = last_slot + 2 * slots_per_epoch;

// packet size is 0, so cannot get index
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -371,50 +324,35 @@ mod tests {
shred.copy_to_packet(&mut packet);

// rejected slot is 2, root is 3
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
3,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);

assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
345, // shred_version
&deduper,
345, // shred_version
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);

// Accepted for 1,3
assert!(!should_discard_packet(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

// deduper should filter duplicate
assert!(should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.duplicate_shred, 1);

let shred = Shred::new_from_data(
1_000_000,
Expand All @@ -429,25 +367,23 @@ mod tests {
shred.copy_to_packet(&mut packet);

// Slot 1 million is too high
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
shred.copy_to_packet(&mut packet);
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand Down
56 changes: 49 additions & 7 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
Expand All @@ -18,6 +20,10 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
Expand All @@ -36,16 +42,26 @@ pub(crate) fn spawn_shred_sigverify(
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Builder::new()
.name("solShredVerifr".to_string())
.spawn(move || loop {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
.build()
.unwrap();
let run_shred_sigverify = move || {
let mut rng = rand::thread_rng();
let mut deduper =
Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS);
loop {
deduper.maybe_reset(&mut rng, &DEDUPER_RESET_CYCLE);
match run_shred_sigverify(
&thread_pool,
// We can't store the pubkey outside the loop
// because the identity might be hot swapped.
&cluster_info.id(),
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&deduper,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
Expand All @@ -58,15 +74,21 @@ pub(crate) fn spawn_shred_sigverify(
Err(Error::SendError) => break,
}
stats.maybe_submit();
})
}
};
Builder::new()
.name("solShredVerifr".to_string())
.spawn(run_shred_sigverify)
.unwrap()
}

fn run_shred_sigverify(
fn run_shred_sigverify<const K: usize>(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
Expand All @@ -82,7 +104,21 @@ fn run_shred_sigverify(
stats.num_iters += 1;
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
stats.num_duplicates += thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| {
!packet.meta().discard()
&& shred::layout::get_shred(packet)
.map(|shred| deduper.check_duplicate(shred))
.unwrap_or(true)
})
.map(|packet| packet.meta_mut().set_discard(true))
.count()
});
verify_packets(
thread_pool,
self_pubkey,
bank_forks,
leader_schedule_cache,
Expand All @@ -108,6 +144,7 @@ fn run_shred_sigverify(
}

fn verify_packets(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
Expand All @@ -121,7 +158,7 @@ fn verify_packets(
.filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes())))
.chain(std::iter::once((Slot::MAX, [0u8; 32])))
.collect();
let out = verify_shreds_gpu(packets, &leader_slots, recycler_cache);
let out = verify_shreds_gpu(thread_pool, packets, &leader_slots, recycler_cache);
solana_perf::sigverify::mark_disabled(packets, &out);
}

Expand Down Expand Up @@ -192,6 +229,7 @@ struct ShredSigVerifyStats {
num_packets: usize,
num_discards_pre: usize,
num_discards_post: usize,
num_duplicates: usize,
num_retransmit_shreds: usize,
elapsed_micros: u64,
}
Expand All @@ -206,6 +244,7 @@ impl ShredSigVerifyStats {
num_packets: 0usize,
num_discards_pre: 0usize,
num_discards_post: 0usize,
num_duplicates: 0usize,
num_retransmit_shreds: 0usize,
elapsed_micros: 0u64,
}
Expand All @@ -221,6 +260,7 @@ impl ShredSigVerifyStats {
("num_packets", self.num_packets, i64),
("num_discards_pre", self.num_discards_pre, i64),
("num_discards_post", self.num_discards_post, i64),
("num_duplicates", self.num_duplicates, i64),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
("elapsed_micros", self.elapsed_micros, i64),
);
Expand Down Expand Up @@ -284,7 +324,9 @@ mod tests {
batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload());
batches[0][1].meta_mut().size = shred.payload().len();

let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
verify_packets(
&thread_pool,
&Pubkey::new_unique(), // self_pubkey
&bank_forks,
&leader_schedule_cache,
Expand Down
Loading

0 comments on commit 2336ac2

Please sign in to comment.