Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl Debug for MetricExporter {
}
}

#[async_trait]
impl PushMetricExporter for MetricExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
self.client.export(metrics).await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::test_utils;
use anyhow::Result;
use anyhow::{Context, Ok};
use opentelemetry_otlp::MetricExporter;
use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::metrics::{MeterProviderBuilder, SdkMeterProvider};
use opentelemetry_sdk::Resource;
use serde_json::Value;
use std::fs;
Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ rust-version = "1.75.0"
opentelemetry = { version = "0.28", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.28", path = "../opentelemetry-http", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
Expand Down Expand Up @@ -47,7 +46,7 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "serde_json"]
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
metrics = ["opentelemetry/metrics", "glob"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
experimental_async_runtime = []
rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"]
Expand Down
5 changes: 1 addition & 4 deletions opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use opentelemetry::time::now;
use opentelemetry_sdk::error::OTelSdkResult;
use std::sync::Mutex;

use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};

use opentelemetry::logs::{LogRecord as _, Logger, LoggerProvider, Severity};
Expand All @@ -29,9 +28,8 @@ use std::fmt::Debug;

// Run this benchmark with:
// cargo bench --bench log_exporter
#[async_trait]
pub trait LogExporterWithFuture: Send + Sync + Debug {
async fn export(&mut self, batch: LogBatch<'_>);
fn export(&mut self, batch: LogBatch<'_>) -> impl std::future::Future<Output = ()> + Send;
}

pub trait LogExporterWithoutFuture: Send + Sync + Debug {
Expand All @@ -41,7 +39,6 @@ pub trait LogExporterWithoutFuture: Send + Sync + Debug {
#[derive(Debug)]
struct NoOpExporterWithFuture {}

#[async_trait]
impl LogExporterWithFuture for NoOpExporterWithFuture {
async fn export(&mut self, _batch: LogBatch<'_>) {}
}
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
//! Interfaces for exporting metrics
use async_trait::async_trait;

use crate::error::OTelSdkResult;

use crate::metrics::data::ResourceMetrics;
Expand All @@ -10,17 +8,19 @@ use super::Temporality;
/// Exporter handles the delivery of metric data to external receivers.
///
/// This is the final component in the metric push pipeline.
#[async_trait]
pub trait PushMetricExporter: Send + Sync + 'static {
/// Export serializes and transmits metric data to a receiver.
///
/// All retry logic must be contained in this function. The SDK does not
/// implement any retry logic. All errors returned by this function are
/// considered unrecoverable and will be logged.
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
fn export(
&self,
metrics: &mut ResourceMetrics,
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Flushes any metric data held by an exporter.
async fn force_flush(&self) -> OTelSdkResult;
fn force_flush(&self) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Releases any held computational resources.
///
Expand Down
2 changes: 0 additions & 2 deletions opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::metrics::exporter::PushMetricExporter;
use crate::metrics::MetricError;
use crate::metrics::MetricResult;
use crate::metrics::Temporality;
use async_trait::async_trait;
use std::collections::VecDeque;
use std::fmt;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -263,7 +262,6 @@ impl InMemoryMetricExporter {
}
}

#[async_trait]
impl PushMetricExporter for InMemoryMetricExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
self.metrics
Expand Down
40 changes: 19 additions & 21 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
}

/// Create a [PeriodicReader] with the given config.
pub fn build(self) -> PeriodicReader {
pub fn build(self) -> PeriodicReader<E> {
PeriodicReader::new(self.exporter, self.interval)
}
}
Expand Down Expand Up @@ -124,24 +124,25 @@ where
/// # drop(reader);
/// # }
/// ```
#[derive(Clone)]
pub struct PeriodicReader {
inner: Arc<PeriodicReaderInner>,
pub struct PeriodicReader<E: PushMetricExporter> {
inner: Arc<PeriodicReaderInner<E>>,
}

impl PeriodicReader {
impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<E: PushMetricExporter> PeriodicReader<E> {
/// Configuration options for a periodic reader with own thread
pub fn builder<E>(exporter: E) -> PeriodicReaderBuilder<E>
where
E: PushMetricExporter,
{
pub fn builder(exporter: E) -> PeriodicReaderBuilder<E> {
PeriodicReaderBuilder::new(exporter)
}

fn new<E>(exporter: E, interval: Duration) -> Self
where
E: PushMetricExporter,
{
fn new(exporter: E, interval: Duration) -> Self {
let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
mpsc::channel();
let exporter_arc = Arc::new(exporter);
Expand Down Expand Up @@ -333,19 +334,19 @@ impl PeriodicReader {
}
}

impl fmt::Debug for PeriodicReader {
impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeriodicReader").finish()
}
}

struct PeriodicReaderInner {
exporter: Arc<dyn PushMetricExporter>,
struct PeriodicReaderInner<E: PushMetricExporter> {
exporter: Arc<E>,
message_sender: mpsc::Sender<Message>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
}

impl PeriodicReaderInner {
impl<E: PushMetricExporter> PeriodicReaderInner<E> {
fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
let mut inner = self.producer.lock().expect("lock poisoned");
*inner = Some(producer);
Expand Down Expand Up @@ -472,7 +473,7 @@ enum Message {
Shutdown(Sender<bool>),
}

impl MetricReader for PeriodicReader {
impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.inner.register_pipeline(pipeline);
}
Expand Down Expand Up @@ -516,7 +517,6 @@ mod tests {
},
Resource,
};
use async_trait::async_trait;
use opentelemetry::metrics::MeterProvider;
use std::{
sync::{
Expand Down Expand Up @@ -548,7 +548,6 @@ mod tests {
}
}

#[async_trait]
impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
Expand Down Expand Up @@ -576,7 +575,6 @@ mod tests {
is_shutdown: Arc<AtomicBool>,
}

#[async_trait]
impl PushMetricExporter for MockMetricExporter {
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ where
}

/// Create a [PeriodicReader] with the given config.
pub fn build(self) -> PeriodicReader {
pub fn build(self) -> PeriodicReader<E> {
let (message_sender, message_receiver) = mpsc::channel(256);

let worker = move |reader: &PeriodicReader| {
let worker = move |reader: &PeriodicReader<E>| {
let runtime = self.runtime.clone();
let reader = reader.clone();
self.runtime.spawn(Box::pin(async move {
Expand Down Expand Up @@ -184,33 +184,40 @@ where
/// # drop(reader);
/// # }
/// ```
#[derive(Clone)]
pub struct PeriodicReader {
exporter: Arc<dyn PushMetricExporter>,
inner: Arc<Mutex<PeriodicReaderInner>>,
pub struct PeriodicReader<E: PushMetricExporter> {
exporter: Arc<E>,
inner: Arc<Mutex<PeriodicReaderInner<E>>>,
}

impl PeriodicReader {
impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
fn clone(&self) -> Self {
Self {
exporter: Arc::clone(&self.exporter),
inner: Arc::clone(&self.inner),
}
}
}

impl<E: PushMetricExporter> PeriodicReader<E> {
/// Configuration options for a periodic reader
pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
pub fn builder<RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
where
E: PushMetricExporter,
RT: Runtime,
{
PeriodicReaderBuilder::new(exporter, runtime)
}
}

impl fmt::Debug for PeriodicReader {
impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeriodicReader").finish()
}
}

struct PeriodicReaderInner {
struct PeriodicReaderInner<E: PushMetricExporter> {
message_sender: mpsc::Sender<Message>,
is_shutdown: bool,
sdk_producer_or_worker: ProducerOrWorker,
sdk_producer_or_worker: ProducerOrWorker<E>,
}

#[derive(Debug)]
Expand All @@ -220,19 +227,20 @@ enum Message {
Shutdown(oneshot::Sender<OTelSdkResult>),
}

enum ProducerOrWorker {
enum ProducerOrWorker<E: PushMetricExporter> {
Producer(Weak<dyn SdkProducer>),
Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
#[allow(clippy::type_complexity)]
Worker(Box<dyn FnOnce(&PeriodicReader<E>) + Send + Sync>),
}

struct PeriodicReaderWorker<RT: Runtime> {
reader: PeriodicReader,
struct PeriodicReaderWorker<E: PushMetricExporter, RT: Runtime> {
reader: PeriodicReader<E>,
timeout: Duration,
runtime: RT,
rm: ResourceMetrics,
}

impl<RT: Runtime> PeriodicReaderWorker<RT> {
impl<E: PushMetricExporter, RT: Runtime> PeriodicReaderWorker<E, RT> {
async fn collect_and_export(&mut self) -> OTelSdkResult {
self.reader
.collect(&mut self.rm)
Expand Down Expand Up @@ -323,7 +331,7 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
}
}

impl MetricReader for PeriodicReader {
impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Expand Down
8 changes: 2 additions & 6 deletions opentelemetry-stdout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = ["trace", "metrics", "logs"]
trace = ["opentelemetry/trace", "opentelemetry_sdk/trace", "futures-util"]
metrics = ["async-trait", "opentelemetry/metrics", "opentelemetry_sdk/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "async-trait", "thiserror", "opentelemetry_sdk/spec_unstable_logs_enabled"]
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "opentelemetry_sdk/spec_unstable_logs_enabled"]

[dependencies]
async-trait = { workspace = true, optional = true }
chrono = { version = "0.4.34", default-features = false, features = ["now"] }
thiserror = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }
opentelemetry = { version = "0.28", path = "../opentelemetry" }
opentelemetry_sdk = { version = "0.28", path = "../opentelemetry-sdk" }
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
opentelemetry = { path = "../opentelemetry", features = ["metrics"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["rt-tokio", "metrics"] }
opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"}
opentelemetry-semantic-conventions = { path = "../opentelemetry-semantic-conventions" }
tracing = { workspace = true, features = ["std"]}
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
tokio = { workspace = true, features = ["full"] }
Expand Down
2 changes: 0 additions & 2 deletions opentelemetry-stdout/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use core::{f64, fmt};
use opentelemetry_sdk::metrics::Temporality;
Expand Down Expand Up @@ -39,7 +38,6 @@ impl fmt::Debug for MetricExporter {
}
}

#[async_trait]
impl PushMetricExporter for MetricExporter {
/// Write Metrics to stdout
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
Expand Down