Skip to content

Commit

Permalink
add timing metrics, some renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
tao-stones committed Mar 18, 2022
1 parent fd51509 commit c478fe2
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 106 deletions.
69 changes: 5 additions & 64 deletions core/src/cluster_nodes.rs
@@ -1,9 +1,5 @@
use {
crate::{
broadcast_stage::BroadcastStage,
find_packet_sender_stake_stage::FindPacketSenderStakeStage,
retransmit_stage::RetransmitStage,
},
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
itertools::Itertools,
lru::LruCache,
rand::{seq::SliceRandom, Rng, SeedableRng},
Expand Down Expand Up @@ -32,7 +28,7 @@ use {
collections::HashMap,
iter::repeat_with,
marker::PhantomData,
net::{IpAddr, Ipv4Addr, SocketAddr},
net::SocketAddr,
ops::Deref,
sync::{Arc, Mutex},
time::{Duration, Instant},
Expand Down Expand Up @@ -317,19 +313,6 @@ impl ClusterNodes<RetransmitStage> {
}
}

impl ClusterNodes<FindPacketSenderStakeStage> {
pub(crate) fn get_ip_to_stakes(&self) -> HashMap<IpAddr, u64> {
self.compat_index
.iter()
.filter_map(|(_, i)| {
let node = &self.nodes[*i];
let contact_info = node.contact_info()?;
Some((contact_info.tvu.ip(), node.stake))
})
.collect()
}
}

pub fn new_cluster_nodes<T: 'static>(
cluster_info: &ClusterInfo,
stakes: &HashMap<Pubkey, u64>,
Expand Down Expand Up @@ -505,20 +488,9 @@ pub fn make_test_cluster<R: Rng>(
ClusterInfo,
) {
let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7));
let mut ip_addr_octet: usize = 0;
let mut nodes: Vec<_> = repeat_with(|| {
let mut contact_info = ContactInfo::new_rand(rng, None);
contact_info.tvu.set_ip(IpAddr::V4(Ipv4Addr::new(
127,
0,
0,
(ip_addr_octet % 256) as u8,
)));
ip_addr_octet += 1;
contact_info
})
.take(num_nodes)
.collect();
let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None))
.take(num_nodes)
.collect();
nodes.shuffle(rng);
let this_node = nodes[0].clone();
let mut stakes: HashMap<Pubkey, u64> = nodes
Expand Down Expand Up @@ -713,35 +685,4 @@ mod tests {
assert_eq!(*peer, peers[index]);
}
}

#[test]
fn test_cluster_nodes_transaction_weight() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 14, None);
let cluster_nodes = new_cluster_nodes::<FindPacketSenderStakeStage>(&cluster_info, &stakes);

// All nodes with contact-info should be in the index.
assert_eq!(cluster_nodes.compat_index.len(), nodes.len());
// Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len());

let ip_to_stake = cluster_nodes.get_ip_to_stakes();

// Only staked nodes with contact_info should be in the ip_to_stake
let stacked_nodes_with_contact_info: HashMap<_, _> = stakes
.iter()
.filter_map(|(pubkey, stake)| {
let node = nodes.iter().find(|node| node.id == *pubkey)?;
Some((node.tvu.ip(), stake))
})
.collect();
ip_to_stake.iter().for_each(|(ip, stake)| {
// ignoring the 0 staked, because non-stacked nodes are defaulted into 0 stake.
if *stake > 0 {
let expected_stake = stacked_nodes_with_contact_info.get(ip).unwrap();
assert_eq!(stake, *expected_stake);
}
});
}
}
132 changes: 110 additions & 22 deletions core/src/find_packet_sender_stake_stage.rs
@@ -1,11 +1,12 @@
use {
crate::cluster_nodes::ClusterNodesCache,
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank_forks::BankForks,
solana_sdk::timing::timestamp,
solana_streamer::streamer::{self, StreamerError},
std::{
cell::RefCell,
Expand All @@ -17,66 +18,153 @@ use {
},
};

const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
const STAKES_REFRESH_PERIOD_IN_MS: u128 = 1000;
const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5);

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("transaction_sender_stake_stage_{}", ix))
.build()
.unwrap()));

pub type FindPacketSenderStakeSender = Sender<Vec<PacketBatch>>;
pub type FindPacketSenderStakeReceiver = Receiver<Vec<PacketBatch>>;

#[derive(Debug, Default)]
struct FindPacketSenderStakeStats {
last_print: u64,
refresh_ip_to_stake_time: u64,
apply_sender_stakes_time: u64,
send_batches_time: u64,
receive_batches_time: u64,
total_batches: u64,
total_packets: u64,
}

impl FindPacketSenderStakeStats {
fn report(&mut self) {
let now = timestamp();
let elapsed_ms = now - self.last_print;
if elapsed_ms > 2000 {
datapoint_info!(
"find_packet_sender_stake-services_stats",
(
"refresh_ip_to_stake_time",
self.refresh_ip_to_stake_time as i64,
i64
),
(
"apply_sender_stakes_time",
self.apply_sender_stakes_time as i64,
i64
),
("send_batches_time", self.send_batches_time as i64, i64),
(
"receive_batches_time",
self.receive_batches_time as i64,
i64
),
("total_batches", self.total_batches as i64, i64),
("total_packets", self.total_packets as i64, i64),
);
*self = FindPacketSenderStakeStats::default();
self.last_print = now;
}
}
}

