Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader Scheduler Plumbing #1437

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 84 additions & 19 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
use bincode::deserialize;
use bincode::serialize;
use budget_program::BudgetState;
use budget_transaction::BudgetTransaction;
use counter::Counter;
use dynamic_program::DynamicProgram;
use entry::Entry;
use hash::{hash, Hash};
use itertools::Itertools;
use leader_scheduler::LeaderScheduler;
use ledger::Block;
use log::Level;
use mint::Mint;
Expand Down Expand Up @@ -731,12 +733,36 @@ impl Bank {
results
}

pub fn process_entry(&self, entry: &Entry) -> Result<()> {
pub fn process_entry(
&self,
entry: &Entry,
entry_height_option: Option<u64>,
leader_scheduler_option: &mut Option<&mut LeaderScheduler>,
) -> Result<()> {
if !entry.transactions.is_empty() {
for result in self.process_transactions(&entry.transactions) {
for (i, result) in self
.process_transactions(&entry.transactions)
.into_iter()
.enumerate()
{
if let Some(ref mut leader_scheduler) = leader_scheduler_option {
if result.is_ok() {
let tx = &entry.transactions[i];
if tx.vote().is_some() {
// Update the active set in the leader scheduler
leader_scheduler.push_vote(*tx.from(), entry_height_option.expect("entry_height_option can't be None if leader_scheduler_option isn't None"));
}
}
}
result?;
}
}

if let Some(ref mut leader_scheduler) = leader_scheduler_option {
// Update the leader schedule based on entry height
leader_scheduler.update_height(entry_height_option.unwrap(), self);
}

self.register_entry_id(&entry.id);
Ok(())
}
Expand All @@ -748,6 +774,8 @@ impl Bank {
entries: Vec<Entry>,
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
leader_scheduler_option: &mut Option<&mut LeaderScheduler>,
entry_height: u64,
) -> Result<u64> {
let mut entry_count = 0;

Expand All @@ -760,16 +788,29 @@ impl Bank {
*tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize;

entry_count += 1;
self.process_entry(&entry)?;
self.process_entry(
&entry,
Some(entry_count + entry_height),
leader_scheduler_option,
)?;
}

Ok(entry_count)
}

/// Process an ordered list of entries.
pub fn process_entries(&self, entries: &[Entry]) -> Result<()> {
for entry in entries {
self.process_entry(&entry)?;
pub fn process_entries(
&self,
entries: &[Entry],
start_entry_height: Option<u64>,
leader_scheduler_option: &mut Option<&mut LeaderScheduler>,
) -> Result<()> {
for (i, entry) in entries.iter().enumerate() {
self.process_entry(
&entry,
start_entry_height.map(|x| x + i as u64 + 1),
leader_scheduler_option,
)?;
}
Ok(())
}
Expand All @@ -781,6 +822,7 @@ impl Bank {
entries: I,
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
mut leader_scheduler_option: Option<&mut LeaderScheduler>,
) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
Expand All @@ -796,13 +838,23 @@ impl Bank {
return Err(BankError::LedgerVerificationFailed);
}
id = block.last().unwrap().id;
entry_count += self.process_entries_tail(block, tail, tail_idx)?;
entry_count += self.process_entries_tail(
block,
tail,
tail_idx,
&mut leader_scheduler_option,
entry_count,
)?;
}
Ok(entry_count)
}

/// Process a full ledger.
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
pub fn process_ledger<I>(
&self,
entries: I,
leader_scheduler_option: Option<&mut LeaderScheduler>,
) -> Result<(u64, Vec<Entry>)>
where
I: IntoIterator<Item = Entry>,
{
Expand Down Expand Up @@ -844,9 +896,15 @@ impl Bank {
tail.push(entry0);
tail.push(entry1);
let mut tail_idx = 2;
let entry_count = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?;

// check f we need to rotate tail
let entry_count = self.process_blocks(
entry1_id,
entries,
&mut tail,
&mut tail_idx,
leader_scheduler_option,
)?;

// check if we need to rotate tail
if tail.len() == WINDOW_SIZE as usize {
tail.rotate_left(tail_idx)
}
Expand Down Expand Up @@ -936,6 +994,13 @@ impl Bank {
pub fn set_finality(&self, finality: usize) {
self.finality_time.store(finality, Ordering::Relaxed);
}

#[cfg(test)]
// Used to access accounts for things like controlling stake to control
// the eligible set of nodes for leader selection
pub fn accounts(&self) -> &RwLock<HashMap<Pubkey, Account>> {
&self.accounts
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1284,7 +1349,7 @@ mod tests {
);

// Now ensure the TX is accepted despite pointing to the ID of an empty entry.
bank.process_entries(&[entry]).unwrap();
bank.process_entries(&[entry], None, &mut None).unwrap();
assert_eq!(bank.process_transaction(&tx), Ok(()));
}

Expand All @@ -1293,7 +1358,7 @@ mod tests {
let mint = Mint::new(1);
let genesis = mint.create_entries();
let bank = Bank::default();
bank.process_ledger(genesis).unwrap();
bank.process_ledger(genesis, None).unwrap();
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}

Expand Down Expand Up @@ -1351,7 +1416,7 @@ mod tests {
let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank.process_ledger(ledger, None).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 3);
assert_eq!(tail.len(), 3);
Expand All @@ -1373,7 +1438,7 @@ mod tests {
for entry_count in window_size - 3..window_size + 2 {
let (ledger, pubkey) = create_sample_ledger(entry_count);
let bank = Bank::default();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank.process_ledger(ledger, None).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 2);
assert!(tail.len() <= window_size);
Expand All @@ -1398,7 +1463,7 @@ mod tests {
let ledger = to_file_iter(ledger);

let bank = Bank::default();
bank.process_ledger(ledger).unwrap();
bank.process_ledger(ledger, None).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
}

Expand All @@ -1409,7 +1474,7 @@ mod tests {
let block = to_file_iter(create_sample_block(&mint, 1));

let bank = Bank::default();
bank.process_ledger(genesis.chain(block)).unwrap();
bank.process_ledger(genesis.chain(block), None).unwrap();
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}

Expand All @@ -1432,9 +1497,9 @@ mod tests {
let ledger1 = create_sample_ledger_with_mint_and_keypairs(&mint, &keypairs);

let bank0 = Bank::default();
bank0.process_ledger(ledger0).unwrap();
bank0.process_ledger(ledger0, None).unwrap();
let bank1 = Bank::default();
bank1.process_ledger(ledger1).unwrap();
bank1.process_ledger(ledger1, None).unwrap();

let initial_state = bank0.hash_internal_state();

Expand Down
1 change: 0 additions & 1 deletion src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ impl BankingStage {

// Many banks that process transactions in parallel.
let mut thread_hdls: Vec<JoinHandle<()>> = (0..NUM_THREADS)
.into_iter()
.map(|_| {
let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone();
Expand Down
3 changes: 2 additions & 1 deletion src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ fn main() -> () {
loop {
let status = fullnode.handle_role_transition();
match status {
Ok(Some(FullnodeReturnType::LeaderRotation)) => (),
Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)) => (),
Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)) => (),
_ => {
// Fullnode tpu/tvu exited for some unexpected
// reason, so exit
Expand Down
4 changes: 2 additions & 2 deletions src/bin/ledger-tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ fn main() {

let genesis = genesis.take(2).map(|e| e.unwrap());

if let Err(e) = bank.process_ledger(genesis) {
if let Err(e) = bank.process_ledger(genesis, None) {
eprintln!("verify failed at genesis err: {:?}", e);
if !matches.is_present("continue") {
exit(1);
Expand All @@ -137,7 +137,7 @@ fn main() {
exit(1);
}
}
if let Err(e) = bank.process_entry(&entry) {
if let Err(e) = bank.process_entry(&entry, None, &mut None) {
eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e);
if !matches.is_present("continue") {
exit(1);
Expand Down
Loading