Skip to content

Commit

Permalink
Vote optimization.
Browse files Browse the repository at this point in the history
  • Loading branch information
bji committed Sep 4, 2024
1 parent aeb3a2e commit 915909f
Show file tree
Hide file tree
Showing 6 changed files with 722 additions and 143 deletions.
246 changes: 242 additions & 4 deletions core/src/consensus.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ use {
std::{ std::{
cmp::Ordering, cmp::Ordering,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fs::read_to_string,
ops::{ ops::{
Bound::{Included, Unbounded}, Bound::{Included, Unbounded},
Deref, Deref,
}, },
path::Path,
time::SystemTime,
}, },
thiserror::Error, thiserror::Error,
}; };
Expand Down Expand Up @@ -191,6 +194,11 @@ impl TowerVersions {
last_timestamp: tower.last_timestamp, last_timestamp: tower.last_timestamp,
stray_restored_slot: tower.stray_restored_slot, stray_restored_slot: tower.stray_restored_slot,
last_switch_threshold_check: tower.last_switch_threshold_check, last_switch_threshold_check: tower.last_switch_threshold_check,
mostly_confirmed_threshold: None,
threshold_ahead_count: None,
after_skip_threshold: None,
threshold_escape_count: None,
last_config_check_seconds: 0,
} }
} }
TowerVersions::V1_14_11(tower) => Tower { TowerVersions::V1_14_11(tower) => Tower {
Expand All @@ -204,6 +212,11 @@ impl TowerVersions {
last_timestamp: tower.last_timestamp, last_timestamp: tower.last_timestamp,
stray_restored_slot: tower.stray_restored_slot, stray_restored_slot: tower.stray_restored_slot,
last_switch_threshold_check: tower.last_switch_threshold_check, last_switch_threshold_check: tower.last_switch_threshold_check,
mostly_confirmed_threshold: None,
threshold_ahead_count: None,
after_skip_threshold: None,
threshold_escape_count: None,
last_config_check_seconds: 0,
}, },
TowerVersions::Current(tower) => tower, TowerVersions::Current(tower) => tower,
} }
Expand Down Expand Up @@ -236,6 +249,16 @@ pub struct Tower {
stray_restored_slot: Option<Slot>, stray_restored_slot: Option<Slot>,
#[serde(skip)] #[serde(skip)]
pub last_switch_threshold_check: Option<(Slot, SwitchForkDecision)>, pub last_switch_threshold_check: Option<(Slot, SwitchForkDecision)>,
#[serde(skip)]
mostly_confirmed_threshold: Option<f64>,
#[serde(skip)]
threshold_ahead_count: Option<u8>,
#[serde(skip)]
after_skip_threshold: Option<u8>,
#[serde(skip)]
threshold_escape_count: Option<u8>,
#[serde(skip)]
last_config_check_seconds: u64,
} }


impl Default for Tower { impl Default for Tower {
Expand All @@ -250,6 +273,11 @@ impl Default for Tower {
last_vote_tx_blockhash: None, last_vote_tx_blockhash: None,
stray_restored_slot: Option::default(), stray_restored_slot: Option::default(),
last_switch_threshold_check: Option::default(), last_switch_threshold_check: Option::default(),
mostly_confirmed_threshold: None,
threshold_ahead_count: None,
after_skip_threshold: None,
threshold_escape_count: None,
last_config_check_seconds: 0,
}; };
// VoteState::root_slot is ensured to be Some in Tower // VoteState::root_slot is ensured to be Some in Tower
tower.vote_state.root_slot = Some(Slot::default()); tower.vote_state.root_slot = Some(Slot::default());
Expand Down Expand Up @@ -504,6 +532,32 @@ impl Tower {
.unwrap_or(false) .unwrap_or(false)
} }


pub fn is_mostly_confirmed_threshold_enabled(&self) -> bool {
if let Some(_) = self.mostly_confirmed_threshold {
true
} else {
false
}
}

pub fn is_slot_mostly_confirmed(
&self,
slot: Slot,
voted_stakes: &VotedStakes,
total_stake: Stake,
) -> bool {
let mostly_confirmed_threshold = if let Some(m) = self.mostly_confirmed_threshold {
m
} else {
SWITCH_FORK_THRESHOLD
};

voted_stakes
.get(&slot)
.map(|stake| (*stake as f64 / total_stake as f64) > mostly_confirmed_threshold)
.unwrap_or(false)
}

pub fn tower_slots(&self) -> Vec<Slot> { pub fn tower_slots(&self) -> Vec<Slot> {
self.vote_state.tower() self.vote_state.tower()
} }
Expand Down Expand Up @@ -561,10 +615,10 @@ impl Tower {
vote_state.as_ref().ok()?.last_voted_slot() vote_state.as_ref().ok()?.last_voted_slot()
} }


