From 64868876fbcbd0155bf33e0fa3258a31ee2b5831 Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 5 Oct 2018 15:00:01 -0700 Subject: [PATCH] Remove unneeded checks from broadcast stage --- src/broadcast_stage.rs | 196 ----------------------------------------- 1 file changed, 196 deletions(-) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 7173874baa2932..ea6a0961706b30 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -200,31 +200,6 @@ impl BroadcastStage { Some(leader_scheduler.read().unwrap().leader_rotation_interval); } loop { - if let Some(leader_rotation_interval) = leader_rotation_interval_option { - if transmit_index.data % (leader_rotation_interval as u64) == 0 { - let my_id = me.id; - let leader_scheduler_lock = leader_scheduler_option.as_ref().expect( - "leader_scheduler_option cannot be None if - leader_rotation_interval_option is not None", - ); - - let rlock = leader_scheduler_lock.read().unwrap(); - - match rlock.get_scheduled_leader(transmit_index.data) { - Some(leader_id) if leader_id == my_id => (), - // In this case, the write_stage moved the schedule too far - // ahead and we no longer are in the known window. In that case, - // just continue broadcasting until we catch up. - None => (), - // If the leader stays in power for the next - // round as well, then we don't exit. Otherwise, exit. - _ => { - return BroadcastStageReturnType::LeaderRotation; - } - } - } - } - let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( &leader_rotation_interval_option, @@ -301,174 +276,3 @@ impl Service for BroadcastStage { self.thread_hdl.join() } } - -#[cfg(test)] -mod tests { - use bank::Bank; - use broadcast_stage::{BroadcastStage, BroadcastStageReturnType}; - use crdt::{Crdt, Node}; - use entry::Entry; - use leader_scheduler::{set_new_leader, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::next_entries_mut; - use mint::Mint; - use service::Service; - use signature::{Keypair, KeypairUtil}; - use std::cmp; - use std::sync::atomic::AtomicBool; - use std::sync::mpsc::{channel, Sender}; - use std::sync::{Arc, RwLock}; - use window::{new_window_from_entries, SharedWindow}; - - struct DummyBroadcastStage { - bank: Bank, - broadcast_stage: BroadcastStage, - shared_window: SharedWindow, - entry_sender: Sender>, - entries: Vec, - leader_scheduler: Arc>, - } - - fn setup_dummy_broadcast_stage( - leader_keypair: Keypair, - leader_scheduler_config: &LeaderSchedulerConfig, - ) -> DummyBroadcastStage { - // Setup dummy leader info - let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - - // Give the leader somebody to broadcast to so he isn't lonely - let buddy_keypair = Keypair::new(); - let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey()); - - // Fill the crdt with the buddy's info - let mut crdt = Crdt::new(leader_info.info.clone()).expect("Crdt::new"); - crdt.insert(&broadcast_buddy.info); - let crdt = Arc::new(RwLock::new(crdt)); - - // Make dummy initial entries - let mint = Mint::new(10000); - let entries = mint.create_entries(); - let entry_height = entries.len() as u64; - - // Setup a window - let window = new_window_from_entries(&entries, entry_height, &leader_info.info); - - let shared_window = Arc::new(RwLock::new(window)); - - let (entry_sender, entry_receiver) = channel(); - let exit_sender = Arc::new(AtomicBool::new(false)); - - // Make a leader scheduler to manipulate in later tests - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(leader_scheduler_config))); - - // Make a bank to manipulate the stakes for leader selection - let bank = Bank::new_default(true); - - // Start up the broadcast stage - let broadcast_stage = BroadcastStage::new( - leader_info.sockets.broadcast, - crdt.clone(), - shared_window.clone(), - entry_height, - entry_receiver, - Some(leader_scheduler.clone()), - exit_sender, - ); - - DummyBroadcastStage { - broadcast_stage, - shared_window, - entry_sender, - bank, - entries, - leader_scheduler, - } - } - - fn find_highest_window_index(shared_window: &SharedWindow) -> u64 { - let window = shared_window.read().unwrap(); - window.iter().fold(0, |m, w_slot| { - if let Some(ref blob) = w_slot.data { - cmp::max(m, blob.read().unwrap().get_index().unwrap()) - } else { - m - } - }) - } - - #[test] - fn test_broadcast_stage_leader_rotation_exit() { - let leader_keypair = Keypair::new(); - let leader_rotation_interval = 10; - let bootstrap_height = 20; - let seed_rotation_interval = 3 * leader_rotation_interval; - let active_window = bootstrap_height + 3 * seed_rotation_interval; - let leader_scheduler_config = LeaderSchedulerConfig::new( - leader_keypair.pubkey(), - Some(bootstrap_height), - Some(leader_rotation_interval), - Some(seed_rotation_interval), - // The active window needs to >= the cumulative entry_height reached by - // any of the below test cases - Some(active_window), - ); - - let broadcast_info = setup_dummy_broadcast_stage(leader_keypair, &leader_scheduler_config); - let genesis_len = broadcast_info.entries.len() as u64; - assert!(genesis_len < bootstrap_height); - let mut last_id = broadcast_info - .entries - .last() - .expect("Ledger should not be empty") - .id; - let mut num_hashes = 0; - - // Input enough entries to make exactly leader_rotation_interval entries, which will - // trigger a check for leader rotation. Because the next scheduled leader - // is ourselves, we won't exit - for entry_height in (genesis_len + 1)..(bootstrap_height + 1) { - broadcast_info - .leader_scheduler - .write() - .unwrap() - .update_height(entry_height, &broadcast_info.bank); - let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); - broadcast_info.entry_sender.send(new_entry).unwrap(); - } - - // Set the scheduled leader for the next seed_rotation_interval to somebody else - let next_leader_height = bootstrap_height + seed_rotation_interval; - set_new_leader( - &broadcast_info.bank, - &mut (*broadcast_info.leader_scheduler.write().unwrap()), - next_leader_height, - ); - - // Input another leader_rotation_interval dummy entries, which will take us - // past the point of the leader rotation. The broadcast_stage will see that - // it's no longer the leader after checking the schedule, and exit - for entry_height in (bootstrap_height + 1)..(next_leader_height + 1) { - broadcast_info - .leader_scheduler - .write() - .unwrap() - .update_height(entry_height, &broadcast_info.bank); - let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); - match broadcast_info.entry_sender.send(new_entry) { - // We disconnected, break out of loop and check the results - Err(_) => break, - _ => (), - }; - } - - // Make sure the threads closed cleanly - assert_eq!( - broadcast_info.broadcast_stage.join().unwrap(), - BroadcastStageReturnType::LeaderRotation - ); - - let highest_index = find_highest_window_index(&broadcast_info.shared_window); - // The blob index is zero indexed, so it will always be one behind the entry height - // which starts at one. - assert_eq!(highest_index, bootstrap_height + seed_rotation_interval - 1); - } -}