Skip to content

Commit

Permalink
moves turbine-disabled check to shred-fetch-stage (#30799)
Browse files Browse the repository at this point in the history
If turbine_disabled is true, the commit discards turbine packets
earlier in the pipeline so that they won't interfere with the deduper
and the packets can get through once turbine is enabled again.

This is a prerequisite of:
#30786
so that local-cluster tests pass.

(cherry picked from commit e66edeb)

# Conflicts:
#	core/src/shred_fetch_stage.rs
  • Loading branch information
behzadnouri authored and mergify[bot] committed Mar 21, 2023
1 parent 11a24f2 commit 4fefc45
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
29 changes: 28 additions & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use {
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
Expand All @@ -40,6 +43,7 @@ impl ShredFetchStage {
name: &'static str,
flags: PacketFlags,
repair_context: Option<(&UdpSocket, &ClusterInfo)>,
turbine_disabled: Arc<AtomicBool>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE);
Expand Down Expand Up @@ -93,7 +97,9 @@ impl ShredFetchStage {
let max_slot = last_slot + 2 * slots_per_epoch;
let should_drop_merkle_shreds =
|shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank);
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for packet in packet_batch.iter_mut() {
<<<<<<< HEAD
if should_discard_packet(
packet,
last_root,
Expand All @@ -105,6 +111,20 @@ impl ShredFetchStage {
&mut stats,
) {
packet.meta.set_discard(true);
=======
if turbine_disabled
|| should_discard_packet(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
)
{
packet.meta_mut().set_discard(true);
>>>>>>> e66edeb18 (moves turbine-disabled check to shred-fetch-stage (#30799))
} else {
packet.meta.flags.insert(flags);
}
Expand All @@ -116,6 +136,7 @@ impl ShredFetchStage {
}
}

#[allow(clippy::too_many_arguments)]
fn packet_modifier(
sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
Expand All @@ -126,6 +147,7 @@ impl ShredFetchStage {
name: &'static str,
flags: PacketFlags,
repair_context: Option<(Arc<UdpSocket>, Arc<ClusterInfo>)>,
turbine_disabled: Arc<AtomicBool>,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>) {
let (packet_sender, packet_receiver) = unbounded();
let streamers = sockets
Expand Down Expand Up @@ -157,6 +179,7 @@ impl ShredFetchStage {
name,
flags,
repair_context,
turbine_disabled,
)
})
.unwrap();
Expand All @@ -171,6 +194,7 @@ impl ShredFetchStage {
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
turbine_disabled: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler = PacketBatchRecycler::warmed(100, 1024);
Expand All @@ -185,6 +209,7 @@ impl ShredFetchStage {
"shred_fetch",
PacketFlags::empty(),
None, // repair_context
turbine_disabled.clone(),
);

let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
Expand All @@ -197,6 +222,7 @@ impl ShredFetchStage {
"shred_fetch_tvu_forwards",
PacketFlags::FORWARDED,
None, // repair_context
turbine_disabled.clone(),
);

let (repair_receiver, repair_handler) = Self::packet_modifier(
Expand All @@ -209,6 +235,7 @@ impl ShredFetchStage {
"shred_fetch_repair",
PacketFlags::REPAIR,
Some((repair_socket, cluster_info)),
turbine_disabled,
);

tvu_threads.extend(tvu_forwards_threads.into_iter());
Expand Down
14 changes: 3 additions & 11 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use {
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signer},
std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
sync::{Arc, RwLock},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
Expand All @@ -32,7 +29,6 @@ pub(crate) fn spawn_shred_sigverify(
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: Sender<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Expand All @@ -50,7 +46,6 @@ pub(crate) fn spawn_shred_sigverify(
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
&turbine_disabled,
&mut stats,
) {
Ok(()) => (),
Expand All @@ -71,7 +66,6 @@ fn run_shred_sigverify(
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
turbine_disabled: &AtomicBool,
stats: &mut ShredSigVerifyStats,
) -> Result<(), Error> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -100,10 +94,8 @@ fn run_shred_sigverify(
.map(<[u8]>::to_vec)
.collect();
stats.num_retransmit_shreds += shreds.len();
if !turbine_disabled.load(Ordering::Relaxed) {
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
}
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
stats.elapsed_micros += now.elapsed().as_micros() as u64;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl Tvu {
tvu_config.shred_version,
bank_forks.clone(),
cluster_info.clone(),
turbine_disabled,
exit,
);

Expand All @@ -166,7 +167,6 @@ impl Tvu {
fetch_receiver,
retransmit_sender.clone(),
verified_sender,
turbine_disabled,
);

let retransmit_stage = RetransmitStage::new(
Expand Down

0 comments on commit 4fefc45

Please sign in to comment.