pub fn record_bank_vote(&mut self, bank: &Bank) -> Option<Slot> { pub fn record_bank_vote(&mut self, bank: &Bank, pop_expired: bool) -> Option<Slot> {
// Returns the new root if one is made after applying a vote for the given bank to // Returns the new root if one is made after applying a vote for the given bank to
// `self.vote_state` // `self.vote_state`
self.record_bank_vote_and_update_lockouts(bank.slot(), bank.hash()) self.record_bank_vote_and_update_lockouts(bank.slot(), bank.hash(), pop_expired)
} }


/// If we've recently updated the vote state by applying a new vote /// If we've recently updated the vote state by applying a new vote
Expand All @@ -584,16 +638,143 @@ impl Tower {
self.last_vote = new_vote; self.last_vote = new_vote;
} }


pub fn update_config(&mut self) {
// Use this opportunity to possibly load new value for mostly_confirmed_threshold
let config_check_seconds = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.ok()
.map_or(0, |x| x.as_secs());
if config_check_seconds >= (self.last_config_check_seconds + 60) {
// Format of mostly_confirmed_threshold:
// a.float b.int c.int d.int
// a is threshold - no slot that hasn't already achieved this vote weight will be voted on, except for
// slots in the "vote ahead of threshold" region, unless the escape hatch distance has been reached
// b is "vote ahead of threshold" - how many slots ahead of the threshold slot to vote, regardless of
// vote weight. Reduces vote latency.
// c controls what stake weighted vote percentage is required on a slot after there have been skips. It
// must be one of these values:
// 0 -- no restriction
// 1 -- a slot after a skip has to have mostly_confirmed_threshold before it will be voted on
// 2 -- a slot after a skip has to be confirmed already before it will be voted on
// d is "escape hatch distance". This is the number of slots of non-voting while waiting for threshold
// to just vote anyway. This is an escape hatch to allow network progress even if threshold is not
// being achieved. Without this, there could be deadlock if all validators ran this voting strategy
// beacuse if multiple forks happen at once, it's possible for all forks to end up with less than
// the threshold vote and no validator would ever switch forks.
warn!("Checking for change to mostly_confirmed_threshold");
self.last_config_check_seconds = config_check_seconds;
match read_to_string(&Path::new("./mostly_confirmed_threshold")) {
Ok(s) => {
let split = s
.strip_suffix("\n")
.unwrap_or("")
.split_whitespace()
.collect::<Vec<&str>>();
match split.get(0).unwrap_or(&"").parse::<f64>() {
Ok(threshold) => {
if let Some(mostly_confirmed_threshold) =
self.mostly_confirmed_threshold
{
if mostly_confirmed_threshold != threshold {
self.mostly_confirmed_threshold = Some(threshold);
warn!("Using new mostly_confirmed_threshold: {}", threshold);
}
} else {
self.mostly_confirmed_threshold = Some(threshold);
warn!("Using new mostly_confirmed_threshold: {}", threshold);
}
}
_ => {
warn!("Using NO mostly_confirmed_threshold");
self.mostly_confirmed_threshold = None;
}
}
match split.get(1).unwrap_or(&"").parse::<u8>() {
Ok(count) => {
if let Some(already_count) = self.threshold_ahead_count {
if already_count != count {
self.threshold_ahead_count = Some(count);
warn!("Using new threshold_ahead_count: {}", count);
}
} else {
self.threshold_ahead_count = Some(count);
warn!("Using new threshold_ahead_count: {}", count);
}
}
_ => {
warn!("Using NO threshold_ahead_count");
self.threshold_ahead_count = None;
}
}
match split.get(2).unwrap_or(&"").parse::<u8>() {
Ok(threshold) => {
if let Some(already_after_skip_threshold) = self.after_skip_threshold {
if already_after_skip_threshold != threshold {
self.after_skip_threshold = Some(threshold);
warn!("Using new after_skip_threshold: {}", threshold);
}
} else {
self.after_skip_threshold = Some(threshold);
warn!("Using new after_skip_threshold: {}", threshold);
}
}
_ => {
warn!("Using NO after_skip_threshold");
self.after_skip_threshold = None;
}
}
match split.get(3).unwrap_or(&"").parse::<u8>() {
Ok(escape) => {
if let Some(already_escape) = self.threshold_escape_count {
if already_escape != escape {
self.threshold_escape_count = Some(escape);
warn!("Using new threshold_escape_count: {}", escape);
}
} else {
self.threshold_escape_count = Some(escape);
warn!("Using new threshold_escape_count: {}", escape);
}
}
_ => {
warn!("Using NO threshold_escape_count");
self.threshold_escape_count = None;
}
}
}
_ => {
warn!("Using NO mostly_confirmed_threshold, threshold_ahead_count, after_skip_threshold, or threshold_escape_count");
self.mostly_confirmed_threshold = None;
self.threshold_ahead_count = None;
self.after_skip_threshold = None;
self.threshold_escape_count = None;
}
}
}
}

pub fn get_threshold_ahead_count(&self) -> Option<u8> {
return self.threshold_ahead_count;
}

