Skip to content

Commit

Permalink
exporter takes slice to reference to logdata
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed May 23, 2024
1 parent b50fd5f commit 6a444d3
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 33 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export<'a>(&mut self, _: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, _: &'a [&'a LogData]) -> LogResult<()> {
LogResult::Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: &'a [&'a LogData]) -> LogResult<()> {
let client = self
.client
.lock()
Expand All @@ -21,9 +21,9 @@ impl LogExporter for OtlpHttpClient {

//TODO :avoid cloning when logdata is borrowed?
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();
.iter()
.map(|&log_data| log_data.clone()) // Converts Cow to owned LogData
.collect();

let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
let mut request = http::Request::builder()
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl TonicLogsClient {

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: &'a [&'a LogData]) -> LogResult<()> {
// clone if batch is borrowed
let (mut client, metadata, extensions) = match &mut self.inner {
Some(inner) => {
Expand All @@ -69,8 +69,8 @@ impl LogExporter for TonicLogsClient {
// TODO: Avoid cloning when logdata is borrowed?
let resource_logs = {
batch
.into_iter()
.map(|log_data_cow| (log_data_cow.into_owned()))
.iter()
.map(|&log_data| (log_data.clone()))
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
Expand Down
5 changes: 1 addition & 4 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ impl LogExporter {

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export<'a>(
&mut self,
batch: Vec<std::borrow::Cow<'a, LogData>>,
) -> opentelemetry::logs::LogResult<()> {
async fn export<'a>(&mut self, batch: &'a [&'a LogData]) -> opentelemetry::logs::LogResult<()> {
self.client.export(batch).await
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct VoidExporter;

#[async_trait]
impl LogExporter for VoidExporter {
async fn export<'a>(&mut self, _batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, _batch: &'a [&'a LogData]) -> LogResult<()> {
LogResult::Ok(())
}
}
Expand Down
4 changes: 1 addition & 3 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ use opentelemetry::{
logs::{LogError, LogResult},
InstrumentationLibrary,
};
use std::borrow::Cow;
use std::fmt::Debug;

/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
/// Exports a batch of [`LogData`].
//async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
async fn export<'a>(&mut self, batch: &'a [&'a LogData]) -> LogResult<()>;

/// Shuts down the exporter.
fn shutdown(&mut self) {}
Expand Down
21 changes: 10 additions & 11 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use opentelemetry::{
global,
logs::{LogError, LogResult},
};
use std::borrow::Cow;
use std::sync::atomic::AtomicBool;
use std::{cmp::min, env, sync::Mutex};
use std::{
Expand Down Expand Up @@ -100,9 +99,7 @@ impl LogProcessor for SimpleLogProcessor {
.exporter
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.and_then(|mut exporter| {
futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)]))
});
.and_then(|mut exporter| futures_executor::block_on(exporter.export(&[data])));
if let Err(err) = result {
global::handle_error(err);
}
Expand Down Expand Up @@ -215,14 +212,15 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
match message {
// Log has finished, add to buffer of pending logs.
BatchMessage::ExportLog(log) => {
logs.push(Cow::Owned(log));
logs.push(log);

if logs.len() == config.max_export_batch_size {
let log_refs: Vec<&LogData> = logs.iter().collect();
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
&log_refs,
)
.await;

Expand All @@ -233,11 +231,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
// Log batch interval time reached or a force flush has been invoked, export current spans.
BatchMessage::Flush(res_channel) => {
let log_refs: Vec<&LogData> = logs.iter().collect();
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
&log_refs,
)
.await;

Expand All @@ -254,11 +253,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
// Stream has terminated or processor is shutdown, return to finish execution.
BatchMessage::Shutdown(ch) => {
let log_refs: Vec<&LogData> = logs.iter().collect();
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
&log_refs,
)
.await;

Expand Down Expand Up @@ -303,7 +303,7 @@ async fn export_with_timeout<'a, R, E>(
time_out: Duration,
exporter: &mut E,
runtime: &R,
batch: Vec<Cow<'a, LogData>>,
batch: &'a [&'a LogData],
) -> ExportResult
where
R: RuntimeChannel,
Expand Down Expand Up @@ -531,7 +531,6 @@ mod tests {
use opentelemetry::logs::{Logger, LoggerProvider as _};
use opentelemetry::Key;
use opentelemetry::{logs::LogResult, KeyValue};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand All @@ -542,7 +541,7 @@ mod tests {

#[async_trait]
impl LogExporter for MockLogExporter {
async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, _batch: &'a [&'a LogData]) -> LogResult<()> {
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ impl InMemoryLogsExporter {

#[async_trait]
impl LogExporter for InMemoryLogsExporter {
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: &'a [&'a LogData]) -> LogResult<()> {
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
for log in batch.into_iter() {
logs_guard.push(log.into_owned());
for &log in batch.iter() {
logs_guard.push(log.clone());
}
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use opentelemetry::{
};
use opentelemetry_sdk::export::logs::{ExportResult, LogData};
use opentelemetry_sdk::Resource;
use std::borrow::Cow;
use std::io::{stdout, Write};

type Encoder =
Expand Down Expand Up @@ -45,11 +44,11 @@ impl fmt::Debug for LogExporter {
#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
/// Export spans to stdout
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> ExportResult {
async fn export<'a>(&mut self, batch: &'a [&'a LogData]) -> ExportResult {
if let Some(writer) = &mut self.writer {
// TODO - Avoid cloning logdata if it is borrowed.
let log_data = crate::logs::transform::LogData::from((
batch.into_iter().map(Cow::into_owned).collect(),
batch.iter().map(|&log_data| log_data.clone()).collect(),
&self.resource,
));
let result = (self.encoder)(writer, log_data) as LogResult<()>;
Expand Down

0 comments on commit 6a444d3

Please sign in to comment.