Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moves shreds deduper to shred-sigverify stage #30786

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 12 additions & 81 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use {
crossbeam_channel::{unbounded, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::{
deduper::Deduper,
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
Expand All @@ -26,10 +23,6 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
Expand All @@ -47,8 +40,6 @@ impl ShredFetchStage {
turbine_disabled: Arc<AtomicBool>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
let mut last_updated = Instant::now();
let mut keypair = repair_context
.as_ref()
Expand All @@ -63,9 +54,6 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default();

for mut packet_batch in recvr {
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) {
stats.num_deduper_saturations += 1;
}
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
{
Expand Down Expand Up @@ -102,12 +90,11 @@ impl ShredFetchStage {
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for packet in packet_batch.iter_mut() {
if turbine_disabled
|| should_discard_packet(
|| should_discard_shred(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
)
Expand Down Expand Up @@ -245,39 +232,6 @@ impl ShredFetchStage {
}
}

// Returns true if the packet should be marked as discard.
#[must_use]
fn should_discard_packet<const K: usize>(
packet: &Packet,
root: Slot,
max_slot: Slot, // Max slot to ingest shreds for.
shred_version: u16,
deduper: &Deduper<K, [u8]>,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
if should_discard_shred(
packet,
root,
max_slot,
shred_version,
should_drop_merkle_shreds,
stats,
) {
return true;
}
if packet
.data(..)
.map(|data| deduper.dedup(data))
.unwrap_or(true)
{
stats.duplicate_shred += 1;
true
} else {
false
}
}

#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
Expand All @@ -299,13 +253,12 @@ mod tests {
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{ReedSolomonCache, Shred, ShredFlags},
},
solana_sdk::packet::Packet,
};

#[test]
fn test_data_code_same_index() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();

Expand All @@ -327,12 +280,11 @@ mod tests {
let last_slot = 100;
let slots_per_epoch = 10;
let max_slot = last_slot + 2 * slots_per_epoch;
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -342,12 +294,11 @@ mod tests {
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -356,8 +307,6 @@ mod tests {
#[test]
fn test_shred_filter() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
let last_root = 0;
Expand All @@ -367,12 +316,11 @@ mod tests {
let max_slot = last_slot + 2 * slots_per_epoch;

// packet size is 0, so cannot get index
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand All @@ -390,50 +338,35 @@ mod tests {
shred.copy_to_packet(&mut packet);

// rejected slot is 2, root is 3
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
3,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);

assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
345, // shred_version
&deduper,
345, // shred_version
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);

// Accepted for 1,3
assert!(!should_discard_packet(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

// deduper should filter duplicate
assert!(should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.duplicate_shred, 1);

let shred = Shred::new_from_data(
1_000_000,
Expand All @@ -448,25 +381,23 @@ mod tests {
shred.copy_to_packet(&mut packet);

// Slot 1 million is too high
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
shred.copy_to_packet(&mut packet);
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
Expand Down
39 changes: 35 additions & 4 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{ThreadPool, ThreadPoolBuilder},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey},
Expand All @@ -17,6 +17,10 @@ use {
},
};

const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
Expand All @@ -40,7 +44,12 @@ pub(crate) fn spawn_shred_sigverify(
.build()
.unwrap();
let run_shred_sigverify = move || {
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
loop {
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) {
stats.num_deduper_saturations += 1;
}
match run_shred_sigverify(
&thread_pool,
// We can't store the pubkey outside the loop
Expand All @@ -49,6 +58,7 @@ pub(crate) fn spawn_shred_sigverify(
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&deduper,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
Expand All @@ -69,12 +79,13 @@ pub(crate) fn spawn_shred_sigverify(
}

#[allow(clippy::too_many_arguments)]
fn run_shred_sigverify(
fn run_shred_sigverify<const K: usize>(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
Expand All @@ -89,6 +100,20 @@ fn run_shred_sigverify(
stats.num_iters += 1;
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
stats.num_duplicates += thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| {
!packet.meta().discard()
&& packet
.data(..)
.map(|data| deduper.dedup(data))
.unwrap_or(true)
})
.map(|packet| packet.meta_mut().set_discard(true))
.count()
});
verify_packets(
thread_pool,
self_pubkey,
Expand Down Expand Up @@ -197,8 +222,10 @@ struct ShredSigVerifyStats {
since: Instant,
num_iters: usize,
num_packets: usize,
num_discards_pre: usize,
num_deduper_saturations: usize,
num_discards_post: usize,
num_discards_pre: usize,
num_duplicates: usize,
num_retransmit_shreds: usize,
elapsed_micros: u64,
}
Expand All @@ -212,7 +239,9 @@ impl ShredSigVerifyStats {
num_iters: 0usize,
num_packets: 0usize,
num_discards_pre: 0usize,
num_deduper_saturations: 0usize,
num_discards_post: 0usize,
num_duplicates: 0usize,
num_retransmit_shreds: 0usize,
elapsed_micros: 0u64,
}
Expand All @@ -227,7 +256,9 @@ impl ShredSigVerifyStats {
("num_iters", self.num_iters, i64),
("num_packets", self.num_packets, i64),
("num_discards_pre", self.num_discards_pre, i64),
("num_deduper_saturations", self.num_deduper_saturations, i64),
("num_discards_post", self.num_discards_post, i64),
("num_duplicates", self.num_duplicates, i64),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
("elapsed_micros", self.elapsed_micros, i64),
);
Expand Down
4 changes: 0 additions & 4 deletions ledger/src/shred/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ pub struct ProcessShredsStats {
pub struct ShredFetchStats {
pub index_overrun: usize,
pub shred_count: usize,
pub num_deduper_saturations: usize,
pub(crate) num_shreds_merkle_code: usize,
pub(crate) num_shreds_merkle_data: usize,
pub ping_count: usize,
pub ping_err_verify_count: usize,
pub(crate) index_bad_deserialize: usize,
pub(crate) index_out_of_bounds: usize,
pub(crate) slot_bad_deserialize: usize,
pub duplicate_shred: usize,
pub slot_out_of_range: usize,
pub(crate) bad_shred_type: usize,
pub shred_version_mismatch: usize,
Expand Down Expand Up @@ -118,7 +116,6 @@ impl ShredFetchStats {
name,
("index_overrun", self.index_overrun, i64),
("shred_count", self.shred_count, i64),
("num_deduper_saturations", self.num_deduper_saturations, i64),
("num_shreds_merkle_code", self.num_shreds_merkle_code, i64),
("num_shreds_merkle_data", self.num_shreds_merkle_data, i64),
("ping_count", self.ping_count, i64),
Expand All @@ -127,7 +124,6 @@ impl ShredFetchStats {
("index_bad_deserialize", self.index_bad_deserialize, i64),
("index_out_of_bounds", self.index_out_of_bounds, i64),
("slot_out_of_range", self.slot_out_of_range, i64),
("duplicate_shred", self.duplicate_shred, i64),
("bad_shred_type", self.bad_shred_type, i64),
("shred_version_mismatch", self.shred_version_mismatch, i64),
("bad_parent_offset", self.bad_parent_offset, i64),
Expand Down