Skip to content

Commit

Permalink
feat: json logging
Browse files Browse the repository at this point in the history
  • Loading branch information
talzion12 committed Apr 25, 2023
1 parent 8ff8f66 commit d2352f1
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 71 deletions.
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
RUST_LOG=http_cache=debug,info
RUST_LOG=http_cache=debug,info
LOG_FORMAT=pretty
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ hyper-rustls = { version = "0.24.0", features = ["http2"] }
tokio-util = { version = "0.7.7", features = ["compat"] }
tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] }
tower = { version = "0.4.13", features = ["make", "util"] }
tower-http = "0.4.0"
tower-http = { version = "0.4.0", features = ["trace"] }
tracing = "0.1.37"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
sha2 = "0.10.6"
hex = "0.4.3"
serde = { version = "1.0", features = ["derive"] }
Expand Down
4 changes: 4 additions & 0 deletions charts/http-cache/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ spec:
value: {{ .Values.upstream.url | required "upstream.url is required" | quote }}
- name: CACHE_URL
value: {{ .Values.cache.url | required "cache.url is required" | quote }}
- name: RUST_LOG
value: {{ .Values.log.level | quote }}
- name: LOG_FORMAT
value: {{ .Values.log.format | quote }}

{{ if .Values.gcs.serviceAccountKey }}
- name: GOOGLE_APPLICATION_CREDENTIALS
Expand Down
3 changes: 3 additions & 0 deletions charts/http-cache/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ upstream:
url: null
cache:
url: file:///var/http-cache/fs-cache
log:
level: info
format: json

gcs:
serviceAccountKey: null
Expand Down
59 changes: 31 additions & 28 deletions src/cache/layer.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,67 @@
use std::{
marker::PhantomData,
sync::Arc,
task::{Context, Poll},
};

use futures::{channel::mpsc::channel, future::BoxFuture, FutureExt, SinkExt, StreamExt};
use http::{Request, Response, StatusCode};
use hyper::{body::Bytes, Body};
use hyper::{
body::{Bytes, HttpBody},
Body,
};
use phf::phf_set;
use url::Url;

use crate::cache::metadata::CacheMetadata;

use super::{create_cache_storage_from_url, storage::Cache, GetBody};

