Skip to content

Commit

Permalink
Fixed missing Root notifications via geyser plugin framework (#31180)
Browse files Browse the repository at this point in the history
* Fixed missing Root notifications via geyser plugin framework

* Renamed a variable

* fmt issue

* Do not try the loop if no subscribers.

* Addressing some feedback -- passing parent roots from replay_stage to avoid race conditions

* clippy issue

* Address some reviewing findings

* Addressed some feedback from Carl

* fix a clippy issue

* Added comments on optimistically_confirmed_bank_tracker module to explain the workflow

* Addressed Trent's review
  • Loading branch information
lijunwangs committed May 3, 2023
1 parent 74315d2 commit 7cf50e6
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 50 deletions.
32 changes: 26 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use {
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
},
solana_rpc_client_api::response::SlotUpdate,
Expand Down Expand Up @@ -237,7 +237,7 @@ pub struct ReplayStageConfig {
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub bank_notification_sender: Option<BankNotificationSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
pub tower_storage: Arc<dyn TowerStorage>,
Expand Down Expand Up @@ -1996,7 +1996,7 @@ impl ReplayStage {
rpc_subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
Expand All @@ -2022,8 +2022,19 @@ impl ReplayStage {
.get(new_root)
.expect("Root bank doesn't exist");
let mut rooted_banks = root_bank.parents();
let oldest_parent = rooted_banks.last().map(|last| last.parent_slot());
rooted_banks.push(root_bank.clone());
let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
// The following differs from rooted_slots by including the parent slot of the oldest parent bank.
let rooted_slots_with_parents = bank_notification_sender
.as_ref()
.map_or(false, |sender| sender.should_send_parents)
.then(|| {
let mut new_chain = rooted_slots.clone();
new_chain.push(oldest_parent.unwrap_or(bank.parent_slot()));
new_chain
});

// Call leader schedule_cache.set_root() before blockstore.set_root() because
// bank_forks.root is consumed by repair_service to update gossip, so we don't want to
// get shreds for repair on gossip before we update leader schedule, otherwise they may
Expand Down Expand Up @@ -2059,8 +2070,16 @@ impl ReplayStage {
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Root(root_bank))
.sender
.send(BankNotification::NewRootBank(root_bank))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));

if let Some(new_chain) = rooted_slots_with_parents {
sender
.sender
.send(BankNotification::NewRootedChain(new_chain))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
}
latest_root_senders.iter().for_each(|s| {
if let Err(e) = s.send(new_root) {
Expand Down Expand Up @@ -2574,7 +2593,7 @@ impl ReplayStage {
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down Expand Up @@ -2698,6 +2717,7 @@ impl ReplayStage {
);
if let Some(sender) = bank_notification_sender {
sender
.sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
Expand Down Expand Up @@ -2766,7 +2786,7 @@ impl ReplayStage {
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down
4 changes: 2 additions & 2 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use {
},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSender,
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Tvu {
verified_vote_receiver: VerifiedVoteReceiver,
replay_vote_sender: ReplayVoteSender,
completed_data_sets_sender: CompletedDataSetsSender,
bank_notification_sender: Option<BankNotificationSender>,
bank_notification_sender: Option<BankNotificationSenderConfig>,
gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
Expand Down
10 changes: 7 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ use {
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
BankNotificationSenderConfig, OptimisticallyConfirmedBank,
OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_completed_slots_service::RpcCompletedSlotsService,
Expand Down Expand Up @@ -945,7 +946,10 @@ impl Validator {
rpc_subscriptions.clone(),
confirmed_bank_subscribers,
)),
Some(bank_notification_sender),
Some(BankNotificationSenderConfig {
sender: bank_notification_sender,
should_send_parents: geyser_plugin_service.is_some(),
}),
)
} else {
(None, None, None, None)
Expand Down Expand Up @@ -1145,7 +1149,7 @@ impl Validator {
gossip_verified_vote_hash_sender,
replay_vote_receiver,
replay_vote_sender,
bank_notification_sender,
bank_notification_sender.map(|sender| sender.sender),
config.tpu_coalesce,
cluster_confirmed_slot_sender,
&connection_cache,
Expand Down
6 changes: 3 additions & 3 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
log::*,
solana_rpc::{
entry_notifier_interface::EntryNotifierLock,
optimistically_confirmed_bank_tracker::BankNotification,
optimistically_confirmed_bank_tracker::SlotNotification,
transaction_notifier_interface::TransactionNotifierLock,
},
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -50,14 +50,14 @@ impl GeyserPluginService {
/// The rest of the JSON fields' definition is up to to the concrete plugin implementation
/// It is usually used to configure the connection information for the external data store.
pub fn new(
confirmed_bank_receiver: Receiver<BankNotification>,
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None)
}

pub fn new_with_receiver(
confirmed_bank_receiver: Receiver<BankNotification>,
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_config_files: &[PathBuf],
rpc_to_plugin_manager_receiver_and_exit: Option<(
Receiver<GeyserPluginManagerRequest>,
Expand Down
16 changes: 8 additions & 8 deletions geyser-plugin-manager/src/slot_status_observer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::slot_status_notifier::SlotStatusNotifier,
crossbeam_channel::Receiver,
solana_rpc::optimistically_confirmed_bank_tracker::BankNotification,
solana_rpc::optimistically_confirmed_bank_tracker::SlotNotification,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -19,7 +19,7 @@ pub(crate) struct SlotStatusObserver {

impl SlotStatusObserver {
pub fn new(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
slot_status_notifier: SlotStatusNotifier,
) -> Self {
let exit_updated_slot_server = Arc::new(AtomicBool::new(false));
Expand All @@ -43,7 +43,7 @@ impl SlotStatusObserver {
}

fn run_bank_notification_receiver(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
exit: Arc<AtomicBool>,
slot_status_notifier: SlotStatusNotifier,
) -> JoinHandle<()> {
Expand All @@ -53,23 +53,23 @@ impl SlotStatusObserver {
while !exit.load(Ordering::Relaxed) {
if let Ok(slot) = bank_notification_receiver.recv() {
match slot {
BankNotification::OptimisticallyConfirmed(slot) => {
SlotNotification::OptimisticallyConfirmed(slot) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_confirmed(slot, None);
}
BankNotification::Frozen(bank) => {
SlotNotification::Frozen((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_processed(bank.slot(), Some(bank.parent_slot()));
.notify_slot_processed(slot, Some(parent));
}
BankNotification::Root(bank) => {
SlotNotification::Root((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_rooted(bank.slot(), Some(bank.parent_slot()));
.notify_slot_rooted(slot, Some(parent));
}
}
}
Expand Down

0 comments on commit 7cf50e6

Please sign in to comment.