Skip to content

Commit

Permalink
Remove unneeded checks from broadcast stage
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Oct 5, 2018
1 parent f8b6305 commit 6486887
Showing 1 changed file with 0 additions and 196 deletions.
196 changes: 0 additions & 196 deletions src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,31 +200,6 @@ impl BroadcastStage {
Some(leader_scheduler.read().unwrap().leader_rotation_interval);
}
loop {
if let Some(leader_rotation_interval) = leader_rotation_interval_option {
if transmit_index.data % (leader_rotation_interval as u64) == 0 {
let my_id = me.id;
let leader_scheduler_lock = leader_scheduler_option.as_ref().expect(
"leader_scheduler_option cannot be None if
leader_rotation_interval_option is not None",
);

let rlock = leader_scheduler_lock.read().unwrap();

match rlock.get_scheduled_leader(transmit_index.data) {
Some(leader_id) if leader_id == my_id => (),
// In this case, the write_stage moved the schedule too far
// ahead and we no longer are in the known window. In that case,
// just continue broadcasting until we catch up.
None => (),
// If the leader stays in power for the next
// round as well, then we don't exit. Otherwise, exit.
_ => {
return BroadcastStageReturnType::LeaderRotation;
}
}
}
}

let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
&leader_rotation_interval_option,
Expand Down Expand Up @@ -301,174 +276,3 @@ impl Service for BroadcastStage {
self.thread_hdl.join()
}
}

#[cfg(test)]
mod tests {
use bank::Bank;
use broadcast_stage::{BroadcastStage, BroadcastStageReturnType};
use crdt::{Crdt, Node};
use entry::Entry;
use leader_scheduler::{set_new_leader, LeaderScheduler, LeaderSchedulerConfig};
use ledger::next_entries_mut;
use mint::Mint;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::cmp;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, RwLock};
use window::{new_window_from_entries, SharedWindow};

struct DummyBroadcastStage {
bank: Bank,
broadcast_stage: BroadcastStage,
shared_window: SharedWindow,
entry_sender: Sender<Vec<Entry>>,
entries: Vec<Entry>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
}

fn setup_dummy_broadcast_stage(
leader_keypair: Keypair,
leader_scheduler_config: &LeaderSchedulerConfig,
) -> DummyBroadcastStage {
// Setup dummy leader info
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());

// Give the leader somebody to broadcast to so he isn't lonely
let buddy_keypair = Keypair::new();
let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey());

// Fill the crdt with the buddy's info
let mut crdt = Crdt::new(leader_info.info.clone()).expect("Crdt::new");
crdt.insert(&broadcast_buddy.info);
let crdt = Arc::new(RwLock::new(crdt));

// Make dummy initial entries
let mint = Mint::new(10000);
let entries = mint.create_entries();
let entry_height = entries.len() as u64;

// Setup a window
let window = new_window_from_entries(&entries, entry_height, &leader_info.info);

let shared_window = Arc::new(RwLock::new(window));

let (entry_sender, entry_receiver) = channel();
let exit_sender = Arc::new(AtomicBool::new(false));

// Make a leader scheduler to manipulate in later tests
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(leader_scheduler_config)));

// Make a bank to manipulate the stakes for leader selection
let bank = Bank::new_default(true);

// Start up the broadcast stage
let broadcast_stage = BroadcastStage::new(
leader_info.sockets.broadcast,
crdt.clone(),
shared_window.clone(),
entry_height,
entry_receiver,
Some(leader_scheduler.clone()),
exit_sender,
);

DummyBroadcastStage {
broadcast_stage,
shared_window,
entry_sender,
bank,
entries,
leader_scheduler,
}
}

fn find_highest_window_index(shared_window: &SharedWindow) -> u64 {
let window = shared_window.read().unwrap();
window.iter().fold(0, |m, w_slot| {
if let Some(ref blob) = w_slot.data {
cmp::max(m, blob.read().unwrap().get_index().unwrap())
} else {
m
}
})
}

#[test]
fn test_broadcast_stage_leader_rotation_exit() {
let leader_keypair = Keypair::new();
let leader_rotation_interval = 10;
let bootstrap_height = 20;
let seed_rotation_interval = 3 * leader_rotation_interval;
let active_window = bootstrap_height + 3 * seed_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
leader_keypair.pubkey(),
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
// The active window needs to >= the cumulative entry_height reached by
// any of the below test cases
Some(active_window),
);

let broadcast_info = setup_dummy_broadcast_stage(leader_keypair, &leader_scheduler_config);
let genesis_len = broadcast_info.entries.len() as u64;
assert!(genesis_len < bootstrap_height);
let mut last_id = broadcast_info
.entries
.last()
.expect("Ledger should not be empty")
.id;
let mut num_hashes = 0;

// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
for entry_height in (genesis_len + 1)..(bootstrap_height + 1) {
broadcast_info
.leader_scheduler
.write()
.unwrap()
.update_height(entry_height, &broadcast_info.bank);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
broadcast_info.entry_sender.send(new_entry).unwrap();
}

// Set the scheduled leader for the next seed_rotation_interval to somebody else
let next_leader_height = bootstrap_height + seed_rotation_interval;
set_new_leader(
&broadcast_info.bank,
&mut (*broadcast_info.leader_scheduler.write().unwrap()),
next_leader_height,
);

// Input another leader_rotation_interval dummy entries, which will take us
// past the point of the leader rotation. The broadcast_stage will see that
// it's no longer the leader after checking the schedule, and exit
for entry_height in (bootstrap_height + 1)..(next_leader_height + 1) {
broadcast_info
.leader_scheduler
.write()
.unwrap()
.update_height(entry_height, &broadcast_info.bank);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
match broadcast_info.entry_sender.send(new_entry) {
// We disconnected, break out of loop and check the results
Err(_) => break,
_ => (),
};
}

// Make sure the threads closed cleanly
assert_eq!(
broadcast_info.broadcast_stage.join().unwrap(),
BroadcastStageReturnType::LeaderRotation
);

let highest_index = find_highest_window_index(&broadcast_info.shared_window);
// The blob index is zero indexed, so it will always be one behind the entry height
// which starts at one.
assert_eq!(highest_index, bootstrap_height + seed_rotation_interval - 1);
}
}

0 comments on commit 6486887

Please sign in to comment.