Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::{
cmp::Reverse,
sync::{Arc, OnceLock},
time::Duration,
};

use magicblock_metrics::metrics::TRANSACTION_COUNT;
use scc::{ebr::Guard, TreeIndex};
use solana_rpc_client_api::response::RpcPerfSample;
use tokio::time;
use tokio_util::sync::CancellationToken;

use super::prelude::*;

/// 60 seconds per sample
const PERIOD_SECS: u64 = 60;

/// Keep 12 hours of history (720 minutes)
const MAX_PERF_SAMPLES: usize = 720;

/// Estimated blocks per minute:
/// Nominal = 1200 (20 blocks/sec * 60s).
/// We use 1500 (25% buffer) to ensure the cleanup
/// logic never accidentally prunes valid history
const ESTIMATED_SLOTS_PER_SAMPLE: u64 = 1500;

static PERF_SAMPLES: OnceLock<TreeIndex<Reverse<Slot>, Sample>> =
OnceLock::new();

#[derive(Clone, Copy)]
struct Sample {
transactions: u64,
slots: u64,
}

impl HttpDispatcher {
pub(crate) fn get_recent_performance_samples(
&self,
request: &mut JsonRequest,
) -> HandlerResult {
let count = parse_params!(request.params()?, usize);
let mut count: usize = some_or_err!(count);

// Cap request at max history size (12h)
count = count.min(MAX_PERF_SAMPLES);

let index = PERF_SAMPLES.get_or_init(TreeIndex::default);
let mut samples = Vec::with_capacity(count);

// Index is keyed by Reverse(Slot), so iter() yields Newest -> Oldest
for (slot, &sample) in index.iter(&Guard::new()).take(count) {
samples.push(RpcPerfSample {
slot: slot.0,
num_slots: sample.slots,
num_transactions: sample.transactions,
num_non_vote_transactions: None,
sample_period_secs: PERIOD_SECS as u16,
});
}

Ok(ResponsePayload::encode_no_context(&request.id, samples))
}

pub(crate) async fn run_perf_samples_collector(
self: Arc<Self>,
cancel: CancellationToken,
) {
let mut interval = time::interval(Duration::from_secs(PERIOD_SECS));

let mut last_slot = self.blocks.block_height();
let mut last_tx_count = TRANSACTION_COUNT.get();

loop {
tokio::select! {
_ = interval.tick() => {
// Capture current state
let current_slot = self.blocks.block_height();
let current_tx_count = TRANSACTION_COUNT.get();

// Calculate Deltas (Activity within the last 60s)
let slots_delta = current_slot.saturating_sub(last_slot).max(1);
let tx_delta = current_tx_count.saturating_sub(last_tx_count);

let index = PERF_SAMPLES.get_or_init(TreeIndex::default);
let sample = Sample {
slots: slots_delta,
transactions: tx_delta,
};
let _ = index.insert_async(Reverse(current_slot), sample).await;

// Prune old history
if index.len() > MAX_PERF_SAMPLES {
// Calculate cutoff: 720 samples * 1500 blocks = ~1.08M blocks history
let retention_range = MAX_PERF_SAMPLES as u64 * ESTIMATED_SLOTS_PER_SAMPLE;
let cutoff_slot = current_slot.saturating_sub(retention_range);

// Remove everything OLDER than the cutoff.
// In Reverse(), "Older" (Smaller Slot) == "Greater Value".
// RangeFrom (cutoff..) removes the tail of the tree.
index.remove_range_async(Reverse(cutoff_slot)..).await;
}

// Update baseline for next tick
last_slot = current_slot;
last_tx_count = current_tx_count;
}
_ = cancel.cancelled() => {
break;
}
}
}
}
}
35 changes: 26 additions & 9 deletions magicblock-aperture/src/requests/http/mocked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
//! returning default or empty responses, rather than 'method not found' errors.

use magicblock_core::link::blocks::BlockHash;
use magicblock_metrics::metrics::TRANSACTION_COUNT;
use solana_account_decoder::parse_token::UiTokenAmount;
use solana_rpc_client_api::response::{
RpcBlockCommitment, RpcContactInfo, RpcSnapshotSlotInfo, RpcSupply,
RpcVoteAccountStatus,
};

