Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SetResource return status, and call it synchronously in Batch[Log|Span]Processor #1898

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> LogResult<()> {

Check warning on line 60 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L60

Added line #L60 was not covered by tests
self.resource = resource.into();
Ok(())

Check warning on line 62 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L62

Added line #L62 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {

Check warning on line 70 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L70

Added line #L70 was not covered by tests
self.resource = resource.into();
Ok(())

Check warning on line 72 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L72

Added line #L72 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> LogResult<()> {

Check warning on line 94 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L94

Added line #L94 was not covered by tests
self.resource = resource.into();
Ok(())

Check warning on line 96 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L96

Added line #L96 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl SpanExporter for TonicTracesClient {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {
self.resource = resource.into();
Ok(())
}
}
7 changes: 5 additions & 2 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@
self.client.export(batch).await
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
fn set_resource(
&mut self,
resource: &opentelemetry_sdk::Resource,
) -> opentelemetry::logs::LogResult<()> {
self.client.set_resource(resource)

Check warning on line 112 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L108-L112

Added lines #L108 - L112 were not covered by tests
}
}

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
self.0.export(batch)
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {
self.0.set_resource(resource)
}
}
32 changes: 29 additions & 3 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,43 @@
/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
/// Exports a batch of [`LogData`].
/// Exports a batch of readable logs. Protocol exporters that will
/// implement this function are typically expected to serialize and transmit
/// the data to the destination.
///
/// This function will never be called concurrently for the same exporter
/// instance. It can be called again only after the current call returns.
///
/// This function must not block indefinitely, there must be a reasonable
/// upper limit after which the call must time out with an error result.
///
/// Any retry logic that is required by the exporter is the responsibility
/// of the exporter.
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
/// Shuts down the exporter.
/// Shuts down the exporter. Called when SDK is shut down. This is an
/// opportunity for exporter to do any cleanup required.
///
/// This function should be called only once for each `LogExporter`
/// instance. After the call to `shutdown`, subsequent calls to `export` are
/// not allowed and should return an error.
///
/// This function should not block indefinitely (e.g. if it attempts to
/// flush the data and the destination is unavailable). SDK authors
/// can decide if they want to make the shutdown timeout
/// configurable.
fn shutdown(&mut self) {}
#[cfg(feature = "logs_level_enabled")]
/// Chek if logs are enabled.
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
/// This function SHOULD only be called once during the initialization of the exporter.
/// This function SHOULD complete or abort within some timeout. This function SHOULD be
/// implemented as a blocking API
fn set_resource(&mut self, _resource: &Resource) -> LogResult<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does set_resource need to return a Result or impose timeout limit on it? Shouldn't it always be a matter of assigning/copying a vector?

Copy link
Member Author

@lalitb lalitb Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to make assumptions on what the exporter does within set_resource(). The exporter may choose to do some custom serialization for resource within this method, and then use this serialized data with every export. So better to have the status.

Ok(())
}

Check warning on line 53 in opentelemetry-sdk/src/export/logs/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/export/logs/mod.rs#L51-L53

Added lines #L51 - L53 were not covered by tests
}

/// `LogData` represents a single log event without resource context.
Expand Down
7 changes: 6 additions & 1 deletion opentelemetry-sdk/src/export/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ pub trait SpanExporter: Send + Sync + Debug {
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
/// This function SHOULD only be called once during the initialization of the exporter.
/// This function SHOULD complete or abort within some timeout. This function SHOULD be
/// implemented as a blocking API
fn set_resource(&mut self, _resource: &Resource) -> ExportResult {
Ok(())
}
}

/// `SpanData` contains all the information collected by a `Span` and can be used
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@

// invoke set_resource on all the processors
for processor in logger_provider.log_processors() {
processor.set_resource(logger_provider.resource());
if let Err(err) = processor.set_resource(logger_provider.resource()) {
global::handle_error(err);

Check warning on line 210 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L210

Added line #L210 was not covered by tests
}
}
logger_provider
}
Expand Down
53 changes: 35 additions & 18 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;

/// Set the resource for the log processor.
fn set_resource(&self, _resource: &Resource) {}
fn set_resource(&self, _resource: &Resource) -> LogResult<()> {
Ok(())
}
}

/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
Expand Down Expand Up @@ -125,9 +127,13 @@
}
}

fn set_resource(&self, resource: &Resource) {
fn set_resource(&self, resource: &Resource) -> LogResult<()> {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
exporter.set_resource(resource)
} else {
Err(LogError::Other(
"simple logprocessor mutex poison during set_resource".into(),
))

Check warning on line 136 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L134-L136

Added lines #L134 - L136 were not covered by tests
}
}

Expand Down Expand Up @@ -189,11 +195,16 @@
.and_then(std::convert::identity)
}

fn set_resource(&self, resource: &Resource) {
fn set_resource(&self, resource: &Resource) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
self.message_sender
.try_send(BatchMessage::SetResource(resource, res_sender))
.map_err(|err| LogError::Other(err.into()))?;

futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
}

Expand Down Expand Up @@ -275,8 +286,14 @@
}

// propagate the resource
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
BatchMessage::SetResource(resource, res_sender) => {
let result = exporter.set_resource(&resource);
if let Err(result) = res_sender.send(result) {
global::handle_error(LogError::from(format!(
"failed to send set resource result: {:?}",
result
)));

Check warning on line 295 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L292-L295

Added lines #L292 - L295 were not covered by tests
}
}
}
}
Expand Down Expand Up @@ -500,7 +517,7 @@
/// Shut down the worker thread, push all logs in buffer to the backend.
Shutdown(oneshot::Sender<ExportResult>),
/// Set the resource for the exporter.
SetResource(Arc<Resource>),
SetResource(Arc<Resource>, oneshot::Sender<LogResult<()>>),
}

