Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed missing Root notifications via geyser plugin framework #31180

Merged
merged 11 commits into from
May 3, 2023
32 changes: 26 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use {
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
},
solana_rpc_client_api::response::SlotUpdate,
Expand Down Expand Up @@ -237,7 +237,7 @@ pub struct ReplayStageConfig {
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub bank_notification_sender: Option<BankNotificationSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
pub tower_storage: Arc<dyn TowerStorage>,
Expand Down Expand Up @@ -1995,7 +1995,7 @@ impl ReplayStage {
rpc_subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
Expand All @@ -2021,8 +2021,19 @@ impl ReplayStage {
.get(new_root)
.expect("Root bank doesn't exist");
let mut rooted_banks = root_bank.parents();
let oldest_parent = rooted_banks.last().map(|last| last.parent_slot());
rooted_banks.push(root_bank.clone());
let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
// The following differs from rooted_slots by including the parent slot of the oldest parent bank.
let rooted_slots_with_parents = bank_notification_sender
.as_ref()
.map_or(false, |sender| sender.should_send_parents)
.then(|| {
let mut new_chain = rooted_slots.clone();
new_chain.push(oldest_parent.unwrap_or(bank.parent_slot()));
new_chain
});

// Call leader schedule_cache.set_root() before blockstore.set_root() because
// bank_forks.root is consumed by repair_service to update gossip, so we don't want to
// get shreds for repair on gossip before we update leader schedule, otherwise they may
Expand Down Expand Up @@ -2058,8 +2069,16 @@ impl ReplayStage {
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Root(root_bank))
.sender
.send(BankNotification::NewRootBank(root_bank))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));

if let Some(new_chain) = rooted_slots_with_parents {
sender
.sender
.send(BankNotification::NewRootedChain(new_chain))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
}
latest_root_senders.iter().for_each(|s| {
if let Err(e) = s.send(new_root) {
Expand Down Expand Up @@ -2573,7 +2592,7 @@ impl ReplayStage {
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down Expand Up @@ -2697,6 +2716,7 @@ impl ReplayStage {
);
if let Some(sender) = bank_notification_sender {
sender
.sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
Expand Down Expand Up @@ -2765,7 +2785,7 @@ impl ReplayStage {
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
Expand Down
4 changes: 2 additions & 2 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use {
},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSender,
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Tvu {
verified_vote_receiver: VerifiedVoteReceiver,
replay_vote_sender: ReplayVoteSender,
completed_data_sets_sender: CompletedDataSetsSender,
bank_notification_sender: Option<BankNotificationSender>,
bank_notification_sender: Option<BankNotificationSenderConfig>,
gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
Expand Down
10 changes: 7 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ use {
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
BankNotificationSenderConfig, OptimisticallyConfirmedBank,
OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_completed_slots_service::RpcCompletedSlotsService,
Expand Down Expand Up @@ -945,7 +946,10 @@ impl Validator {
rpc_subscriptions.clone(),
confirmed_bank_subscribers,
)),
Some(bank_notification_sender),
Some(BankNotificationSenderConfig {
sender: bank_notification_sender,
should_send_parents: geyser_plugin_service.is_some(),
}),
)
} else {
(None, None, None, None)
Expand Down Expand Up @@ -1145,7 +1149,7 @@ impl Validator {
gossip_verified_vote_hash_sender,
replay_vote_receiver,
replay_vote_sender,
bank_notification_sender,
bank_notification_sender.map(|sender| sender.sender),
config.tpu_coalesce,
cluster_confirmed_slot_sender,
&connection_cache,
Expand Down
6 changes: 3 additions & 3 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
log::*,
solana_rpc::{
entry_notifier_interface::EntryNotifierLock,
optimistically_confirmed_bank_tracker::BankNotification,
optimistically_confirmed_bank_tracker::SlotNotification,
transaction_notifier_interface::TransactionNotifierLock,
},
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -50,14 +50,14 @@ impl GeyserPluginService {
/// The rest of the JSON fields' definition is up to to the concrete plugin implementation
/// It is usually used to configure the connection information for the external data store.
pub fn new(
confirmed_bank_receiver: Receiver<BankNotification>,
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None)
}

pub fn new_with_receiver(
confirmed_bank_receiver: Receiver<BankNotification>,
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_config_files: &[PathBuf],
rpc_to_plugin_manager_receiver_and_exit: Option<(
Receiver<GeyserPluginManagerRequest>,
Expand Down
16 changes: 8 additions & 8 deletions geyser-plugin-manager/src/slot_status_observer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::slot_status_notifier::SlotStatusNotifier,
crossbeam_channel::Receiver,
solana_rpc::optimistically_confirmed_bank_tracker::BankNotification,
solana_rpc::optimistically_confirmed_bank_tracker::SlotNotification,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -19,7 +19,7 @@ pub(crate) struct SlotStatusObserver {

impl SlotStatusObserver {
pub fn new(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
slot_status_notifier: SlotStatusNotifier,
) -> Self {
let exit_updated_slot_server = Arc::new(AtomicBool::new(false));
Expand All @@ -43,7 +43,7 @@ impl SlotStatusObserver {
}

fn run_bank_notification_receiver(
bank_notification_receiver: Receiver<BankNotification>,
bank_notification_receiver: Receiver<SlotNotification>,
exit: Arc<AtomicBool>,
slot_status_notifier: SlotStatusNotifier,
) -> JoinHandle<()> {
Expand All @@ -53,23 +53,23 @@ impl SlotStatusObserver {
while !exit.load(Ordering::Relaxed) {
if let Ok(slot) = bank_notification_receiver.recv() {
match slot {
BankNotification::OptimisticallyConfirmed(slot) => {
SlotNotification::OptimisticallyConfirmed(slot) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_confirmed(slot, None);
}
BankNotification::Frozen(bank) => {
SlotNotification::Frozen((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_processed(bank.slot(), Some(bank.parent_slot()));
.notify_slot_processed(slot, Some(parent));
}
BankNotification::Root(bank) => {
SlotNotification::Root((slot, parent)) => {
slot_status_notifier
.read()
.unwrap()
.notify_slot_rooted(bank.slot(), Some(bank.parent_slot()));
.notify_slot_rooted(slot, Some(parent));
}
}
}
Expand Down