use super::prelude::*;
const SLOTS_IN_EPOCH: u64 = 432_000;

impl HttpDispatcher {
/// Handles the `getSlotLeader` RPC request.
Expand All @@ -37,7 +40,8 @@ impl HttpDispatcher {
&self,
request: &JsonRequest,
) -> HandlerResult {
Ok(ResponsePayload::encode_no_context(&request.id, 0))
let count = TRANSACTION_COUNT.get();
Ok(ResponsePayload::encode_no_context(&request.id, count))
}

/// Handles the `getSlotLeaders` RPC request.
Expand Down Expand Up @@ -161,13 +165,15 @@ impl HttpDispatcher {
&self,
request: &JsonRequest,
) -> HandlerResult {
let slot = self.blocks.block_height();
let transaction_count = self.ledger.count_transactions()?;
let info = json::json! {{
"epoch": 0,
"slotIndex": 0,
"slotsInEpoch": u64::MAX,
"absoluteSlot": 0,
"blockHeight": 0,
"transactionCount": Some(0),
"epoch": slot / SLOTS_IN_EPOCH,
"slotIndex": slot % SLOTS_IN_EPOCH,
"slotsInEpoch": SLOTS_IN_EPOCH,
"absoluteSlot": slot,
"blockHeight": slot,
"transactionCount": Some(transaction_count),
}};
Ok(ResponsePayload::encode_no_context(&request.id, info))
}
Expand All @@ -182,8 +188,8 @@ impl HttpDispatcher {
"firstNormalEpoch": 0,
"firstNormalSlot": 0,
"leaderScheduleSlotOffset": 0,
"slotsPerEpoch": u64::MAX,
"warmup": true
"slotsPerEpoch": SLOTS_IN_EPOCH,
"warmup": false
}};
Ok(ResponsePayload::encode_no_context(&request.id, schedule))
}
Expand Down Expand Up @@ -226,4 +232,15 @@ impl HttpDispatcher {
};
Ok(ResponsePayload::encode_no_context(&request.id, [info]))
}

