Skip to content

Commit

Permalink
Add access logging as requested in linkerd/linkerd2#1913
Browse files Browse the repository at this point in the history
Signed-off-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
tustvold committed Nov 16, 2020
1 parent 5c7db67 commit e5b3901
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 11 deletions.
64 changes: 58 additions & 6 deletions Cargo.lock
Expand Up @@ -189,13 +189,15 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"

[[package]]
name = "chrono"
version = "0.4.10"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31850b4a4d6bae316f7a09e691c944c28299298837edc0a03f755618c23cbc01"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits 0.2.6",
"time",
"winapi 0.3.8",
]

[[package]]
Expand Down Expand Up @@ -225,6 +227,16 @@ dependencies = [
"build_const",
]

[[package]]
name = "crossbeam-channel"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
dependencies = [
"crossbeam-utils",
"maybe-uninit",
]

[[package]]
name = "crossbeam-utils"
version = "0.7.2"
Expand Down Expand Up @@ -441,7 +453,7 @@ checksum = "e7db7ca94ed4cd01190ceee0d8a8052f08a247aa1b469a7f68c6a3b71afcf407"
dependencies = [
"cfg-if",
"libc",
"wasi",
"wasi 0.7.0",
]

[[package]]
Expand Down Expand Up @@ -727,6 +739,20 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"

[[package]]
name = "linkerd2-access-log"
version = "0.1.0"
dependencies = [
"base64 0.10.1",
"chrono",
"futures 0.3.5",
"http 0.2.1",
"pin-project",
"tower",
"tracing",
"tracing-subscriber",
]

