Skip to content

Commit

Permalink
Start PeriodicReader worker after provider is up
Browse files Browse the repository at this point in the history
  • Loading branch information
izquierdo committed Jan 26, 2024
1 parent 433c1b3 commit ebb7b7e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 42 deletions.
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
| OTEL_BSP_SCHEDULE_DELAY_MILLIS | OTEL_BSP_SCHEDULE_DELAY |
| OTEL_BSP_EXPORT_TIMEOUT_MILLIS | OTEL_BSP_EXPORT_TIMEOUT |

### Fixed

- [#1481](https://github.com/open-telemetry/opentelemetry-rust/pull/1481) Fix error message caused by race condition when using PeriodicReader

## v0.21.2

### Fixed
Expand Down
118 changes: 76 additions & 42 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
env, fmt,
env, fmt, mem,
sync::{Arc, Mutex, Weak},
time::Duration,
};
Expand Down Expand Up @@ -127,37 +127,39 @@ where
/// Create a [PeriodicReader] with the given config.
pub fn build(self) -> PeriodicReader {
let (message_sender, message_receiver) = mpsc::channel(256);
let ticker = self
.runtime
.interval(self.interval)
.map(|_| Message::Export);

let messages = Box::pin(stream::select(message_receiver, ticker));
let reader = PeriodicReader {
let worker = move |reader: &PeriodicReader| {
let ticker = self
.runtime
.interval(self.interval)
.map(|_| Message::Export);

let messages = Box::pin(stream::select(message_receiver, ticker));

let runtime = self.runtime.clone();
self.runtime.spawn(Box::pin(
PeriodicReaderWorker {
reader: reader.clone(),
timeout: self.timeout,
runtime,
rm: ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
},
}
.run(messages),
));
};

PeriodicReader {
exporter: Arc::new(self.exporter),
inner: Arc::new(Mutex::new(PeriodicReaderInner {
message_sender,
sdk_producer: None,
is_shutdown: false,
external_producers: self.producers,
sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
})),
};

let runtime = self.runtime.clone();
self.runtime.spawn(Box::pin(
PeriodicReaderWorker {
reader: reader.clone(),
timeout: self.timeout,
runtime,
rm: ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
},
}
.run(messages),
));

reader
}
}
}

Expand Down Expand Up @@ -223,9 +225,9 @@ impl fmt::Debug for PeriodicReader {

struct PeriodicReaderInner {
message_sender: mpsc::Sender<Message>,
sdk_producer: Option<Weak<dyn SdkProducer>>,
is_shutdown: bool,
external_producers: Vec<Box<dyn MetricProducer>>,
sdk_producer_or_worker: ProducerOrWorker,
}

#[derive(Debug)]
Expand All @@ -235,6 +237,11 @@ enum Message {
Shutdown(oneshot::Sender<Result<()>>),
}

enum ProducerOrWorker {
Producer(Weak<dyn SdkProducer>),
Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
}

struct PeriodicReaderWorker<RT: Runtime> {
reader: PeriodicReader,
timeout: Duration,
Expand Down Expand Up @@ -311,14 +318,19 @@ impl MetricReader for PeriodicReader {
Err(_) => return,
};

// Only register once. If producer is already set, do nothing.
if inner.sdk_producer.is_none() {
inner.sdk_producer = Some(pipeline);
} else {
global::handle_error(MetricsError::Other(
"duplicate meter registration, did not register manual reader".into(),
))
}
let worker = match &mut inner.sdk_producer_or_worker {
ProducerOrWorker::Producer(_) => {
// Only register once. If producer is already set, do nothing.
global::handle_error(MetricsError::Other(
"duplicate meter registration, did not register manual reader".into(),
));
return;

Check warning on line 327 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L324-L327

Added lines #L324 - L327 were not covered by tests
}
ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
};

inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
worker(self);
}

fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
Expand All @@ -327,14 +339,14 @@ impl MetricReader for PeriodicReader {
return Err(MetricsError::Other("reader is shut down".into()));
}

match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
Some(producer) => producer.produce(rm)?,
None => {
return Err(MetricsError::Other(
"reader is shut down or not registered".into(),
))
}
};
if let Some(producer) = match &inner.sdk_producer_or_worker {
ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
ProducerOrWorker::Worker(_) => None,
} {
producer.produce(rm)?;
} else {
return Err(MetricsError::Other("reader is not registered".into()));
}

let mut errs = vec![];
for producer in &inner.external_producers {
Expand Down Expand Up @@ -392,3 +404,25 @@ impl MetricReader for PeriodicReader {
shutdown_result
}
}

#[cfg(all(test, feature = "testing"))]
mod tests {
use super::PeriodicReader;
use crate::{
metrics::data::ResourceMetrics, metrics::reader::MetricReader, runtime,
testing::metrics::InMemoryMetricsExporter, Resource,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn unregistered_collect() {
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();

let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
let result = reader.collect(&mut rm);
result.expect_err("error expected when reader is not registered");
}
}

0 comments on commit ebb7b7e

Please sign in to comment.