diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index caca5a0af7..686de5dde5 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -139,7 +139,6 @@ impl Debug for MetricExporter { } } -#[async_trait] impl PushMetricExporter for MetricExporter { async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { self.client.export(metrics).await diff --git a/opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs b/opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs index 7368fa4f21..047ae81f69 100644 --- a/opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs +++ b/opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs @@ -3,7 +3,7 @@ use crate::test_utils; use anyhow::Result; use anyhow::{Context, Ok}; use opentelemetry_otlp::MetricExporter; -use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::metrics::{MeterProviderBuilder, SdkMeterProvider}; use opentelemetry_sdk::Resource; use serde_json::Value; use std::fs; diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 2d2e55ed7e..861638526d 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -13,7 +13,6 @@ rust-version = "1.75.0" opentelemetry = { version = "0.28", path = "../opentelemetry/" } opentelemetry-http = { version = "0.28", path = "../opentelemetry-http", optional = true } async-std = { workspace = true, features = ["unstable"], optional = true } -async-trait = { workspace = true, optional = true } futures-channel = "0.3" futures-executor = { workspace = true } futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] } @@ -47,7 +46,7 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"] logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] -metrics = ["opentelemetry/metrics", "glob", "async-trait"] +metrics = ["opentelemetry/metrics", "glob"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] experimental_async_runtime = [] rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"] diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 0e2272d49d..5fd4ac40dc 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -14,7 +14,6 @@ use opentelemetry::time::now; use opentelemetry_sdk::error::OTelSdkResult; use std::sync::Mutex; -use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, Logger, LoggerProvider, Severity}; @@ -29,9 +28,8 @@ use std::fmt::Debug; // Run this benchmark with: // cargo bench --bench log_exporter -#[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export(&mut self, batch: LogBatch<'_>); + fn export(&mut self, batch: LogBatch<'_>) -> impl std::future::Future + Send; } pub trait LogExporterWithoutFuture: Send + Sync + Debug { @@ -41,7 +39,6 @@ pub trait LogExporterWithoutFuture: Send + Sync + Debug { #[derive(Debug)] struct NoOpExporterWithFuture {} -#[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { async fn export(&mut self, _batch: LogBatch<'_>) {} } diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index d657c7238b..32a29e44d1 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -1,6 +1,4 @@ //! Interfaces for exporting metrics -use async_trait::async_trait; - use crate::error::OTelSdkResult; use crate::metrics::data::ResourceMetrics; @@ -10,17 +8,19 @@ use super::Temporality; /// Exporter handles the delivery of metric data to external receivers. /// /// This is the final component in the metric push pipeline. -#[async_trait] pub trait PushMetricExporter: Send + Sync + 'static { /// Export serializes and transmits metric data to a receiver. /// /// All retry logic must be contained in this function. The SDK does not /// implement any retry logic. All errors returned by this function are /// considered unrecoverable and will be logged. - async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult; + fn export( + &self, + metrics: &mut ResourceMetrics, + ) -> impl std::future::Future + Send; /// Flushes any metric data held by an exporter. - async fn force_flush(&self) -> OTelSdkResult; + fn force_flush(&self) -> impl std::future::Future + Send; /// Releases any held computational resources. /// diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index eeaf640c45..89574e3219 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -5,7 +5,6 @@ use crate::metrics::exporter::PushMetricExporter; use crate::metrics::MetricError; use crate::metrics::MetricResult; use crate::metrics::Temporality; -use async_trait::async_trait; use std::collections::VecDeque; use std::fmt; use std::sync::{Arc, Mutex}; @@ -263,7 +262,6 @@ impl InMemoryMetricExporter { } } -#[async_trait] impl PushMetricExporter for InMemoryMetricExporter { async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { self.metrics diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index dda7f51fa2..41eba0a2e1 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -59,7 +59,7 @@ where } /// Create a [PeriodicReader] with the given config. - pub fn build(self) -> PeriodicReader { + pub fn build(self) -> PeriodicReader { PeriodicReader::new(self.exporter, self.interval) } } @@ -124,24 +124,25 @@ where /// # drop(reader); /// # } /// ``` -#[derive(Clone)] -pub struct PeriodicReader { - inner: Arc, +pub struct PeriodicReader { + inner: Arc>, } -impl PeriodicReader { +impl Clone for PeriodicReader { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl PeriodicReader { /// Configuration options for a periodic reader with own thread - pub fn builder(exporter: E) -> PeriodicReaderBuilder - where - E: PushMetricExporter, - { + pub fn builder(exporter: E) -> PeriodicReaderBuilder { PeriodicReaderBuilder::new(exporter) } - fn new(exporter: E, interval: Duration) -> Self - where - E: PushMetricExporter, - { + fn new(exporter: E, interval: Duration) -> Self { let (message_sender, message_receiver): (Sender, Receiver) = mpsc::channel(); let exporter_arc = Arc::new(exporter); @@ -333,19 +334,19 @@ impl PeriodicReader { } } -impl fmt::Debug for PeriodicReader { +impl fmt::Debug for PeriodicReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PeriodicReader").finish() } } -struct PeriodicReaderInner { - exporter: Arc, +struct PeriodicReaderInner { + exporter: Arc, message_sender: mpsc::Sender, producer: Mutex>>, } -impl PeriodicReaderInner { +impl PeriodicReaderInner { fn register_pipeline(&self, producer: Weak) { let mut inner = self.producer.lock().expect("lock poisoned"); *inner = Some(producer); @@ -472,7 +473,7 @@ enum Message { Shutdown(Sender), } -impl MetricReader for PeriodicReader { +impl MetricReader for PeriodicReader { fn register_pipeline(&self, pipeline: Weak) { self.inner.register_pipeline(pipeline); } @@ -516,7 +517,6 @@ mod tests { }, Resource, }; - use async_trait::async_trait; use opentelemetry::metrics::MeterProvider; use std::{ sync::{ @@ -548,7 +548,6 @@ mod tests { } } - #[async_trait] impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst { async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult { if self.count.fetch_add(1, Ordering::Relaxed) == 0 { @@ -576,7 +575,6 @@ mod tests { is_shutdown: Arc, } - #[async_trait] impl PushMetricExporter for MockMetricExporter { async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult { Ok(()) diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 9315d38b91..5ba1de731f 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -103,10 +103,10 @@ where } /// Create a [PeriodicReader] with the given config. - pub fn build(self) -> PeriodicReader { + pub fn build(self) -> PeriodicReader { let (message_sender, message_receiver) = mpsc::channel(256); - let worker = move |reader: &PeriodicReader| { + let worker = move |reader: &PeriodicReader| { let runtime = self.runtime.clone(); let reader = reader.clone(); self.runtime.spawn(Box::pin(async move { @@ -184,33 +184,40 @@ where /// # drop(reader); /// # } /// ``` -#[derive(Clone)] -pub struct PeriodicReader { - exporter: Arc, - inner: Arc>, +pub struct PeriodicReader { + exporter: Arc, + inner: Arc>>, } -impl PeriodicReader { +impl Clone for PeriodicReader { + fn clone(&self) -> Self { + Self { + exporter: Arc::clone(&self.exporter), + inner: Arc::clone(&self.inner), + } + } +} + +impl PeriodicReader { /// Configuration options for a periodic reader - pub fn builder(exporter: E, runtime: RT) -> PeriodicReaderBuilder + pub fn builder(exporter: E, runtime: RT) -> PeriodicReaderBuilder where - E: PushMetricExporter, RT: Runtime, { PeriodicReaderBuilder::new(exporter, runtime) } } -impl fmt::Debug for PeriodicReader { +impl fmt::Debug for PeriodicReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PeriodicReader").finish() } } -struct PeriodicReaderInner { +struct PeriodicReaderInner { message_sender: mpsc::Sender, is_shutdown: bool, - sdk_producer_or_worker: ProducerOrWorker, + sdk_producer_or_worker: ProducerOrWorker, } #[derive(Debug)] @@ -220,19 +227,20 @@ enum Message { Shutdown(oneshot::Sender), } -enum ProducerOrWorker { +enum ProducerOrWorker { Producer(Weak), - Worker(Box), + #[allow(clippy::type_complexity)] + Worker(Box) + Send + Sync>), } -struct PeriodicReaderWorker { - reader: PeriodicReader, +struct PeriodicReaderWorker { + reader: PeriodicReader, timeout: Duration, runtime: RT, rm: ResourceMetrics, } -impl PeriodicReaderWorker { +impl PeriodicReaderWorker { async fn collect_and_export(&mut self) -> OTelSdkResult { self.reader .collect(&mut self.rm) @@ -323,7 +331,7 @@ impl PeriodicReaderWorker { } } -impl MetricReader for PeriodicReader { +impl MetricReader for PeriodicReader { fn register_pipeline(&self, pipeline: Weak) { let mut inner = match self.inner.lock() { Ok(guard) => guard, diff --git a/opentelemetry-stdout/Cargo.toml b/opentelemetry-stdout/Cargo.toml index 80b539e642..48ad6bfafd 100644 --- a/opentelemetry-stdout/Cargo.toml +++ b/opentelemetry-stdout/Cargo.toml @@ -22,23 +22,19 @@ rustdoc-args = ["--cfg", "docsrs"] [features] default = ["trace", "metrics", "logs"] trace = ["opentelemetry/trace", "opentelemetry_sdk/trace", "futures-util"] -metrics = ["async-trait", "opentelemetry/metrics", "opentelemetry_sdk/metrics"] -logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "async-trait", "thiserror", "opentelemetry_sdk/spec_unstable_logs_enabled"] +metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"] +logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "opentelemetry_sdk/spec_unstable_logs_enabled"] [dependencies] -async-trait = { workspace = true, optional = true } chrono = { version = "0.4.34", default-features = false, features = ["now"] } -thiserror = { workspace = true, optional = true } futures-util = { workspace = true, optional = true } opentelemetry = { version = "0.28", path = "../opentelemetry" } opentelemetry_sdk = { version = "0.28", path = "../opentelemetry-sdk" } -serde = { workspace = true, features = ["derive"] } [dev-dependencies] opentelemetry = { path = "../opentelemetry", features = ["metrics"] } opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["rt-tokio", "metrics"] } opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"} -opentelemetry-semantic-conventions = { path = "../opentelemetry-semantic-conventions" } tracing = { workspace = true, features = ["std"]} tracing-subscriber = { workspace = true, features = ["registry", "std"] } tokio = { workspace = true, features = ["full"] } diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 005be41d61..daa975c783 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::{f64, fmt}; use opentelemetry_sdk::metrics::Temporality; @@ -39,7 +38,6 @@ impl fmt::Debug for MetricExporter { } } -#[async_trait] impl PushMetricExporter for MetricExporter { /// Write Metrics to stdout async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {