Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bank_send_loop: Get feature flag from root bank #31954

Merged
merged 2 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 20 additions & 6 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use {
},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT},
feature_set::allow_votes_to_directly_update_vote_state,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
Expand Down Expand Up @@ -264,6 +265,7 @@ impl ClusterInfoVoteListener {
})
.unwrap()
};
let bank_forks_clone = bank_forks.clone();
let bank_send_thread = {
let exit = exit.clone();
Builder::new()
Expand All @@ -274,6 +276,7 @@ impl ClusterInfoVoteListener {
verified_vote_label_packets_receiver,
poh_recorder,
&verified_packets_sender,
bank_forks_clone,
);
})
.unwrap()
Expand Down Expand Up @@ -379,10 +382,17 @@ impl ClusterInfoVoteListener {
verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver,
poh_recorder: Arc<RwLock<PohRecorder>>,
verified_packets_sender: &BankingPacketSender,
bank_forks: Arc<RwLock<BankForks>>,
) -> Result<()> {
let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now();
let mut bank_vote_sender_state_option: Option<BankVoteSenderState> = None;
let mut is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());

loop {
if exit.load(Ordering::Relaxed) {
Expand All @@ -393,16 +403,11 @@ impl ClusterInfoVoteListener {
.read()
.unwrap()
.would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT);
let feature_set = poh_recorder
.read()
.unwrap()
.bank()
.map(|bank| bank.feature_set.clone());

if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
&verified_vote_label_packets_receiver,
would_be_leader,
feature_set,
is_tower_full_vote_enabled,
) {
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected)
Expand All @@ -426,6 +431,15 @@ impl ClusterInfoVoteListener {
&verified_vote_packets,
)?;
}
// Check if we've crossed the feature boundary
Copy link
Contributor

@steviez steviez Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewer note: Better to have this here a second time (in addition to before the loop) to avoid grabbing a read lock on bank_forks every iteration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we factor this optimization out to a separate PR to reduce the backport surface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we factor this optimization out to a separate PR to reduce the backport surface?

if you gimme write on your branch, i have this change made locally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we factor this optimization out to a separate PR

To confirm, you're only talking about removing the if statement that skips reading feature set if we know the feature is already activated ? My initial comment was pointing out that we were reading in two places, but that was better to both 1) start with a valid value on first couple iterations of loop and 2) avoid spamming the lock.

That being said, receive_and_process_vote_packets() does recv_timeout(200ms) so my concern about having feature set happen every iteration of the loop is mitigated by receive_and_process_vote_packets() taking a nontrivial amount of time to return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just gave you write access. if you're talking about the refactor to pass the flag rather than the feature set that was to avoid cloning or holding the bank forks lock for too long.
the if statement could probably be removed taking into consideration the 200ms sleep that steve pointed out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. the whole swap from Option<FeatureSet> to bool. master...t-nelson:solana:feature-flag-fix

we're already doing the inefficient thing and that inefficiency did not contribute to the testnet outage, so the optimization doesn't need to be backported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, i'm fine with the clone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, i'm fine with the clone.

hrmf, seems like github has some new default branch protection rules or something. it won't let me push my changes 'cause i rewrote your tip. mind pulling my branch and pushing up the change?

ping me when we're readhy and i'll re-enable rebase+merge for this so we don't need a separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice thanks, should now be 2 commits. hopefully github does the right thing and only backports the first one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice thanks, should now be 2 commits. hopefully github does the right thing and only backports the first one.

it'll try to do them both. trivial to drop the last one tho

