Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion crates/core/src/runloops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::{
};

use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, ReplicaTransactionInfoV3, ReplicaTransactionInfoVersions,
GeyserPlugin, ReplicaBlockInfoV4, ReplicaBlockInfoVersions, ReplicaEntryInfoV2,
ReplicaEntryInfoVersions, ReplicaTransactionInfoV3, ReplicaTransactionInfoVersions, SlotStatus,
};
use chrono::{Local, Utc};
use crossbeam::select;
Expand All @@ -32,6 +33,7 @@ use solana_geyser_plugin_manager::geyser_plugin_manager::{
};
use solana_message::SimpleAddressLoader;
use solana_transaction::sanitized::{MessageHash, SanitizedTransaction};
use solana_transaction_status::RewardsAndNumPartitions;
#[cfg(feature = "subgraph")]
use surfpool_subgraph::SurfpoolSubgraphPlugin;
use surfpool_types::{
Expand Down Expand Up @@ -171,6 +173,9 @@ pub async fn start_local_surfnet_runloop(
let initial_transactions = svm_locker.with_svm_reader(|svm| svm.transactions_processed);
let _ = simnet_events_tx_cc.send(SimnetEvent::Ready(initial_transactions));

// Notify geyser plugins that startup is complete
let _ = svm_locker.with_svm_reader(|svm| svm.geyser_events_tx.send(GeyserEvent::EndOfStartup));

start_block_production_runloop(
clock_event_rx,
clock_command_tx,
Expand Down Expand Up @@ -792,6 +797,94 @@ fn start_geyser_runloop(
}
}
}
Ok(GeyserEvent::EndOfStartup) => {
for plugin in surfpool_plugin_manager.iter() {
if let Err(e) = plugin.notify_end_of_startup() {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e)));
}
}

#[cfg(feature = "geyser_plugin")]
for plugin in plugin_manager.plugins.iter() {
if let Err(e) = plugin.notify_end_of_startup() {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e)));
}
}
}
Ok(GeyserEvent::UpdateSlotStatus { slot, parent, status }) => {
let slot_status = match status {
crate::surfnet::GeyserSlotStatus::Processed => SlotStatus::Processed,
crate::surfnet::GeyserSlotStatus::Confirmed => SlotStatus::Confirmed,
crate::surfnet::GeyserSlotStatus::Rooted => SlotStatus::Rooted,
};

for plugin in surfpool_plugin_manager.iter() {
if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e)));
}
}

#[cfg(feature = "geyser_plugin")]
for plugin in plugin_manager.plugins.iter() {
if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e)));
}
}
}
Ok(GeyserEvent::NotifyBlockMetadata(block_metadata)) => {
let rewards = RewardsAndNumPartitions {
rewards: vec![],
num_partitions: None,
};

let block_info = ReplicaBlockInfoV4 {
slot: block_metadata.slot,
blockhash: &block_metadata.blockhash,
rewards: &rewards,
block_time: block_metadata.block_time,
block_height: block_metadata.block_height,
parent_slot: block_metadata.parent_slot,
parent_blockhash: &block_metadata.parent_blockhash,
executed_transaction_count: block_metadata.executed_transaction_count,
entry_count: block_metadata.entry_count,
};

for plugin in surfpool_plugin_manager.iter() {
if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e)));
}
}

#[cfg(feature = "geyser_plugin")]
for plugin in plugin_manager.plugins.iter() {
if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e)));
}
}
}
Ok(GeyserEvent::NotifyEntry(entry_info)) => {
let entry_replica = ReplicaEntryInfoV2 {
slot: entry_info.slot,
index: entry_info.index,
num_hashes: entry_info.num_hashes,
hash: &entry_info.hash,
executed_transaction_count: entry_info.executed_transaction_count,
starting_transaction_index: entry_info.starting_transaction_index,
};

for plugin in surfpool_plugin_manager.iter() {
if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e)));
}
}

#[cfg(feature = "geyser_plugin")]
for plugin in plugin_manager.plugins.iter() {
if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e)));
}
}
}
}
}
};
Expand Down
50 changes: 49 additions & 1 deletion crates/core/src/surfnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,62 @@ pub const SLOTS_PER_EPOCH: u64 = 432000;

pub type AccountFactory = Box<dyn Fn(SurfnetSvmLocker) -> GetAccountResult + Send + Sync>;

/// Slot status for geyser plugin notifications.
/// Mirrors `agave_geyser_plugin_interface::geyser_plugin_interface::SlotStatus`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GeyserSlotStatus {
/// Slot is being processed
Processed,
/// Slot has been rooted (finalized)
Rooted,
/// Slot has been confirmed
Confirmed,
}

/// Block metadata for geyser plugin notifications.
#[derive(Debug, Clone)]
pub struct GeyserBlockMetadata {
pub slot: Slot,
pub blockhash: String,
pub parent_slot: Slot,
pub parent_blockhash: String,
pub block_time: Option<i64>,
pub block_height: Option<u64>,
pub executed_transaction_count: u64,
pub entry_count: u64,
}