pub(crate) fn get_vote_accounts(
&self,
request: &JsonRequest,
) -> HandlerResult {
let status = RpcVoteAccountStatus {
current: Vec::new(),
delinquent: Vec::new(),
};
Ok(ResponsePayload::encode_no_context(&request.id, status))
}
}
1 change: 1 addition & 0 deletions magicblock-aperture/src/requests/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ pub(crate) mod get_identity;
pub(crate) mod get_latest_blockhash;
pub(crate) mod get_multiple_accounts;
pub(crate) mod get_program_accounts;
pub(crate) mod get_recent_performance_samples;
pub(crate) mod get_signature_statuses;
pub(crate) mod get_signatures_for_address;
pub(crate) mod get_slot;
Expand Down
118 changes: 54 additions & 64 deletions magicblock-aperture/src/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) enum JsonRpcHttpMethod {
GetLatestBlockhash,
GetMultipleAccounts,
GetProgramAccounts,
GetRecentPerformanceSamples,
GetSignatureStatuses,
GetSignaturesForAddress,
GetSlot,
Expand All @@ -65,6 +66,7 @@ pub(crate) enum JsonRpcHttpMethod {
GetTransaction,
GetTransactionCount,
GetVersion,
GetVoteAccounts,
IsBlockhashValid,
MinimumLedgerSlot,
RequestAirdrop,
Expand All @@ -91,77 +93,65 @@ pub(crate) enum JsonRpcWsMethod {
impl JsonRpcHttpMethod {
pub(crate) fn as_str(&self) -> &'static str {
match self {
JsonRpcHttpMethod::GetAccountInfo => "getAccountInfo",
JsonRpcHttpMethod::GetBalance => "getBalance",
JsonRpcHttpMethod::GetBlock => "getBlock",
JsonRpcHttpMethod::GetBlockCommitment => "getBlockCommitment",
JsonRpcHttpMethod::GetBlockHeight => "getBlockHeight",
JsonRpcHttpMethod::GetBlockTime => "getBlockTime",
JsonRpcHttpMethod::GetBlocks => "getBlocks",
JsonRpcHttpMethod::GetBlocksWithLimit => "getBlocksWithLimit",
JsonRpcHttpMethod::GetClusterNodes => "getClusterNodes",
JsonRpcHttpMethod::GetEpochInfo => "getEpochInfo",
JsonRpcHttpMethod::GetEpochSchedule => "getEpochSchedule",
JsonRpcHttpMethod::GetFeeForMessage => "getFeeForMessage",
JsonRpcHttpMethod::GetFirstAvailableBlock => {
"getFirstAvailableBlock"
}
JsonRpcHttpMethod::GetGenesisHash => "getGenesisHash",
JsonRpcHttpMethod::GetHealth => "getHealth",
JsonRpcHttpMethod::GetHighestSnapshotSlot => {
"getHighestSnapshotSlot"
}
JsonRpcHttpMethod::GetIdentity => "getIdentity",
JsonRpcHttpMethod::GetLargestAccounts => "getLargestAccounts",
JsonRpcHttpMethod::GetLatestBlockhash => "getLatestBlockhash",
JsonRpcHttpMethod::GetMultipleAccounts => "getMultipleAccounts",
JsonRpcHttpMethod::GetProgramAccounts => "getProgramAccounts",
JsonRpcHttpMethod::GetSignatureStatuses => "getSignatureStatuses",
JsonRpcHttpMethod::GetSignaturesForAddress => {
"getSignaturesForAddress"
}
JsonRpcHttpMethod::GetSlot => "getSlot",
JsonRpcHttpMethod::GetSlotLeader => "getSlotLeader",
JsonRpcHttpMethod::GetSlotLeaders => "getSlotLeaders",
JsonRpcHttpMethod::GetSupply => "getSupply",
JsonRpcHttpMethod::GetTokenAccountBalance => {
"getTokenAccountBalance"
}
JsonRpcHttpMethod::GetTokenAccountsByDelegate => {
"getTokenAccountsByDelegate"
}
JsonRpcHttpMethod::GetTokenAccountsByOwner => {
"getTokenAccountsByOwner"
}
JsonRpcHttpMethod::GetTokenLargestAccounts => {
"getTokenLargestAccounts"
}
JsonRpcHttpMethod::GetTokenSupply => "getTokenSupply",
JsonRpcHttpMethod::GetTransaction => "getTransaction",
JsonRpcHttpMethod::GetTransactionCount => "getTransactionCount",
JsonRpcHttpMethod::GetVersion => "getVersion",
JsonRpcHttpMethod::IsBlockhashValid => "isBlockhashValid",
JsonRpcHttpMethod::MinimumLedgerSlot => "minimumLedgerSlot",
JsonRpcHttpMethod::RequestAirdrop => "requestAirdrop",
JsonRpcHttpMethod::SendTransaction => "sendTransaction",
JsonRpcHttpMethod::SimulateTransaction => "simulateTransaction",
Self::GetAccountInfo => "getAccountInfo",
Self::GetBalance => "getBalance",
Self::GetBlock => "getBlock",
Self::GetBlockCommitment => "getBlockCommitment",
Self::GetBlockHeight => "getBlockHeight",
Self::GetBlockTime => "getBlockTime",
Self::GetBlocks => "getBlocks",
Self::GetBlocksWithLimit => "getBlocksWithLimit",
Self::GetClusterNodes => "getClusterNodes",
Self::GetEpochInfo => "getEpochInfo",
Self::GetEpochSchedule => "getEpochSchedule",
Self::GetFeeForMessage => "getFeeForMessage",
Self::GetFirstAvailableBlock => "getFirstAvailableBlock",
Self::GetGenesisHash => "getGenesisHash",
Self::GetHealth => "getHealth",
Self::GetHighestSnapshotSlot => "getHighestSnapshotSlot",
Self::GetIdentity => "getIdentity",
Self::GetLargestAccounts => "getLargestAccounts",
Self::GetLatestBlockhash => "getLatestBlockhash",
Self::GetMultipleAccounts => "getMultipleAccounts",
Self::GetProgramAccounts => "getProgramAccounts",
Self::GetRecentPerformanceSamples => "getRecentPerformanceSamples",
Self::GetSignatureStatuses => "getSignatureStatuses",
Self::GetSignaturesForAddress => "getSignaturesForAddress",
Self::GetSlot => "getSlot",
Self::GetSlotLeader => "getSlotLeader",
Self::GetSlotLeaders => "getSlotLeaders",
Self::GetSupply => "getSupply",
Self::GetTokenAccountBalance => "getTokenAccountBalance",
Self::GetTokenAccountsByDelegate => "getTokenAccountsByDelegate",
Self::GetTokenAccountsByOwner => "getTokenAccountsByOwner",
Self::GetTokenLargestAccounts => "getTokenLargestAccounts",
Self::GetTokenSupply => "getTokenSupply",
Self::GetTransaction => "getTransaction",
Self::GetTransactionCount => "getTransactionCount",
Self::GetVersion => "getVersion",
Self::GetVoteAccounts => "getVoteAccounts",
Self::IsBlockhashValid => "isBlockhashValid",
Self::MinimumLedgerSlot => "minimumLedgerSlot",
Self::RequestAirdrop => "requestAirdrop",
Self::SendTransaction => "sendTransaction",
Self::SimulateTransaction => "simulateTransaction",
}
}
}

impl JsonRpcWsMethod {
pub(crate) fn as_str(&self) -> &'static str {
match self {
JsonRpcWsMethod::AccountSubscribe => "accountSubscribe",
JsonRpcWsMethod::AccountUnsubscribe => "accountUnsubscribe",
JsonRpcWsMethod::LogsSubscribe => "logsSubscribe",
JsonRpcWsMethod::LogsUnsubscribe => "logsUnsubscribe",
JsonRpcWsMethod::ProgramSubscribe => "programSubscribe",
JsonRpcWsMethod::ProgramUnsubscribe => "programUnsubscribe",
JsonRpcWsMethod::SignatureSubscribe => "signatureSubscribe",
JsonRpcWsMethod::SignatureUnsubscribe => "signatureUnsubscribe",
JsonRpcWsMethod::SlotSubscribe => "slotSubscribe",
JsonRpcWsMethod::SlotUnsubscribe => "slotUnsubscribe",
Self::AccountSubscribe => "accountSubscribe",
Self::AccountUnsubscribe => "accountUnsubscribe",
Self::LogsSubscribe => "logsSubscribe",
Self::LogsUnsubscribe => "logsUnsubscribe",
Self::ProgramSubscribe => "programSubscribe",
Self::ProgramUnsubscribe => "programUnsubscribe",
Self::SignatureSubscribe => "signatureSubscribe",
Self::SignatureUnsubscribe => "signatureUnsubscribe",
Self::SlotSubscribe => "slotSubscribe",
Self::SlotUnsubscribe => "slotUnsubscribe",
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions magicblock-aperture/src/requests/payload.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use hyper::{body::Bytes, Response};
use hyper::{body::Bytes, header::CONTENT_TYPE, Response};
use json::{Serialize, Value};
use magicblock_core::Slot;

Expand Down Expand Up @@ -160,10 +160,8 @@ impl<'id, T: Serialize> ResponsePayload<'id, T> {

/// Builds a standard `200 OK` JSON HTTP response with appropriate headers.
fn build_json_response<T: Serialize>(payload: T) -> Response<JsonBody> {
use hyper::header::{ACCESS_CONTROL_ALLOW_ORIGIN, CONTENT_TYPE};
Response::builder()
.header(CONTENT_TYPE, "application/json")
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.body(JsonBody::from(payload))
// SAFETY: Safe with static values
.expect("Building JSON response failed")
Expand Down
Loading
Loading