Skip to content

Commit

Permalink
v1.14: Fixed missing Root notifications via geyser plugin framework (…
Browse files Browse the repository at this point in the history
…backport of #31180) (#31650)

Problem

It is reported that Geyser is missing some Root notifications for slots. #31124
The Root notification is sent from replay_stage's code in handle_votable_bank. https://github.com/solana-labs/solana/blob/master/core/src/replay_stage.rs#L1981. However, the validator does not necessarily vote on every slot on the rooted chain. From @carllin

For instance if the rooted chain is 1->2->3->4

You might only vote on 1 and 4
But when 4 is rooted, 2 is also rooted
But handle_votavle_bank is not called on 2

As result of this, we may miss notifications for slot 2 and 3.

Summary of Changes

Enhanced BankNotification to add NewRootedChain enum to send the chains of parent roots.
Renamed BankNotification::Root -> BankNotification::NewRootBank
Introduced SlotNotification for SlotStatusObserver interfaces to send slot status without Bank.
In the OptimisticallyConfirmedBankTracker notify parents of a new root if these parents were not notified.
Modified and added unit test cases to verify the logic.
  • Loading branch information
mergify[bot] committed May 22, 2023
1 parent 4e51391 commit 2d610ee
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 49 deletions.
32 changes: 26 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use {
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
Expand Down Expand Up @@ -160,7 +160,7 @@ pub struct ReplayStageConfig {
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub bank_notification_sender: Option<BankNotificationSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
pub tower_storage: Arc<dyn TowerStorage>,
Expand Down Expand Up @@ -1901,7 +1901,7 @@ impl ReplayStage {
rpc_subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
Expand All @@ -1927,8 +1927,19 @@ impl ReplayStage {
.get(new_root)
.expect("Root bank doesn't exist");
let mut rooted_banks = root_bank.parents();
let oldest_parent = rooted_banks.last().map(|last| last.parent_slot());
rooted_banks.push(root_bank.clone());
let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
// The following differs from rooted_slots by including the parent slot of the oldest parent bank.
let rooted_slots_with_parents = bank_notification_sender
.as_ref()
.map_or(false, |sender| sender.should_send_parents)
.then(|| {
let mut new_chain = rooted_slots.clone();
new_chain.push(oldest_parent.unwrap_or_else(|| bank.parent_slot()));
new_chain
});

// Call leader schedule_cache.set_root() before blockstore.set_root() because
// bank_forks.root is consumed by repair_service to update gossip, so we don't want to
// get shreds for repair on gossip before we update leader schedule, otherwise they may
Expand Down Expand Up @@ -1964,8 +1975,16 @@ impl ReplayStage {
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Root(root_bank))
.sender
.send(BankNotification::NewRootBank(root_bank))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));

if let Some(new_chain) = rooted_slots_with_parents {
sender
.sender
.send(BankNotification::NewRootedChain(new_chain))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
}
latest_root_senders.iter().for_each(|s| {
if let Err(e) = s.send(new_root) {
Expand Down Expand Up @@ -2483,7 +2502,7 @@ impl ReplayStage {
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down Expand Up @@ -2606,6 +2625,7 @@ impl ReplayStage {
);
if let Some(sender) = bank_notification_sender {
sender
.sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
Expand Down Expand Up @@ -2678,7 +2698,7 @@ impl ReplayStage {
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down
4 changes: 2 additions & 2 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use {
},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSender,
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
Expand Down Expand Up @@ -119,7 +119,7 @@ impl Tvu {
verified_vote_receiver: VerifiedVoteReceiver,
replay_vote_sender: ReplayVoteSender,
completed_data_sets_sender: CompletedDataSetsSender,
bank_notification_sender: Option<BankNotificationSender>,
bank_notification_sender: Option<BankNotificationSenderConfig>,
gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
Expand Down
10 changes: 7 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ use {
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
BankNotificationSenderConfig, OptimisticallyConfirmedBank,
OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_completed_slots_service::RpcCompletedSlotsService,
Expand Down Expand Up @@ -879,7 +880,10 @@ impl Validator {
rpc_subscriptions.clone(),
confirmed_bank_subscribers,
)),
Some(bank_notification_sender),
Some(BankNotificationSenderConfig {
sender: bank_notification_sender,
should_send_parents: geyser_plugin_service.is_some(),
}),
)
} else {
(None, None, None, None)
Expand Down Expand Up @@ -1063,7 +1067,7 @@ impl Validator {
gossip_verified_vote_hash_sender,
replay_vote_receiver,
replay_vote_sender,
bank_notification_sender,
bank_notification_sender.map(|sender| sender.sender),
config.tpu_coalesce_ms,
cluster_confirmed_slot_sender,
&cost_model,
Expand Down
4 changes: 2 additions & 2 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
crossbeam_channel::Receiver,
log::*,
solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotification,
optimistically_confirmed_bank_tracker::SlotNotification,
transaction_notifier_interface::TransactionNotifierLock,
},
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl GeyserPluginService {
/// It is usually used to configure the connection information for the external data store.

pub fn new(
confirmed_bank_receiver: Receiver<BankNotification>,
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
info!(
Expand Down
16 changes: 8 additions & 8 deletions geyser-plugin-manager/src/slot_status_observer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::slot_status_notifier::SlotStatusNotifier,
crossbeam_channel::Receiver,
solana_rpc::optimistically_confirmed_bank_tracker::BankNotification,
solana_rpc::optimistically_confirmed_bank_tracker::SlotNotification,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -19,7 +19,7 @@ pub(crate) struct SlotStatusObserver {

impl SlotStatusObserver {
pub fn new(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
slot_status_notifier: SlotStatusNotifier,
) -> Self {
let exit_updated_slot_server = Arc::new(AtomicBool::new(false));
Expand All @@ -43,7 +43,7 @@ impl SlotStatusObserver {
}

fn run_bank_notification_receiver(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
exit: Arc<AtomicBool>,
slot_status_notifier: SlotStatusNotifier,
) -> JoinHandle<()> {
Expand All @@ -53,23 +53,23 @@ impl SlotStatusObserver {
while !exit.load(Ordering::Relaxed) {
if let Ok(slot) = bank_notification_receiver.recv() {
match slot {
BankNotification::OptimisticallyConfirmed(slot) => {
SlotNotification::OptimisticallyConfirmed(slot) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_confirmed(slot, None);
}
BankNotification::Frozen(bank) => {
SlotNotification::Frozen((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_processed(bank.slot(), Some(bank.parent_slot()));
.notify_slot_processed(slot, Some(parent));
}
BankNotification::Root(bank) => {
SlotNotification::Root((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_rooted(bank.slot(), Some(bank.parent_slot()));
.notify_slot_rooted(slot, Some(parent));
}
}
}
Expand Down

0 comments on commit 2d610ee

Please sign in to comment.