pub struct CachingLayer<C: ?Sized> {
pub struct CachingLayer<C: ?Sized, B> {
cache: Arc<C>,
phantom: PhantomData<B>,
}

impl<C: ?Sized> CachingLayer<C> {
impl<C: ?Sized, B> CachingLayer<C, B> {
pub fn new(cache: impl Into<Arc<C>>) -> Self {
Self {
cache: cache.into(),
phantom: PhantomData,
}
}
}

impl CachingLayer<dyn Cache> {
impl<B> CachingLayer<dyn Cache, B> {
pub async fn from_url(url: &Url) -> color_eyre::Result<Self> {
let storage = create_cache_storage_from_url(url).await?;
Ok(Self::new(storage))
}
}

impl<S: Clone, C: Cache + ?Sized> tower::Layer<S> for CachingLayer<C> {
type Service = CachingService<S, C>;
impl<S: Clone, C: Cache + ?Sized, B> tower::Layer<S> for CachingLayer<C, B> {
type Service = CachingService<S, C, B>;

fn layer(&self, inner: S) -> Self::Service {
CachingService {
inner,
cache: self.cache.clone(),
phantom: PhantomData,
}
}
}

pub struct CachingService<S, C: ?Sized> {
pub struct CachingService<S, C: ?Sized, B> {
inner: S,
cache: Arc<C>,
phantom: PhantomData<B>,
}

impl<S, C> CachingService<S, C>
impl<S, C, B> CachingService<S, C, B>
where
S: tower::Service<Request<Body>, Response = Response<Body>, Error = hyper::Error> + Send + Sync,
S: tower::Service<Request<B>, Response = Response<Body>, Error = hyper::Error> + Send + Sync,
C: Cache + 'static + ?Sized,
{
async fn on_request(&mut self, request: Request<Body>) -> Result<Response<Body>, hyper::Error>
where
S: tower::Service<Request<Body>, Response = Response<Body>, Error = hyper::Error>,
C: Cache + 'static,
{
async fn on_request(&mut self, request: Request<B>) -> Result<Response<Body>, hyper::Error> {
let uri = request.uri();
tracing::debug!("Received request for {uri}");
let cache_result = self.cache.get(uri).await;
Expand All @@ -80,7 +84,7 @@ where
metadata: CacheMetadata,
body: GetBody,
) -> Result<Response<Body>, hyper::Error> {
tracing::debug!("Cache hit");
tracing::info!("Cache hit");

let mut builder = Response::builder().status(metadata.status);

Expand All @@ -102,17 +106,14 @@ where
Ok(body)
}

async fn on_cache_miss(
&mut self,
request: Request<Body>,
) -> Result<Response<Body>, hyper::Error> {
tracing::debug!("Cache miss");
async fn on_cache_miss(&mut self, request: Request<B>) -> Result<Response<Body>, hyper::Error> {
tracing::info!("Cache miss");

let uri = request.uri().clone();
let response = self.inner.call(request).await?;

if !response.status().is_success() {
tracing::debug!(
tracing::info!(
"Not caching response because the status code is {}",
response.status()
);
Expand All @@ -136,7 +137,7 @@ where

tokio::spawn(async move {
match cache_cloned.set(&uri, receiver, metadata).await {
Ok(()) => tracing::debug!("Wrote to cache"),
Ok(()) => tracing::info!("Wrote to cache"),
Err(err) => tracing::error!("Failed to write to cache {err:?}"),
}
});
Expand All @@ -150,21 +151,23 @@ where
}
}

impl<S: Clone, C: ?Sized> Clone for CachingService<S, C> {
impl<S: Clone, C: ?Sized, B> Clone for CachingService<S, C, B> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
cache: self.cache.clone(),
phantom: PhantomData,
}
}
}

impl<S, C> tower::Service<Request<Body>> for CachingService<S, C>
impl<S, C, B> tower::Service<Request<B>> for CachingService<S, C, B>
where
S: tower::Service<Request<Body>, Response = Response<Body>, Error = hyper::Error> + Send + Sync,
S: tower::Service<Request<B>, Response = Response<Body>, Error = hyper::Error> + Send + Sync,
S: Clone + 'static,
S::Future: Send,
C: Cache + Send + Sync + 'static + ?Sized,
B: HttpBody + Send + Sync + 'static,
{
type Response = S::Response;
type Error = S::Error;
Expand All @@ -174,8 +177,7 @@ where
self.inner.poll_ready(cx)
}

#[tracing::instrument(skip(self))]
fn call(&mut self, request: Request<Body>) -> Self::Future {
fn call(&mut self, request: Request<B>) -> Self::Future {
let mut c = self.clone();
async move { c.on_request(request).await }.boxed()
}
Expand All @@ -199,5 +201,6 @@ async fn send_part(

static HEADERS_TO_KEEP: phf::Set<&'static str> = phf_set! {
"content-encoding",
"content-type"
"content-type",
"cache-control"
};
80 changes: 41 additions & 39 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
use std::{
io::ErrorKind,
net::{IpAddr, SocketAddr},
};
use std::{io::ErrorKind, net::SocketAddr};

use cache::CachingLayer;
use clap::Parser;
use eyre::Context;
use hyper::{client::HttpConnector, Body, Uri};
use http::Request;
use hyper::{client::HttpConnector, Body};
use hyper_rustls::HttpsConnector;
use proxy::ProxyService;
use tower::{make::Shared, ServiceBuilder};
use tracing_error::ErrorLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use url::Url;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

mod cache;
mod options;
mod proxy;

#[derive(clap::Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[clap(long, env = "UPSTREAM_URL")]
upstream: Uri,

#[clap(long, env = "CACHE_URL")]
cache_url: Url,

#[clap(long, env = "CACHE_URL_2")]
cache_url_2: Option<Url>,

#[clap(long, env = "HOST", default_value = "0.0.0.0")]
host: IpAddr,

#[clap(long, env = "PORT", default_value = "8080")]
port: u16,
}
use options::{LogFormat, Options};

#[tokio::main]
async fn main() -> eyre::Result<()> {
Expand All @@ -48,23 +29,20 @@ async fn main() -> eyre::Result<()> {
Err(error) => return Err(error).context("Failed to load .env"),
};

let args = Args::parse();
let options = Options::parse();

tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env())
.with(ErrorLayer::default())
.init();
init_tracing(&options);

color_eyre::install()?;

let cache_layer = CachingLayer::from_url(&args.cache_url).await?;
let cache_layer = CachingLayer::from_url(&options.cache_url).await?;

let cache_layer_2 = tower::util::option_layer(if let Some(cache_url_2) = &args.cache_url_2 {
Some(CachingLayer::from_url(cache_url_2).await?)
} else {
None
});
let cache_layer_2 =
tower::util::option_layer(if let Some(cache_url_2) = &options.cache_url_2 {
Some(CachingLayer::from_url(cache_url_2).await?)
} else {
None
});

let client = hyper::Client::builder().build(
hyper_rustls::HttpsConnectorBuilder::new()
Expand All @@ -74,15 +52,27 @@ async fn main() -> eyre::Result<()> {
.enable_http2()
.build(),
);
let proxy = ProxyService::<HttpsConnector<HttpConnector>, Body>::new(args.upstream, client);
let proxy = ProxyService::<HttpsConnector<HttpConnector>, Body>::new(options.upstream, client);

let service = ServiceBuilder::new()
.layer(
tower_http::trace::TraceLayer::new_for_http().make_span_with(
|request: &Request<Body>| {
tracing::info_span!(
"http-request",
scheme = request.uri().scheme_str(),
path = request.uri().path(),
query = request.uri().query()
)
},
),
)
.layer(cache_layer_2)
.layer(cache_layer)
.service(proxy);
let make_service = Shared::new(service);

let listen_addr = SocketAddr::new(args.host, args.port);
let listen_addr = SocketAddr::new(options.host, options.port);

tracing::info!("Listening on {listen_addr}");

Expand All @@ -92,3 +82,15 @@ async fn main() -> eyre::Result<()> {

Ok(())
}

fn init_tracing(opts: &Options) {
tracing_subscriber::registry()
.with(match opts.log_format {
LogFormat::Json => tracing_subscriber::fmt::layer().json().boxed(),
LogFormat::Pretty => tracing_subscriber::fmt::layer().pretty().boxed(),
LogFormat::Text => tracing_subscriber::fmt::layer().boxed(),
})
.with(ErrorLayer::default())
.with(EnvFilter::from_default_env())
.init()
}
Loading

0 comments on commit d2352f1

Please sign in to comment.