diff --git a/Cargo.lock b/Cargo.lock index 2ed5e948..68bafeb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2501,6 +2501,7 @@ dependencies = [ "serde_json", "sha2", "tracing", + "vrl", "web-time", ] diff --git a/plugins/http_caching/Cargo.toml b/plugins/http_caching/Cargo.toml index e5f4663c..efd12fad 100644 --- a/plugins/http_caching/Cargo.toml +++ b/plugins/http_caching/Cargo.toml @@ -8,6 +8,7 @@ path = "src/lib.rs" [dependencies] async-trait = { workspace = true } +vrl = { workspace = true } conductor_common = { path = "../../libs/common" } conductor_cache = { path = "../../libs/cache" } reqwest = { workspace = true } diff --git a/plugins/http_caching/src/plugin.rs b/plugins/http_caching/src/plugin.rs index bf640575..61f0a4dc 100644 --- a/plugins/http_caching/src/plugin.rs +++ b/plugins/http_caching/src/plugin.rs @@ -1,28 +1,25 @@ use std::sync::Arc; use crate::config::HttpCachePluginConfig; +use anyhow::anyhow; use conductor_cache::cache_manager::{CacheManager, CacheStoreProxy}; use conductor_common::{ execute::RequestExecutionContext, http::{ConductorHttpRequest, ConductorHttpResponse}, plugin::{CreatablePlugin, Plugin, PluginError}, + vrl_utils::{conductor_request_to_value, VrlProgramProxy}, }; use hex::encode as hex_encode; use sha2::{Digest, Sha256}; +use vrl::value; #[derive(Debug)] pub struct HttpCachingPlugin { config: HttpCachePluginConfig, + session_builder: Option, store: Option>, } impl HttpCachingPlugin { - pub fn new(config: HttpCachePluginConfig) -> Self { - Self { - config, - store: None, - } - } - fn generate_cache_key(request: &ConductorHttpRequest) -> String { let query_body = String::from_utf8_lossy(&request.body); format!("{}:{}", request.uri, query_body) @@ -45,22 +42,110 @@ impl CreatablePlugin for HttpCachingPlugin { type Config = HttpCachePluginConfig; async fn create(config: Self::Config) -> Result, PluginError> { - Ok(Box::new(Self::new(config))) + let session_builder = match &config.session_builder { + Some(condition) => match condition.program() { + Ok(program) => Some(program), + Err(e) => { + return Err(PluginError::InitError { + source: anyhow::anyhow!("vrl compiler error: {:?}", e), + }) + } + }, + None => None, + }; + + Ok(Box::new(Self { + config, + store: None, + session_builder, + })) } } impl HttpCachingPlugin { - fn build_session_from_request(&self, ctx: &RequestExecutionContext) -> Option { + fn default_session_builder(ctx: &mut RequestExecutionContext) -> String { + "".to_string() + } + + fn build_session_from_request(&self, ctx: &mut RequestExecutionContext) -> Option { + if let Some(session_builder) = &self.session_builder { + let downstream_http_req = conductor_request_to_value(&ctx.downstream_http_request); + + let maybe_session = match session_builder.resolve_with_state( + value::Value::Null, + value!({ + downstream_http_req: downstream_http_req, + }), + ctx.vrl_shared_state(), + ) { + Ok(ret) => { + let t = match ret { + vrl::value::Value::Bytes(v) => String::from_utf8(v.to_vec()).ok(), + _ => { + tracing::error!("HttpCachingPlugin::vrl::session_builder must return a string, but returned a non-string value: {:?}, ignoring...", ret); + + None + } + }; + + t + } + Err(err) => { + tracing::error!( + "HttpCachingPlugin::vrl::session_builder resolve error: {:?}, ignoring", + err + ); + + None + } + }; + + return maybe_session; + } + None } + + fn build_cache_key(ctx: &RequestExecutionContext, session_id: String) -> String { + sha256( + format!( + "{}|{}|{}|{}", + ctx + .downstream_graphql_request + .as_ref() + .map(|v| v.request.operation.clone()) + .unwrap_or_default(), + ctx + .downstream_graphql_request + .as_ref() + .and_then(|v| v.request.operation_name.clone()) + .unwrap_or_default(), + ctx + .downstream_graphql_request + .as_ref() + .map(|v| format!("{:?}", v.request.variables)) + .unwrap_or_default(), + session_id + ) + .as_bytes(), + ) + } } #[async_trait::async_trait(?Send)] impl Plugin for HttpCachingPlugin { async fn on_downstream_http_request(&self, ctx: &mut RequestExecutionContext) { if let Some(store) = &self.store { - let session = self.build_session_from_request(ctx); - // ok + let session_id = self + .build_session_from_request(ctx) + .unwrap_or_else(|| Self::default_session_builder(ctx)); + let cache_key = Self::build_cache_key(ctx, session_id); + + if let Some(record) = store.get(&cache_key).await { + ctx.short_circuit(record); + + return; + } } else { tracing::warn!( "Cache store '{}' is not configured correctly for http_caching plugin, plugin is skipped.", @@ -70,7 +155,7 @@ impl Plugin for HttpCachingPlugin { } } -fn generate_etag(body: &[u8]) -> String { +fn sha256(body: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(body); let result = hasher.finalize();