if !is_tower_full_vote_enabled {
is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());
steviez marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand Down
76 changes: 26 additions & 50 deletions core/src/verified_vote_packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use {
solana_sdk::{
account::from_account,
clock::{Slot, UnixTimestamp},
feature_set::{allow_votes_to_directly_update_vote_state, FeatureSet},
hash::Hash,
pubkey::Pubkey,
signature::Signature,
Expand Down Expand Up @@ -211,17 +210,12 @@ impl VerifiedVotePackets {
&mut self,
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
would_be_leader: bool,
feature_set: Option<Arc<FeatureSet>>,
is_full_tower_vote_enabled: bool,
) -> Result<()> {
use SingleValidatorVotes::*;
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());
let mut is_full_tower_vote_enabled = false;
if let Some(feature_set) = feature_set {
is_full_tower_vote_enabled =
feature_set.is_active(&allow_votes_to_directly_update_vote_state::id());
}

for gossip_votes in vote_packets {
if would_be_leader {
Expand Down Expand Up @@ -348,7 +342,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -368,7 +362,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -390,7 +384,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -413,7 +407,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand All @@ -426,7 +420,7 @@ mod tests {

// No new messages, should time out
assert_matches!(
verified_vote_packets.receive_and_process_vote_packets(&r, true, None),
verified_vote_packets.receive_and_process_vote_packets(&r, true, false),
Err(Error::RecvTimeout(_))
);
}
Expand Down Expand Up @@ -455,7 +449,7 @@ mod tests {

// At most `MAX_VOTES_PER_VALIDATOR` should be stored per validator
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
Expand Down Expand Up @@ -493,7 +487,7 @@ mod tests {
// Ingest the votes into the buffer
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();

// Create tracker for previously sent bank votes
Expand Down Expand Up @@ -548,7 +542,7 @@ mod tests {
// Ingest the votes into the buffer
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();

// One batch of vote packets per validator
Expand Down Expand Up @@ -608,7 +602,7 @@ mod tests {
.unwrap();
// Ingest the votes into the buffer
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new(
my_leader_bank,
Expand Down Expand Up @@ -640,11 +634,8 @@ mod tests {
}

let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
let feature_set = Some(Arc::new(feature_set));
verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set.clone())
.receive_and_process_vote_packets(&r, true, true)
.unwrap();

// second_vote should be kept and first_vote ignored
Expand All @@ -665,7 +656,7 @@ mod tests {
.unwrap();

verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set)
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
let slot = verified_vote_packets
.0
Expand All @@ -685,9 +676,8 @@ mod tests {
.unwrap();
}
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let feature_set = FeatureSet::default();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, false)
.unwrap();

assert_eq!(
Expand All @@ -705,7 +695,7 @@ mod tests {
r: &Receiver<Vec<VerifiedVoteMetadata>>,
vote: VoteStateUpdate,
vote_account_key: Pubkey,
feature_set: Option<Arc<FeatureSet>>,
is_tower_full_vote_enabled: bool,
verified_vote_packets: &mut VerifiedVotePackets,
) -> GossipVote {
s.send(vec![VerifiedVoteMetadata {
Expand All @@ -716,7 +706,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(r, true, feature_set)
.receive_and_process_vote_packets(r, true, is_tower_full_vote_enabled)
.unwrap();
match verified_vote_packets.0.get(&vote_account_key).unwrap() {
SingleValidatorVotes::FullTowerVote(gossip_vote) => gossip_vote.clone(),
Expand All @@ -743,9 +733,6 @@ mod tests {
vote_no_ts.timestamp = None;

let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
let feature_set = Some(Arc::new(feature_set));

// Original vote
let GossipVote {
Expand All @@ -755,7 +742,7 @@ mod tests {
&r,
vote.clone(),
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote.last_voted_slot().unwrap());
Expand All @@ -769,7 +756,7 @@ mod tests {
&r,
vote_later_ts.clone(),
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
Expand All @@ -783,7 +770,7 @@ mod tests {
&r,
vote_earlier_ts,
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
Expand All @@ -797,7 +784,7 @@ mod tests {
&r,
vote_no_ts,
vote_account_key,
feature_set,
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
Expand All @@ -823,9 +810,8 @@ mod tests {

let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
// Receive votes without the feature active
let feature_set = Some(Arc::new(FeatureSet::default()));
verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
100,
Expand Down Expand Up @@ -858,10 +844,8 @@ mod tests {
}

// Receive votes with the feature active
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert_eq!(200, vote.slot);
Expand All @@ -887,10 +871,8 @@ mod tests {
.unwrap();

// Receive incremental votes with the feature active
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();

// Should still store as incremental votes
Expand Down Expand Up @@ -918,10 +900,8 @@ mod tests {
.unwrap();

// Receive full votes
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
assert_eq!(
42,
Expand All @@ -932,7 +912,7 @@ mod tests {
.get_latest_gossip_slot()
);

// Try to send an old ibncremental vote from pre feature activation
// Try to send an old incremental vote from pre feature activation
let vote = VoteTransaction::from(Vote::new(vec![34], Hash::new_unique()));
s.send(vec![VerifiedVoteMetadata {
vote_account_key,
Expand All @@ -943,10 +923,8 @@ mod tests {
.unwrap();

// Try to receive nothing should happen
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert_eq!(42, vote.slot);
Expand All @@ -966,10 +944,8 @@ mod tests {
.unwrap();

// Try to receive and vote lands as well as the conversion back to incremental votes
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let IncrementalVotes(votes) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert!(votes.contains_key(&(42, hash_42)));
Expand Down