From 406f143b7ed9288dac66b09d835a614f3d813b67 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Sat, 22 Nov 2025 09:20:50 +0400 Subject: [PATCH 1/4] feat: handle batched RPC requests --- magicblock-aperture/src/requests/http/mod.rs | 6 ++-- magicblock-aperture/src/requests/mod.rs | 7 ++++ .../src/server/http/dispatch.rs | 33 ++++++++++++++++--- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/magicblock-aperture/src/requests/http/mod.rs b/magicblock-aperture/src/requests/http/mod.rs index ea599e999..7f0a455df 100644 --- a/magicblock-aperture/src/requests/http/mod.rs +++ b/magicblock-aperture/src/requests/http/mod.rs @@ -19,7 +19,7 @@ use solana_transaction::{ }; use solana_transaction_status::UiTransactionEncoding; -use super::JsonHttpRequest; +use super::{JsonRpcHttpMethod, RpcRequest}; use crate::{ error::RpcError, server::http::dispatch::HttpDispatcher, RpcResult, }; @@ -45,7 +45,9 @@ impl Data { } /// Deserializes the raw request body bytes into a structured `JsonHttpRequest`. -pub(crate) fn parse_body(body: Data) -> RpcResult { +pub(crate) fn parse_body( + body: Data, +) -> RpcResult> { let body_bytes = match &body { Data::Empty => { return Err(RpcError::invalid_request("missing request body")) diff --git a/magicblock-aperture/src/requests/mod.rs b/magicblock-aperture/src/requests/mod.rs index 7528a74c3..4418a8c18 100644 --- a/magicblock-aperture/src/requests/mod.rs +++ b/magicblock-aperture/src/requests/mod.rs @@ -16,6 +16,13 @@ pub(crate) struct JsonRequest { pub(crate) params: Option, } +#[derive(Deserialize)] +#[serde(untagged)] +pub enum RpcRequest { + Single(JsonRequest), + Multi(Vec>), +} + impl JsonRequest { /// A helper method to get a mutable reference to the /// `params` array, returning an error if it is `None`. diff --git a/magicblock-aperture/src/server/http/dispatch.rs b/magicblock-aperture/src/server/http/dispatch.rs index 37238b9fc..6ab42c648 100644 --- a/magicblock-aperture/src/server/http/dispatch.rs +++ b/magicblock-aperture/src/server/http/dispatch.rs @@ -1,5 +1,6 @@ use std::{convert::Infallible, sync::Arc}; +use futures::{stream::FuturesOrdered, StreamExt}; use hyper::{body::Incoming, Method, Request, Response}; use magicblock_accounts_db::AccountsDb; use magicblock_core::link::{ @@ -14,7 +15,7 @@ use crate::{ requests::{ http::{extract_bytes, parse_body, HandlerResult}, payload::ResponseErrorPayload, - JsonHttpRequest, + JsonHttpRequest, RpcRequest, }, state::{ blocks::BlocksCache, transactions::TransactionsCache, ChainlinkImpl, @@ -103,11 +104,35 @@ impl HttpDispatcher { // Extract and parse the request body. let body = unwrap!(extract_bytes(request).await, None); - let mut request = unwrap!(parse_body(body), None); + let request = unwrap!(parse_body(body), None); + // Resolve the handler for request and process it - let response = self.process(&mut request).await; + let (response, id) = match request { + RpcRequest::Single(mut r) => { + let response = self.process(&mut r).await; + (response, Some(r.id)) + } + RpcRequest::Multi(requests) => { + let mut jobs = FuturesOrdered::new(); + for mut r in requests { + let j = async { + let response = self.process(&mut r).await; + (response, r) + }; + jobs.push_back(j); + } + let mut body = vec![b'[']; + while let Some((response, request)) = jobs.next().await { + let response = unwrap!(response, Some(&request.id)); + body.extend_from_slice(&response.into_body().0); + } + body.push(b']'); + (Ok(Response::new(body.into())), None) + } + }; + // Handle any errors from the execution stage - let response = unwrap!(response, Some(&request.id)); + let response = unwrap!(response, id.as_ref()); Ok(response) } From 77ac55f73b83f324e5b5f3b88c4d8c019aa161c1 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Sat, 22 Nov 2025 09:25:22 +0400 Subject: [PATCH 2/4] chore: fix clippy --- magicblock-processor/src/executor/processing.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/magicblock-processor/src/executor/processing.rs b/magicblock-processor/src/executor/processing.rs index 0bb638c3c..c58c4c200 100644 --- a/magicblock-processor/src/executor/processing.rs +++ b/magicblock-processor/src/executor/processing.rs @@ -373,7 +373,6 @@ impl super::TransactionExecutor { .get_or_insert_default(); let msg = "Feepayer balance has been modified illegally".into(); logs.push(msg); - return; } } } From bf6aee59554d9bc8626765c2470e50e065fb477f Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Sat, 22 Nov 2025 17:29:39 +0400 Subject: [PATCH 3/4] feat: add batched JSON RPC requests --- Cargo.lock | 1 + Cargo.toml | 1 + magicblock-aperture/Cargo.toml | 1 + magicblock-aperture/src/requests/http/mod.rs | 15 +++-- magicblock-aperture/src/requests/mod.rs | 8 +-- .../src/server/http/dispatch.rs | 17 ++--- magicblock-aperture/tests/batches.rs | 62 +++++++++++++++++++ 7 files changed, 88 insertions(+), 17 deletions(-) create mode 100644 magicblock-aperture/tests/batches.rs diff --git a/Cargo.lock b/Cargo.lock index b828863cc..9c83cc6ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3561,6 +3561,7 @@ dependencies = [ "magicblock-version", "parking_lot 0.12.4", "rand 0.9.1", + "reqwest", "scc", "serde", "solana-account", diff --git a/Cargo.toml b/Cargo.toml index 3f5b534d3..fd525f680 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,6 +142,7 @@ protobuf-src = "1.1" quote = "1.0" rand = "0.8.5" rayon = "1.10.0" +reqwest = "0.11" # bundled sqlite 3.44 rusqlite = { version = "0.37.0", features = ["bundled"] } rustc_version = "0.4" diff --git a/magicblock-aperture/Cargo.toml b/magicblock-aperture/Cargo.toml index 153a956a4..63f9c8b59 100644 --- a/magicblock-aperture/Cargo.toml +++ b/magicblock-aperture/Cargo.toml @@ -69,5 +69,6 @@ serde = { workspace = true } [dev-dependencies] rand = "0.9" test-kit = { workspace = true } +reqwest = { workspace = true } solana-rpc-client = { workspace = true } solana-pubsub-client = { workspace = true } diff --git a/magicblock-aperture/src/requests/http/mod.rs b/magicblock-aperture/src/requests/http/mod.rs index fb98edf37..5c5336727 100644 --- a/magicblock-aperture/src/requests/http/mod.rs +++ b/magicblock-aperture/src/requests/http/mod.rs @@ -1,3 +1,4 @@ +use core::str; use std::{mem::size_of, ops::Range}; use base64::{prelude::BASE64_STANDARD, Engine}; @@ -19,7 +20,7 @@ use solana_transaction::{ }; use solana_transaction_status::UiTransactionEncoding; -use super::{JsonRpcHttpMethod, RpcRequest}; +use super::RpcRequest; use crate::{ error::RpcError, server::http::dispatch::HttpDispatcher, RpcResult, }; @@ -45,9 +46,7 @@ impl Data { } /// Deserializes the raw request body bytes into a structured `JsonHttpRequest`. -pub(crate) fn parse_body( - body: Data, -) -> RpcResult> { +pub(crate) fn parse_body(body: Data) -> RpcResult { let body_bytes = match &body { Data::Empty => { return Err(RpcError::invalid_request("missing request body")) @@ -55,7 +54,13 @@ pub(crate) fn parse_body( Data::SingleChunk(slice) => slice.as_ref(), Data::MultiChunk(vec) => vec.as_ref(), }; - json::from_slice(body_bytes).map_err(Into::into) + // Hacky/cheap way to detect single request vs an array of requests + if body_bytes.first().map(|&b| b == b'{').unwrap_or_default() { + json::from_slice(body_bytes).map(RpcRequest::Single) + } else { + json::from_slice(body_bytes).map(RpcRequest::Multi) + } + .map_err(Into::into) } /// Asynchronously reads all data from an HTTP request body, correctly handling chunked transfers. diff --git a/magicblock-aperture/src/requests/mod.rs b/magicblock-aperture/src/requests/mod.rs index 30deee348..428e58317 100644 --- a/magicblock-aperture/src/requests/mod.rs +++ b/magicblock-aperture/src/requests/mod.rs @@ -16,11 +16,9 @@ pub(crate) struct JsonRequest { pub(crate) params: Option, } -#[derive(Deserialize)] -#[serde(untagged)] -pub enum RpcRequest { - Single(JsonRequest), - Multi(Vec>), +pub enum RpcRequest { + Single(JsonHttpRequest), + Multi(Vec), } impl JsonRequest { diff --git a/magicblock-aperture/src/server/http/dispatch.rs b/magicblock-aperture/src/server/http/dispatch.rs index faa9c17ce..d67b87288 100644 --- a/magicblock-aperture/src/server/http/dispatch.rs +++ b/magicblock-aperture/src/server/http/dispatch.rs @@ -1,3 +1,4 @@ +use core::str; use std::{convert::Infallible, sync::Arc}; use futures::{stream::FuturesOrdered, StreamExt}; @@ -106,10 +107,9 @@ impl HttpDispatcher { match $result { Ok(r) => r, Err(error) => { - let mut response = - ResponseErrorPayload::encode($id, error); - Self::set_access_control_headers(&mut response); - return Ok(response); + let mut resp = ResponseErrorPayload::encode($id, error); + Self::set_access_control_headers(&mut resp); + return Ok(resp); } } }; @@ -138,13 +138,16 @@ impl HttpDispatcher { while let Some((response, request)) = jobs.next().await { let response = unwrap!(response, Some(&request.id)); body.extend_from_slice(&response.into_body().0); + body.push(b','); } - body.push(b']'); - (Ok(Response::new(body.into())), None) + if let Some(b) = body.last_mut() { + *b = b']' + } + (Ok(Response::new(JsonBody(body))), None) } }; - // Handle any errors from the execution stage + // Handle any errors from the handling stage let mut response = unwrap!(response, id.as_ref()); Self::set_access_control_headers(&mut response); Ok(response) diff --git a/magicblock-aperture/tests/batches.rs b/magicblock-aperture/tests/batches.rs new file mode 100644 index 000000000..66cba3211 --- /dev/null +++ b/magicblock-aperture/tests/batches.rs @@ -0,0 +1,62 @@ +use json::{JsonContainerTrait, JsonValueTrait, Value}; +use setup::RpcTestEnv; + +mod setup; + +#[tokio::test] +async fn test_batch_requests() { + let env = RpcTestEnv::new().await; + let client = reqwest::Client::new(); + let rpc_url = env.rpc.url(); + + // Construct a batch request using serde_json macro + let batch_request = json::json!([ + {"jsonrpc": "2.0", "method": "getVersion", "id": 1}, + {"jsonrpc": "2.0", "method": "getIdentity", "id": 2} + ]); + + let response = client + .post(rpc_url) + .json(&batch_request) + .send() + .await + .expect("Failed to send batch request"); + + assert!( + response.status().is_success(), + "HTTP request failed status: {}", + response.status() + ); + let text = response.text().await.unwrap(); + let body: Value = json::from_str(&text).unwrap(); + + assert!(body.is_array(), "Response should be a JSON array"); + let results = body.as_array().unwrap(); + assert_eq!(results.len(), 2, "Should return exactly 2 results"); + + // Helper to find result by ID since batch responses can be out of order + let get_result = |id: u64| { + results + .iter() + .find(|v| v["id"] == id) + .expect("Result for id not found") + }; + + // Verify getVersion result (ID 1) + let res1 = get_result(1); + assert!( + res1.get("result").is_some(), + "Should contain a result object" + ); + assert!( + res1["result"]["solana-core"].is_str(), + "Should contain solana-core version" + ); + + // Verify getIdentity result (ID 2) + let res2 = get_result(2); + assert!( + res2["result"].is_object(), + "getIdentity should return adn object" + ); +} From 15049378d8178c489b738502b12d9739e192b577 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Mon, 24 Nov 2025 12:41:57 +0400 Subject: [PATCH 4/4] fix: proper edge case handling when processing batches --- magicblock-aperture/src/requests/http/mod.rs | 3 ++- magicblock-aperture/src/requests/mod.rs | 2 +- .../src/server/http/dispatch.rs | 25 ++++++++++++++----- magicblock-aperture/tests/batches.rs | 2 +- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/magicblock-aperture/src/requests/http/mod.rs b/magicblock-aperture/src/requests/http/mod.rs index 5c5336727..f9f9a21cb 100644 --- a/magicblock-aperture/src/requests/http/mod.rs +++ b/magicblock-aperture/src/requests/http/mod.rs @@ -53,7 +53,8 @@ pub(crate) fn parse_body(body: Data) -> RpcResult { } Data::SingleChunk(slice) => slice.as_ref(), Data::MultiChunk(vec) => vec.as_ref(), - }; + } + .trim_ascii_start(); // Hacky/cheap way to detect single request vs an array of requests if body_bytes.first().map(|&b| b == b'{').unwrap_or_default() { json::from_slice(body_bytes).map(RpcRequest::Single) diff --git a/magicblock-aperture/src/requests/mod.rs b/magicblock-aperture/src/requests/mod.rs index 428e58317..1b4c14e77 100644 --- a/magicblock-aperture/src/requests/mod.rs +++ b/magicblock-aperture/src/requests/mod.rs @@ -15,7 +15,7 @@ pub(crate) struct JsonRequest { /// An optional array of positional parameter values for the method. pub(crate) params: Option, } - +/// Represents either a single JSON-RPC request or a batch of multiple requests. pub enum RpcRequest { Single(JsonHttpRequest), Multi(Vec), diff --git a/magicblock-aperture/src/server/http/dispatch.rs b/magicblock-aperture/src/server/http/dispatch.rs index d67b87288..32955d2e7 100644 --- a/magicblock-aperture/src/server/http/dispatch.rs +++ b/magicblock-aperture/src/server/http/dispatch.rs @@ -113,6 +113,16 @@ impl HttpDispatcher { } } }; + (@noret, $result:expr, $id: expr) => { + match $result { + Ok(r) => r, + Err(error) => { + let mut resp = ResponseErrorPayload::encode($id, error); + Self::set_access_control_headers(&mut resp); + resp + } + } + }; } // Extract and parse the request body. @@ -126,6 +136,9 @@ impl HttpDispatcher { (response, Some(r.id)) } RpcRequest::Multi(requests) => { + const COMA: u8 = b','; + const OPEN_BR: u8 = b'['; + const CLOSE_BR: u8 = b']'; let mut jobs = FuturesOrdered::new(); for mut r in requests { let j = async { @@ -134,15 +147,15 @@ impl HttpDispatcher { }; jobs.push_back(j); } - let mut body = vec![b'[']; + let mut body = vec![OPEN_BR]; while let Some((response, request)) = jobs.next().await { - let response = unwrap!(response, Some(&request.id)); + if body.len() != 1 { + body.push(COMA); + } + let response = unwrap!(@noret, response, Some(&request.id)); body.extend_from_slice(&response.into_body().0); - body.push(b','); - } - if let Some(b) = body.last_mut() { - *b = b']' } + body.push(CLOSE_BR); (Ok(Response::new(JsonBody(body))), None) } }; diff --git a/magicblock-aperture/tests/batches.rs b/magicblock-aperture/tests/batches.rs index 66cba3211..9a7ab3e00 100644 --- a/magicblock-aperture/tests/batches.rs +++ b/magicblock-aperture/tests/batches.rs @@ -57,6 +57,6 @@ async fn test_batch_requests() { let res2 = get_result(2); assert!( res2["result"].is_object(), - "getIdentity should return adn object" + "getIdentity should return an object" ); }