Skip to content

Commit

Permalink
Add access logging as requested in linkerd/linkerd2#1913
Browse files Browse the repository at this point in the history
Will write space-delimited access logs to a file specified by the LINKERD2_PROXY_ACCESS_LOG_FILE environment variable in a best-effort fashion

Signed-off-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
tustvold committed Jul 25, 2020
1 parent 95d950f commit 3ce298d
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 5 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock
Expand Up @@ -716,6 +716,22 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"

[[package]]
name = "linkerd2-access-log"
version = "0.1.0"
dependencies = [
"base64 0.10.1",
"bytes 0.5.4",
"futures 0.3.5",
"hex",
"http 0.2.1",
"linkerd2-error",
"pin-project",
"tokio",
"tower",
"tracing",
]

[[package]]
name = "linkerd2-addr"
version = "0.1.0"
Expand All @@ -739,6 +755,7 @@ name = "linkerd2-app"
version = "0.1.0"
dependencies = [
"bytes 0.5.4",
"chrono",
"futures 0.3.5",
"h2 0.2.5",
"http 0.2.1",
Expand All @@ -755,6 +772,7 @@ dependencies = [
"linkerd2-opencensus",
"linkerd2-proxy-api",
"net2",
"pin-project",
"quickcheck",
"regex 1.0.0",
"ring",
Expand Down Expand Up @@ -783,6 +801,7 @@ dependencies = [
"hyper",
"indexmap",
"libc",
"linkerd2-access-log",
"linkerd2-addr",
"linkerd2-admit",
"linkerd2-buffer",
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
@@ -1,6 +1,7 @@
[workspace]
members = [
"hyper-balance",
"linkerd/access-log",
"linkerd/addr",
"linkerd/admit",
"linkerd/app/core",
Expand Down Expand Up @@ -59,7 +60,7 @@ members = [
[profile.dev]
debug = false
[profile.test]
debug = false
#debug = false

[patch.crates-io]
webpki = { git = "https://github.com/linkerd/webpki", branch = "cert-dns-names-0.21" }
Expand Down
18 changes: 18 additions & 0 deletions linkerd/access-log/Cargo.toml
@@ -0,0 +1,18 @@
[package]
name = "linkerd2-access-log"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
edition = "2018"
publish = false

[dependencies]
base64 = "0.10.1"
bytes = "0.5"
futures = "0.3"
hex = "0.3.2"
http = "0.2"
linkerd2-error = { path = "../error" }
tower = { version = "0.3", default-features = false }
tracing = "0.1.2"
tokio = {version = "0.2", features = ["sync"]}
pin-project = "0.4"
122 changes: 122 additions & 0 deletions linkerd/access-log/src/layer.rs
@@ -0,0 +1,122 @@
use crate::AccessLog;
use futures::{ready, TryFuture};
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::SystemTime;
use tokio::sync::mpsc;
use tracing::warn;

/// A layer that adds access logging
#[derive(Clone)]
pub struct AccessLogLayer {
sink: Option<mpsc::Sender<AccessLog>>,
}

#[derive(Clone)]
pub struct AccessLogContext<Svc> {
inner: Svc,
sink: Option<mpsc::Sender<AccessLog>>,
}

#[pin_project]
pub struct ResponseFuture<F> {
state: Option<(AccessLog, mpsc::Sender<AccessLog>)>,

#[pin]
inner: F,
}

impl<Svc> tower::layer::Layer<Svc> for AccessLogLayer {
type Service = AccessLogContext<Svc>;

fn layer(&self, inner: Svc) -> Self::Service {
Self::Service {
inner,
sink: self.sink.clone(),
}
}
}

impl<Svc, B1, B2> tower::Service<http::Request<B1>> for AccessLogContext<Svc>
where
Svc: tower::Service<http::Request<B1>, Response = http::Response<B2>>,
{
type Response = Svc::Response;
type Error = Svc::Error;
type Future = ResponseFuture<Svc::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Svc::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: http::Request<B1>) -> Self::Future {
let sink = match &self.sink {
Some(sink) => sink,
None => {
return ResponseFuture {
state: None,
inner: self.inner.call(request),
}
}
};

let t0 = SystemTime::now();

let host = request.headers().get("Host").map(|x| x.clone());

let trace_id = request
.headers()
.get("X-Amzn-Trace-Id")
.or_else(|| request.headers().get("X-Request-ID"))
.map(|x| x.clone());

let user_agent = request.headers().get("User-Agent").map(|x| x.clone());

let log = AccessLog {
uri: request.uri().clone(),
method: request.method().clone(),
status: Default::default(),
host,
user_agent,
trace_id,
start_time: t0,
end_time: t0,
};

ResponseFuture {
state: Some((log, sink.clone())),
inner: self.inner.call(request),
}
}
}

impl AccessLogLayer {
pub fn new(sink: Option<mpsc::Sender<AccessLog>>) -> Self {
Self { sink }
}
}

impl<F, B2> Future for ResponseFuture<F>
where
F: TryFuture<Ok = http::Response<B2>>,
{
type Output = Result<F::Ok, F::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let response: http::Response<B2> = ready!(this.inner.try_poll(cx))?;

if let Some((mut log, mut sink)) = this.state.take() {
log.end_time = SystemTime::now();
log.status = response.status().clone();

if let Err(error) = sink.try_send(log) {
warn!(message = "access log dropped", %error);
}
}

Poll::Ready(Ok(response))
}
}
32 changes: 32 additions & 0 deletions linkerd/access-log/src/lib.rs
@@ -0,0 +1,32 @@
#![deny(warnings, rust_2018_idioms)]

use linkerd2_error::Error;
use tokio::sync::mpsc;

pub mod layer;

use http::{HeaderValue, Method, StatusCode, Uri};
pub use layer::{AccessLogContext, AccessLogLayer};
use std::time::SystemTime;

#[derive(Debug)]
pub struct AccessLog {
pub uri: Uri,
pub method: Method,
pub status: StatusCode,
pub host: Option<HeaderValue>,
pub user_agent: Option<HeaderValue>,
pub trace_id: Option<HeaderValue>,
pub start_time: SystemTime,
pub end_time: SystemTime,
}

pub trait AccessLogSink {
fn try_send(&mut self, log: AccessLog) -> Result<(), Error>;
}

impl AccessLogSink for mpsc::Sender<AccessLog> {
fn try_send(&mut self, span: AccessLog) -> Result<(), Error> {
self.try_send(span).map_err(Into::into)
}
}
4 changes: 3 additions & 1 deletion linkerd/app/Cargo.toml
Expand Up @@ -14,6 +14,7 @@ This is used by tests and the executable.
mock-orig-dst = ["linkerd2-app-core/mock-orig-dst"]

[dependencies]
chrono = "0.4"
futures = { version = "0.3" }
http-body = "0.3"
indexmap = "1.0"
Expand All @@ -24,8 +25,9 @@ linkerd2-app-inbound = { path = "./inbound" }
linkerd2-app-outbound = { path = "./outbound" }
linkerd2-opencensus = { path = "../opencensus" }
linkerd2-error = { path = "../error" }
pin-project = "0.4"
regex = "1.0.0"
tokio = { version = "0.2", features = ["rt-util"] }
tokio = { version = "0.2", features = ["rt-util", "fs"] }
tonic = { version = "0.2", default-features = false, features = ["prost"] }
tower = "0.3"
tracing = "0.1.9"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Expand Up @@ -22,6 +22,7 @@ http-body = "0.3"
hyper = "0.13"
futures = "0.3"
indexmap = "1.0"
linkerd2-access-log = { path = "../../access-log" }
linkerd2-addr = { path = "../../addr" }
linkerd2-admit = { path = "../../admit" }
linkerd2-cache = { path = "../../cache" }
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/src/lib.rs
Expand Up @@ -9,6 +9,7 @@
#![type_length_limit = "1586225"]
#![deny(warnings, rust_2018_idioms)]

pub use linkerd2_access_log::{AccessLog, AccessLogLayer};
pub use linkerd2_addr::{self as addr, Addr, NameAddr};
pub use linkerd2_admit as admit;
pub use linkerd2_cache as cache;
Expand Down
8 changes: 6 additions & 2 deletions linkerd/app/inbound/src/lib.rs
Expand Up @@ -27,8 +27,8 @@ use linkerd2_app_core::{
spans::SpanConverter,
svc::{self, NewService},
transport::{self, io::BoxedIo, tls},
Error, ProxyMetrics, TraceContextLayer, DST_OVERRIDE_HEADER, L5D_CLIENT_ID, L5D_REMOTE_IP,
L5D_SERVER_ID,
AccessLog, AccessLogLayer, Error, ProxyMetrics, TraceContextLayer, DST_OVERRIDE_HEADER,
L5D_CLIENT_ID, L5D_REMOTE_IP, L5D_SERVER_ID,
};
use std::collections::HashMap;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -61,6 +61,7 @@ impl Config {
tap_layer: tap::Layer,
metrics: ProxyMetrics,
span_sink: Option<mpsc::Sender<oc::Span>>,
access_log_sink: Option<mpsc::Sender<AccessLog>>,
drain: drain::Watch,
) -> Result<(), Error>
where
Expand Down Expand Up @@ -98,6 +99,7 @@ impl Config {
local_identity,
metrics,
span_sink,
access_log_sink,
drain,
)
.await
Expand Down Expand Up @@ -304,6 +306,7 @@ impl Config {
local_identity: tls::Conditional<identity::Local>,
metrics: ProxyMetrics,
span_sink: Option<mpsc::Sender<oc::Span>>,
access_log_sink: Option<mpsc::Sender<AccessLog>>,
drain: drain::Watch,
) -> Result<(), Error>
where
Expand Down Expand Up @@ -363,6 +366,7 @@ impl Config {
.push(errors::layer());

let http_server_observability = svc::layers()
.push(AccessLogLayer::new(access_log_sink))
.push(TraceContextLayer::new(span_sink.map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
Expand Down

0 comments on commit 3ce298d

Please sign in to comment.