diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index f5defc814456bb..501ed1386bf4a3 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -159,7 +159,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { for _ in 0..batches.len() { if let Some(batch) = batches.pop() { sent_len += batch.packets.len(); - packet_s.send(batch).unwrap(); + packet_s.send(vec![batch]).unwrap(); } } let mut received = 0; diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs new file mode 100644 index 00000000000000..d584f74b7068d0 --- /dev/null +++ b/core/src/find_packet_sender_stake_stage.rs @@ -0,0 +1,185 @@ +use { + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + rayon::{prelude::*, ThreadPool}, + solana_gossip::cluster_info::ClusterInfo, + solana_measure::measure::Measure, + solana_perf::packet::PacketBatch, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::bank_forks::BankForks, + solana_sdk::timing::timestamp, + solana_streamer::streamer::{self, StreamerError}, + std::{ + cell::RefCell, + collections::HashMap, + net::IpAddr, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, + }, +}; + +const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5); + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("transaction_sender_stake_stage_{}", ix)) + .build() + .unwrap())); + +pub type FindPacketSenderStakeSender = Sender>; +pub type FindPacketSenderStakeReceiver = Receiver>; + +#[derive(Debug, Default)] +struct FindPacketSenderStakeStats { + last_print: u64, + refresh_ip_to_stake_time: u64, + apply_sender_stakes_time: u64, + send_batches_time: u64, + receive_batches_time: u64, + total_batches: u64, + total_packets: u64, +} + +impl FindPacketSenderStakeStats { + fn report(&mut self) { + let now = timestamp(); + let elapsed_ms = now - self.last_print; + if elapsed_ms > 2000 { + datapoint_info!( + "find_packet_sender_stake-services_stats", + ( + "refresh_ip_to_stake_time", + self.refresh_ip_to_stake_time as i64, + i64 + ), + ( + "apply_sender_stakes_time", + self.apply_sender_stakes_time as i64, + i64 + ), + ("send_batches_time", self.send_batches_time as i64, i64), + ( + "receive_batches_time", + self.receive_batches_time as i64, + i64 + ), + ("total_batches", self.total_batches as i64, i64), + ("total_packets", self.total_packets as i64, i64), + ); + *self = FindPacketSenderStakeStats::default(); + self.last_print = now; + } + } +} + +pub struct FindPacketSenderStakeStage { + thread_hdl: JoinHandle<()>, +} + +impl FindPacketSenderStakeStage { + pub fn new( + packet_receiver: streamer::PacketBatchReceiver, + sender: FindPacketSenderStakeSender, + bank_forks: Arc>, + cluster_info: Arc, + ) -> Self { + let mut stats = FindPacketSenderStakeStats::default(); + let thread_hdl = Builder::new() + .name("find-packet-sender-stake".to_string()) + .spawn(move || { + let mut last_stakes = Instant::now(); + let mut ip_to_stake: HashMap = HashMap::new(); + loop { + let mut refresh_ip_to_stake_time = Measure::start("refresh_ip_to_stake_time"); + Self::try_refresh_ip_to_stake( + &mut last_stakes, + &mut ip_to_stake, + bank_forks.clone(), + cluster_info.clone(), + ); + refresh_ip_to_stake_time.stop(); + stats.refresh_ip_to_stake_time = stats + .refresh_ip_to_stake_time + .saturating_add(refresh_ip_to_stake_time.as_us()); + + match streamer::recv_packet_batches(&packet_receiver) { + Ok((mut batches, num_packets, recv_duration)) => { + let num_batches = batches.len(); + let mut apply_sender_stakes_time = + Measure::start("apply_sender_stakes_time"); + Self::apply_sender_stakes(&mut batches, &ip_to_stake); + apply_sender_stakes_time.stop(); + + let mut send_batches_time = Measure::start("send_batches_time"); + if let Err(e) = sender.send(batches) { + info!("Sender error: {:?}", e); + } + send_batches_time.stop(); + + stats.apply_sender_stakes_time = stats + .apply_sender_stakes_time + .saturating_add(apply_sender_stakes_time.as_us()); + stats.send_batches_time = stats + .send_batches_time + .saturating_add(send_batches_time.as_us()); + stats.receive_batches_time = stats + .receive_batches_time + .saturating_add(recv_duration.as_nanos() as u64); + stats.total_batches = + stats.total_batches.saturating_add(num_batches as u64); + stats.total_packets = + stats.total_packets.saturating_add(num_packets as u64); + } + Err(e) => match e { + StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break, + StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (), + _ => error!("error: {:?}", e), + }, + } + + stats.report(); + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn try_refresh_ip_to_stake( + last_stakes: &mut Instant, + ip_to_stake: &mut HashMap, + bank_forks: Arc>, + cluster_info: Arc, + ) { + if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION { + let root_bank = bank_forks.read().unwrap().root_bank(); + let staked_nodes = root_bank.staked_nodes(); + *ip_to_stake = cluster_info + .tvu_peers() + .into_iter() + .filter_map(|node| { + let stake = staked_nodes.get(&node.id)?; + Some((node.tvu.ip(), *stake)) + }) + .collect(); + *last_stakes = Instant::now(); + } + } + + fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap) { + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .flat_map(|batch| batch.packets.par_iter_mut()) + .for_each(|packet| { + packet.meta.sender_stake = + *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); + }); + }) + }); + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 8f8e9adc3da66c..e4032def450816 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -24,6 +24,7 @@ pub mod cost_update_service; pub mod drop_bank_service; pub mod duplicate_repair_status; pub mod fetch_stage; +pub mod find_packet_sender_stake_stage; pub mod fork_choice; pub mod gen_keys; pub mod heaviest_subtree_fork_choice; diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index ff3db68b13fc5b..b3e4c2d6bd87b0 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -2,17 +2,17 @@ use { crate::packet_hasher::PacketHasher, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Sender}, lru::LruCache, solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, solana_perf::{ cuda_runtime::PinnedVec, - packet::{Packet, PacketBatchRecycler, PacketFlags}, + packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, recycler::Recycler, }, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, - solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, + solana_streamer::streamer::{self, PacketBatchReceiver}, std::{ net::UdpSocket, sync::{atomic::AtomicBool, Arc, RwLock}, @@ -65,7 +65,7 @@ impl ShredFetchStage { // updates packets received on a channel and sends them on another channel fn modify_packets( recvr: PacketBatchReceiver, - sendr: PacketBatchSender, + sendr: Sender>, bank_forks: Option>>, name: &'static str, modify: F, @@ -125,7 +125,7 @@ impl ShredFetchStage { stats = ShredFetchStats::default(); last_stats = Instant::now(); } - if sendr.send(packet_batch).is_err() { + if sendr.send(vec![packet_batch]).is_err() { break; } } @@ -134,7 +134,7 @@ impl ShredFetchStage { fn packet_modifier( sockets: Vec>, exit: &Arc, - sender: PacketBatchSender, + sender: Sender>, recycler: Recycler>, bank_forks: Option>>, name: &'static str, @@ -170,7 +170,7 @@ impl ShredFetchStage { sockets: Vec>, forward_sockets: Vec>, repair_socket: Arc, - sender: &PacketBatchSender, + sender: &Sender>, bank_forks: Option>>, exit: &Arc, ) -> Self { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 03bf6005b4a47d..b1388271666c70 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -6,9 +6,9 @@ //! if perf-libs are available use { - crate::sigverify, + crate::{find_packet_sender_stake_stage, sigverify}, core::time::Duration, - crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, + crossbeam_channel::{RecvTimeoutError, SendError, Sender}, itertools::Itertools, solana_measure::measure::Measure, solana_perf::{ @@ -16,7 +16,7 @@ use { sigverify::{count_valid_packets, shrink_batches, Deduper}, }, solana_sdk::timing, - solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, + solana_streamer::streamer::{self, StreamerError}, std::{ thread::{self, Builder, JoinHandle}, time::Instant, @@ -192,7 +192,7 @@ impl SigVerifier for DisabledSigVerifier { impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( - packet_receiver: Receiver, + packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, verified_sender: Sender>, verifier: T, ) -> Self { @@ -227,12 +227,12 @@ impl SigVerifyStage { fn verifier( deduper: &Deduper, - recvr: &PacketBatchReceiver, + recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, sendr: &Sender>, verifier: &T, stats: &mut SigVerifierStats, ) -> Result<()> { - let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + let (mut batches, num_packets, recv_duration) = streamer::recv_vec_packet_batches(recvr)?; let batches_len = batches.len(); debug!( @@ -312,7 +312,7 @@ impl SigVerifyStage { } fn verifier_service( - packet_receiver: PacketBatchReceiver, + packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, verified_sender: Sender>, verifier: &T, ) -> JoinHandle<()> { @@ -358,7 +358,7 @@ impl SigVerifyStage { } fn verifier_services( - packet_receiver: PacketBatchReceiver, + packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, verified_sender: Sender>, verifier: T, ) -> JoinHandle<()> { @@ -445,7 +445,7 @@ mod tests { for _ in 0..batches.len() { if let Some(batch) = batches.pop() { sent_len += batch.packets.len(); - packet_s.send(batch).unwrap(); + packet_s.send(vec![batch]).unwrap(); } } let mut received = 0; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a1deaf571041fb..929bbb4d254070 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -10,6 +10,7 @@ use { GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, }, fetch_stage::FetchStage, + find_packet_sender_stake_stage::FindPacketSenderStakeStage, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }, @@ -55,6 +56,8 @@ pub struct Tpu { cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, + find_packet_sender_stake_stage: FindPacketSenderStakeStage, + vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage, } impl Tpu { @@ -103,6 +106,26 @@ impl Tpu { poh_recorder, tpu_coalesce_ms, ); + + let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded(); + + let find_packet_sender_stake_stage = FindPacketSenderStakeStage::new( + packet_receiver, + find_packet_sender_stake_sender, + bank_forks.clone(), + cluster_info.clone(), + ); + + let (vote_find_packet_sender_stake_sender, vote_find_packet_sender_stake_receiver) = + unbounded(); + + let vote_find_packet_sender_stake_stage = FindPacketSenderStakeStage::new( + vote_packet_receiver, + vote_find_packet_sender_stake_sender, + bank_forks.clone(), + cluster_info.clone(), + ); + let (verified_sender, verified_receiver) = unbounded(); let tpu_quic_t = solana_streamer::quic::spawn_server( @@ -117,7 +140,7 @@ impl Tpu { let sigverify_stage = { let verifier = TransactionSigVerifier::default(); - SigVerifyStage::new(packet_receiver, verified_sender, verifier) + SigVerifyStage::new(find_packet_sender_stake_receiver, verified_sender, verifier) }; let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); @@ -125,7 +148,7 @@ impl Tpu { let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(); SigVerifyStage::new( - vote_packet_receiver, + vote_find_packet_sender_stake_receiver, verified_tpu_vote_packets_sender, verifier, ) @@ -179,6 +202,8 @@ impl Tpu { cluster_info_vote_listener, broadcast_stage, tpu_quic_t, + find_packet_sender_stake_stage, + vote_find_packet_sender_stake_stage, } } @@ -189,6 +214,8 @@ impl Tpu { self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), + self.find_packet_sender_stake_stage.join(), + self.vote_find_packet_sender_stake_stage.join(), ]; self.tpu_quic_t.join()?; let broadcast_result = self.broadcast_stage.join(); diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index efea21904364c4..b73590da192138 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -32,6 +32,7 @@ pub struct Meta { pub addr: IpAddr, pub port: u16, pub flags: PacketFlags, + pub sender_stake: u64, } #[derive(Clone)] @@ -145,6 +146,7 @@ impl Default for Meta { addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED), port: 0, flags: PacketFlags::empty(), + sender_stake: 0, } } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 81535c8d2c5d89..b16a9eff6e2233 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -256,6 +256,34 @@ fn recv_send( Ok(()) } +pub fn recv_vec_packet_batches( + recvr: &Receiver>, +) -> Result<(Vec, usize, Duration)> { + let timer = Duration::new(1, 0); + let mut packet_batches = recvr.recv_timeout(timer)?; + let recv_start = Instant::now(); + trace!("got packets"); + let mut num_packets = packet_batches + .iter() + .map(|packets| packets.packets.len()) + .sum::(); + while let Ok(packet_batch) = recvr.try_recv() { + trace!("got more packets"); + num_packets += packet_batch + .iter() + .map(|packets| packets.packets.len()) + .sum::(); + packet_batches.extend(packet_batch); + } + let recv_duration = recv_start.elapsed(); + trace!( + "packet batches len: {}, num packets: {}", + packet_batches.len(), + num_packets + ); + Ok((packet_batches, num_packets, recv_duration)) +} + pub fn recv_packet_batches( recvr: &PacketBatchReceiver, ) -> Result<(Vec, usize, Duration)> {