Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::otel_warn;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_debug,
trace::TraceContextExt,
Context, InstrumentationLibrary,
};
Expand Down Expand Up @@ -126,17 +125,10 @@ impl LoggerProvider {
if errs.is_empty() {
Ok(())
} else {
otel_warn!(
name: "logger_provider_shutdown_error",
error = format!("{:?}", errs)
);
Err(LogError::Other(format!("{:?}", errs).into()))
Err(LogError::Other(format!("{errs:?}").into()))
}
} else {
otel_warn!(
name: "logger_provider_already_shutdown"
);
Err(LogError::Other("logger provider already shut down".into()))
Err(LogError::AlreadyShutdown("LoggerProvider".to_string()))
}
}
}
Expand All @@ -154,6 +146,24 @@ impl LoggerProviderInner {
let mut errs = vec![];
for processor in &self.processors {
if let Err(err) = processor.shutdown() {
// Log at debug level because:
// - The error is also returned to the user for handling (if applicable)
// - Or the error occurs during `LoggerProviderInner::Drop` as part of telemetry shutdown,
// which is non-actionable by the user
match err {
// specific handling for mutex poisioning
LogError::MutexPoisoned(_) => {
otel_debug!(
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
);
}
_ => {
otel_debug!(
name: "LoggerProvider.Drop.ShutdownError",
error = format!("{err}")
);
}
}
errs.push(err);
}
}
Expand All @@ -164,10 +174,11 @@ impl LoggerProviderInner {
impl Drop for LoggerProviderInner {
fn drop(&mut self) {
if !self.is_shutdown.load(Ordering::Relaxed) {
let errs = self.shutdown();
if !errs.is_empty() {
global::handle_error(LogError::Other(format!("{:?}", errs).into()));
}
let _ = self.shutdown(); // errors are handled within shutdown
} else {
otel_debug!(
name: "LoggerProvider.Drop.AlreadyShutdown"
);
}
}
}
Expand Down
90 changes: 34 additions & 56 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use futures_util::{
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_error, otel_warn, InstrumentationLibrary,
otel_debug, otel_error, otel_warn, InstrumentationLibrary,
};

use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -99,26 +98,36 @@ impl LogProcessor for SimpleLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
otel_warn!(
name: "simple_log_processor_emit_after_shutdown"
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
);
return;
}

let result = self
.exporter
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
.and_then(|mut exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
otel_error!(
name: "simple_log_processor_emit_error",
error = format!("{:?}", err)
);
global::handle_error(err);
// Handle errors with specific static names
match result {
Err(LogError::MutexPoisoned(_)) => {
// logging as debug as this is not a user error
otel_debug!(
name: "SimpleLogProcessor.Emit.MutexPoisoning",
);
}
Err(err) => {
otel_error!(
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)
);
}
_ => {}
}
}

Expand All @@ -133,12 +142,7 @@ impl LogProcessor for SimpleLogProcessor {
exporter.shutdown();
Ok(())
} else {
otel_error!(
name: "simple_log_processor_shutdown_error"
);
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
}
}

Expand Down Expand Up @@ -170,12 +174,12 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
instrumentation.clone(),
)));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if let Err(err) = result {
otel_error!(
name: "batch_log_processor_emit_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be triggered when channel is full? If yes, we need to rethink this, as this can spam the log output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this error only triggers when channel is full or closed. We need to add some throttling or logic to prevent flooding - have added the TODO for now, as we need common strategy for such flooding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we need a common strategy, but lets remove the error log from here. It'll flood as-is when buffer is full.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove altogether, or make it otel_debug for now - with comment to change it to otel_error once throttling is ready.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either of them are fine with me, though I slightly prefer removing altogether, as I don't know if we can ship a throttling solution for next release.

error = format!("{}", err)
);
global::handle_error(LogError::Other(err.into()));
}
}

Expand Down Expand Up @@ -243,10 +247,9 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {

if let Err(err) = result {
otel_error!(
name: "batch_log_processor_export_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)
);
global::handle_error(err);
}
}
}
Expand All @@ -261,24 +264,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
.await;

if let Some(channel) = res_channel {
if let Err(result) = channel.send(result) {
global::handle_error(LogError::from(format!(
"failed to send flush result: {:?}",
result
)));
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", result),
message = "Failed to send flush result"
if let Err(send_error) = channel.send(result) {
otel_debug!(
name: "BatchLogProcessor.Flush.SendResultError",
error = format!("{:?}", send_error),
);
}
} else if let Err(err) = result {
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", err),
message = "Flush failed"
);
global::handle_error(err);
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
Expand All @@ -293,21 +284,14 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {

exporter.shutdown();

if let Err(result) = ch.send(result) {
otel_error!(
name: "batch_log_processor_shutdown_error",
error = format!("{:?}", result),
message = "Failed to send shutdown result"
if let Err(send_error) = ch.send(result) {
otel_debug!(
name: "BatchLogProcessor.Shutdown.SendResultError",
error = format!("{:?}", send_error),
);
global::handle_error(LogError::from(format!(
"failed to send batch processor shutdown result: {:?}",
result
)));
}

break;
}

// propagate the resource
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
Expand Down Expand Up @@ -357,13 +341,7 @@ where
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => {
otel_error!(
name: "export_with_timeout_timeout",
timeout_duration = time_out.as_millis()
);
ExportResult::Err(LogError::ExportTimedOut(time_out))
}
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
}
}

Expand Down
16 changes: 14 additions & 2 deletions opentelemetry/src/global/internal_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ macro_rules! otel_warn {
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::warn!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
Expand Down Expand Up @@ -136,7 +142,13 @@ macro_rules! otel_error {
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::error!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub enum LogError {
#[error("Exporter timed out after {} seconds", .0.as_secs())]
ExportTimedOut(Duration),

/// Processor is already shutdown
#[error("{0} already shutdown")]
AlreadyShutdown(String),

/// Mutex lock poisoning
#[error("mutex lock poisioning for {0}")]
MutexPoisoned(String),

/// Other errors propagated from log SDK that weren't covered above.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
Expand Down