-
Notifications
You must be signed in to change notification settings - Fork 23
feat: support batched json rpc requests #675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
406f143
77ac55f
60caea1
73f3ccb
bf6aee5
1504937
90d0fee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| use core::str; | ||
| use std::{mem::size_of, ops::Range}; | ||
|
|
||
| use base64::{prelude::BASE64_STANDARD, Engine}; | ||
|
|
@@ -19,7 +20,7 @@ use solana_transaction::{ | |
| }; | ||
| use solana_transaction_status::UiTransactionEncoding; | ||
|
|
||
| use super::JsonHttpRequest; | ||
| use super::RpcRequest; | ||
| use crate::{ | ||
| error::RpcError, server::http::dispatch::HttpDispatcher, RpcResult, | ||
| }; | ||
|
|
@@ -45,15 +46,22 @@ impl Data { | |
| } | ||
|
|
||
| /// Deserializes the raw request body bytes into a structured `JsonHttpRequest`. | ||
| pub(crate) fn parse_body(body: Data) -> RpcResult<JsonHttpRequest> { | ||
| pub(crate) fn parse_body(body: Data) -> RpcResult<RpcRequest> { | ||
| let body_bytes = match &body { | ||
| Data::Empty => { | ||
| return Err(RpcError::invalid_request("missing request body")) | ||
| } | ||
| Data::SingleChunk(slice) => slice.as_ref(), | ||
| Data::MultiChunk(vec) => vec.as_ref(), | ||
| }; | ||
| json::from_slice(body_bytes).map_err(Into::into) | ||
| } | ||
| .trim_ascii_start(); | ||
| // Hacky/cheap way to detect single request vs an array of requests | ||
| if body_bytes.first().map(|&b| b == b'{').unwrap_or_default() { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the user sends an invalid payload, i.e. Maybe we should also check specifically for I ran such an invalid request against our current implementation and it returns: {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "error parsing request body: invalid type: string \"Hello World\", expected struct JsonRequest at line 1 column 13\n\n\tlo World\"\n\t........^\n"
}
}Running against stock solana validator returns: {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
},
"id": null
}So we already do a better job here it seems. I double checked that we still return the same message on this branch (while writing this comment), so it seems fine either way, just something to consider to make things more explicit. |
||
| json::from_slice(body_bytes).map(RpcRequest::Single) | ||
| } else { | ||
| json::from_slice(body_bytes).map(RpcRequest::Multi) | ||
| } | ||
| .map_err(Into::into) | ||
| } | ||
|
|
||
| /// Asynchronously reads all data from an HTTP request body, correctly handling chunked transfers. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| use core::str; | ||
| use std::{convert::Infallible, sync::Arc}; | ||
|
|
||
| use futures::{stream::FuturesOrdered, StreamExt}; | ||
| use hyper::{ | ||
| body::Incoming, | ||
| header::{ | ||
|
|
@@ -22,7 +24,7 @@ use crate::{ | |
| requests::{ | ||
| http::{extract_bytes, parse_body, HandlerResult}, | ||
| payload::ResponseErrorPayload, | ||
| JsonHttpRequest, | ||
| JsonHttpRequest, RpcRequest, | ||
| }, | ||
| state::{ | ||
| blocks::BlocksCache, transactions::TransactionsCache, ChainlinkImpl, | ||
|
|
@@ -105,19 +107,62 @@ impl HttpDispatcher { | |
| match $result { | ||
| Ok(r) => r, | ||
| Err(error) => { | ||
| return Ok(ResponseErrorPayload::encode($id, error)); | ||
| let mut resp = ResponseErrorPayload::encode($id, error); | ||
| Self::set_access_control_headers(&mut resp); | ||
| return Ok(resp); | ||
| } | ||
| } | ||
| }; | ||
| (@noret, $result:expr, $id: expr) => { | ||
| match $result { | ||
| Ok(r) => r, | ||
| Err(error) => { | ||
| let mut resp = ResponseErrorPayload::encode($id, error); | ||
| Self::set_access_control_headers(&mut resp); | ||
| resp | ||
| } | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| // Extract and parse the request body. | ||
| let body = unwrap!(extract_bytes(request).await, None); | ||
| let mut request = unwrap!(parse_body(body), None); | ||
| let request = unwrap!(parse_body(body), None); | ||
|
|
||
| // Resolve the handler for request and process it | ||
| let response = self.process(&mut request).await; | ||
| // Handle any errors from the execution stage | ||
| let response = unwrap!(response, Some(&request.id)); | ||
| let (response, id) = match request { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we handrolling JSON generators? Why not use a proper crate to do this? We should use If this is done for speed then we should be sure the difference is worth it. |
||
| RpcRequest::Single(mut r) => { | ||
| let response = self.process(&mut r).await; | ||
| (response, Some(r.id)) | ||
| } | ||
| RpcRequest::Multi(requests) => { | ||
| const COMA: u8 = b','; | ||
| const OPEN_BR: u8 = b'['; | ||
| const CLOSE_BR: u8 = b']'; | ||
| let mut jobs = FuturesOrdered::new(); | ||
| for mut r in requests { | ||
| let j = async { | ||
| let response = self.process(&mut r).await; | ||
| (response, r) | ||
| }; | ||
| jobs.push_back(j); | ||
| } | ||
| let mut body = vec![OPEN_BR]; | ||
| while let Some((response, request)) = jobs.next().await { | ||
| if body.len() != 1 { | ||
| body.push(COMA); | ||
| } | ||
| let response = unwrap!(@noret, response, Some(&request.id)); | ||
| body.extend_from_slice(&response.into_body().0); | ||
| } | ||
| body.push(CLOSE_BR); | ||
| (Ok(Response::new(JsonBody(body))), None) | ||
| } | ||
| }; | ||
|
|
||
| // Handle any errors from the handling stage | ||
| let mut response = unwrap!(response, id.as_ref()); | ||
| Self::set_access_control_headers(&mut response); | ||
| Ok(response) | ||
| } | ||
|
|
||
|
|
@@ -130,7 +175,7 @@ impl HttpDispatcher { | |
| .with_label_values(&[method]) | ||
| .start_timer(); | ||
|
|
||
| let mut result = match request.method { | ||
| match request.method { | ||
| GetAccountInfo => self.get_account_info(request).await, | ||
| GetBalance => self.get_balance(request).await, | ||
| GetBlock => self.get_block(request), | ||
|
|
@@ -181,11 +226,7 @@ impl HttpDispatcher { | |
| RequestAirdrop => self.request_airdrop(request).await, | ||
| SendTransaction => self.send_transaction(request).await, | ||
| SimulateTransaction => self.simulate_transaction(request).await, | ||
| }; | ||
| if let Ok(response) = &mut result { | ||
| Self::set_access_control_headers(response); | ||
| } | ||
| result | ||
| } | ||
|
|
||
| /// Set CORS/Access control related headers (required by explorers/web apps) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| use json::{JsonContainerTrait, JsonValueTrait, Value}; | ||
| use setup::RpcTestEnv; | ||
|
|
||
| mod setup; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_batch_requests() { | ||
| let env = RpcTestEnv::new().await; | ||
| let client = reqwest::Client::new(); | ||
| let rpc_url = env.rpc.url(); | ||
|
|
||
| // Construct a batch request using serde_json macro | ||
| let batch_request = json::json!([ | ||
| {"jsonrpc": "2.0", "method": "getVersion", "id": 1}, | ||
| {"jsonrpc": "2.0", "method": "getIdentity", "id": 2} | ||
| ]); | ||
|
|
||
| let response = client | ||
| .post(rpc_url) | ||
| .json(&batch_request) | ||
| .send() | ||
| .await | ||
| .expect("Failed to send batch request"); | ||
|
|
||
| assert!( | ||
| response.status().is_success(), | ||
| "HTTP request failed status: {}", | ||
| response.status() | ||
| ); | ||
| let text = response.text().await.unwrap(); | ||
| let body: Value = json::from_str(&text).unwrap(); | ||
|
|
||
| assert!(body.is_array(), "Response should be a JSON array"); | ||
| let results = body.as_array().unwrap(); | ||
| assert_eq!(results.len(), 2, "Should return exactly 2 results"); | ||
|
|
||
| // Helper to find result by ID since batch responses can be out of order | ||
| let get_result = |id: u64| { | ||
| results | ||
| .iter() | ||
| .find(|v| v["id"] == id) | ||
| .expect("Result for id not found") | ||
| }; | ||
|
|
||
| // Verify getVersion result (ID 1) | ||
| let res1 = get_result(1); | ||
| assert!( | ||
| res1.get("result").is_some(), | ||
| "Should contain a result object" | ||
| ); | ||
| assert!( | ||
| res1["result"]["solana-core"].is_str(), | ||
| "Should contain solana-core version" | ||
| ); | ||
|
|
||
| // Verify getIdentity result (ID 2) | ||
| let res2 = get_result(2); | ||
| assert!( | ||
| res2["result"].is_object(), | ||
| "getIdentity should return an object" | ||
| ); | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.