Skip to content

Commit

Permalink
moves shred-fetch deduper to shred-sigverify stage
Browse files Browse the repository at this point in the history
Shreds arriving at tvu/tvu_forward/repair sockets are each processed in
a separate thread, and since each thread has its own deduper, the
duplicates across these sockets are not filtered out.
Using a common deduper across these threads will require an RwLock
wrapper and may introduce lock contention.
The commit instead moves the shred-deduper to shred-sigverify-stage
where all these shreds arrive through the same channel.
  • Loading branch information
behzadnouri committed Mar 18, 2023
1 parent 3ca85f7 commit 4ba3396
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 81 deletions.
88 changes: 12 additions & 76 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use {
crossbeam_channel::{unbounded, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::{
deduper::Deduper,
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
Expand All @@ -23,10 +20,6 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
Expand All @@ -43,9 +36,6 @@ impl ShredFetchStage {
repair_context: Option<(&UdpSocket, &ClusterInfo)>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng();
let mut deduper =
Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS);
let mut last_updated = Instant::now();
let mut keypair = repair_context
.as_ref()
Expand All @@ -60,7 +50,6 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default();

for mut packet_batch in recvr {
deduper.maybe_reset(&mut rng, &DEDUPER_RESET_CYCLE);
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
{
Expand Down Expand Up @@ -95,12 +84,11 @@ impl ShredFetchStage {
let should_drop_merkle_shreds =
|shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank);
for packet in packet_batch.iter_mut() {
if should_discard_packet(
if should_discard_shred(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
) {
Expand Down Expand Up @@ -230,35 +218,6 @@ impl ShredFetchStage {
}
}

// Returns true if the packet should be marked as discard.
#[must_use]
fn should_discard_packet<const K: usize>(
packet: &Packet,
root: Slot,
max_slot: Slot, // Max slot to ingest shreds for.
shred_version: u16,
deduper: &Deduper<K, [u8]>,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
if should_discard_shred(
packet,
root,
max_slot,
shred_version,
should_drop_merkle_shreds,
stats,
) {
return true;
}
if deduper.check_duplicate(packet.data(..).unwrap_or_default()) {
stats.duplicate_shred += 1;
true
} else {
false
}
}

#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
Expand All @@ -280,13 +239,12 @@ mod tests {
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{ReedSolomonCache, Shred, ShredFlags},
},
solana_sdk::packet::Packet,
};

#[test]
fn test_data_code_same_index() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();

Expand All @@ -308,12 +266,11 @@ mod tests {
let last_slot = 100;
let slots_per_epoch = 10;
let max_slot = last_slot + 2 * slots_per_epoch;
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -323,12 +280,11 @@ mod tests {
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -337,8 +293,6 @@ mod tests {
#[test]
fn test_shred_filter() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
let last_root = 0;
Expand All @@ -348,12 +302,11 @@ mod tests {
let max_slot = last_slot + 2 * slots_per_epoch;

// packet size is 0, so cannot get index
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -371,50 +324,35 @@ mod tests {
shred.copy_to_packet(&mut packet);

// rejected slot is 2, root is 3
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
3,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);

assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
345, // shred_version
&deduper,
345, // shred_version
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);

// Accepted for 1,3
assert!(!should_discard_packet(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

// deduper should filter duplicate
assert!(should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.duplicate_shred, 1);

let shred = Shred::new_from_data(
1_000_000,
Expand All @@ -429,25 +367,23 @@ mod tests {
shred.copy_to_packet(&mut packet);

// Slot 1 million is too high
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
shred.copy_to_packet(&mut packet);
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand Down
32 changes: 29 additions & 3 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{ThreadPool, ThreadPoolBuilder},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey},
Expand All @@ -20,6 +20,10 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
Expand All @@ -44,7 +48,11 @@ pub(crate) fn spawn_shred_sigverify(
.build()
.unwrap();
let run_shred_sigverify = move || {
let mut rng = rand::thread_rng();
let mut deduper =
Deduper::<2, [u8]>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS);
loop {
deduper.maybe_reset(&mut rng, &DEDUPER_RESET_CYCLE);
match run_shred_sigverify(
&thread_pool,
// We can't store the pubkey outside the loop
Expand All @@ -53,6 +61,7 @@ pub(crate) fn spawn_shred_sigverify(
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&deduper,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
Expand All @@ -74,12 +83,13 @@ pub(crate) fn spawn_shred_sigverify(
}

#[allow(clippy::too_many_arguments)]
fn run_shred_sigverify(
fn run_shred_sigverify<const K: usize>(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
Expand All @@ -95,6 +105,19 @@ fn run_shred_sigverify(
stats.num_iters += 1;
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
stats.num_duplicates += thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| {
!packet.meta().discard()
&& shred::layout::get_shred(packet)
.map(|shred| deduper.check_duplicate(shred))
.unwrap_or(true)
})
.map(|packet| packet.meta_mut().set_discard(true))
.count()
});
verify_packets(
thread_pool,
self_pubkey,
Expand Down Expand Up @@ -207,6 +230,7 @@ struct ShredSigVerifyStats {
num_packets: usize,
num_discards_pre: usize,
num_discards_post: usize,
num_duplicates: usize,
num_retransmit_shreds: usize,
elapsed_micros: u64,
}
Expand All @@ -221,6 +245,7 @@ impl ShredSigVerifyStats {
num_packets: 0usize,
num_discards_pre: 0usize,
num_discards_post: 0usize,
num_duplicates: 0usize,
num_retransmit_shreds: 0usize,
elapsed_micros: 0u64,
}
Expand All @@ -236,6 +261,7 @@ impl ShredSigVerifyStats {
("num_packets", self.num_packets, i64),
("num_discards_pre", self.num_discards_pre, i64),
("num_discards_post", self.num_discards_post, i64),
("num_duplicates", self.num_duplicates, i64),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
("elapsed_micros", self.elapsed_micros, i64),
);
Expand Down
2 changes: 0 additions & 2 deletions ledger/src/shred/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub struct ShredFetchStats {
pub(crate) index_bad_deserialize: usize,
pub(crate) index_out_of_bounds: usize,
pub(crate) slot_bad_deserialize: usize,
pub duplicate_shred: usize,
pub slot_out_of_range: usize,
pub(crate) bad_shred_type: usize,
pub shred_version_mismatch: usize,
Expand Down Expand Up @@ -125,7 +124,6 @@ impl ShredFetchStats {
("index_bad_deserialize", self.index_bad_deserialize, i64),
("index_out_of_bounds", self.index_out_of_bounds, i64),
("slot_out_of_range", self.slot_out_of_range, i64),
("duplicate_shred", self.duplicate_shred, i64),
("bad_shred_type", self.bad_shred_type, i64),
("shred_version_mismatch", self.shred_version_mismatch, i64),
("bad_parent_offset", self.bad_parent_offset, i64),
Expand Down

0 comments on commit 4ba3396

Please sign in to comment.