[[package]]
name = "linkerd2-addr"
version = "0.1.0"
Expand Down Expand Up @@ -768,6 +794,7 @@ dependencies = [
"indexmap",
"ipnet 1.0.0",
"libc",
"linkerd2-access-log",
"linkerd2-addr",
"linkerd2-buffer",
"linkerd2-cache",
Expand Down Expand Up @@ -1507,11 +1534,13 @@ dependencies = [
"html-escape",
"http 0.2.1",
"hyper",
"linkerd2-access-log",
"linkerd2-error",
"serde_json",
"tokio-timer",
"tokio-trace",
"tracing",
"tracing-appender",
"tracing-log",
"tracing-subscriber",
]
Expand Down Expand Up @@ -1577,6 +1606,12 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376"

[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"

[[package]]
name = "memchr"
version = "0.1.11"
Expand Down Expand Up @@ -2444,12 +2479,12 @@ dependencies = [

[[package]]
name = "time"
version = "0.1.39"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a15375f1df02096fb3317256ce2cee6a1f42fc84ea5ad5fc8c421cfe40c73098"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"redox_syscall",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.8",
]

Expand Down Expand Up @@ -2697,6 +2732,17 @@ dependencies = [
"tracing-core",
]

[[package]]
name = "tracing-appender"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7aa52d56cc0d79ab604e8a022a1cebc4de33cf09dc9933c94353bea2e00d6e88"
dependencies = [
"chrono",
"crossbeam-channel",
"tracing-subscriber",
]

[[package]]
name = "tracing-attributes"
version = "0.1.10"
Expand Down Expand Up @@ -2895,6 +2941,12 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d"

[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"

[[package]]
name = "wasm-bindgen"
version = "0.2.68"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
@@ -1,6 +1,7 @@
[workspace]
members = [
"hyper-balance",
"linkerd/access-log",
"linkerd/addr",
"linkerd/app/core",
"linkerd/app/gateway",
Expand Down
16 changes: 16 additions & 0 deletions linkerd/access-log/Cargo.toml
@@ -0,0 +1,16 @@
[package]
name = "linkerd2-access-log"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
edition = "2018"
publish = false

[dependencies]
base64 = "0.10.1"
chrono = "0.4.15"
futures = "0.3"
http = "0.2"
tower = { version = "0.3", default-features = false }
tracing = "0.1.19"
tracing-subscriber = "0.2.12"
pin-project = "0.4"
4 changes: 4 additions & 0 deletions linkerd/access-log/src/lib.rs
@@ -0,0 +1,4 @@
#![deny(warnings, rust_2018_idioms)]

pub mod tower;
pub mod tracing;
163 changes: 163 additions & 0 deletions linkerd/access-log/src/tower.rs
@@ -0,0 +1,163 @@
use futures::TryFuture;
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tracing::{field, span, Level, Span};

/// A tower layer that associates a tokio-tracing Span with each request
#[derive(Clone)]
pub struct AccessLogLayer {}

#[derive(Clone)]
pub struct AccessLogContext<Svc> {
inner: Svc,
}

struct ResponseFutureInner {
span: Span,
start: Instant,
processing: Duration,
}

#[pin_project]
pub struct AccessLogFuture<F> {
data: Option<ResponseFutureInner>,

#[pin]
inner: F,
}

impl<Svc> tower::layer::Layer<Svc> for AccessLogLayer {
type Service = AccessLogContext<Svc>;

fn layer(&self, inner: Svc) -> Self::Service {
Self::Service { inner }
}
}

impl<Svc, B1, B2> tower::Service<http::Request<B1>> for AccessLogContext<Svc>
where
Svc: tower::Service<http::Request<B1>, Response = http::Response<B2>>,
{
type Response = Svc::Response;
type Error = Svc::Error;
type Future = AccessLogFuture<Svc::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Svc::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: http::Request<B1>) -> Self::Future {
let span: Span = span!(target: "access_log", Level::TRACE, "http",
timestamp=field::Empty, processing_ns=field::Empty, total_ns=field::Empty,
method=field::Empty, uri=field::Empty, version=field::Empty, user_agent=field::Empty,
host=field::Empty, trace_id=field::Empty, status=field::Empty,
request_bytes=field::Empty, response_bytes=field::Empty);

if span.is_disabled() {
return AccessLogFuture {
data: None,
inner: self.inner.call(request),
};
}

// Delay formatting to avoid an intermediate `String`
let delayed_format = chrono::Utc::now().format_with_items(
[chrono::format::Item::Fixed(chrono::format::Fixed::RFC3339)].iter(),
);

span.record("timestamp", &field::display(&delayed_format));
span.record("method", &request.method().as_str());
span.record("uri", &field::display(&request.uri()));
span.record("version", &field::debug(&request.version()));

request
.headers()
.get("Host")
.and_then(|x| x.to_str().ok())
.map(|x| span.record("host", &x));

request
.headers()
.get("User-Agent")
.and_then(|x| x.to_str().ok())
.map(|x| span.record("user_agent", &x));

request
.headers()
.get("Content-Length")
.and_then(|x| x.to_str().ok())
.map(|x| span.record("request_bytes", &x));

request
.headers()
.get("x-b3-traceid")
.or_else(|| request.headers().get("X-Request-ID"))
.or_else(|| request.headers().get("X-Amzn-Trace-Id"))
.and_then(|x| x.to_str().ok())
.map(|x| span.record("trace_id", &x));

AccessLogFuture {
data: Some(ResponseFutureInner {
span,
start: Instant::now(),
processing: Duration::from_secs(0),
}),
inner: self.inner.call(request),
}
}
}

impl AccessLogLayer {
pub fn new() -> Self {
Self {}
}
}

impl<F, B2> Future for AccessLogFuture<F>
where
F: TryFuture<Ok = http::Response<B2>>,
{
type Output = Result<F::Ok, F::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

let data: &mut ResponseFutureInner = match &mut this.data {
Some(data) => data,
None => return this.inner.try_poll(cx),
};

let _enter = data.span.enter();
let poll_start = Instant::now();

let response: http::Response<B2> = match this.inner.try_poll(cx) {
Poll::Pending => {
data.processing += Instant::now().duration_since(poll_start);
return Poll::Pending;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(response)) => response,
};

let now = Instant::now();
let total_ns = now.duration_since(data.start).as_nanos();
let processing_ns = (now.duration_since(poll_start) + data.processing).as_nanos();

let span = &data.span;

response
.headers()
.get("Content-Length")
.and_then(|x| x.to_str().ok())
.map(|x| span.record("response_bytes", &x));

span.record("status", &response.status().as_u16());
span.record("total_ns", &field::display(total_ns));
span.record("processing_ns", &field::display(processing_ns));

Poll::Ready(Ok(response))
}
}

0 comments on commit e5b3901

Please sign in to comment.