#[cfg(all(test, feature = "testing", feature = "logs"))]
Expand All @@ -527,7 +544,7 @@
use opentelemetry::logs::AnyValue;
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::logs::{Logger, LoggerProvider as _};
use opentelemetry::logs::{LogError, Logger, LoggerProvider as _};
use opentelemetry::Key;
use opentelemetry::{logs::LogResult, KeyValue};
use std::borrow::Cow;
Expand All @@ -547,13 +564,14 @@

fn shutdown(&mut self) {}

fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
fn set_resource(&mut self, resource: &Resource) -> LogResult<()> {
match self.resource.lock() {
Ok(mut res_opt) => {
res_opt.replace(resource.clone());
})
.expect("mock log exporter shouldn't error when setting resource");
Ok(())
}
Err(_) => Err(LogError::Other("mock log exporter mutex poison".into())),

Check warning on line 573 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L573

Added line #L573 was not covered by tests
}
}
}

Expand Down Expand Up @@ -753,7 +771,6 @@
KeyValue::new("k5", "v5"),
]))
.build();
tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking?
assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
let _ = provider.shutdown();
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ impl LogExporter for InMemoryLogsExporter {
}
}

fn set_resource(&mut self, resource: &Resource) {
let mut res_guard = self.resource.lock().expect("Resource lock poisoned");
fn set_resource(&mut self, resource: &Resource) -> LogResult<()> {
let mut res_guard = self.resource.lock().map_err(LogError::from)?;
*res_guard = resource.clone();
Ok(())
}
}
9 changes: 4 additions & 5 deletions opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ impl SpanExporter for InMemorySpanExporter {
self.reset()
}

fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_guard| *res_guard = resource.clone())
.expect("Resource lock poisoned");
fn set_resource(&mut self, resource: &Resource) -> ExportResult {
let mut res_guard = self.resource.lock().map_err(TraceError::from)?;
*res_guard = resource.clone();
Ok(())
}
}
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@

// Set the resource for each processor
for p in &mut processors {
p.set_resource(config.resource.as_ref());
if let Err(err) = p.set_resource(config.resource.as_ref()) {
global::handle_error(err);

Check warning on line 293 in opentelemetry-sdk/src/trace/provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/provider.rs#L293

Added line #L293 was not covered by tests
}
}

TracerProvider::new(TracerProviderInner { processors, config })
Expand Down
36 changes: 25 additions & 11 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@
/// Implementation should make sure shutdown can be called multiple times.
fn shutdown(&self) -> TraceResult<()>;
/// Set the resource for the log processor.
fn set_resource(&mut self, _resource: &Resource) {}
fn set_resource(&mut self, _resource: &Resource) -> TraceResult<()> {
Ok(())
}

Check warning on line 101 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L99-L101

Added lines #L99 - L101 were not covered by tests
}

/// A [SpanProcessor] that passes finished spans to the configured
Expand Down Expand Up @@ -153,9 +155,10 @@
}
}

fn set_resource(&mut self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
fn set_resource(&mut self, resource: &Resource) -> ExportResult {
match self.exporter.lock() {
Ok(mut exporter) => exporter.set_resource(resource),
Err(_) => Err(TraceError::Other("SimpleSpanProcessor mutex poison".into())),

Check warning on line 161 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L161

Added line #L161 was not covered by tests
}
}
}
Expand Down Expand Up @@ -271,11 +274,16 @@
.and_then(|identity| identity)
}

fn set_resource(&mut self, resource: &Resource) {
fn set_resource(&mut self, resource: &Resource) -> ExportResult {
let (res_sender, res_receiver) = oneshot::channel();
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
self.message_sender
.try_send(BatchMessage::SetResource(resource, res_sender))
.map_err(|err| TraceError::Other(err.into()))?;

futures_executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}
}

Expand All @@ -294,7 +302,7 @@
/// Shut down the worker thread, push all spans in buffer to the backend.
Shutdown(oneshot::Sender<ExportResult>),
/// Set the resource for the exporter.
SetResource(Arc<Resource>),
SetResource(Arc<Resource>, oneshot::Sender<ExportResult>),
}

struct BatchSpanProcessorInternal<R> {
Expand Down Expand Up @@ -396,8 +404,14 @@
return false;
}
// propagate the resource
BatchMessage::SetResource(resource) => {
self.exporter.set_resource(&resource);
BatchMessage::SetResource(resource, res_sender) => {
let result = self.exporter.set_resource(&resource);
if let Err(result) = res_sender.send(result) {
global::handle_error(TraceError::from(format!(
"failed to send set resource result: {:?}",
result
)));

Check warning on line 413 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L410-L413

Added lines #L410 - L413 were not covered by tests
}
}
}
true
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@
self.writer.take();
}

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) -> LogResult<()> {

Check warning on line 66 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L66

Added line #L66 was not covered by tests
self.resource = res.clone();
Ok(())

Check warning on line 68 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L68

Added line #L68 was not covered by tests
}
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-stdout/src/trace/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@
self.writer.take();
}

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) -> ExportResult {

Check warning on line 59 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L59

Added line #L59 was not covered by tests
self.resource = res.clone();
Ok(())

Check warning on line 61 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L61

Added line #L61 was not covered by tests
}
}

Expand Down
Loading