diff --git a/Cargo.lock b/Cargo.lock index 0afa4a4d2..8b366cf74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3540,6 +3540,7 @@ dependencies = [ name = "magicblock-aperture" version = "0.2.3" dependencies = [ + "arc-swap", "base64 0.21.7", "bincode", "bs58", diff --git a/magicblock-aperture/Cargo.toml b/magicblock-aperture/Cargo.toml index 50ce6c6db..153a956a4 100644 --- a/magicblock-aperture/Cargo.toml +++ b/magicblock-aperture/Cargo.toml @@ -20,6 +20,7 @@ tokio = { workspace = true } tokio-util = { workspace = true } # containers +arc-swap = { workspace = true } scc = { workspace = true } # sync diff --git a/magicblock-aperture/src/processor.rs b/magicblock-aperture/src/processor.rs index 44faf1717..ca6035164 100644 --- a/magicblock-aperture/src/processor.rs +++ b/magicblock-aperture/src/processor.rs @@ -83,16 +83,20 @@ impl EventProcessor { } /// The main event processing loop for a single worker instance. - /// - /// This function listens on all event channels concurrently and processes messages - /// as they arrive. The `tokio::select!` macro is biased to prioritize account - /// processing, as it is typically the most frequent and time-sensitive event. async fn run(self, id: usize, cancel: CancellationToken) { info!("event processor {id} is running"); loop { tokio::select! { biased; + // Process a new block. + Ok(latest) = self.block_update_rx.recv_async() => { + // Notify subscribers waiting on slot updates. + self.subscriptions.send_slot(latest.meta.slot); + // Update the global blocks cache with the latest block. + self.blocks.set_latest(latest); + } + // Process a new account state update. Ok(state) = self.account_update_rx.recv_async() => { // Notify subscribers for this specific account. @@ -121,14 +125,6 @@ impl EventProcessor { self.transactions.push(status.signature, Some(result)); } - // Process a new block. - Ok(latest) = self.block_update_rx.recv_async() => { - // Notify subscribers waiting on slot updates. - self.subscriptions.send_slot(latest.meta.slot); - // Update the global blocks cache with the latest block. - self.blocks.set_latest(latest); - } - // Listen for the cancellation signal to gracefully shut down. _ = cancel.cancelled() => { break; diff --git a/magicblock-aperture/src/state/blocks.rs b/magicblock-aperture/src/state/blocks.rs index f5d44c1da..263606ee7 100644 --- a/magicblock-aperture/src/state/blocks.rs +++ b/magicblock-aperture/src/state/blocks.rs @@ -1,10 +1,10 @@ -use std::{ops::Deref, time::Duration}; +use std::{ops::Deref, sync::Arc, time::Duration}; +use arc_swap::ArcSwapAny; use magicblock_core::{ link::blocks::{BlockHash, BlockMeta, BlockUpdate}, Slot, }; -use magicblock_ledger::LatestBlock; use solana_rpc_client_api::response::RpcBlockhash; use super::ExpiringCache; @@ -23,12 +23,20 @@ pub(crate) struct BlocksCache { /// The number of slots for which a blockhash is considered valid. /// This is calculated based on the host ER's block time relative to Solana's. block_validity: u64, - /// The most recent block update received, protected by a `RwLock` for concurrent access. - latest: LatestBlock, + /// Latest observed block (updated whenever the ledger transitions to new slot) + latest: ArcSwapAny>, /// An underlying time-based cache for storing `BlockHash` to `BlockMeta` mappings. cache: ExpiringCache, } +/// Last produced block that has been put into cache. We need to keep this separately, +/// as there's no way to access the cache efficiently to find the latest inserted entry +#[derive(Default, Debug, Clone, Copy)] +pub(crate) struct LastCachedBlock { + pub(crate) blockhash: BlockHash, + pub(crate) slot: Slot, +} + impl Deref for BlocksCache { type Target = ExpiringCache; fn deref(&self) -> &Self::Target { @@ -44,7 +52,7 @@ impl BlocksCache { /// /// # Panics /// Panics if `blocktime` is zero. - pub(crate) fn new(blocktime: u64, latest: LatestBlock) -> Self { + pub(crate) fn new(blocktime: u64, latest: LastCachedBlock) -> Self { const BLOCK_CACHE_TTL: Duration = Duration::from_secs(60); assert!(blocktime != 0, "blocktime cannot be zero"); @@ -54,7 +62,7 @@ impl BlocksCache { let block_validity = blocktime_ratio * MAX_VALID_BLOCKHASH_SLOTS; let cache = ExpiringCache::new(BLOCK_CACHE_TTL); Self { - latest, + latest: ArcSwapAny::new(latest.into()), block_validity: block_validity as u64, cache, } @@ -62,8 +70,15 @@ impl BlocksCache { /// Updates the latest block information in the cache. pub(crate) fn set_latest(&self, latest: BlockUpdate) { - // The `push` method adds the blockhash to the underlying expiring cache. + let last = LastCachedBlock { + blockhash: latest.hash, + slot: latest.meta.slot, + }; + + // Register the block in the expiring cache self.cache.push(latest.hash, latest.meta); + // And mark it as latest observed + self.latest.swap(last.into()); } /// Retrieves information about the latest block, including its calculated validity period. @@ -83,6 +98,7 @@ impl BlocksCache { } /// A data structure containing essential details about a blockhash for RPC responses. +#[derive(Default)] pub(crate) struct BlockHashInfo { /// The blockhash. pub(crate) hash: BlockHash, diff --git a/magicblock-aperture/src/state/mod.rs b/magicblock-aperture/src/state/mod.rs index 641316321..5fb7c0928 100644 --- a/magicblock-aperture/src/state/mod.rs +++ b/magicblock-aperture/src/state/mod.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use blocks::BlocksCache; +use blocks::{BlocksCache, LastCachedBlock}; use cache::ExpiringCache; use magicblock_account_cloner::ChainlinkCloner; use magicblock_accounts_db::AccountsDb; @@ -81,7 +81,11 @@ impl SharedState { blocktime: u64, ) -> Self { const TRANSACTIONS_CACHE_TTL: Duration = Duration::from_secs(75); - let latest = ledger.latest_block().clone(); + let block = ledger.latest_block().load(); + let latest = LastCachedBlock { + blockhash: block.blockhash, + slot: block.slot, + }; Self { context, accountsdb,