Skip to content

Commit

Permalink
fix: wrap with filter layer
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Aug 5, 2023
1 parent 802809c commit 3cfc96f
Showing 1 changed file with 138 additions and 21 deletions.
159 changes: 138 additions & 21 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{collections::HashMap, fmt, sync::RwLock};
use tracing::{field::Visit, Subscriber};
use tracing_core::{Field, Metadata};
use tracing_core::{Field, Interest, Metadata};

use opentelemetry::{
metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter},
KeyValue, Value,
};
use tracing_subscriber::{
filter::Filtered,
layer::{Context, Filter},
registry::LookupSpan,
Layer,
Expand Down Expand Up @@ -319,14 +320,16 @@ impl<'a> Visit for MetricVisitor<'a> {
/// its callsite, eliminating the need for any maps.
///
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
pub struct MetricsLayer {
meter: Meter,
instruments: Instruments,
pub struct MetricsLayer<S> {
inner: Filtered<InstrumentLayer, MetricsFilter, S>,
}

impl MetricsLayer {
impl<S> MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
/// Create a new instance of MetricsLayer.
pub fn new<M>(meter_provider: M) -> Self
pub fn new<M>(meter_provider: M) -> MetricsLayer<S>
where
M: MeterProvider,
{
Expand All @@ -337,14 +340,51 @@ impl MetricsLayer {
None,
);

MetricsLayer {
let layer = InstrumentLayer {
meter,
instruments: Default::default(),
};

MetricsLayer {
inner: layer.with_filter(MetricsFilter),
}
}
}

impl<S> Layer<S> for MetricsLayer
struct MetricsFilter;

impl MetricsFilter {
fn has_metrics_fields(&self, meta: &Metadata<'_>) -> bool {
meta.is_event()
&& meta.fields().iter().any(|field| {
let name = field.name();
name.starts_with(METRIC_PREFIX_COUNTER)
|| name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER)
|| name.starts_with(METRIC_PREFIX_HISTOGRAM)
})
}
}

impl<S> Filter<S> for MetricsFilter {
fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
self.has_metrics_fields(meta)
}

fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
if self.has_metrics_fields(meta) {
Interest::always()
} else {
Interest::never()
}
}
}

struct InstrumentLayer {
meter: Meter,
instruments: Instruments,
}

impl<S> Layer<S> for InstrumentLayer
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
Expand All @@ -371,28 +411,105 @@ where
}
}

impl<S> Filter<S> for MetricsLayer {
fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
meta.is_event()
&& meta.fields().iter().any(|field| {
let name = field.name();
name.starts_with(METRIC_PREFIX_COUNTER)
|| name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER)
|| name.starts_with(METRIC_PREFIX_HISTOGRAM)
})
impl<S> Layer<S> for MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_register_dispatch(&self, collector: &tracing_core::Dispatch) {
self.inner.on_register_dispatch(collector)
}

fn on_layer(&mut self, subscriber: &mut S) {
self.inner.on_layer(subscriber)
}

fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
self.inner.register_callsite(metadata)
}

fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
self.inner.enabled(metadata, ctx)
}

fn on_new_span(
&self,
attrs: &tracing_core::span::Attributes<'_>,
id: &tracing_core::span::Id,
ctx: Context<'_, S>,
) {
self.inner.on_new_span(attrs, id, ctx)
}

fn max_level_hint(&self) -> Option<tracing_core::LevelFilter> {
self.inner.max_level_hint()
}

fn on_record(
&self,
span: &tracing_core::span::Id,
values: &tracing_core::span::Record<'_>,
ctx: Context<'_, S>,
) {
self.inner.on_record(span, values, ctx)
}

fn on_follows_from(
&self,
span: &tracing_core::span::Id,
follows: &tracing_core::span::Id,
ctx: Context<'_, S>,
) {
self.inner.on_follows_from(span, follows, ctx)
}

fn event_enabled(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) -> bool {
self.inner.event_enabled(event, ctx)
}

fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) {
self.inner.on_event(event, ctx)
}

fn on_enter(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
self.inner.on_enter(id, ctx)
}

fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
self.inner.on_exit(id, ctx)
}

fn on_close(&self, id: tracing_core::span::Id, ctx: Context<'_, S>) {
self.inner.on_close(id, ctx)
}

fn on_id_change(
&self,
old: &tracing_core::span::Id,
new: &tracing_core::span::Id,
ctx: Context<'_, S>,
) {
self.inner.on_id_change(old, new, ctx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::metrics::noop::NoopMeterProvider;
use tracing_subscriber::layer::SubscriberExt;

struct PanicLayer;
impl<S> Layer<S> for PanicLayer
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_event(&self, _event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
panic!("panic");
}
}

#[test]
fn layer_should_not_panic_on_non_metrics_event() {
let noop = NoopMeterProvider::new();
let layer = MetricsLayer::new(noop);
fn filter_layer_should_filter_non_metrics_event() {
let layer = PanicLayer.with_filter(MetricsFilter);
let subscriber = tracing_subscriber::registry().with(layer);

tracing::subscriber::with_default(subscriber, || {
Expand Down

0 comments on commit 3cfc96f

Please sign in to comment.