Skip to content

Commit

Permalink
Add --log-format json (#948)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 8, 2022
1 parent 5fa9075 commit 071a6e2
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 42 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Expand Up @@ -57,7 +57,7 @@ base64 = "0.13.0"
metrics = "0.20.0"
metrics-exporter-prometheus = "0.11.0"
tracing = { version = "0.1.15", features = ["release_max_level_info"] }
tracing-subscriber = { version = "0.3.1", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.1", features = ["env-filter", "json"] }
tracing-appender = "0.2.0"
hyper = { version = "0.14.14", features = ["server"] }
halfbrown = "0.1.11"
Expand Down
3 changes: 3 additions & 0 deletions shotover-proxy/src/lib.rs
Expand Up @@ -18,6 +18,9 @@
//! * [`transforms::TransformsConfig`], the enum to register with (add a variant) for configuring your own transform.

#![allow(clippy::derive_partial_eq_without_eq)]
// Accidentally printing would break json log output
#![deny(clippy::print_stdout)]
#![deny(clippy::print_stderr)]

pub mod codec;
pub mod config;
Expand Down
34 changes: 14 additions & 20 deletions shotover-proxy/src/observability/mod.rs
@@ -1,3 +1,4 @@
use crate::runner::ReloadHandle;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use hyper::{
Expand All @@ -9,27 +10,20 @@ use std::convert::Infallible;
use std::str;
use std::{net::SocketAddr, sync::Arc};
use tracing::{error, trace};
use tracing_subscriber::reload::Handle;
use tracing_subscriber::EnvFilter;

/// Exports metrics over HTTP.
pub struct LogFilterHttpExporter<S> {
pub struct LogFilterHttpExporter {
recorder_handle: PrometheusHandle,
address: SocketAddr,
tracing_handle: Handle<EnvFilter, S>,
tracing_handle: ReloadHandle,
}

/// Sets the `tracing_suscriber` filter level to the value of `bytes` on `handle`
fn set_filter<S>(bytes: Bytes, handle: &Handle<EnvFilter, S>) -> Result<(), String>
where
S: tracing::Subscriber + 'static,
{
let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{e}"))?;
fn set_filter(bytes: Bytes, handle: &ReloadHandle) -> Result<()> {
let body = str::from_utf8(bytes.as_ref())?;
trace!(request.body = ?body);
let new_filter = body
.parse::<tracing_subscriber::filter::EnvFilter>()
.map_err(|e| format!("{e}"))?;
handle.reload(new_filter).map_err(|e| format!("{e}"))
let new_filter = body.parse::<tracing_subscriber::filter::EnvFilter>()?;
handle.reload(new_filter)
}

fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
Expand All @@ -39,17 +33,14 @@ fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
.expect("builder with known status code must not fail")
}

impl<S> LogFilterHttpExporter<S>
where
S: tracing::Subscriber + 'static,
{
impl LogFilterHttpExporter {
/// Creates a new [`LogFilterHttpExporter`] that listens on the given `address`.
///
/// Observers expose their output by being converted into strings.
pub fn new(
recorder_handle: PrometheusHandle,
address: SocketAddr,
tracing_handle: Handle<EnvFilter, S>,
tracing_handle: ReloadHandle,
) -> Self {
LogFilterHttpExporter {
recorder_handle,
Expand Down Expand Up @@ -89,8 +80,11 @@ where
match hyper::body::to_bytes(req).await {
Ok(body) => match set_filter(body, &tracing_handle) {
Err(error) => {
error!(%error, "setting filter failed!");
rsp(StatusCode::INTERNAL_SERVER_ERROR, error)
error!(?error, "setting filter failed!");
rsp(
StatusCode::INTERNAL_SERVER_ERROR,
format!("{:?}", error),
)
}
Ok(()) => rsp(StatusCode::NO_CONTENT, Body::empty()),
},
Expand Down
87 changes: 66 additions & 21 deletions shotover-proxy/src/runner.rs
Expand Up @@ -15,7 +15,11 @@ use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::fmt::format::DefaultFields;
use tracing_subscriber::fmt::format::Format;
use tracing_subscriber::fmt::format::Full;
use tracing_subscriber::fmt::format::Json;
use tracing_subscriber::fmt::format::JsonFields;
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::layer::Layered;
use tracing_subscriber::reload::Handle;
Expand All @@ -35,6 +39,15 @@ pub struct ConfigOpts {
// 2,097,152 = 2 * 1024 * 1024 (2MiB)
#[clap(long, default_value = "2097152")]
pub stack_size: usize,

#[arg(long, value_enum, default_value = "human")]
pub log_format: LogFormat,
}

#[derive(clap::ValueEnum, Clone)]
pub enum LogFormat {
Human,
Json,
}

impl Default for ConfigOpts {
Expand All @@ -44,6 +57,7 @@ impl Default for ConfigOpts {
config_file: "config/config.yaml".into(),
core_threads: 4,
stack_size: 2097152,
log_format: LogFormat::Human,
}
}
}
Expand All @@ -61,7 +75,7 @@ impl Runner {
let config = Config::from_file(params.config_file)?;
let topology = Topology::from_file(params.topology_file)?;

let tracing = TracingState::new(config.main_log_level.as_str())?;
let tracing = TracingState::new(config.main_log_level.as_str(), params.log_format)?;

let (runtime_handle, runtime) = Runner::get_runtime(params.stack_size, params.core_threads);

Expand Down Expand Up @@ -150,13 +164,10 @@ impl Runner {
}
}

type TracingStateHandle =
Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>;

struct TracingState {
/// Once this is dropped tracing logs are ignored
guard: WorkerGuard,
handle: TracingStateHandle,
handle: ReloadHandle,
}

/// Returns a new `EnvFilter` by parsing each directive string, or an error if any directive is invalid.
Expand All @@ -181,28 +192,62 @@ fn try_parse_log_directives(directives: &[Option<&str>]) -> Result<EnvFilter> {
}

impl TracingState {
fn new(log_level: &str) -> Result<Self> {
fn new(log_level: &str, format: LogFormat) -> Result<Self> {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let builder = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter({
// Load log directives from shotover config and then from the RUST_LOG env var, with the latter taking priority.
// In the future we might be able to simplify the implementation if work is done on tokio-rs/tracing#1466.
let overrides = env::var(EnvFilter::DEFAULT_ENV).ok();
try_parse_log_directives(&[Some(log_level), overrides.as_deref()])?
})
.with_filter_reloading();
let handle = builder.reload_handle();

// To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init.
// Currently the implementation of try_init will only fail when it is called multiple times.
builder.try_init().ok();
// Load log directives from shotover config and then from the RUST_LOG env var, with the latter taking priority.
// In the future we might be able to simplify the implementation if work is done on tokio-rs/tracing#1466.
let overrides = env::var(EnvFilter::DEFAULT_ENV).ok();
let env_filter = try_parse_log_directives(&[Some(log_level), overrides.as_deref()])?;

let handle = match format {
LogFormat::Json => {
let builder = tracing_subscriber::fmt()
.json()
.with_writer(non_blocking)
.with_env_filter(env_filter)
.with_filter_reloading();
let handle = ReloadHandle::Json(builder.reload_handle());
// To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init.
// Currently the implementation of try_init will only fail when it is called multiple times.
builder.try_init().ok();
handle
}
LogFormat::Human => {
let builder = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter(env_filter)
.with_filter_reloading();
let handle = ReloadHandle::Human(builder.reload_handle());
builder.try_init().ok();
handle
}
};

Ok(TracingState { guard, handle })
}
}

type Formatter<A, B> = Layered<Layer<Registry, A, Format<B>, NonBlocking>, Registry>;

// TODO: We will be able to remove this and just directly use the handle once tracing 0.2 is released. See:
// * https://github.com/tokio-rs/tracing/pull/1035
// * https://github.com/linkerd/linkerd2-proxy/blob/6c484f6dcdeebda18b68c800b4494263bf98fcdc/linkerd/app/core/src/trace.rs#L19-L36
#[derive(Clone)]
pub enum ReloadHandle {
Json(Handle<EnvFilter, Formatter<JsonFields, Json>>),
Human(Handle<EnvFilter, Formatter<DefaultFields, Full>>),
}

impl ReloadHandle {
pub fn reload(&self, filter: EnvFilter) -> Result<()> {
match self {
ReloadHandle::Json(handle) => handle.reload(filter).map_err(|e| anyhow!(e)),
ReloadHandle::Human(handle) => handle.reload(filter).map_err(|e| anyhow!(e)),
}
}
}

pub struct RunnerSpawned {
pub runtime: Option<Runtime>,
pub runtime_handle: RuntimeHandle,
Expand Down

0 comments on commit 071a6e2

Please sign in to comment.