From d24885f512ba54e90ac589c90352dea2a924008e Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 20 Nov 2025 11:47:26 +0400 Subject: [PATCH 1/3] fix(aperture): prevent racy getLatestBlockhash This fixes the issue when the blockhash returned to the client didn't exist in the cache do to timing differences between block update and cache update. --- magicblock-aperture/src/processor.rs | 20 ++++------ magicblock-aperture/src/state/blocks.rs | 51 ++++++++++++++++++++----- magicblock-aperture/src/state/mod.rs | 3 +- 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/magicblock-aperture/src/processor.rs b/magicblock-aperture/src/processor.rs index 44faf1717..ca6035164 100644 --- a/magicblock-aperture/src/processor.rs +++ b/magicblock-aperture/src/processor.rs @@ -83,16 +83,20 @@ impl EventProcessor { } /// The main event processing loop for a single worker instance. - /// - /// This function listens on all event channels concurrently and processes messages - /// as they arrive. The `tokio::select!` macro is biased to prioritize account - /// processing, as it is typically the most frequent and time-sensitive event. async fn run(self, id: usize, cancel: CancellationToken) { info!("event processor {id} is running"); loop { tokio::select! { biased; + // Process a new block. + Ok(latest) = self.block_update_rx.recv_async() => { + // Notify subscribers waiting on slot updates. + self.subscriptions.send_slot(latest.meta.slot); + // Update the global blocks cache with the latest block. + self.blocks.set_latest(latest); + } + // Process a new account state update. Ok(state) = self.account_update_rx.recv_async() => { // Notify subscribers for this specific account. @@ -121,14 +125,6 @@ impl EventProcessor { self.transactions.push(status.signature, Some(result)); } - // Process a new block. - Ok(latest) = self.block_update_rx.recv_async() => { - // Notify subscribers waiting on slot updates. - self.subscriptions.send_slot(latest.meta.slot); - // Update the global blocks cache with the latest block. - self.blocks.set_latest(latest); - } - // Listen for the cancellation signal to gracefully shut down. _ = cancel.cancelled() => { break; diff --git a/magicblock-aperture/src/state/blocks.rs b/magicblock-aperture/src/state/blocks.rs index f5d44c1da..3185fe5a1 100644 --- a/magicblock-aperture/src/state/blocks.rs +++ b/magicblock-aperture/src/state/blocks.rs @@ -1,10 +1,14 @@ -use std::{ops::Deref, time::Duration}; +use std::{ + ops::Deref, + sync::atomic::{AtomicPtr, Ordering}, + time::Duration, +}; +use log::*; use magicblock_core::{ link::blocks::{BlockHash, BlockMeta, BlockUpdate}, Slot, }; -use magicblock_ledger::LatestBlock; use solana_rpc_client_api::response::RpcBlockhash; use super::ExpiringCache; @@ -23,12 +27,20 @@ pub(crate) struct BlocksCache { /// The number of slots for which a blockhash is considered valid. /// This is calculated based on the host ER's block time relative to Solana's. block_validity: u64, - /// The most recent block update received, protected by a `RwLock` for concurrent access. - latest: LatestBlock, + /// Latest observed block (updated whenever the ledger transitions to new slot) + latest: AtomicPtr, /// An underlying time-based cache for storing `BlockHash` to `BlockMeta` mappings. cache: ExpiringCache, } +/// Last produced block that has been put into cache. We need to keep this separately, +/// as there's no way to access the cache efficiently to find the latest inserted entry +#[derive(Default, Debug)] +pub(crate) struct LastCachedBlock { + pub(crate) blockhash: BlockHash, + pub(crate) slot: Slot, +} + impl Deref for BlocksCache { type Target = ExpiringCache; fn deref(&self) -> &Self::Target { @@ -44,7 +56,7 @@ impl BlocksCache { /// /// # Panics /// Panics if `blocktime` is zero. - pub(crate) fn new(blocktime: u64, latest: LatestBlock) -> Self { + pub(crate) fn new(blocktime: u64) -> Self { const BLOCK_CACHE_TTL: Duration = Duration::from_secs(60); assert!(blocktime != 0, "blocktime cannot be zero"); @@ -54,7 +66,7 @@ impl BlocksCache { let block_validity = blocktime_ratio * MAX_VALID_BLOCKHASH_SLOTS; let cache = ExpiringCache::new(BLOCK_CACHE_TTL); Self { - latest, + latest: AtomicPtr::default(), block_validity: block_validity as u64, cache, } @@ -62,13 +74,26 @@ impl BlocksCache { /// Updates the latest block information in the cache. pub(crate) fn set_latest(&self, latest: BlockUpdate) { - // The `push` method adds the blockhash to the underlying expiring cache. + // Allocate a 'static memory for the last observed block + let last = Box::leak(Box::new(LastCachedBlock { + blockhash: latest.hash, + slot: latest.meta.slot, + })) as *mut _; + + // Register the block in the expiring cache self.cache.push(latest.hash, latest.meta); + // And mark it as latest observed + let prev = self.latest.swap(last, Ordering::Release); + // Reclaim the allocation of the previous block + (!prev.is_null()).then(|| unsafe { Box::from_raw(prev) }); } /// Retrieves information about the latest block, including its calculated validity period. pub(crate) fn get_latest(&self) -> BlockHashInfo { - let block = self.latest.load(); + let Some(block) = self.load_latest() else { + warn!("Failed to load latest cached block, the cache is empty"); + return Default::default(); + }; BlockHashInfo { hash: block.blockhash, validity: block.slot + self.block_validity, @@ -78,11 +103,19 @@ impl BlocksCache { /// Returns the slot number of the most recent block, also known as the block height. pub(crate) fn block_height(&self) -> Slot { - self.latest.load().slot + self.load_latest().map(|b| b.slot).unwrap_or_default() + } + + // Helper function to load latest observed block (if any) + #[inline] + fn load_latest(&self) -> Option<&LastCachedBlock> { + let latest = self.latest.load(Ordering::Relaxed); + (!latest.is_null()).then(|| unsafe { &*latest }) } } /// A data structure containing essential details about a blockhash for RPC responses. +#[derive(Default)] pub(crate) struct BlockHashInfo { /// The blockhash. pub(crate) hash: BlockHash, diff --git a/magicblock-aperture/src/state/mod.rs b/magicblock-aperture/src/state/mod.rs index 641316321..6c422ab16 100644 --- a/magicblock-aperture/src/state/mod.rs +++ b/magicblock-aperture/src/state/mod.rs @@ -81,12 +81,11 @@ impl SharedState { blocktime: u64, ) -> Self { const TRANSACTIONS_CACHE_TTL: Duration = Duration::from_secs(75); - let latest = ledger.latest_block().clone(); Self { context, accountsdb, transactions: ExpiringCache::new(TRANSACTIONS_CACHE_TTL).into(), - blocks: BlocksCache::new(blocktime, latest).into(), + blocks: BlocksCache::new(blocktime).into(), ledger, chainlink, subscriptions: Default::default(), From 62b47f3667d7caa26901833910ae38c6a9bbb764 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 20 Nov 2025 12:28:33 +0400 Subject: [PATCH 2/3] fix: UB with use after free --- Cargo.lock | 1 + magicblock-aperture/Cargo.toml | 1 + magicblock-aperture/src/state/blocks.rs | 37 +++++++------------------ 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0afa4a4d2..8b366cf74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3540,6 +3540,7 @@ dependencies = [ name = "magicblock-aperture" version = "0.2.3" dependencies = [ + "arc-swap", "base64 0.21.7", "bincode", "bs58", diff --git a/magicblock-aperture/Cargo.toml b/magicblock-aperture/Cargo.toml index 50ce6c6db..153a956a4 100644 --- a/magicblock-aperture/Cargo.toml +++ b/magicblock-aperture/Cargo.toml @@ -20,6 +20,7 @@ tokio = { workspace = true } tokio-util = { workspace = true } # containers +arc-swap = { workspace = true } scc = { workspace = true } # sync diff --git a/magicblock-aperture/src/state/blocks.rs b/magicblock-aperture/src/state/blocks.rs index 3185fe5a1..665175923 100644 --- a/magicblock-aperture/src/state/blocks.rs +++ b/magicblock-aperture/src/state/blocks.rs @@ -1,10 +1,6 @@ -use std::{ - ops::Deref, - sync::atomic::{AtomicPtr, Ordering}, - time::Duration, -}; +use std::{ops::Deref, sync::Arc, time::Duration}; -use log::*; +use arc_swap::ArcSwapAny; use magicblock_core::{ link::blocks::{BlockHash, BlockMeta, BlockUpdate}, Slot, @@ -28,14 +24,14 @@ pub(crate) struct BlocksCache { /// This is calculated based on the host ER's block time relative to Solana's. block_validity: u64, /// Latest observed block (updated whenever the ledger transitions to new slot) - latest: AtomicPtr, + latest: ArcSwapAny>, /// An underlying time-based cache for storing `BlockHash` to `BlockMeta` mappings. cache: ExpiringCache, } /// Last produced block that has been put into cache. We need to keep this separately, /// as there's no way to access the cache efficiently to find the latest inserted entry -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone, Copy)] pub(crate) struct LastCachedBlock { pub(crate) blockhash: BlockHash, pub(crate) slot: Slot, @@ -66,7 +62,7 @@ impl BlocksCache { let block_validity = blocktime_ratio * MAX_VALID_BLOCKHASH_SLOTS; let cache = ExpiringCache::new(BLOCK_CACHE_TTL); Self { - latest: AtomicPtr::default(), + latest: ArcSwapAny::default(), block_validity: block_validity as u64, cache, } @@ -74,26 +70,20 @@ impl BlocksCache { /// Updates the latest block information in the cache. pub(crate) fn set_latest(&self, latest: BlockUpdate) { - // Allocate a 'static memory for the last observed block - let last = Box::leak(Box::new(LastCachedBlock { + let last = LastCachedBlock { blockhash: latest.hash, slot: latest.meta.slot, - })) as *mut _; + }; // Register the block in the expiring cache self.cache.push(latest.hash, latest.meta); // And mark it as latest observed - let prev = self.latest.swap(last, Ordering::Release); - // Reclaim the allocation of the previous block - (!prev.is_null()).then(|| unsafe { Box::from_raw(prev) }); + self.latest.swap(last.into()); } /// Retrieves information about the latest block, including its calculated validity period. pub(crate) fn get_latest(&self) -> BlockHashInfo { - let Some(block) = self.load_latest() else { - warn!("Failed to load latest cached block, the cache is empty"); - return Default::default(); - }; + let block = self.latest.load(); BlockHashInfo { hash: block.blockhash, validity: block.slot + self.block_validity, @@ -103,14 +93,7 @@ impl BlocksCache { /// Returns the slot number of the most recent block, also known as the block height. pub(crate) fn block_height(&self) -> Slot { - self.load_latest().map(|b| b.slot).unwrap_or_default() - } - - // Helper function to load latest observed block (if any) - #[inline] - fn load_latest(&self) -> Option<&LastCachedBlock> { - let latest = self.latest.load(Ordering::Relaxed); - (!latest.is_null()).then(|| unsafe { &*latest }) + self.latest.load().slot } } From 68a6e26ebc380a28a0b4b32daf857c5a2f4de890 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 20 Nov 2025 14:45:27 +0400 Subject: [PATCH 3/3] fix: init the cache with latest blockhash --- magicblock-aperture/src/state/blocks.rs | 4 ++-- magicblock-aperture/src/state/mod.rs | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/magicblock-aperture/src/state/blocks.rs b/magicblock-aperture/src/state/blocks.rs index 665175923..263606ee7 100644 --- a/magicblock-aperture/src/state/blocks.rs +++ b/magicblock-aperture/src/state/blocks.rs @@ -52,7 +52,7 @@ impl BlocksCache { /// /// # Panics /// Panics if `blocktime` is zero. - pub(crate) fn new(blocktime: u64) -> Self { + pub(crate) fn new(blocktime: u64, latest: LastCachedBlock) -> Self { const BLOCK_CACHE_TTL: Duration = Duration::from_secs(60); assert!(blocktime != 0, "blocktime cannot be zero"); @@ -62,7 +62,7 @@ impl BlocksCache { let block_validity = blocktime_ratio * MAX_VALID_BLOCKHASH_SLOTS; let cache = ExpiringCache::new(BLOCK_CACHE_TTL); Self { - latest: ArcSwapAny::default(), + latest: ArcSwapAny::new(latest.into()), block_validity: block_validity as u64, cache, } diff --git a/magicblock-aperture/src/state/mod.rs b/magicblock-aperture/src/state/mod.rs index 6c422ab16..5fb7c0928 100644 --- a/magicblock-aperture/src/state/mod.rs +++ b/magicblock-aperture/src/state/mod.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use blocks::BlocksCache; +use blocks::{BlocksCache, LastCachedBlock}; use cache::ExpiringCache; use magicblock_account_cloner::ChainlinkCloner; use magicblock_accounts_db::AccountsDb; @@ -81,11 +81,16 @@ impl SharedState { blocktime: u64, ) -> Self { const TRANSACTIONS_CACHE_TTL: Duration = Duration::from_secs(75); + let block = ledger.latest_block().load(); + let latest = LastCachedBlock { + blockhash: block.blockhash, + slot: block.slot, + }; Self { context, accountsdb, transactions: ExpiringCache::new(TRANSACTIONS_CACHE_TTL).into(), - blocks: BlocksCache::new(blocktime).into(), + blocks: BlocksCache::new(blocktime, latest).into(), ledger, chainlink, subscriptions: Default::default(),