diff --git a/Cargo.lock b/Cargo.lock index c4614768c..1b2bd2804 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3561,6 +3561,7 @@ dependencies = [ "magicblock-version", "parking_lot 0.12.4", "rand 0.9.1", + "reqwest", "scc", "serde", "solana-account", diff --git a/Cargo.toml b/Cargo.toml index bcc7f085c..8b90dd076 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,6 +142,7 @@ protobuf-src = "1.1" quote = "1.0" rand = "0.8.5" rayon = "1.10.0" +reqwest = "0.11" # bundled sqlite 3.44 rusqlite = { version = "0.37.0", features = ["bundled"] } rustc_version = "0.4" diff --git a/magicblock-aperture/Cargo.toml b/magicblock-aperture/Cargo.toml index 153a956a4..63f9c8b59 100644 --- a/magicblock-aperture/Cargo.toml +++ b/magicblock-aperture/Cargo.toml @@ -69,5 +69,6 @@ serde = { workspace = true } [dev-dependencies] rand = "0.9" test-kit = { workspace = true } +reqwest = { workspace = true } solana-rpc-client = { workspace = true } solana-pubsub-client = { workspace = true } diff --git a/magicblock-aperture/src/requests/http/mod.rs b/magicblock-aperture/src/requests/http/mod.rs index b0432a365..7fa56ec0f 100644 --- a/magicblock-aperture/src/requests/http/mod.rs +++ b/magicblock-aperture/src/requests/http/mod.rs @@ -1,3 +1,4 @@ +use core::str; use std::{mem::size_of, ops::Range}; use base64::{prelude::BASE64_STANDARD, Engine}; @@ -19,7 +20,7 @@ use solana_transaction::{ }; use solana_transaction_status::UiTransactionEncoding; -use super::JsonHttpRequest; +use super::RpcRequest; use crate::{ error::RpcError, server::http::dispatch::HttpDispatcher, RpcResult, }; @@ -45,15 +46,22 @@ impl Data { } /// Deserializes the raw request body bytes into a structured `JsonHttpRequest`. -pub(crate) fn parse_body(body: Data) -> RpcResult { +pub(crate) fn parse_body(body: Data) -> RpcResult { let body_bytes = match &body { Data::Empty => { return Err(RpcError::invalid_request("missing request body")) } Data::SingleChunk(slice) => slice.as_ref(), Data::MultiChunk(vec) => vec.as_ref(), - }; - json::from_slice(body_bytes).map_err(Into::into) + } + .trim_ascii_start(); + // Hacky/cheap way to detect single request vs an array of requests + if body_bytes.first().map(|&b| b == b'{').unwrap_or_default() { + json::from_slice(body_bytes).map(RpcRequest::Single) + } else { + json::from_slice(body_bytes).map(RpcRequest::Multi) + } + .map_err(Into::into) } /// Asynchronously reads all data from an HTTP request body, correctly handling chunked transfers. diff --git a/magicblock-aperture/src/requests/mod.rs b/magicblock-aperture/src/requests/mod.rs index e5dc3e99a..1b4c14e77 100644 --- a/magicblock-aperture/src/requests/mod.rs +++ b/magicblock-aperture/src/requests/mod.rs @@ -15,6 +15,11 @@ pub(crate) struct JsonRequest { /// An optional array of positional parameter values for the method. pub(crate) params: Option, } +/// Represents either a single JSON-RPC request or a batch of multiple requests. +pub enum RpcRequest { + Single(JsonHttpRequest), + Multi(Vec), +} impl JsonRequest { /// A helper method to get a mutable reference to the diff --git a/magicblock-aperture/src/server/http/dispatch.rs b/magicblock-aperture/src/server/http/dispatch.rs index 583609cd0..32955d2e7 100644 --- a/magicblock-aperture/src/server/http/dispatch.rs +++ b/magicblock-aperture/src/server/http/dispatch.rs @@ -1,5 +1,7 @@ +use core::str; use std::{convert::Infallible, sync::Arc}; +use futures::{stream::FuturesOrdered, StreamExt}; use hyper::{ body::Incoming, header::{ @@ -22,7 +24,7 @@ use crate::{ requests::{ http::{extract_bytes, parse_body, HandlerResult}, payload::ResponseErrorPayload, - JsonHttpRequest, + JsonHttpRequest, RpcRequest, }, state::{ blocks::BlocksCache, transactions::TransactionsCache, ChainlinkImpl, @@ -105,7 +107,19 @@ impl HttpDispatcher { match $result { Ok(r) => r, Err(error) => { - return Ok(ResponseErrorPayload::encode($id, error)); + let mut resp = ResponseErrorPayload::encode($id, error); + Self::set_access_control_headers(&mut resp); + return Ok(resp); + } + } + }; + (@noret, $result:expr, $id: expr) => { + match $result { + Ok(r) => r, + Err(error) => { + let mut resp = ResponseErrorPayload::encode($id, error); + Self::set_access_control_headers(&mut resp); + resp } } }; @@ -113,11 +127,42 @@ impl HttpDispatcher { // Extract and parse the request body. let body = unwrap!(extract_bytes(request).await, None); - let mut request = unwrap!(parse_body(body), None); + let request = unwrap!(parse_body(body), None); + // Resolve the handler for request and process it - let response = self.process(&mut request).await; - // Handle any errors from the execution stage - let response = unwrap!(response, Some(&request.id)); + let (response, id) = match request { + RpcRequest::Single(mut r) => { + let response = self.process(&mut r).await; + (response, Some(r.id)) + } + RpcRequest::Multi(requests) => { + const COMA: u8 = b','; + const OPEN_BR: u8 = b'['; + const CLOSE_BR: u8 = b']'; + let mut jobs = FuturesOrdered::new(); + for mut r in requests { + let j = async { + let response = self.process(&mut r).await; + (response, r) + }; + jobs.push_back(j); + } + let mut body = vec![OPEN_BR]; + while let Some((response, request)) = jobs.next().await { + if body.len() != 1 { + body.push(COMA); + } + let response = unwrap!(@noret, response, Some(&request.id)); + body.extend_from_slice(&response.into_body().0); + } + body.push(CLOSE_BR); + (Ok(Response::new(JsonBody(body))), None) + } + }; + + // Handle any errors from the handling stage + let mut response = unwrap!(response, id.as_ref()); + Self::set_access_control_headers(&mut response); Ok(response) } @@ -130,7 +175,7 @@ impl HttpDispatcher { .with_label_values(&[method]) .start_timer(); - let mut result = match request.method { + match request.method { GetAccountInfo => self.get_account_info(request).await, GetBalance => self.get_balance(request).await, GetBlock => self.get_block(request), @@ -181,11 +226,7 @@ impl HttpDispatcher { RequestAirdrop => self.request_airdrop(request).await, SendTransaction => self.send_transaction(request).await, SimulateTransaction => self.simulate_transaction(request).await, - }; - if let Ok(response) = &mut result { - Self::set_access_control_headers(response); } - result } /// Set CORS/Access control related headers (required by explorers/web apps) diff --git a/magicblock-aperture/tests/batches.rs b/magicblock-aperture/tests/batches.rs new file mode 100644 index 000000000..9a7ab3e00 --- /dev/null +++ b/magicblock-aperture/tests/batches.rs @@ -0,0 +1,62 @@ +use json::{JsonContainerTrait, JsonValueTrait, Value}; +use setup::RpcTestEnv; + +mod setup; + +#[tokio::test] +async fn test_batch_requests() { + let env = RpcTestEnv::new().await; + let client = reqwest::Client::new(); + let rpc_url = env.rpc.url(); + + // Construct a batch request using serde_json macro + let batch_request = json::json!([ + {"jsonrpc": "2.0", "method": "getVersion", "id": 1}, + {"jsonrpc": "2.0", "method": "getIdentity", "id": 2} + ]); + + let response = client + .post(rpc_url) + .json(&batch_request) + .send() + .await + .expect("Failed to send batch request"); + + assert!( + response.status().is_success(), + "HTTP request failed status: {}", + response.status() + ); + let text = response.text().await.unwrap(); + let body: Value = json::from_str(&text).unwrap(); + + assert!(body.is_array(), "Response should be a JSON array"); + let results = body.as_array().unwrap(); + assert_eq!(results.len(), 2, "Should return exactly 2 results"); + + // Helper to find result by ID since batch responses can be out of order + let get_result = |id: u64| { + results + .iter() + .find(|v| v["id"] == id) + .expect("Result for id not found") + }; + + // Verify getVersion result (ID 1) + let res1 = get_result(1); + assert!( + res1.get("result").is_some(), + "Should contain a result object" + ); + assert!( + res1["result"]["solana-core"].is_str(), + "Should contain solana-core version" + ); + + // Verify getIdentity result (ID 2) + let res2 = get_result(2); + assert!( + res2["result"].is_object(), + "getIdentity should return an object" + ); +}