Skip to content

Commit 584e540

Browse files
committed
initial
1 parent 0e221c1 commit 584e540

File tree

2 files changed

+25
-18
lines changed

2 files changed

+25
-18
lines changed

opentelemetry-sdk/src/export/logs/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
use crate::logs::LogRecord;
33
use crate::logs::{LogError, LogResult};
44
use crate::Resource;
5-
use async_trait::async_trait;
65
#[cfg(feature = "spec_unstable_logs_enabled")]
76
use opentelemetry::logs::Severity;
87
use opentelemetry::InstrumentationScope;
@@ -63,7 +62,6 @@ impl LogBatch<'_> {
6362
}
6463

6564
/// `LogExporter` defines the interface that log exporters should implement.
66-
#[async_trait]
6765
pub trait LogExporter: Send + Sync + Debug {
6866
/// Exports a batch of log records and their associated instrumentation scopes.
6967
///
@@ -82,7 +80,10 @@ pub trait LogExporter: Send + Sync + Debug {
8280
/// A `LogResult<()>`, which is a result type indicating either a successful export (with
8381
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
8482
///
85-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>;
83+
fn export<'a>(
84+
&mut self,
85+
batch: LogBatch<'a>,
86+
) -> impl std::future::Future<Output = LogResult<()>> + Send;
8687
/// Shuts down the exporter.
8788
fn shutdown(&mut self) {}
8889
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,21 @@ pub trait LogProcessor: Send + Sync + Debug {
7777
/// debugging and testing. For scenarios requiring higher
7878
/// performance/throughput, consider using [BatchLogProcessor].
7979
#[derive(Debug)]
80-
pub struct SimpleLogProcessor {
81-
exporter: Mutex<Box<dyn LogExporter>>,
80+
pub struct SimpleLogProcessor<T: LogExporter> {
81+
exporter: Mutex<T>,
8282
is_shutdown: AtomicBool,
8383
}
8484

85-
impl SimpleLogProcessor {
86-
pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
85+
impl<T: LogExporter> SimpleLogProcessor<T> {
86+
pub(crate) fn new(exporter: T) -> Self {
8787
SimpleLogProcessor {
8888
exporter: Mutex::new(exporter),
8989
is_shutdown: AtomicBool::new(false),
9090
}
9191
}
9292
}
9393

94-
impl LogProcessor for SimpleLogProcessor {
94+
impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
9595
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
9696
// noop after shutdown
9797
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
@@ -152,19 +152,20 @@ impl LogProcessor for SimpleLogProcessor {
152152

153153
/// A [`LogProcessor`] that asynchronously buffers log records and reports
154154
/// them at a pre-configured interval.
155-
pub struct BatchLogProcessor<R: RuntimeChannel> {
155+
pub struct BatchLogProcessor<T: LogExporter, R: RuntimeChannel> {
156+
exporter: Mutex<T>,
156157
message_sender: R::Sender<BatchMessage>,
157158
}
158159

159-
impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
160+
impl<T: LogExporter, R: RuntimeChannel> Debug for BatchLogProcessor<T, R> {
160161
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
161162
f.debug_struct("BatchLogProcessor")
162163
.field("message_sender", &self.message_sender)
163164
.finish()
164165
}
165166
}
166167

167-
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
168+
impl<T: LogExporter, R: RuntimeChannel> LogProcessor for BatchLogProcessor<T, R> {
168169
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
169170
let result = self.message_sender.try_send(BatchMessage::ExportLog((
170171
record.clone(),
@@ -210,10 +211,11 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
210211
}
211212
}
212213

213-
impl<R: RuntimeChannel> BatchLogProcessor<R> {
214-
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
214+
impl<T: LogExporter, R: RuntimeChannel> BatchLogProcessor<T, R> {
215+
pub(crate) fn new(exporter: T, config: BatchConfig, runtime: R) -> Self {
215216
let (message_sender, message_receiver) =
216217
runtime.batch_message_channel(config.max_queue_size);
218+
let exporter = Arc::new(Mutex::new(exporter));
217219
let inner_runtime = runtime.clone();
218220

219221
// Spawn worker process via user-defined spawn function.
@@ -234,9 +236,10 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
234236
BatchMessage::ExportLog(log) => {
235237
logs.push(log);
236238
if logs.len() == config.max_export_batch_size {
239+
let mut locked_exporter = exporter.lock().unwrap(); // Get the MutexGuard
237240
let result = export_with_timeout(
238241
config.max_export_timeout,
239-
exporter.as_mut(),
242+
&mut *locked_exporter,
240243
&timeout_runtime,
241244
logs.split_off(0),
242245
)
@@ -252,9 +255,10 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
252255
}
253256
// Log batch interval time reached or a force flush has been invoked, export current spans.
254257
BatchMessage::Flush(res_channel) => {
258+
let mut locked_exporter = exporter.lock().unwrap(); // Get the MutexGuard
255259
let result = export_with_timeout(
256260
config.max_export_timeout,
257-
exporter.as_mut(),
261+
&mut *locked_exporter,
258262
&timeout_runtime,
259263
logs.split_off(0),
260264
)
@@ -271,15 +275,16 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
271275
}
272276
// Stream has terminated or processor is shutdown, return to finish execution.
273277
BatchMessage::Shutdown(ch) => {
278+
let mut locked_exporter = exporter.lock().unwrap();
274279
let result = export_with_timeout(
275280
config.max_export_timeout,
276-
exporter.as_mut(),
281+
&mut *locked_exporter,
277282
&timeout_runtime,
278283
logs.split_off(0),
279284
)
280285
.await;
281286

282-
exporter.shutdown();
287+
locked_exporter.shutdown();
283288

284289
if let Err(send_error) = ch.send(result) {
285290
otel_debug!(
@@ -291,7 +296,8 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
291296
}
292297
// propagate the resource
293298
BatchMessage::SetResource(resource) => {
294-
exporter.set_resource(&resource);
299+
let mut locked_exporter = exporter.lock().unwrap();
300+
locked_exporter.set_resource(&resource);
295301
}
296302
}
297303
}

0 commit comments

Comments
 (0)