/// Entry info for geyser plugin notifications.
/// Surfpool emits one entry per block (simplified model).
#[derive(Debug, Clone)]
pub struct GeyserEntryInfo {
pub slot: Slot,
pub index: usize,
pub num_hashes: u64,
pub hash: Vec<u8>,
pub executed_transaction_count: u64,
pub starting_transaction_index: usize,
}

#[allow(clippy::large_enum_variant)]
pub enum GeyserEvent {
NotifyTransaction(TransactionWithStatusMeta, Option<VersionedTransaction>),
UpdateAccount(GeyserAccountUpdate),
/// Account update sent at startup (before block production begins).
/// These updates should be sent to geyser plugins with is_startup=true.
StartupAccountUpdate(GeyserAccountUpdate),
// todo: add more events
/// Notify plugins that startup is complete.
EndOfStartup,
/// Update slot status (processed, confirmed, rooted/finalized).
UpdateSlotStatus {
slot: Slot,
parent: Option<Slot>,
status: GeyserSlotStatus,
},
/// Notify plugins of block metadata.
NotifyBlockMetadata(GeyserBlockMetadata),
/// Notify plugins of entry execution.
NotifyEntry(GeyserEntryInfo),
}

#[derive(Debug, Eq, PartialEq, Hash, Clone)]
Expand Down
59 changes: 56 additions & 3 deletions crates/core/src/surfnet/svm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ use uuid::Uuid;

use super::{
AccountSubscriptionData, BlockHeader, BlockIdentifier, FINALIZATION_SLOT_THRESHOLD,
GetAccountResult, GeyserEvent, SLOTS_PER_EPOCH, SignatureSubscriptionData,
SignatureSubscriptionType, remote::SurfnetRemoteClient,
GetAccountResult, GeyserBlockMetadata, GeyserEntryInfo, GeyserEvent, GeyserSlotStatus,
SLOTS_PER_EPOCH, SignatureSubscriptionData, SignatureSubscriptionType,
remote::SurfnetRemoteClient,
};
use crate::{
error::{SurfpoolError, SurfpoolResult},
Expand Down Expand Up @@ -1961,7 +1962,7 @@ impl SurfnetSvm {
slot,
BlockHeader {
hash: self.chain_tip.hash.clone(),
previous_blockhash: previous_chain_tip.hash,
previous_blockhash: previous_chain_tip.hash.clone(),
block_time: self.updated_at as i64 / 1_000,
block_height: self.chain_tip.index,
parent_slot: slot,
Expand Down Expand Up @@ -2004,6 +2005,46 @@ impl SurfnetSvm {
let root = new_slot.saturating_sub(FINALIZATION_SLOT_THRESHOLD);
self.notify_slot_subscribers(new_slot, parent_slot, root);

// Notify geyser plugins of slot status (Confirmed)
self.geyser_events_tx
.send(GeyserEvent::UpdateSlotStatus {
slot: new_slot,
parent: Some(parent_slot),
status: GeyserSlotStatus::Confirmed,
})
.ok();

// Notify geyser plugins of block metadata
let block_metadata = GeyserBlockMetadata {
slot: new_slot,
blockhash: self.chain_tip.hash.clone(),
parent_slot,
parent_blockhash: previous_chain_tip.hash.clone(),
block_time: Some(self.updated_at as i64 / 1_000),
block_height: Some(self.chain_tip.index),
executed_transaction_count: num_transactions,
entry_count: 1, // Surfpool produces 1 entry per block
};
self.geyser_events_tx
.send(GeyserEvent::NotifyBlockMetadata(block_metadata))
.ok();

// Notify geyser plugins of entry (Surfpool emits 1 entry per block)
let entry_hash = solana_hash::Hash::from_str(&self.chain_tip.hash)
.map(|h| h.to_bytes().to_vec())
.unwrap_or_else(|_| vec![0u8; 32]);
let entry_info = GeyserEntryInfo {
slot: new_slot,
index: 0, // Single entry per block
num_hashes: 1,
hash: entry_hash,
executed_transaction_count: num_transactions,
starting_transaction_index: 0,
};
self.geyser_events_tx
.send(GeyserEvent::NotifyEntry(entry_info))
.ok();

let clock: Clock = Clock {
slot: self.latest_epoch_info.absolute_slot,
epoch: self.latest_epoch_info.epoch,
Expand All @@ -2019,6 +2060,18 @@ impl SurfnetSvm {

self.finalize_transactions()?;

// Notify geyser plugins of newly rooted (finalized) slot
// Only emit if root is a valid slot (greater than genesis)
if root >= self.genesis_slot {
self.geyser_events_tx
.send(GeyserEvent::UpdateSlotStatus {
slot: root,
parent: root.checked_sub(1),
status: GeyserSlotStatus::Rooted,
})
.ok();
}

// Evict the accounts marked as streamed from cache to enforce them to be fetched again
let accounts_to_reset: Vec<_> = self.streamed_accounts.into_iter()?.collect();
for (pubkey_str, include_owned_accounts) in accounts_to_reset {
Expand Down