Skip to content

Commit

Permalink
bank_send_loop: Reduce feature flag polling frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
t-nelson committed Jun 13, 2023
1 parent dd379bf commit 077e29a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 52 deletions.
19 changes: 17 additions & 2 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use {
},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT},
feature_set::allow_votes_to_directly_update_vote_state,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
Expand Down Expand Up @@ -386,6 +387,12 @@ impl ClusterInfoVoteListener {
let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now();
let mut bank_vote_sender_state_option: Option<BankVoteSenderState> = None;
let mut is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());

loop {
if exit.load(Ordering::Relaxed) {
Expand All @@ -396,12 +403,11 @@ impl ClusterInfoVoteListener {
.read()
.unwrap()
.would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT);
let feature_set = Some(bank_forks.read().unwrap().root_bank().feature_set.clone());

if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
&verified_vote_label_packets_receiver,
would_be_leader,
feature_set,
is_tower_full_vote_enabled,
) {
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected)
Expand All @@ -425,6 +431,15 @@ impl ClusterInfoVoteListener {
&verified_vote_packets,
)?;
}
// Check if we've crossed the feature boundary
if !is_tower_full_vote_enabled {
is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());
}
}
}
}
Expand Down
76 changes: 26 additions & 50 deletions core/src/verified_vote_packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use {
solana_sdk::{
account::from_account,
clock::{Slot, UnixTimestamp},
feature_set::{allow_votes_to_directly_update_vote_state, FeatureSet},
hash::Hash,
pubkey::Pubkey,
signature::Signature,
Expand Down Expand Up @@ -211,17 +210,12 @@ impl VerifiedVotePackets {
&mut self,
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
would_be_leader: bool,
feature_set: Option<Arc<FeatureSet>>,
is_full_tower_vote_enabled: bool,
) -> Result<()> {
use SingleValidatorVotes::*;
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());
let mut is_full_tower_vote_enabled = false;
if let Some(feature_set) = feature_set {
is_full_tower_vote_enabled =
feature_set.is_active(&allow_votes_to_directly_update_vote_state::id());
}

for gossip_votes in vote_packets {
if would_be_leader {
Expand Down Expand Up @@ -348,7 +342,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -368,7 +362,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -390,7 +384,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -413,7 +407,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -426,7 +420,7 @@ mod tests {

// No new messages, should time out
assert_matches!(
verified_vote_packets.receive_and_process_vote_packets(&r, true, None),
verified_vote_packets.receive_and_process_vote_packets(&r, true, false),
Err(Error::RecvTimeout(_))
);
}
Expand Down Expand Up @@ -455,7 +449,7 @@ mod tests {

// At most `MAX_VOTES_PER_VALIDATOR` should be stored per validator
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand Down Expand Up @@ -493,7 +487,7 @@ mod tests {
// Ingest the votes into the buffer
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();

// Create tracker for previously sent bank votes
Expand Down Expand Up @@ -548,7 +542,7 @@ mod tests {
// Ingest the votes into the buffer
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();

// One batch of vote packets per validator
Expand Down Expand Up @@ -608,7 +602,7 @@ mod tests {
.unwrap();
// Ingest the votes into the buffer
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new(
my_leader_bank,
Expand Down Expand Up @@ -640,11 +634,8 @@ mod tests {
}

let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
let feature_set = Some(Arc::new(feature_set));
verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set.clone())
.receive_and_process_vote_packets(&r, true, true)
.unwrap();

// second_vote should be kept and first_vote ignored
Expand All @@ -665,7 +656,7 @@ mod tests {
.unwrap();

verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set)
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
let slot = verified_vote_packets
.0
Expand All @@ -685,9 +676,8 @@ mod tests {
.unwrap();
}
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let feature_set = FeatureSet::default();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, false)
.unwrap();

assert_eq!(
Expand All @@ -705,7 +695,7 @@ mod tests {
r: &Receiver<Vec<VerifiedVoteMetadata>>,
vote: VoteStateUpdate,
vote_account_key: Pubkey,
feature_set: Option<Arc<FeatureSet>>,
is_tower_full_vote_enabled: bool,
verified_vote_packets: &mut VerifiedVotePackets,
) -> GossipVote {
s.send(vec![VerifiedVoteMetadata {
Expand All @@ -716,7 +706,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(r, true, feature_set)
.receive_and_process_vote_packets(r, true, is_tower_full_vote_enabled)
.unwrap();
match verified_vote_packets.0.get(&vote_account_key).unwrap() {
SingleValidatorVotes::FullTowerVote(gossip_vote) => gossip_vote.clone(),
Expand All @@ -743,9 +733,6 @@ mod tests {
vote_no_ts.timestamp = None;

let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
let feature_set = Some(Arc::new(feature_set));

// Original vote
let GossipVote {
Expand All @@ -755,7 +742,7 @@ mod tests {
&r,
vote.clone(),
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote.last_voted_slot().unwrap());
Expand All @@ -769,7 +756,7 @@ mod tests {
&r,
vote_later_ts.clone(),
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
Expand All @@ -783,7 +770,7 @@ mod tests {
&r,
vote_earlier_ts,
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
Expand All @@ -797,7 +784,7 @@ mod tests {
&r,
vote_no_ts,
vote_account_key,
feature_set,
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
Expand All @@ -823,9 +810,8 @@ mod tests {

let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
// Receive votes without the feature active
let feature_set = Some(Arc::new(FeatureSet::default()));
verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
100,
Expand Down Expand Up @@ -858,10 +844,8 @@ mod tests {
}

// Receive votes with the feature active
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert_eq!(200, vote.slot);
Expand All @@ -887,10 +871,8 @@ mod tests {
.unwrap();

// Receive incremental votes with the feature active
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();

// Should still store as incremental votes
Expand Down Expand Up @@ -918,10 +900,8 @@ mod tests {
.unwrap();

// Receive full votes
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
assert_eq!(
42,
Expand All @@ -932,7 +912,7 @@ mod tests {
.get_latest_gossip_slot()
);

// Try to send an old ibncremental vote from pre feature activation
// Try to send an old incremental vote from pre feature activation
let vote = VoteTransaction::from(Vote::new(vec![34], Hash::new_unique()));
s.send(vec![VerifiedVoteMetadata {
vote_account_key,
Expand All @@ -943,10 +923,8 @@ mod tests {
.unwrap();

// Try to receive nothing should happen
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert_eq!(42, vote.slot);
Expand All @@ -966,10 +944,8 @@ mod tests {
.unwrap();

// Try to receive and vote lands as well as the conversion back to incremental votes
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let IncrementalVotes(votes) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert!(votes.contains_key(&(42, hash_42)));
Expand Down

0 comments on commit 077e29a

Please sign in to comment.