diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ceb7c1ffb76e14..0d43bbe48aeebc 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -396,21 +396,38 @@ impl BankingStage { data_budget: &DataBudget, ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); - inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; data_budget.update(INTERVAL_MS, |bytes| { - std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET) + std::cmp::min( + bytes.saturating_add(MAX_BYTES_PER_INTERVAL), + MAX_BYTES_BUDGET, + ) }); + + let mut forwarded_packet_count: usize = 0; + let mut forward_result = Ok(()); for p in packets { - if data_budget.take(p.meta.size) { - socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; + if !p.meta.forwarded && data_budget.take(p.meta.size) { + match socket.send_to(&p.data[..p.meta.size], &tpu_forwards) { + Ok(_) => { + forwarded_packet_count = forwarded_packet_count.saturating_add(1); + } + Err(err) => { + forward_result = Err(err); + break; + } + } } } - Ok(()) + if forwarded_packet_count > 0 { + inc_new_counter_info!("banking_stage-forwarded_packets", forwarded_packet_count); + } + + forward_result } // Returns whether the given `PacketBatch` has any more remaining unprocessed @@ -1696,7 +1713,7 @@ mod tests { system_transaction, transaction::TransactionError, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::TransactionWithStatusMeta, solana_vote_program::vote_transaction, std::{ @@ -2978,16 +2995,15 @@ mod tests { fn test_forwarder_budget() { solana_logger::setup(); // Create `PacketBatch` with 1 unprocessed packet - let single_packet_batch = PacketBatch::new(vec![Packet::default()]); - let mut unprocessed_packets: UnprocessedPacketBatches = - vec![(single_packet_batch, vec![0], false)] - .into_iter() - .collect(); - - let cluster_info = new_test_cluster_info(Node::new_localhost().info); + let packet = Packet::from_data(None, &[0]).unwrap(); + let single_packet_batch = PacketBatch::new(vec![packet]); let genesis_config_info = create_slow_genesis_config(10_000); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; let bank = Arc::new(Bank::new_no_wallclock_throttle(genesis_config)); let ledger_path = get_tmp_ledger_path!(); @@ -3006,17 +3022,155 @@ mod tests { let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blockstore, Some(poh_config)); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let data_budget = DataBudget::default(); - BankingStage::handle_forwarding( - &ForwardOption::ForwardTransaction, - &cluster_info, - &mut unprocessed_packets, - &poh_recorder, - &socket, - false, - &data_budget, + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("budget-restricted", DataBudget::restricted(), 0), + ("budget-available", DataBudget::default(), 1), + ]; + + for (name, data_budget, expected_num_forwarded) in test_cases { + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(single_packet_batch.clone(), vec![0], false)] + .into_iter() + .collect(); + BankingStage::handle_forwarding( + &ForwardOption::ForwardTransaction, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + true, + &data_budget, + ); + + recv_socket + .set_nonblocking(expected_num_forwarded == 0) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_num_forwarded, "{}", name); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + #[test] + fn test_handle_forwarding() { + solana_logger::setup(); + + const FWD_PACKET: u8 = 1; + let forwarded_packet = { + let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap(); + packet.meta.forwarded = true; + packet + }; + + const NORMAL_PACKET: u8 = 2; + let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap(); + + let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]); + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(packet_batch, vec![0, 1], false)] + .into_iter() + .collect(); + + let genesis_config_info = create_slow_genesis_config(10_000); + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; + let bank = Arc::new(Bank::new_no_wallclock_throttle(genesis_config)); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new( + Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"), ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config)); + + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("not-forward", ForwardOption::NotForward, true, vec![], 2), + ( + "fwd-normal", + ForwardOption::ForwardTransaction, + true, + vec![NORMAL_PACKET], + 2, + ), + ( + "fwd-no-op", + ForwardOption::ForwardTransaction, + true, + vec![], + 2, + ), + ( + "fwd-no-hold", + ForwardOption::ForwardTransaction, + false, + vec![], + 0, + ), + ]; + + for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { + BankingStage::handle_forwarding( + &forward_option, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + hold, + &DataBudget::default(), + ); + + recv_socket + .set_nonblocking(expected_ids.is_empty()) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_ids.len(), "{}", name); + for (i, expected_id) in expected_ids.iter().enumerate() { + assert_eq!(packets[i].meta.size, 1); + assert_eq!(packets[i].data[0], *expected_id, "{}", name); + } + + let num_unprocessed_packets: usize = unprocessed_packet_batches + .iter() + .map(|(b, ..)| b.packets.len()) + .sum(); + assert_eq!( + num_unprocessed_packets, expected_num_unprocessed, + "{}", + name + ); + } + exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index 6d2dc74ce46c64..322ea233b0ba1f 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -506,11 +506,7 @@ mod tests { let validator_vote_keypairs = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![100; 1], diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 6785b3ae611888..d705fc601100b7 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1689,7 +1689,7 @@ pub mod test { let GenesisConfigInfo { genesis_config, mint_keypair, - voting_keypair: _, + .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index d890a9ed35444b..a1af7e7160dfd2 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -8,7 +8,7 @@ use { solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, - solana_sdk::clock::DEFAULT_TICKS_PER_SLOT, + solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, packet::Packet}, solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, std::{ net::UdpSocket, @@ -83,10 +83,16 @@ impl FetchStage { sendr: &PacketBatchSender, poh_recorder: &Arc>, ) -> Result<()> { - let packet_batch = recvr.recv()?; + let mark_forwarded = |packet: &mut Packet| { + packet.meta.forwarded = true; + }; + + let mut packet_batch = recvr.recv()?; let mut num_packets = packet_batch.packets.len(); + packet_batch.packets.iter_mut().for_each(mark_forwarded); let mut packet_batches = vec![packet_batch]; - while let Ok(packet_batch) = recvr.try_recv() { + while let Ok(mut packet_batch) = recvr.try_recv() { + packet_batch.packets.iter_mut().for_each(mark_forwarded); num_packets += packet_batch.packets.len(); packet_batches.push(packet_batch); // Read at most 1K transactions in a loop @@ -114,7 +120,7 @@ impl FetchStage { } fn new_multi_socket( - sockets: Vec>, + tpu_sockets: Vec>, tpu_forwards_sockets: Vec>, tpu_vote_sockets: Vec>, exit: &Arc, @@ -125,7 +131,7 @@ impl FetchStage { ) -> Self { let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); - let tpu_threads = sockets.into_iter().map(|socket| { + let tpu_threads = tpu_sockets.into_iter().map(|socket| { streamer::receiver( socket, exit, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index ae93816d72fc41..0c8b7a6f3cef78 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -192,7 +192,7 @@ impl ShredFetchStage { recycler.clone(), bank_forks.clone(), "shred_fetch_tvu_forwards", - |p| p.meta.forward = true, + |p| p.meta.forwarded = true, ); let (repair_receiver, repair_handler) = Self::packet_modifier( diff --git a/perf/src/data_budget.rs b/perf/src/data_budget.rs index 24eb0bb84ec5cc..4c35fc6ce35caa 100644 --- a/perf/src/data_budget.rs +++ b/perf/src/data_budget.rs @@ -10,6 +10,14 @@ pub struct DataBudget { } impl DataBudget { + /// Create a data budget with max bytes, used for tests + pub fn restricted() -> Self { + Self { + bytes: AtomicUsize::default(), + last_timestamp_ms: AtomicU64::new(u64::MAX), + } + } + // If there are enough bytes in the budget, consumes from // the budget and returns true. Otherwise returns false. #[must_use] diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index 92cb627f9e368c..ca6cfa7d3d9f92 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -830,6 +830,7 @@ impl ProgramTest { genesis_config, mint_keypair, voting_keypair, + validator_pubkey: bootstrap_validator_pubkey, }, ) } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 2a045cce31df6b..ee448d89156c5c 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -6035,6 +6035,7 @@ pub mod tests { mut genesis_config, mint_keypair, voting_keypair, + .. } = create_genesis_config(TEST_MINT_LAMPORTS); genesis_config.rent.lamports_per_byte_year = 50; diff --git a/rpc/src/send_transaction_service.rs b/rpc/src/send_transaction_service.rs index 0e72a9f41b97c7..9ad8d321c4e9ca 100644 --- a/rpc/src/send_transaction_service.rs +++ b/rpc/src/send_transaction_service.rs @@ -953,11 +953,7 @@ mod test { &validator_vote_keypairs1, &validator_vote_keypairs2, ]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![10_000; 3], diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d60fe2de73a894..f951d29403c020 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -13659,11 +13659,7 @@ pub(crate) mod tests { let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs0, &validator_vote_keypairs1]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![10_000; 2], diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index a283c0fd29d4c2..87098d2b881c0d 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -629,8 +629,8 @@ mod tests { let leader_keypair = Keypair::new(); let GenesisConfigInfo { mut genesis_config, - mint_keypair: _, voting_keypair, + .. } = create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1_000); let slots_in_epoch = 32; genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); diff --git a/runtime/src/genesis_utils.rs b/runtime/src/genesis_utils.rs index 71222fdf441b31..7db7a4f8a07cbd 100644 --- a/runtime/src/genesis_utils.rs +++ b/runtime/src/genesis_utils.rs @@ -52,6 +52,7 @@ pub struct GenesisConfigInfo { pub genesis_config: GenesisConfig, pub mint_keypair: Keypair, pub voting_keypair: Keypair, + pub validator_pubkey: Pubkey, } pub fn create_genesis_config(mint_lamports: u64) -> GenesisConfigInfo { @@ -84,10 +85,11 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( let voting_keypair = Keypair::from_bytes(&voting_keypairs[0].borrow().vote_keypair.to_bytes()).unwrap(); + let validator_pubkey = voting_keypairs[0].borrow().node_keypair.pubkey(); let genesis_config = create_genesis_config_with_leader_ex( mint_lamports, &mint_keypair.pubkey(), - &voting_keypairs[0].borrow().node_keypair.pubkey(), + &validator_pubkey, &voting_keypairs[0].borrow().vote_keypair.pubkey(), &voting_keypairs[0].borrow().stake_keypair.pubkey(), stakes[0], @@ -102,6 +104,7 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( genesis_config, mint_keypair, voting_keypair, + validator_pubkey, }; for (validator_voting_keypairs, stake) in voting_keypairs[1..].iter().zip(&stakes[1..]) { @@ -159,6 +162,7 @@ pub fn create_genesis_config_with_leader( genesis_config, mint_keypair, voting_keypair, + validator_pubkey: *validator_pubkey, } } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 27435b3ae677fb..19b0bae996d493 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -18,7 +18,7 @@ pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; #[repr(C)] pub struct Meta { pub size: usize, - pub forward: bool, + pub forwarded: bool, pub repair: bool, pub discard: bool, pub addr: [u16; 8],