Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions magicblock-aperture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio = { workspace = true }
tokio-util = { workspace = true }

# containers
arc-swap = { workspace = true }
scc = { workspace = true }

# sync
Expand Down
20 changes: 8 additions & 12 deletions magicblock-aperture/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,20 @@ impl EventProcessor {
}

/// The main event processing loop for a single worker instance.
///
/// This function listens on all event channels concurrently and processes messages
/// as they arrive. The `tokio::select!` macro is biased to prioritize account
/// processing, as it is typically the most frequent and time-sensitive event.
async fn run(self, id: usize, cancel: CancellationToken) {
info!("event processor {id} is running");
loop {
tokio::select! {
biased;

// Process a new block.
Ok(latest) = self.block_update_rx.recv_async() => {
// Notify subscribers waiting on slot updates.
self.subscriptions.send_slot(latest.meta.slot);
// Update the global blocks cache with the latest block.
self.blocks.set_latest(latest);
}

// Process a new account state update.
Ok(state) = self.account_update_rx.recv_async() => {
// Notify subscribers for this specific account.
Expand Down Expand Up @@ -121,14 +125,6 @@ impl EventProcessor {
self.transactions.push(status.signature, Some(result));
}

// Process a new block.
Ok(latest) = self.block_update_rx.recv_async() => {
// Notify subscribers waiting on slot updates.
self.subscriptions.send_slot(latest.meta.slot);
// Update the global blocks cache with the latest block.
self.blocks.set_latest(latest);
}

// Listen for the cancellation signal to gracefully shut down.
_ = cancel.cancelled() => {
break;
Expand Down
30 changes: 23 additions & 7 deletions magicblock-aperture/src/state/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{ops::Deref, time::Duration};
use std::{ops::Deref, sync::Arc, time::Duration};

use arc_swap::ArcSwapAny;
use magicblock_core::{
link::blocks::{BlockHash, BlockMeta, BlockUpdate},
Slot,
};
use magicblock_ledger::LatestBlock;
use solana_rpc_client_api::response::RpcBlockhash;

use super::ExpiringCache;
Expand All @@ -23,12 +23,20 @@ pub(crate) struct BlocksCache {
/// The number of slots for which a blockhash is considered valid.
/// This is calculated based on the host ER's block time relative to Solana's.
block_validity: u64,
/// The most recent block update received, protected by a `RwLock` for concurrent access.
latest: LatestBlock,
/// Latest observed block (updated whenever the ledger transitions to new slot)
latest: ArcSwapAny<Arc<LastCachedBlock>>,
/// An underlying time-based cache for storing `BlockHash` to `BlockMeta` mappings.
cache: ExpiringCache<BlockHash, BlockMeta>,
}

/// Last produced block that has been put into cache. We need to keep this separately,
/// as there's no way to access the cache efficiently to find the latest inserted entry
#[derive(Default, Debug, Clone, Copy)]
pub(crate) struct LastCachedBlock {
pub(crate) blockhash: BlockHash,
pub(crate) slot: Slot,
}

impl Deref for BlocksCache {
type Target = ExpiringCache<BlockHash, BlockMeta>;
fn deref(&self) -> &Self::Target {
Expand All @@ -44,7 +52,7 @@ impl BlocksCache {
///
/// # Panics
/// Panics if `blocktime` is zero.
pub(crate) fn new(blocktime: u64, latest: LatestBlock) -> Self {
pub(crate) fn new(blocktime: u64, latest: LastCachedBlock) -> Self {
const BLOCK_CACHE_TTL: Duration = Duration::from_secs(60);
assert!(blocktime != 0, "blocktime cannot be zero");

Expand All @@ -54,16 +62,23 @@ impl BlocksCache {
let block_validity = blocktime_ratio * MAX_VALID_BLOCKHASH_SLOTS;
let cache = ExpiringCache::new(BLOCK_CACHE_TTL);
Self {
latest,
latest: ArcSwapAny::new(latest.into()),
block_validity: block_validity as u64,
cache,
}
}

/// Updates the latest block information in the cache.
pub(crate) fn set_latest(&self, latest: BlockUpdate) {
// The `push` method adds the blockhash to the underlying expiring cache.
let last = LastCachedBlock {
blockhash: latest.hash,
slot: latest.meta.slot,
};

// Register the block in the expiring cache
self.cache.push(latest.hash, latest.meta);
// And mark it as latest observed
self.latest.swap(last.into());
}

/// Retrieves information about the latest block, including its calculated validity period.
Expand All @@ -83,6 +98,7 @@ impl BlocksCache {
}

/// A data structure containing essential details about a blockhash for RPC responses.
#[derive(Default)]
pub(crate) struct BlockHashInfo {
/// The blockhash.
pub(crate) hash: BlockHash,
Expand Down
8 changes: 6 additions & 2 deletions magicblock-aperture/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use blocks::BlocksCache;
use blocks::{BlocksCache, LastCachedBlock};
use cache::ExpiringCache;
use magicblock_account_cloner::ChainlinkCloner;
use magicblock_accounts_db::AccountsDb;
Expand Down Expand Up @@ -81,7 +81,11 @@ impl SharedState {
blocktime: u64,
) -> Self {
const TRANSACTIONS_CACHE_TTL: Duration = Duration::from_secs(75);
let latest = ledger.latest_block().clone();
let block = ledger.latest_block().load();
let latest = LastCachedBlock {
blockhash: block.blockhash,
slot: block.slot,
};
Self {
context,
accountsdb,
Expand Down