Skip to content

Commit

Permalink
Comments and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Oct 5, 2018
1 parent 650e35b commit 4886e5d
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 32 deletions.
4 changes: 4 additions & 0 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ impl Bank {
if result.is_ok() {
let tx = &entry.transactions[i];
if tx.vote().is_some() {
// Update the active set in the leader scheduler
leader_scheduler.push_vote(*tx.from(), entry_height_option.expect("entry_height_option can't be None if leader_scheduler_option isn't None"));
}
}
Expand All @@ -758,6 +759,7 @@ impl Bank {
}

if let Some(ref mut leader_scheduler) = leader_scheduler_option {
// Update the leader schedule based on entry height
leader_scheduler.update_height(entry_height_option.unwrap(), self);
}

Expand Down Expand Up @@ -994,6 +996,8 @@ impl Bank {
}

#[cfg(test)]
// Used to access accounts for things like controlling stake to control
// the eligible set of nodes for leader selection
pub fn accounts(&self) -> &RwLock<HashMap<Pubkey, Account>> {
&self.accounts
}
Expand Down
9 changes: 1 addition & 8 deletions src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,7 @@ fn main() -> () {
let node_info = node.info.clone();
let pubkey = keypair.pubkey();

let mut fullnode = Fullnode::new(
node,
ledger_path,
keypair,
network,
false,
None,
);
let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false, None);

// airdrop stuff, probably goes away at some point
let leader = match network {
Expand Down
11 changes: 7 additions & 4 deletions src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl BroadcastStage {
window: &SharedWindow,
entry_height: u64,
receiver: &Receiver<Vec<Entry>>,
leader_scheduler_option: Option<Arc<RwLock<LeaderScheduler>>>,
leader_scheduler_option: &Option<Arc<RwLock<LeaderScheduler>>>,
) -> BroadcastStageReturnType {
let mut transmit_index = WindowIndex {
data: entry_height,
Expand All @@ -193,15 +193,17 @@ impl BroadcastStage {
let mut receive_index = entry_height;
let me = crdt.read().unwrap().my_data().clone();
let mut leader_rotation_interval_option = None;
if let Some(ref leader_scheduler) = leader_scheduler_option {
if let Some(leader_scheduler) = leader_scheduler_option {
// Keep track of the leader_rotation_interval, so we don't have to
// grab the leader_scheduler lock every iteration of the loop.
leader_rotation_interval_option =
Some(leader_scheduler.read().unwrap().leader_rotation_interval);
}
loop {
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
&leader_rotation_interval_option,
&leader_scheduler_option,
leader_scheduler_option,
&me,
&broadcast_table,
&window,
Expand Down Expand Up @@ -252,14 +254,15 @@ impl BroadcastStage {
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let leader_scheduler_option_ = leader_scheduler_option;
let _exit = Finalizer::new(exit_sender);
Self::run(
&sock,
&crdt,
&window,
entry_height,
&receiver,
leader_scheduler_option,
&leader_scheduler_option_,
)
}).unwrap();

Expand Down
13 changes: 10 additions & 3 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,16 @@ impl Crdt {
.unwrap()
.get_scheduled_leader(entry_height);
// In the case the next scheduled leader is None, then the write_stage moved
// the schedule too far ahead and we no longer are in the known window. But that's
// fine because that means we're still in power, in which case, just continue
// broadcasting until we catch up.
// the schedule too far ahead and we no longer are in the known window
// (will happen during calculation of the next set of slots every epoch or
// seed_rotation_interval heights when we move the window forward in the
// LeaderScheduler). For correctness, this is fine write_stage will never send
// blobs past the point of when this node should stop being leader, so we just
// continue broadcasting until we catch up to write_stage. The downside is we
// can't guarantee the current leader will broadcast the last entry to the next
// scheduled leader, so the next leader will have to rely on avalanche/repairs
// to get this last blob, which could cause slowdowns during leader handoffs.
// See corresponding issue for repairs in repair() function in window.rs.
if next_leader_id.is_some() && next_leader_id != Some(me.id) {
let info_result = broadcast_table
.iter()
Expand Down
14 changes: 5 additions & 9 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,7 @@ impl Fullnode {
ledger_path,
sigverify_disabled,
entry_height,
leader_scheduler_option
.as_ref()
.map(|arc_ls| arc_ls.clone()),
leader_scheduler_option.as_ref().cloned(),
);

let broadcast_stage = BroadcastStage::new(
Expand All @@ -359,9 +357,7 @@ impl Fullnode {
shared_window.clone(),
entry_height,
entry_receiver,
leader_scheduler_option
.as_ref()
.map(|arc_ls| arc_ls.clone()),
leader_scheduler_option.as_ref().cloned(),
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
Expand Down Expand Up @@ -456,7 +452,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(&self.ledger_path),
self.leader_scheduler_option.as_ref().map(|ls| ls.clone()),
self.leader_scheduler_option.as_ref().cloned(),
);
let validator_state = ValidatorServices::new(tvu);
self.node_role = Some(NodeRole::Validator(validator_state));
Expand All @@ -477,7 +473,7 @@ impl Fullnode {
&self.ledger_path,
self.sigverify_disabled,
entry_height,
self.leader_scheduler_option.as_ref().map(|ls| ls.clone()),
self.leader_scheduler_option.as_ref().cloned(),
);

let broadcast_stage = BroadcastStage::new(
Expand All @@ -488,7 +484,7 @@ impl Fullnode {
self.shared_window.clone(),
entry_height,
blob_receiver,
self.leader_scheduler_option.as_ref().map(|ls| ls.clone()),
self.leader_scheduler_option.as_ref().cloned(),
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
Expand Down
2 changes: 1 addition & 1 deletion src/replicate_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl ReplicateStage {
}
}

return None;
None
}).unwrap();

ReplicateStage {
Expand Down
4 changes: 1 addition & 3 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ impl Tvu {
Arc::new(retransmit_socket),
repair_socket,
blob_fetch_receiver,
leader_scheduler_option
.as_ref()
.map(|arc_ls| arc_ls.clone()),
leader_scheduler_option.as_ref().cloned(),
);

let replicate_stage = ReplicateStage::new(
Expand Down
14 changes: 10 additions & 4 deletions src/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ impl WindowUtil for Window {
// 1) The replicate stage hasn't caught up to the "consumed" entries we sent,
// in which case it will eventually catch up
//
// 2) drops that entry, then everybody will blocking waiting for that "nth"
// entry instead of repairing, until we hit "times" >= the max times in
// calculate_max_repair(). If max times is not large, this shouldn't be a
// big issue.
// 2) We are on the border between seed_rotation_intervals, so the
// schedule won't be known until the entry on that cusp is received
// by the replicate stage (which comes after this stage). Hence, the next
// leader at the beginning of that next epoch will not know he is the
// leader until he receives that last "cusp" entry. He also won't ask for repairs
// for that entry because "is_next_leader" won't be set here. In this case,
// everybody will be blocking waiting for that "cusp" entry instead of repairing,
// until the leader hits "times" >= the max times in calculate_max_repair().
// The impact of this, along with the similar problem from broadcast for the transitioning
// leader, can be observed in the multinode test, test_full_leader_validator_network(),
None => (),
_ => (),
}
Expand Down

0 comments on commit 4886e5d

Please sign in to comment.