pub struct FindPacketSenderStakeStage {
thread_hdl: JoinHandle<()>,
}

impl FindPacketSenderStakeStage {
pub fn new(
packet_receiver: Receiver<PacketBatch>,
sender: Sender<Vec<PacketBatch>>,
packet_receiver: streamer::PacketBatchReceiver,
sender: FindPacketSenderStakeSender,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<FindPacketSenderStakeStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
let mut stats = FindPacketSenderStakeStats::default();
let thread_hdl = Builder::new()
.name("sol-tx-sender_stake".to_string())
.name("find-packet-sender-stake".to_string())
.spawn(move || {
let mut last_stakes = Instant::now();
let mut ip_to_stake: HashMap<IpAddr, u64> = HashMap::new();
loop {
if last_stakes.elapsed().as_millis() > STAKES_REFRESH_PERIOD_IN_MS {
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
ip_to_stake = cluster_nodes_cache
.get(root_bank.slot(), &root_bank, &working_bank, &cluster_info)
.get_ip_to_stakes();
last_stakes = Instant::now();
}
let mut refresh_ip_to_stake_time = Measure::start("refresh_ip_to_stake_time");
Self::try_refresh_ip_to_stake(
&mut last_stakes,
&mut ip_to_stake,
bank_forks.clone(),
cluster_info.clone(),
);
refresh_ip_to_stake_time.stop();
stats.refresh_ip_to_stake_time = stats
.refresh_ip_to_stake_time
.saturating_add(refresh_ip_to_stake_time.as_us());

match streamer::recv_packet_batches(&packet_receiver) {
Ok((mut batches, _num_packets, _recv_duration)) => {
Ok((mut batches, num_packets, recv_duration)) => {
let num_batches = batches.len();
let mut apply_sender_stakes_time =
Measure::start("apply_sender_stakes_time");
Self::apply_sender_stakes(&mut batches, &ip_to_stake);
apply_sender_stakes_time.stop();

let mut send_batches_time = Measure::start("send_batches_time");
if let Err(e) = sender.send(batches) {
info!("Sender error: {:?}", e);
}
send_batches_time.stop();

stats.apply_sender_stakes_time = stats
.apply_sender_stakes_time
.saturating_add(apply_sender_stakes_time.as_us());
stats.send_batches_time = stats
.send_batches_time
.saturating_add(send_batches_time.as_us());
stats.receive_batches_time = stats
.receive_batches_time
.saturating_add(recv_duration.as_nanos() as u64);
stats.total_batches =
stats.total_batches.saturating_add(num_batches as u64);
stats.total_packets =
stats.total_packets.saturating_add(num_packets as u64);
}
Err(e) => match e {
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
_ => error!("error: {:?}", e),
},
}

stats.report();
}
})
.unwrap();
Self { thread_hdl }
}

fn try_refresh_ip_to_stake(
last_stakes: &mut Instant,
ip_to_stake: &mut HashMap<IpAddr, u64>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
) {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
*ip_to_stake = cluster_info
.tvu_peers()
.into_iter()
.filter_map(|node| {
let stake = staked_nodes.get(&node.id)?;
Some((node.tvu.ip(), *stake))
})
.collect();
*last_stakes = Instant::now();
}
}

fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
Expand Down
12 changes: 6 additions & 6 deletions core/src/sigverify_stage.rs
Expand Up @@ -6,9 +6,9 @@
//! if perf-libs are available

use {
crate::sigverify,
crate::{find_packet_sender_stake_stage, sigverify},
core::time::Duration,
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
crossbeam_channel::{RecvTimeoutError, SendError, Sender},
itertools::Itertools,
solana_measure::measure::Measure,
solana_perf::{
Expand Down Expand Up @@ -192,7 +192,7 @@ impl SigVerifier for DisabledSigVerifier {
impl SigVerifyStage {
#[allow(clippy::new_ret_no_self)]
pub fn new<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Receiver<Vec<PacketBatch>>,
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
verified_sender: Sender<Vec<PacketBatch>>,
verifier: T,
) -> Self {
Expand Down Expand Up @@ -227,7 +227,7 @@ impl SigVerifyStage {

fn verifier<T: SigVerifier>(
deduper: &Deduper,
recvr: &Receiver<Vec<PacketBatch>>,
recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
sendr: &Sender<Vec<PacketBatch>>,
verifier: &T,
stats: &mut SigVerifierStats,
Expand Down Expand Up @@ -312,7 +312,7 @@ impl SigVerifyStage {
}

fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Receiver<Vec<PacketBatch>>,
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
verified_sender: Sender<Vec<PacketBatch>>,
verifier: &T,
) -> JoinHandle<()> {
Expand Down Expand Up @@ -358,7 +358,7 @@ impl SigVerifyStage {
}

fn verifier_services<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Receiver<Vec<PacketBatch>>,
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
verified_sender: Sender<Vec<PacketBatch>>,
verifier: T,
) -> JoinHandle<()> {
Expand Down

0 comments on commit c478fe2

Please sign in to comment.