Skip to content

Commit

Permalink
ok
Browse files Browse the repository at this point in the history
  • Loading branch information
dotansimha committed Feb 12, 2024
1 parent 6d43971 commit 14690b9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 84 deletions.
2 changes: 2 additions & 0 deletions plugins/http_caching/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use conductor_common::vrl_utils::VrlConfigReference;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

Expand All @@ -7,6 +8,7 @@ pub struct HttpCachePluginConfig {
pub store_id: String,
#[serde(default = "defualt_max_age")]
pub max_age: u64,
pub session_builder: Option<VrlConfigReference>,
}

fn defualt_max_age() -> u64 {
Expand Down
99 changes: 15 additions & 84 deletions plugins/http_caching/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,21 @@ use crate::config::HttpCachePluginConfig;
use conductor_cache::cache_manager::{CacheManager, CacheStoreProxy};
use conductor_common::{
execute::RequestExecutionContext,
http::{Bytes, ConductorHttpRequest, ConductorHttpResponse, HttpHeadersMap},
http::{ConductorHttpRequest, ConductorHttpResponse},
plugin::{CreatablePlugin, Plugin, PluginError},
};
use hex::encode as hex_encode;
use reqwest::{
header::{CACHE_CONTROL, ETAG, IF_NONE_MATCH},
StatusCode,
};
use sha2::{Digest, Sha256};
use web_time::SystemTime;
#[derive(Debug)]
pub struct HttpCachingPlugin {
store_id: String,
max_age: u64,
config: HttpCachePluginConfig,
store: Option<CacheStoreProxy<ConductorHttpResponse>>,
}

impl HttpCachingPlugin {
pub fn new(config: HttpCachePluginConfig) -> Self {
Self {
store_id: config.store_id,
max_age: config.max_age,
config,
store: None,
}
}
Expand All @@ -36,12 +29,12 @@ impl HttpCachingPlugin {
}

pub fn configure_caching(&mut self, mgr: Arc<CacheManager>) -> Result<(), PluginError> {
if let Some(store) = mgr.get_store(&self.store_id) {
if let Some(store) = mgr.get_store(&self.config.store_id) {
self.store = Some(store);
Ok(())
} else {
Err(PluginError::InitError {
source: anyhow::anyhow!("Cache store not found: {}", self.store_id),
source: anyhow::anyhow!("Cache store not found: {}", self.config.store_id),
})
}
}
Expand All @@ -56,87 +49,25 @@ impl CreatablePlugin for HttpCachingPlugin {
}
}

impl HttpCachingPlugin {
fn build_session_from_request(&self, ctx: &RequestExecutionContext) -> Option<String> {
None
}
}

#[async_trait::async_trait(?Send)]
impl Plugin for HttpCachingPlugin {
async fn on_downstream_http_request(&self, ctx: &mut RequestExecutionContext) {
if let Some(store) = &self.store {
let key = Self::generate_cache_key(&ctx.downstream_http_request);

if let Some(mut cached_response) = store.get(&key).await {
if let Some(cache_timestamp) = cached_response.headers.get("Cache-Timestamp") {
if let Ok(timestamp) = cache_timestamp.to_str() {
if let Ok(cached_time) = timestamp.parse::<u64>() {
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();

if current_time - cached_time > self.max_age {
// cached response is stale, fetch a new one
return;
}
}
}
}

if let Some(etag_header) = ctx.downstream_http_request.headers.get(IF_NONE_MATCH) {
if let Some(cached_etag) = cached_response.headers.get(ETAG) {
if etag_header == cached_etag {
ctx.short_circuit(ConductorHttpResponse {
status: StatusCode::NOT_MODIFIED,
body: Bytes::new(),
headers: HttpHeadersMap::new(),
});
return;
}
}
}

cached_response
.headers
.insert(ETAG, generate_etag(&cached_response.body).parse().unwrap());
cached_response.headers.insert(
CACHE_CONTROL,
format!("max-age={}", self.max_age).parse().unwrap(),
);
ctx.short_circuit(cached_response);
}
let session = self.build_session_from_request(ctx);
// ok
} else {
tracing::warn!(
"Cache store is not configured correctly for http_caching plugin, plugin is skipped"
"Cache store '{}' is not configured correctly for http_caching plugin, plugin is skipped.",
self.config.store_id
);
}
}

fn on_downstream_http_response(
&self,
ctx: &mut RequestExecutionContext,
response: &mut ConductorHttpResponse,
) {
// if !ctx.was_cached {
// let etag = generate_etag(&response.body);
// response.headers.insert(ETAG, etag.parse().unwrap());
// response.headers.insert(
// CACHE_CONTROL,
// format!("max-age={}", self.max_age).parse().unwrap(),
// );

// let current_time = SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .expect("time somehow went backwards")
// .as_secs()
// .to_string();
// response
// .headers
// .insert("Cache-Timestamp", current_time.parse().unwrap());

// let key = Self::generate_cache_key(&ctx.downstream_http_request);
// if let Some(stores) = &ctx.cache_stores {
// stores.cache_response_if_absent(key, response.clone(), &self.store_id);
// }
// }
// }
}
}

fn generate_etag(body: &[u8]) -> String {
Expand Down

0 comments on commit 14690b9

Please sign in to comment.