Skip to content

Commit

Permalink
moves shreds deduper to shred-sigverify stage
Browse files Browse the repository at this point in the history
Shreds arriving at tvu/tvu_forward/repair sockets are each processed in
a separate thread, and since each thread has its own deduper, the
duplicates across these sockets are not filtered out.
Using a common deduper across these threads will require an RwLock
wrapper and may introduce lock contention.
The commit instead moves the shred-deduper to shred-sigverify-stage
where all these shreds arrive through the same channel.
  • Loading branch information
behzadnouri committed Mar 20, 2023
1 parent fcf83ac commit 6af63ec
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 86 deletions.
93 changes: 12 additions & 81 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use {
crossbeam_channel::{unbounded, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::{
deduper::Deduper,
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
Expand All @@ -26,10 +23,6 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
Expand All @@ -47,8 +40,6 @@ impl ShredFetchStage {
turbine_disabled: Arc<AtomicBool>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
let mut last_updated = Instant::now();
let mut keypair = repair_context
.as_ref()
Expand All @@ -63,9 +54,6 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default();

for mut packet_batch in recvr {
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) {
stats.num_deduper_saturations += 1;
}
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
{
Expand Down Expand Up @@ -102,12 +90,11 @@ impl ShredFetchStage {
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for packet in packet_batch.iter_mut() {
if turbine_disabled
|| should_discard_packet(
|| should_discard_shred(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
)
Expand Down Expand Up @@ -245,39 +232,6 @@ impl ShredFetchStage {
}
}

// Returns true if the packet should be marked as discard.
#[must_use]
fn should_discard_packet<const K: usize>(
packet: &Packet,
root: Slot,
max_slot: Slot, // Max slot to ingest shreds for.
shred_version: u16,
deduper: &Deduper<K, [u8]>,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
if should_discard_shred(
packet,
root,
max_slot,
shred_version,
should_drop_merkle_shreds,
stats,
) {
return true;
}
if packet
.data(..)
.map(|data| deduper.dedup(data))
.unwrap_or(true)
{
stats.duplicate_shred += 1;
true
} else {
false
}
}

#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
Expand All @@ -299,13 +253,12 @@ mod tests {
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{ReedSolomonCache, Shred, ShredFlags},
},
solana_sdk::packet::Packet,
};

#[test]
fn test_data_code_same_index() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();

Expand All @@ -327,12 +280,11 @@ mod tests {
let last_slot = 100;
let slots_per_epoch = 10;
let max_slot = last_slot + 2 * slots_per_epoch;
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -342,12 +294,11 @@ mod tests {
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -356,8 +307,6 @@ mod tests {
#[test]
fn test_shred_filter() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
let last_root = 0;
Expand All @@ -367,12 +316,11 @@ mod tests {
let max_slot = last_slot + 2 * slots_per_epoch;

// packet size is 0, so cannot get index
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -390,50 +338,35 @@ mod tests {
shred.copy_to_packet(&mut packet);

// rejected slot is 2, root is 3
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
3,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);

assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
345, // shred_version
&deduper,
345, // shred_version
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);

// Accepted for 1,3
assert!(!should_discard_packet(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

// deduper should filter duplicate
assert!(should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.duplicate_shred, 1);

let shred = Shred::new_from_data(
1_000_000,
Expand All @@ -448,25 +381,23 @@ mod tests {
shred.copy_to_packet(&mut packet);

// Slot 1 million is too high
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
shred.copy_to_packet(&mut packet);
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand Down
31 changes: 28 additions & 3 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{ThreadPool, ThreadPoolBuilder},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey},
Expand All @@ -17,6 +17,10 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
Expand All @@ -40,7 +44,10 @@ pub(crate) fn spawn_shred_sigverify(
.build()
.unwrap();
let run_shred_sigverify = move || {
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
loop {
deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE);
match run_shred_sigverify(
&thread_pool,
// We can't store the pubkey outside the loop
Expand All @@ -49,6 +56,7 @@ pub(crate) fn spawn_shred_sigverify(
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&deduper,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
Expand All @@ -69,12 +77,13 @@ pub(crate) fn spawn_shred_sigverify(
}

#[allow(clippy::too_many_arguments)]
fn run_shred_sigverify(
fn run_shred_sigverify<const K: usize>(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
Expand All @@ -89,6 +98,19 @@ fn run_shred_sigverify(
stats.num_iters += 1;
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
stats.num_duplicates += thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| {
!packet.meta().discard()
&& shred::layout::get_shred(packet)
.map(|shred| deduper.dedup(shred))
.unwrap_or(true)
})
.map(|packet| packet.meta_mut().set_discard(true))
.count()
});
verify_packets(
thread_pool,
self_pubkey,
Expand Down Expand Up @@ -199,6 +221,7 @@ struct ShredSigVerifyStats {
num_packets: usize,
num_discards_pre: usize,
num_discards_post: usize,
num_duplicates: usize,
num_retransmit_shreds: usize,
elapsed_micros: u64,
}
Expand All @@ -213,6 +236,7 @@ impl ShredSigVerifyStats {
num_packets: 0usize,
num_discards_pre: 0usize,
num_discards_post: 0usize,
num_duplicates: 0usize,
num_retransmit_shreds: 0usize,
elapsed_micros: 0u64,
}
Expand All @@ -228,6 +252,7 @@ impl ShredSigVerifyStats {
("num_packets", self.num_packets, i64),
("num_discards_pre", self.num_discards_pre, i64),
("num_discards_post", self.num_discards_post, i64),
("num_duplicates", self.num_duplicates, i64),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
("elapsed_micros", self.elapsed_micros, i64),
);
Expand Down
2 changes: 0 additions & 2 deletions ledger/src/shred/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct ShredFetchStats {
pub(crate) index_bad_deserialize: usize,
pub(crate) index_out_of_bounds: usize,
pub(crate) slot_bad_deserialize: usize,
pub duplicate_shred: usize,
pub slot_out_of_range: usize,
pub(crate) bad_shred_type: usize,
pub shred_version_mismatch: usize,
Expand Down Expand Up @@ -127,7 +126,6 @@ impl ShredFetchStats {
("index_bad_deserialize", self.index_bad_deserialize, i64),
("index_out_of_bounds", self.index_out_of_bounds, i64),
("slot_out_of_range", self.slot_out_of_range, i64),
("duplicate_shred", self.duplicate_shred, i64),
("bad_shred_type", self.bad_shred_type, i64),
("shred_version_mismatch", self.shred_version_mismatch, i64),
("bad_parent_offset", self.bad_parent_offset, i64),
Expand Down

0 comments on commit 6af63ec

Please sign in to comment.