Skip to content

Commit

Permalink
feat: query batching (#2166)
Browse files Browse the repository at this point in the history
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: meskill <8974488+meskill@users.noreply.github.com>
Co-authored-by: Tushar Mathur <tusharmath@gmail.com>
  • Loading branch information
4 people authored Jun 12, 2024
1 parent 82aed52 commit 2234777
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 15 deletions.
5 changes: 5 additions & 0 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ directive @server(
"""
apolloTracing: Boolean
"""
When set to `true`, it will ensure no graphQL execution is made more than once if
similar query is being executed across the server's lifetime.
"""
batchExecution: Boolean
"""
`batchRequests` combines multiple requests into one, improving performance but potentially
introducing latency and complicating debugging. Use judiciously. @default `false`.
"""
Expand Down
7 changes: 7 additions & 0 deletions generated/.tailcallrc.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,13 @@
"null"
]
},
"batchExecution": {
"description": "When set to `true`, it will ensure no graphQL execution is made more than once if similar query is being executed across the server's lifetime.",
"type": [
"boolean",
"null"
]
},
"batchRequests": {
"description": "`batchRequests` combines multiple requests into one, improving performance but potentially introducing latency and complicating debugging. Use judiciously. @default `false`.",
"type": [
Expand Down
9 changes: 6 additions & 3 deletions src/core/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::sync::Arc;

use async_graphql::dynamic::{self, DynamicRequest};
use async_graphql::Response;
use async_graphql_value::ConstValue;
use hyper::body::Bytes;

use crate::core::async_graphql_hyper::OperationId;
use crate::core::auth::context::GlobalAuthContext;
use crate::core::blueprint::Type::ListType;
use crate::core::blueprint::{Blueprint, Definition, SchemaModifiers};
use crate::core::data_loader::{DataLoader, DedupeResult};
use crate::core::graphql::GraphqlDataLoader;
use crate::core::grpc;
use crate::core::grpc::data_loader::GrpcDataLoader;
use crate::core::http::{DataLoaderRequest, HttpDataLoader};
use crate::core::http::{DataLoaderRequest, HttpDataLoader, Response};
use crate::core::ir::{DataLoaderId, EvaluationError, IoId, IO, IR};
use crate::core::rest::{Checked, EndpointSet};
use crate::core::runtime::TargetRuntime;
Expand All @@ -26,6 +27,7 @@ pub struct AppContext {
pub endpoints: EndpointSet<Checked>,
pub auth_ctx: Arc<GlobalAuthContext>,
pub dedupe_handler: Arc<DedupeResult<IoId, ConstValue, EvaluationError>>,
pub dedupe_operation_handler: DedupeResult<OperationId, Response<Bytes>, EvaluationError>,
}

impl AppContext {
Expand Down Expand Up @@ -131,10 +133,11 @@ impl AppContext {
endpoints,
auth_ctx: Arc::new(auth_ctx),
dedupe_handler: Arc::new(DedupeResult::new(false)),
dedupe_operation_handler: DedupeResult::new(false),
}
}

pub async fn execute(&self, request: impl Into<DynamicRequest>) -> Response {
pub async fn execute(&self, request: impl Into<DynamicRequest>) -> async_graphql::Response {
self.schema.execute(request).await
}
}
60 changes: 56 additions & 4 deletions src/core/async_graphql_hyper.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,70 @@
use std::any::Any;
use std::hash::{Hash, Hasher};

use anyhow::Result;
use async_graphql::parser::types::ExecutableDocument;
use async_graphql::parser::types::{ExecutableDocument, OperationType};
use async_graphql::{BatchResponse, Executor, Value};
use headers::HeaderMap;
use hyper::header::{HeaderValue, CACHE_CONTROL, CONTENT_TYPE};
use hyper::{Body, Response, StatusCode};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use tailcall_hasher::TailcallHasher;

#[derive(PartialEq, Eq, Clone, Hash, Debug)]
pub struct OperationId(u64);

#[async_trait::async_trait]
pub trait GraphQLRequestLike {
pub trait GraphQLRequestLike: Hash + Send {
fn data<D: Any + Clone + Send + Sync>(self, data: D) -> Self;
async fn execute<E>(self, executor: &E) -> GraphQLResponse
where
E: Executor;

fn parse_query(&mut self) -> Option<&ExecutableDocument>;

fn is_query(&mut self) -> bool {
self.parse_query()
.map(|a| {
let mut is_query = false;
for (_, operation) in a.operations.iter() {
is_query = operation.node.ty == OperationType::Query;
}
is_query
})
.unwrap_or(false)
}

fn operation_id(&self, headers: &HeaderMap) -> OperationId {
let mut hasher = TailcallHasher::default();
let state = &mut hasher;
for (name, value) in headers.iter() {
name.hash(state);
value.hash(state);
}
self.hash(state);
OperationId(hasher.finish())
}
}

#[derive(Debug, Deserialize)]
pub struct GraphQLBatchRequest(pub async_graphql::BatchRequest);
impl GraphQLBatchRequest {}

impl Hash for GraphQLBatchRequest {
//TODO: Fix Hash implementation for BatchRequest, which should ideally batch
// execution of individual requests instead of the whole chunk of requests as
// one.
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
for request in self.0.iter() {
request.query.hash(state);
request.operation_name.hash(state);
for (name, value) in request.variables.iter() {
name.hash(state);
value.to_string().hash(state);
}
}
}
}
#[async_trait::async_trait]
impl GraphQLRequestLike for GraphQLBatchRequest {
fn data<D: Any + Clone + Send + Sync>(mut self, data: D) -> Self {
Expand All @@ -47,7 +90,16 @@ impl GraphQLRequestLike for GraphQLBatchRequest {
pub struct GraphQLRequest(pub async_graphql::Request);

impl GraphQLRequest {}

impl Hash for GraphQLRequest {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.query.hash(state);
self.0.operation_name.hash(state);
for (name, value) in self.0.variables.iter() {
name.hash(state);
value.to_string().hash(state);
}
}
}
#[async_trait::async_trait]
impl GraphQLRequestLike for GraphQLRequest {
#[must_use]
Expand Down
2 changes: 2 additions & 0 deletions src/core/blueprint/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct Server {
pub cors: Option<Cors>,
pub experimental_headers: HashSet<HeaderName>,
pub auth: Option<Auth>,
pub batch_execution: bool,
}

/// Mimic of mini_v8::Script that's wasm compatible
Expand Down Expand Up @@ -150,6 +151,7 @@ impl TryFrom<crate::core::config::ConfigModule> for Server {
script,
cors,
auth,
batch_execution: config_server.get_batch_execution(),
}
},
)
Expand Down
10 changes: 10 additions & 0 deletions src/core/config/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ pub struct Server {
/// debugging. Use judiciously. @default `false`.
pub batch_requests: Option<bool>,

#[serde(default, skip_serializing_if = "is_default")]
/// When set to `true`, it will ensure no graphQL execution is made more
/// than once if similar query is being executed across the
/// server's lifetime.
pub batch_execution: Option<bool>,

#[serde(default, skip_serializing_if = "is_default")]
/// `headers` contains key-value pairs that are included as default headers
/// in server responses, allowing for consistent header management across
Expand Down Expand Up @@ -198,6 +204,10 @@ impl Server {
pub fn get_pipeline_flush(&self) -> bool {
self.pipeline_flush.unwrap_or(true)
}

pub fn get_batch_execution(&self) -> bool {
self.batch_execution.unwrap_or(false)
}
}

#[cfg(test)]
Expand Down
38 changes: 30 additions & 8 deletions src/core/http/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,26 @@ pub async fn graphql_request<T: DeserializeOwned + GraphQLRequestLike>(
) -> Result<Response<Body>> {
req_counter.set_http_route("/graphql");
let req_ctx = Arc::new(create_request_context(&req, app_ctx));
let bytes = hyper::body::to_bytes(req.into_body()).await?;
let (req, body) = req.into_parts();
let bytes = hyper::body::to_bytes(body).await?;
let graphql_request = serde_json::from_slice::<T>(&bytes);
match graphql_request {
Ok(mut request) => {
let _ = request.parse_query();
let mut response = request.data(req_ctx.clone()).execute(&app_ctx.schema).await;

response = update_cache_control_header(response, app_ctx, req_ctx.clone());
let mut resp = response.into_response()?;
update_response_headers(&mut resp, &req_ctx, app_ctx);
Ok(resp)
if !(app_ctx.blueprint.server.batch_execution && request.is_query()) {
Ok(execute_query(&app_ctx, &req_ctx, request).await?)
} else {
let operation_id = request.operation_id(&req.headers);
let out = app_ctx
.dedupe_operation_handler
.dedupe(&operation_id, || {
Box::pin(async move {
let resp = execute_query(&app_ctx, &req_ctx, request).await?;
Ok(crate::core::http::Response::from_hyper(resp).await?)
})
})
.await?;
Ok(hyper::Response::from(out))
}
}
Err(err) => {
tracing::error!(
Expand All @@ -130,6 +139,19 @@ pub async fn graphql_request<T: DeserializeOwned + GraphQLRequestLike>(
}
}

async fn execute_query<T: DeserializeOwned + GraphQLRequestLike>(
app_ctx: &&AppContext,
req_ctx: &Arc<RequestContext>,
request: T,
) -> anyhow::Result<Response<Body>> {
let mut response = request.data(req_ctx.clone()).execute(&app_ctx.schema).await;

response = update_cache_control_header(response, app_ctx, req_ctx.clone());
let mut resp = response.into_response()?;
update_response_headers(&mut resp, req_ctx, app_ctx);
Ok(resp)
}

fn create_allowed_headers(headers: &HeaderMap, allowed: &BTreeSet<String>) -> HeaderMap {
let mut new_headers = HeaderMap::new();
for (k, v) in headers.iter() {
Expand Down
17 changes: 17 additions & 0 deletions src/core/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Result;
use async_graphql_value::{ConstValue, Name};
use derive_setters::Setters;
use hyper::body::Bytes;
use hyper::Body;
use indexmap::IndexMap;
use prost::Message;
use tonic::Status;
Expand Down Expand Up @@ -54,6 +55,13 @@ impl Response<Bytes> {
Ok(Response { status, headers, body })
}

pub async fn from_hyper(resp: hyper::Response<hyper::Body>) -> Result<Self> {
let status = resp.status();
let headers = resp.headers().to_owned();
let body = hyper::body::to_bytes(resp.into_body()).await?;
Ok(Response { status, headers, body })
}

pub fn empty() -> Self {
Response {
status: reqwest::StatusCode::OK,
Expand Down Expand Up @@ -151,3 +159,12 @@ impl Response<Bytes> {
})
}
}

impl From<Response<Bytes>> for hyper::Response<Body> {
fn from(resp: Response<Bytes>) -> Self {
let mut response = hyper::Response::new(Body::from(resp.body));
*response.headers_mut() = resp.headers;
*response.status_mut() = resp.status;
response
}
}
24 changes: 24 additions & 0 deletions tests/core/snapshots/dedupe_batch_query_execution.md_0.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
source: tests/core/spec.rs
expression: response
---
{
"status": 200,
"headers": {
"content-type": "application/json"
},
"body": {
"data": {
"posts": [
{
"id": 1,
"userId": 1
},
{
"id": 2,
"userId": 2
}
]
}
}
}
52 changes: 52 additions & 0 deletions tests/core/snapshots/dedupe_batch_query_execution.md_client.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
source: tests/core/spec.rs
expression: formatted
---
scalar Bytes

scalar Date

scalar Email

scalar Empty

scalar Int128

scalar Int16

scalar Int32

scalar Int64

scalar Int8

scalar JSON

scalar PhoneNumber

type Post {
body: String
id: Int
title: String
userId: Int!
}

type Query {
posts: [Post]
}

scalar UInt128

scalar UInt16

scalar UInt32

scalar UInt64

scalar UInt8

scalar Url

schema {
query: Query
}
25 changes: 25 additions & 0 deletions tests/core/snapshots/dedupe_batch_query_execution.md_merged.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: tests/core/spec.rs
expression: formatter
---
schema
@server(batchExecution: true, port: 8000, queryValidation: false)
@upstream(baseURL: "http://jsonplaceholder.typicode.com", dedupeInFlight: true) {
query: Query
}

type Post {
body: String
id: Int
title: String
userId: Int!
}

type Query {
posts: [Post] @http(path: "/posts?id=1")
}

type User {
id: Int
name: String
}
Loading

1 comment on commit 2234777

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 6.61ms 2.99ms 84.58ms 72.08%
Req/Sec 3.83k 195.20 4.18k 92.08%

457117 requests in 30.01s, 2.29GB read

Requests/sec: 15231.92

Transfer/sec: 78.18MB

Please sign in to comment.