Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multiple onRequest handlers #1863

Merged
merged 26 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions benches/data_loader_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use criterion::Criterion;
use hyper::body::Bytes;
use reqwest::Request;
use tailcall::core::config::Batch;
use tailcall::core::http::{DataLoaderRequest, HttpDataLoader, Response};
use tailcall::core::http::{DataLoaderRequest, HttpDataLoader, HttpFilter, Response};
use tailcall::core::runtime::TargetRuntime;
use tailcall::core::{EnvIO, FileIO, HttpIO};

Expand All @@ -26,6 +26,15 @@ impl HttpIO for MockHttpClient {
async fn execute(&self, _req: Request) -> anyhow::Result<Response<Bytes>> {
Ok(Response::empty())
}

// just to satisfy trait
async fn execute_with(
&self,
_req: Request,
_http_filter: &'life0 HttpFilter,
) -> anyhow::Result<Response<Bytes>> {
Ok(Response::empty())
}
}

struct Env {}
Expand Down Expand Up @@ -99,8 +108,10 @@ pub fn benchmark_data_loader(c: &mut Criterion) {
let key1 = DataLoaderRequest::new(request1, headers_to_consider.clone());
let key2 = DataLoaderRequest::new(request2, headers_to_consider);

let futures1 = (0..100).map(|_| loader.load_one(key1.clone()));
let futures2 = (0..100).map(|_| loader.load_one(key2.clone()));
let futures1 =
(0..100).map(|_| loader.load_one(key1.clone(), HttpFilter::default()));
let futures2 =
(0..100).map(|_| loader.load_one(key2.clone(), HttpFilter::default()));
let _ = join_all(futures1.chain(futures2)).await;
assert_eq!(
client.request_count.load(Ordering::SeqCst),
Expand Down
15 changes: 14 additions & 1 deletion benches/impl_path_string_for_evaluation_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reqwest::{Client, Request};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use tailcall::core::blueprint::{Server, Upstream};
use tailcall::core::cache::InMemoryCache;
use tailcall::core::http::{RequestContext, Response};
use tailcall::core::http::{HttpFilter, RequestContext, Response};
use tailcall::core::lambda::{EvaluationContext, ResolverContextLike};
use tailcall::core::path::PathString;
use tailcall::core::runtime::TargetRuntime;
Expand Down Expand Up @@ -76,6 +76,19 @@ impl HttpIO for Http {
let resp = self.client.execute(request).await?;
Response::from_reqwest(resp).await
}

// just to satisfy the trait
async fn execute_with(
&self,
mut request: Request,
_http_filter: &'life0 HttpFilter,
) -> anyhow::Result<Response<Bytes>> {
if self.http2_only {
*request.version_mut() = reqwest::Version::HTTP_2;
}
let resp = self.client.execute(request).await?;
Response::from_reqwest(resp).await
}
}

struct Env {}
Expand Down
14 changes: 14 additions & 0 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ directive @http(
"""
method: Method
"""
onRequest field in @http directive gives the ability to specify the request interception
handler.
"""
onRequest: String
"""
Schema of the output of the API call. It is automatically inferred in most cases.
"""
output: Schema
Expand Down Expand Up @@ -382,6 +387,10 @@ directive @upstream(
"""
keepAliveWhileIdle: Boolean
"""
onRequest field gives the ability to specify the global request interception handler.
"""
onRequest: String
"""
The time in seconds that the connection pool will wait before closing idle connections.
"""
poolIdleTimeout: Int
Expand Down Expand Up @@ -646,6 +655,11 @@ input Http {
"""
method: Method
"""
onRequest field in @http directive gives the ability to specify the request interception
handler.
"""
onRequest: String
"""
Schema of the output of the API call. It is automatically inferred in most cases.
"""
output: Schema
Expand Down
14 changes: 14 additions & 0 deletions generated/.tailcallrc.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,13 @@
}
]
},
"onRequest": {
"description": "onRequest field in @http directive gives the ability to specify the request interception handler.",
"type": [
"string",
"null"
]
},
"output": {
"description": "Schema of the output of the API call. It is automatically inferred in most cases.",
"anyOf": [
Expand Down Expand Up @@ -1397,6 +1404,13 @@
"null"
]
},
"onRequest": {
"description": "onRequest field gives the ability to specify the global request interception handler.",
"type": [
"string",
"null"
]
},
"poolIdleTimeout": {
"description": "The time in seconds that the connection pool will wait before closing idle connections.",
"type": [
Expand Down
26 changes: 20 additions & 6 deletions src/cli/javascript/request_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use rquickjs::FromJs;

use super::{JsRequest, JsResponse};
use crate::core::http::Response;
use crate::core::http::{HttpFilter, Response};
use crate::core::{HttpIO, WorkerIO};

#[derive(Debug)]
Expand Down Expand Up @@ -60,10 +60,19 @@
}

#[async_recursion::async_recursion]
async fn on_request(&self, mut request: reqwest::Request) -> anyhow::Result<Response<Bytes>> {
async fn on_request(
&self,
mut request: reqwest::Request,
http_filter: HttpFilter,
) -> anyhow::Result<Response<Bytes>> {
let js_request = JsRequest::try_from(&request)?;
let event = Event::Request(js_request);
let command = self.worker.call("onRequest".to_string(), event).await?;

let mut command = None;
if let Some(ref on_request) = http_filter.on_request {
command = self.worker.call(on_request.clone(), event).await?;
}

match command {
Some(command) => match command {
Command::Request(js_request) => {
Expand All @@ -78,7 +87,7 @@
request
.url_mut()
.set_path(js_response.headers()["location"].as_str());
self.on_request(request).await
self.on_request(request, HttpFilter::default()).await

Check warning on line 90 in src/cli/javascript/request_filter.rs

View check run for this annotation

Codecov / codecov/patch

src/cli/javascript/request_filter.rs#L90

Added line #L90 was not covered by tests
} else {
Ok(js_response.try_into()?)
}
Expand All @@ -91,11 +100,16 @@

#[async_trait::async_trait]
impl HttpIO for RequestFilter {
async fn execute(
async fn execute_with(
&self,
request: reqwest::Request,
http_filter: &'life0 HttpFilter,
) -> anyhow::Result<Response<hyper::body::Bytes>> {
self.on_request(request).await
if http_filter.on_request.is_some() {
self.on_request(request, http_filter.clone()).await
} else {
self.client.execute(request).await
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/cli/runtime/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
use super::HttpIO;
use crate::core::blueprint::telemetry::Telemetry;
use crate::core::blueprint::Upstream;
use crate::core::http::Response;
use crate::core::http::{HttpFilter, Response};

static HTTP_CLIENT_REQUEST_COUNT: Lazy<Counter<u64>> = Lazy::new(|| {
let meter = opentelemetry::global::meter("http_request");
Expand Down Expand Up @@ -141,7 +141,11 @@ impl HttpIO for NativeHttp {
network.protocol.version = ?request.version()
)
)]
async fn execute(&self, mut request: reqwest::Request) -> Result<Response<Bytes>> {
async fn execute_with(
&self,
mut request: reqwest::Request,
_: &'life0 HttpFilter,
) -> Result<Response<Bytes>> {
if self.http2_only {
*request.version_mut() = reqwest::Version::HTTP_2;
}
Expand Down
3 changes: 2 additions & 1 deletion src/core/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl AppContext {
field.map_expr(|expr| {
expr.modify(|expr| match expr {
Expression::IO(io) => match io {
IO::Http { req_template, group_by, .. } => {
IO::Http { req_template, group_by, http_filter, .. } => {
let data_loader = HttpDataLoader::new(
runtime.clone(),
group_by.clone(),
Expand All @@ -56,6 +56,7 @@ impl AppContext {
req_template: req_template.clone(),
group_by: group_by.clone(),
dl_id: Some(DataLoaderId(http_data_loaders.len())),
http_filter: http_filter.clone(),
}));

http_data_loaders.push(data_loader);
Expand Down
13 changes: 11 additions & 2 deletions src/core/blueprint/operators/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::core::blueprint::*;
use crate::core::config::group_by::GroupBy;
use crate::core::config::Field;
use crate::core::endpoint::Endpoint;
use crate::core::http::{Method, RequestTemplate};
use crate::core::http::{HttpFilter, Method, RequestTemplate};
use crate::core::lambda::{Expression, IO};
use crate::core::try_fold::TryFold;
use crate::core::valid::{Valid, ValidationError, Validator};
Expand Down Expand Up @@ -59,14 +59,23 @@ pub fn compile_http(
.into()
})
.map(|req_template| {
// marge http and upstream on_request
let on_request = http
.on_request
.clone()
.or(config_module.upstream.on_request.clone())
.or(Some("onRequest".to_string()));
let http_filter = HttpFilter { on_request };

if !http.group_by.is_empty() && http.method == Method::GET {
Expression::IO(IO::Http {
req_template,
group_by: Some(GroupBy::new(http.group_by.clone())),
dl_id: None,
http_filter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter should be optional

})
} else {
Expression::IO(IO::Http { req_template, group_by: None, dl_id: None })
Expression::IO(IO::Http { req_template, group_by: None, dl_id: None, http_filter })
}
})
}
Expand Down
2 changes: 2 additions & 0 deletions src/core/blueprint/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct Upstream {
pub batch: Option<Batch>,
pub http2_only: bool,
pub dedupe: bool,
pub on_request: Option<String>,
}

impl Upstream {
Expand Down Expand Up @@ -82,6 +83,7 @@ impl TryFrom<&ConfigModule> for Upstream {
batch,
http2_only: (config_upstream).get_http_2_only(),
dedupe: (config_upstream).get_dedupe(),
on_request: (config_upstream).get_on_request(),
})
.to_result()
}
Expand Down
5 changes: 5 additions & 0 deletions src/core/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ pub struct Enum {
/// REST API. In this scenario, the GraphQL server will make a GET request to
/// the API endpoint specified when the `users` field is queried.
pub struct Http {
#[serde(rename = "onRequest", default, skip_serializing_if = "is_default")]
/// onRequest field in @http directive gives the ability to specify the
/// request interception handler.
pub on_request: Option<String>,

#[serde(rename = "baseURL", default, skip_serializing_if = "is_default")]
/// This refers to the base URL of the API. If not specified, the default
/// base URL is the one specified in the `@upstream` operator.
Expand Down
9 changes: 9 additions & 0 deletions src/core/config/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pub struct Proxy {
/// upstream server connection. This includes settings like connection timeouts,
/// keep-alive intervals, and more. If not specified, default values are used.
pub struct Upstream {
#[serde(rename = "onRequest", default, skip_serializing_if = "is_default")]
/// onRequest field gives the ability to specify the global request
/// interception handler.
pub on_request: Option<String>,

#[serde(default, skip_serializing_if = "is_default")]
/// `allowedHeaders` defines the HTTP headers allowed to be forwarded to
/// upstream services. If not set, no headers are forwarded, enhancing
Expand Down Expand Up @@ -195,6 +200,10 @@ impl Upstream {
pub fn get_dedupe(&self) -> bool {
self.dedupe.unwrap_or(false)
}

pub fn get_on_request(&self) -> Option<String> {
self.on_request.clone()
}
}

#[cfg(test)]
Expand Down
Loading
Loading