pub fn get_after_skip_threshold(&self) -> Option<u8> {
return self.after_skip_threshold;
}

pub fn get_threshold_escape_count(&self) -> Option<u8> {
return self.threshold_escape_count;
}

fn record_bank_vote_and_update_lockouts( fn record_bank_vote_and_update_lockouts(
&mut self, &mut self,
vote_slot: Slot, vote_slot: Slot,
vote_hash: Hash, vote_hash: Hash,
pop_expired: bool,
) -> Option<Slot> { ) -> Option<Slot> {
trace!("{} record_vote for {}", self.node_pubkey, vote_slot); trace!("{} record_vote for {}", self.node_pubkey, vote_slot);
let old_root = self.root(); let old_root = self.root();


let vote = Vote::new(vec![vote_slot], vote_hash); let vote = Vote::new(vec![vote_slot], vote_hash);
let result = process_vote_unchecked(&mut self.vote_state, vote); let result = process_vote_unchecked(&mut self.vote_state, vote, pop_expired);
if result.is_err() { if result.is_err() {
panic!( panic!(
"Error while recording vote {} {} in local tower {:?}", "Error while recording vote {} {} in local tower {:?}",
Expand All @@ -618,7 +799,7 @@ impl Tower {


#[cfg(test)] #[cfg(test)]
pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option<Slot> { pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option<Slot> {
self.record_bank_vote_and_update_lockouts(slot, hash) self.record_bank_vote_and_update_lockouts(slot, hash, true)
} }


/// Used for tests /// Used for tests
Expand Down Expand Up @@ -738,6 +919,63 @@ impl Tower {
false false
} }


// This version first pushes all of the 'including' slots onto the bank before evaluating 'slot'
pub fn is_locked_out_including(
&self,
slot: Slot,
ancestors: &HashSet<Slot>,
including: &Vec<Slot>,
) -> bool {
if !self.is_recent(slot) {
return true;
}

// Check if a slot is locked out by simulating adding a vote for that
// slot to the current lockouts to pop any expired votes. If any of the
// remaining voted slots are on a different fork from the checked slot,
// it's still locked out.
let mut vote_state = self.vote_state.clone();

for slot in including {
process_slot_vote_unchecked(&mut vote_state, *slot);
}

process_slot_vote_unchecked(&mut vote_state, slot);
for vote in &vote_state.votes {
if slot != vote.slot() && !ancestors.contains(&vote.slot()) {
return true;
}
}

if let Some(root_slot) = vote_state.root_slot {
if slot != root_slot {
// This case should never happen because bank forks purges all
// non-descendants of the root every time root is set
assert!(
ancestors.contains(&root_slot),
"ancestors: {ancestors:?}, slot: {slot} root: {root_slot}"
);
}
}

false
}

pub fn pop_votes_locked_out_at(&self, new_votes: &mut Vec<Slot>, slot: Slot) {
let mut vote_state = self.vote_state.clone();

for i in 0..new_votes.len() {
process_slot_vote_unchecked(&mut vote_state, new_votes[i]);
if let Some(last_lockout) = vote_state.last_lockout() {
if last_lockout.is_locked_out_at_slot(slot) {
// New votes cannot include this or any subsequent slots
new_votes.truncate(i);
return;
}
}
}
}

fn is_candidate_slot_descendant_of_last_vote( fn is_candidate_slot_descendant_of_last_vote(
candidate_slot: Slot, candidate_slot: Slot,
last_voted_slot: Slot, last_voted_slot: Slot,
Expand Down
6 changes: 6 additions & 0 deletions core/src/consensus/progress_map.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ pub struct ForkStats {
pub is_locked_out: bool, pub is_locked_out: bool,
pub voted_stakes: VotedStakes, pub voted_stakes: VotedStakes,
pub is_supermajority_confirmed: bool, pub is_supermajority_confirmed: bool,
pub is_mostly_confirmed: bool,
pub computed: bool, pub computed: bool,
pub lockout_intervals: LockoutIntervals, pub lockout_intervals: LockoutIntervals,
pub bank_hash: Option<Hash>, pub bank_hash: Option<Hash>,
Expand Down Expand Up @@ -489,6 +490,11 @@ impl ProgressMap {
slot_progress.fork_stats.is_supermajority_confirmed = true; slot_progress.fork_stats.is_supermajority_confirmed = true;
} }


pub fn set_mostly_confirmed_slot(&mut self, slot: Slot) {
let slot_progress = self.get_mut(&slot).unwrap();
slot_progress.fork_stats.is_mostly_confirmed = true;
}

pub fn is_supermajority_confirmed(&self, slot: Slot) -> Option<bool> { pub fn is_supermajority_confirmed(&self, slot: Slot) -> Option<bool> {
self.progress_map self.progress_map
.get(&slot) .get(&slot)
Expand Down
Loading

0 comments on commit 915909f

Please sign in to comment.