From ac8e4e580a1c2403007fa1cab9767feaa30a4857 Mon Sep 17 00:00:00 2001 From: boxdot Date: Sat, 28 Nov 2020 21:14:05 +0100 Subject: [PATCH] Add wasm support to jaeger (#365) --- opentelemetry-jaeger/Cargo.toml | 29 ++ opentelemetry-jaeger/src/collector.rs | 285 ++++++++++++++---- opentelemetry-jaeger/src/lib.rs | 41 ++- opentelemetry-jaeger/src/uploader.rs | 6 +- opentelemetry/Cargo.toml | 3 + opentelemetry/src/api/trace/event.rs | 2 +- opentelemetry/src/api/trace/span.rs | 4 +- opentelemetry/src/exporter/metrics/stdout.rs | 2 +- opentelemetry/src/exporter/trace/mod.rs | 4 +- opentelemetry/src/lib.rs | 14 + .../src/sdk/metrics/aggregators/last_value.rs | 2 +- .../src/sdk/metrics/controllers/pull.rs | 4 +- .../src/sdk/metrics/processors/basic.rs | 8 +- .../src/sdk/trace/id_generator/aws.rs | 8 +- opentelemetry/src/sdk/trace/span.rs | 14 +- opentelemetry/src/sdk/trace/tracer.rs | 3 +- opentelemetry/src/testing/trace.rs | 5 +- 17 files changed, 323 insertions(+), 111 deletions(-) diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index dc023b2c4c..104f20a3c3 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -22,12 +22,41 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] async-std = { version = "1.6", optional = true } async-trait = "0.1" +base64 = { version = "0.13", optional = true } +futures-util = { version = "0.3", optional = true } http = { version = "0.2", optional = true } isahc = { version = "0.9", default-features = false, optional = true } +js-sys = { version = "0.3", optional = true } opentelemetry = { version = "0.10", default-features = false, features = ["trace"], path = "../opentelemetry" } +pin-project = { version = "1.0", optional = true } thrift = "0.13" tokio = { version = "0.2", features = ["udp", "sync"], optional = true } +wasm-bindgen = { version = "0.2", optional = true } +wasm-bindgen-futures = { version = "0.4.18", optional = true } + +[dependencies.web-sys] +version = "0.3.4" +features = [ + 'Headers', + 'Request', + 'RequestCredentials', + 'RequestInit', + 'RequestMode', + 'Response', + 'Window', +] +optional = true [features] default = [] collector_client = ["isahc", "http"] +wasm_collector_client = [ + "base64", + "futures-util", + "http", + "js-sys", + "pin-project", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] diff --git a/opentelemetry-jaeger/src/collector.rs b/opentelemetry-jaeger/src/collector.rs index 70be612ac8..d502fb2683 100644 --- a/opentelemetry-jaeger/src/collector.rs +++ b/opentelemetry-jaeger/src/collector.rs @@ -1,88 +1,247 @@ //! # HTTP Jaeger Collector Client -use crate::thrift::jaeger; -use http::{Request, Uri}; -use isahc::{ - auth::{Authentication, Credentials}, - config::Configurable, - HttpClient, -}; -use std::io::{self, Cursor}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use thrift::protocol::TBinaryOutputProtocol; +use http::Uri; +use std::sync::atomic::AtomicUsize; /// `CollectorAsyncClientHttp` implements an async version of the /// `TCollectorSyncClient` interface over HTTP #[derive(Debug)] pub(crate) struct CollectorAsyncClientHttp { endpoint: Uri, - client: HttpClient, + #[cfg(feature = "collector_client")] + client: isahc::HttpClient, + #[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))] + client: WasmHttpClient, payload_size_estimate: AtomicUsize, } -impl CollectorAsyncClientHttp { - /// Create a new HTTP collector client - pub(crate) fn new( - endpoint: Uri, - username: Option, - password: Option, - ) -> thrift::Result { - let mut builder = HttpClient::builder(); - if let (Some(username), Some(password)) = (username, password) { - builder = builder - .authentication(Authentication::basic()) - .credentials(Credentials::new(username, password)); +#[cfg(feature = "wasm_collector_client")] +#[derive(Debug)] +struct WasmHttpClient { + auth: Option, +} + +#[cfg(feature = "collector_client")] +mod collector_client { + use super::*; + use crate::thrift::jaeger; + use http::{Request, Uri}; + use isahc::{ + auth::{Authentication, Credentials}, + config::Configurable, + HttpClient, + }; + use std::io::{self, Cursor}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use thrift::protocol::TBinaryOutputProtocol; + + impl CollectorAsyncClientHttp { + /// Create a new HTTP collector client + pub(crate) fn new( + endpoint: Uri, + username: Option, + password: Option, + ) -> thrift::Result { + let mut builder = HttpClient::builder(); + if let (Some(username), Some(password)) = (username, password) { + builder = builder + .authentication(Authentication::basic()) + .credentials(Credentials::new(username, password)); + } + let client = builder + .build() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; + let payload_size_estimate = AtomicUsize::new(512); + + Ok(CollectorAsyncClientHttp { + endpoint, + client, + payload_size_estimate, + }) + } + + /// Submit list of Jaeger batches + pub(crate) async fn submit_batch( + &self, + batch: jaeger::Batch, + ) -> thrift::Result { + // estimate transport capacity based on last request + let estimate = self.payload_size_estimate.load(Ordering::Relaxed); + + // Write payload to transport buffer + let transport = Cursor::new(Vec::with_capacity(estimate)); + let mut protocol = TBinaryOutputProtocol::new(transport, true); + batch.write_to_out_protocol(&mut protocol)?; + + // Use current batch capacity as new estimate + self.payload_size_estimate + .store(protocol.transport.get_ref().len(), Ordering::Relaxed); + + // Build collector request + let req = Request::builder() + .method("POST") + .uri(&self.endpoint) + .header("Content-Type", "application/vnd.apache.thrift.binary") + .body(protocol.transport.into_inner()) + .expect("request should always be valid"); + + // Send request to collector + let res = self + .client + .send_async(req) + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; + + if !res.status().is_success() { + return Err(thrift::Error::from(io::Error::new( + io::ErrorKind::Other, + format!("Expected success response, got {:?}", res.status()), + ))); + } + + Ok(jaeger::BatchSubmitResponse { ok: true }) } - let client = builder - .build() - .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; - let payload_size_estimate = AtomicUsize::new(512); - - Ok(CollectorAsyncClientHttp { - endpoint, - client, - payload_size_estimate, - }) } +} + +#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))] +mod wasm_collector_client { + use super::*; + use crate::thrift::jaeger; + use futures_util::future; + use http::Uri; + use js_sys::Uint8Array; + use std::future::Future; + use std::io::{self, Cursor}; + use std::pin::Pin; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::task::{Context, Poll}; + use thrift::protocol::TBinaryOutputProtocol; + use wasm_bindgen::JsCast; + use wasm_bindgen_futures::JsFuture; + use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response}; + + impl CollectorAsyncClientHttp { + /// Create a new HTTP collector client + pub(crate) fn new( + endpoint: Uri, + username: Option, + password: Option, + ) -> thrift::Result { + let auth = if let (Some(username), Some(password)) = (username, password) { + let mut auth = String::from("Basic "); + base64::encode_config_buf(username, base64::STANDARD, &mut auth); + auth.push(':'); + base64::encode_config_buf(password, base64::STANDARD, &mut auth); + Some(auth) + } else { + None + }; + let payload_size_estimate = AtomicUsize::new(512); + + Ok(Self { + endpoint, + client: WasmHttpClient { auth }, + payload_size_estimate, + }) + } - /// Submit list of Jaeger batches - pub(crate) async fn submit_batch( - &self, - batch: jaeger::Batch, - ) -> thrift::Result { - // estimate transport capacity based on last request - let estimate = self.payload_size_estimate.load(Ordering::Relaxed); - - // Write payload to transport buffer - let transport = Cursor::new(Vec::with_capacity(estimate)); - let mut protocol = TBinaryOutputProtocol::new(transport, true); - batch.write_to_out_protocol(&mut protocol)?; - - // Use current batch capacity as new estimate - self.payload_size_estimate - .store(protocol.transport.get_ref().len(), Ordering::Relaxed); - - // Build collector request - let req = Request::builder() - .method("POST") - .uri(&self.endpoint) - .header("Content-Type", "application/vnd.apache.thrift.binary") - .body(protocol.transport.into_inner()) - .expect("request should always be valid"); + /// Submit list of Jaeger batches + pub(crate) fn submit_batch( + &self, + batch: jaeger::Batch, + ) -> impl Future> + Send + 'static + { + self.build_request(batch) + .map(post_request) + .map(|fut| future::Either::Left(SubmitBatchFuture(fut))) + .unwrap_or_else(|e| future::Either::Right(future::err(e))) + } + + fn build_request(&self, batch: jaeger::Batch) -> thrift::Result { + // estimate transport capacity based on last request + let estimate = self.payload_size_estimate.load(Ordering::Relaxed); + + // Write payload to transport buffer + let transport = Cursor::new(Vec::with_capacity(estimate)); + let mut protocol = TBinaryOutputProtocol::new(transport, true); + batch.write_to_out_protocol(&mut protocol)?; + + // Use current batch capacity as new estimate + self.payload_size_estimate + .store(protocol.transport.get_ref().len(), Ordering::Relaxed); + + // Build collector request + let mut options = RequestInit::new(); + options.method("POST"); + options.mode(RequestMode::Cors); + + let body: Uint8Array = protocol.transport.get_ref().as_slice().into(); + options.body(Some(body.as_ref())); + + if self.client.auth.is_some() { + options.credentials(RequestCredentials::Include); + } + + let request = Request::new_with_str_and_init(&self.endpoint.to_string(), &options) + .map_err(jsvalue_into_ioerror)?; + let headers = request.headers(); + headers + .set("Content-Type", "application/vnd.apache.thrift.binary") + .map_err(jsvalue_into_ioerror)?; + if let Some(auth) = self.client.auth.as_ref() { + headers + .set("Authorization", auth) + .map_err(jsvalue_into_ioerror)?; + } + + Ok(request) + } + } + async fn post_request(request: Request) -> thrift::Result { // Send request to collector - let res = self - .client - .send_async(req) + let window = web_sys::window().unwrap(); + let res_value = JsFuture::from(window.fetch_with_request(&request)) .await - .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; + .map_err(jsvalue_into_ioerror)?; + let res: Response = res_value.dyn_into().unwrap(); - if !res.status().is_success() { + if !res.ok() { return Err(thrift::Error::from(io::Error::new( io::ErrorKind::Other, - format!("Expected success response, got {:?}", res.status()), + format!( + "Expected success response, got {} ({})", + res.status(), + res.status_text() + ), ))); } Ok(jaeger::BatchSubmitResponse { ok: true }) } + + /// Wrapper of web fetch API future marked as Send. + /// + /// At the moment, the web APIs are single threaded. Since all opentelemetry futures are + /// required to be Send, we mark this future as Send. + #[pin_project::pin_project] + struct SubmitBatchFuture(#[pin] F); + + unsafe impl Send for SubmitBatchFuture {} + + impl Future for SubmitBatchFuture { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().0.poll(cx) + } + } + + fn jsvalue_into_ioerror(value: wasm_bindgen::JsValue) -> io::Error { + io::Error::new( + io::ErrorKind::Other, + js_sys::JSON::stringify(&value) + .map(String::from) + .unwrap_or_else(|_| "unknown error".to_string()), + ) + } } diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index d9f00acb2c..f7fb1f007a 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -180,7 +180,7 @@ #![cfg_attr(test, deny(warnings))] mod agent; -#[cfg(feature = "collector_client")] +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] mod collector; #[allow(clippy::all, unreachable_pub, dead_code)] #[rustfmt::skip] @@ -192,7 +192,7 @@ mod uploader; use self::thrift::jaeger; use agent::AgentAsyncClientUDP; use async_trait::async_trait; -#[cfg(feature = "collector_client")] +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] use collector::CollectorAsyncClientHttp; use opentelemetry::{ exporter::trace, @@ -288,11 +288,11 @@ impl trace::SpanExporter for Exporter { #[derive(Debug)] pub struct PipelineBuilder { agent_endpoint: Vec, - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] collector_endpoint: Option, - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] collector_username: Option, - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] collector_password: Option, export_instrument_library: bool, process: Process, @@ -304,11 +304,11 @@ impl Default for PipelineBuilder { fn default() -> Self { PipelineBuilder { agent_endpoint: vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()], - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] collector_endpoint: None, - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] collector_username: None, - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] collector_password: None, export_instrument_library: true, process: Process { @@ -354,8 +354,11 @@ impl PipelineBuilder { /// Assign the collector endpoint. /// /// E.g. "http://localhost:14268/api/traces" - #[cfg(feature = "collector_client")] - #[cfg_attr(docsrs, doc(cfg(feature = "collector_client")))] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(cfg(any(feature = "collector_client", feature = "wasm_collector_client"))) + )] pub fn with_collector_endpoint(self, collector_endpoint: T) -> Self where http::Uri: core::convert::TryFrom, @@ -367,8 +370,11 @@ impl PipelineBuilder { } /// Assign the collector username - #[cfg(feature = "collector_client")] - #[cfg_attr(docsrs, doc(cfg(feature = "collector_client")))] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(any(feature = "collector_client", feature = "wasm_collector_client")) + )] pub fn with_collector_username>(self, collector_username: S) -> Self { PipelineBuilder { collector_username: Some(collector_username.into()), @@ -377,8 +383,11 @@ impl PipelineBuilder { } /// Assign the collector password - #[cfg(feature = "collector_client")] - #[cfg_attr(docsrs, doc(cfg(feature = "collector_client")))] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] + #[cfg_attr( + docsrs, + doc(any(feature = "collector_client", feature = "wasm_collector_client")) + )] pub fn with_collector_password>(self, collector_password: S) -> Self { PipelineBuilder { collector_password: Some(collector_password.into()), @@ -449,7 +458,7 @@ impl PipelineBuilder { }) } - #[cfg(not(feature = "collector_client"))] + #[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))] fn init_uploader( self, ) -> Result<(Process, BatchUploader), Box> { @@ -457,7 +466,7 @@ impl PipelineBuilder { Ok((self.process, BatchUploader::Agent(agent))) } - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] fn init_uploader( self, ) -> Result<(Process, uploader::BatchUploader), Box> { diff --git a/opentelemetry-jaeger/src/uploader.rs b/opentelemetry-jaeger/src/uploader.rs index dc4cef7317..a8eb468503 100644 --- a/opentelemetry-jaeger/src/uploader.rs +++ b/opentelemetry-jaeger/src/uploader.rs @@ -1,5 +1,5 @@ //! # Jaeger Span Uploader -#[cfg(feature = "collector_client")] +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] use crate::collector; use crate::{agent, jaeger}; use opentelemetry::exporter::trace; @@ -10,7 +10,7 @@ pub(crate) enum BatchUploader { /// Agent sync client Agent(agent::AgentAsyncClientUDP), /// Collector sync client - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] Collector(collector::CollectorAsyncClientHttp), } @@ -22,7 +22,7 @@ impl BatchUploader { // TODO Implement retry behaviour client.emit_batch(batch).await?; } - #[cfg(feature = "collector_client")] + #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] BatchUploader::Collector(collector) => { // TODO Implement retry behaviour collector.submit_batch(batch).await?; diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 8aa495134e..9cafb15292 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -40,6 +40,9 @@ tonic = { version = "0.3", default-features = false, optional = true } reqwest = { version = "0.10", default-features = false, features = ["blocking"], optional = true } surf = { version = "2.0", default-features = false, optional = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +js-sys = "0.3" + [dev-dependencies] criterion = "0.3.1" rand_distr = "0.3.0" diff --git a/opentelemetry/src/api/trace/event.rs b/opentelemetry/src/api/trace/event.rs index df4a46bacb..4b2dc88f72 100644 --- a/opentelemetry/src/api/trace/event.rs +++ b/opentelemetry/src/api/trace/event.rs @@ -32,7 +32,7 @@ impl Event { pub fn with_name(name: String) -> Self { Event { name, - timestamp: SystemTime::now(), + timestamp: crate::time::now(), attributes: Vec::new(), } } diff --git a/opentelemetry/src/api/trace/span.rs b/opentelemetry/src/api/trace/span.rs index 5f45bf487f..098a99a419 100644 --- a/opentelemetry/src/api/trace/span.rs +++ b/opentelemetry/src/api/trace/span.rs @@ -36,7 +36,7 @@ pub trait Span: fmt::Debug + 'static + Send + Sync { /// keys"](https://github.com/open-telemetry/opentelemetry-specification/tree/v0.5.0/specification/trace/semantic_conventions/README.md) /// which have prescribed semantic meanings. fn add_event(&self, name: String, attributes: Vec) { - self.add_event_with_timestamp(name, SystemTime::now(), attributes) + self.add_event_with_timestamp(name, crate::time::now(), attributes) } /// Convenience method to record an exception/error as an `Event` @@ -154,7 +154,7 @@ pub trait Span: fmt::Debug + 'static + Send + Sync { /// /// This API MUST be non-blocking. fn end(&self) { - self.end_with_timestamp(SystemTime::now()); + self.end_with_timestamp(crate::time::now()); } /// Finishes the `Span` with given timestamp diff --git a/opentelemetry/src/exporter/metrics/stdout.rs b/opentelemetry/src/exporter/metrics/stdout.rs index 8590c4810f..0dc1180cdd 100644 --- a/opentelemetry/src/exporter/metrics/stdout.rs +++ b/opentelemetry/src/exporter/metrics/stdout.rs @@ -125,7 +125,7 @@ where fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> Result<()> { let mut batch = ExportBatch::default(); if !self.do_not_print_time { - batch.timestamp = Some(SystemTime::now()); + batch.timestamp = Some(crate::time::now()); } checkpoint_set.try_for_each(self, &mut |record| { let agg = record.aggregator().ok_or(MetricsError::NoDataCollected)?; diff --git a/opentelemetry/src/exporter/trace/mod.rs b/opentelemetry/src/exporter/trace/mod.rs index 549a448c42..393809d700 100644 --- a/opentelemetry/src/exporter/trace/mod.rs +++ b/opentelemetry/src/exporter/trace/mod.rs @@ -180,8 +180,8 @@ mod tests { let parent_span_id = 1; let span_kind = SpanKind::Client; let name = "foo/bar baz 人?!".to_string(); - let start_time = SystemTime::now(); - let end_time = SystemTime::now(); + let start_time = crate::time::now(); + let end_time = crate::time::now(); let capacity = 3; let attributes = sdk::trace::EvictedHashMap::new(capacity, 0); diff --git a/opentelemetry/src/lib.rs b/opentelemetry/src/lib.rs index b03883dc08..19ea431502 100644 --- a/opentelemetry/src/lib.rs +++ b/opentelemetry/src/lib.rs @@ -180,3 +180,17 @@ pub use api::{ core::{Array, Key, KeyValue, Unit, Value}, propagation, }; + +pub(crate) mod time { + use std::time::SystemTime; + + #[cfg(not(target_arch = "wasm32"))] + pub(crate) fn now() -> SystemTime { + SystemTime::now() + } + + #[cfg(target_arch = "wasm32")] + pub(crate) fn now() -> SystemTime { + SystemTime::UNIX_EPOCH + std::time::Duration::from_millis(js_sys::Date::now() as u64) + } +} diff --git a/opentelemetry/src/sdk/metrics/aggregators/last_value.rs b/opentelemetry/src/sdk/metrics/aggregators/last_value.rs index 0f2a750a03..4d937965e8 100644 --- a/opentelemetry/src/sdk/metrics/aggregators/last_value.rs +++ b/opentelemetry/src/sdk/metrics/aggregators/last_value.rs @@ -22,7 +22,7 @@ impl Aggregator for LastValueAggregator { self.inner.lock().map_err(Into::into).map(|mut inner| { inner.state = Some(LastValueData { value: number.clone(), - timestamp: SystemTime::now(), + timestamp: crate::time::now(), }); }) } diff --git a/opentelemetry/src/sdk/metrics/controllers/pull.rs b/opentelemetry/src/sdk/metrics/controllers/pull.rs index 08466be489..845d7911fa 100644 --- a/opentelemetry/src/sdk/metrics/controllers/pull.rs +++ b/opentelemetry/src/sdk/metrics/controllers/pull.rs @@ -50,7 +50,7 @@ impl PullController { .elapsed() .map_or(true, |elapsed| elapsed > self.period) { - self.last_collect = SystemTime::now(); + self.last_collect = crate::time::now(); self.processor.lock().and_then(|mut checkpointer| { checkpointer.start_collection(); self.accumulator.0.collect(&mut checkpointer); @@ -159,7 +159,7 @@ impl PullControllerBuilder { processor, provider, period: self.cache_period.unwrap_or(DEFAULT_CACHE_DURATION), - last_collect: SystemTime::now(), + last_collect: crate::time::now(), } } } diff --git a/opentelemetry/src/sdk/metrics/processors/basic.rs b/opentelemetry/src/sdk/metrics/processors/basic.rs index c83a104f04..6a9df37fba 100644 --- a/opentelemetry/src/sdk/metrics/processors/basic.rs +++ b/opentelemetry/src/sdk/metrics/processors/basic.rs @@ -188,7 +188,7 @@ impl Checkpointer for BasicLockedProcessor<'_> { } fn finish_collection(&mut self) -> Result<()> { - self.state.interval_end = SystemTime::now(); + self.state.interval_end = crate::time::now(); if self.state.started_collection != self.state.finished_collection.wrapping_add(1) { return Err(MetricsError::InconsistentState); } @@ -291,9 +291,9 @@ impl Default for BasicProcessorState { BasicProcessorState { config: BasicProcessorConfig::default(), values: HashMap::default(), - process_start: SystemTime::now(), - interval_start: SystemTime::now(), - interval_end: SystemTime::now(), + process_start: crate::time::now(), + interval_start: crate::time::now(), + interval_end: crate::time::now(), started_collection: 0, finished_collection: 0, } diff --git a/opentelemetry/src/sdk/trace/id_generator/aws.rs b/opentelemetry/src/sdk/trace/id_generator/aws.rs index a87b0f94c5..65581fa69e 100644 --- a/opentelemetry/src/sdk/trace/id_generator/aws.rs +++ b/opentelemetry/src/sdk/trace/id_generator/aws.rs @@ -1,6 +1,6 @@ use crate::sdk; use crate::trace::{IdGenerator, SpanId, TraceId}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, UNIX_EPOCH}; /// Generates AWS X-Ray compliant Trace and Span ids. /// @@ -53,7 +53,7 @@ impl IdGenerator for XrayIdGenerator { default_trace_id.truncate(24); - let epoch_time_seconds: u64 = SystemTime::now() + let epoch_time_seconds: u64 = crate::time::now() .duration_since(UNIX_EPOCH) .unwrap_or_else(|_| Duration::from_secs(0)) .as_secs(); @@ -74,7 +74,7 @@ mod tests { #[test] fn test_trace_id_generation() { - let before: u64 = SystemTime::now() + let before: u64 = crate::time::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); @@ -84,7 +84,7 @@ mod tests { let trace_id: TraceId = generator.new_trace_id(); sleep(Duration::from_secs(1)); - let after: u64 = SystemTime::now() + let after: u64 = crate::time::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); diff --git a/opentelemetry/src/sdk/trace/span.rs b/opentelemetry/src/sdk/trace/span.rs index 34da759fc4..7625062986 100644 --- a/opentelemetry/src/sdk/trace/span.rs +++ b/opentelemetry/src/sdk/trace/span.rs @@ -159,7 +159,7 @@ impl SpanInner { if let Some(timestamp) = timestamp { span_data.end_time = timestamp; } else if span_data.end_time == span_data.start_time { - span_data.end_time = SystemTime::now(); + span_data.end_time = crate::time::now(); } } @@ -232,8 +232,8 @@ mod tests { parent_span_id: SpanId::from_u64(0), span_kind: api::trace::SpanKind::Internal, name: "opentelemetry".to_string(), - start_time: SystemTime::now(), - end_time: SystemTime::now(), + start_time: crate::time::now(), + end_time: crate::time::now(), attributes: sdk::trace::EvictedHashMap::new(config.max_attributes_per_span, 0), message_events: sdk::trace::EvictedQueue::new(config.max_events_per_span), links: sdk::trace::EvictedQueue::new(config.max_links_per_span), @@ -284,7 +284,7 @@ mod tests { let span = create_span(); let name = "some_event".to_string(); let attributes = vec![KeyValue::new("k", "v")]; - let timestamp = SystemTime::now(); + let timestamp = crate::time::now(); span.add_event_with_timestamp(name.clone(), timestamp, attributes.clone()); span.with_data(|data| { if let Some(event) = data.message_events.iter().next() { @@ -382,7 +382,7 @@ mod tests { #[test] fn end_with_timestamp() { let span = create_span(); - let timestamp = SystemTime::now(); + let timestamp = crate::time::now(); span.end_with_timestamp(timestamp); span.with_data(|data| assert_eq!(data.end_time, timestamp)); } @@ -404,7 +404,7 @@ mod tests { #[test] fn end_only_once() { let span = create_span(); - let timestamp = SystemTime::now(); + let timestamp = crate::time::now(); span.end_with_timestamp(timestamp); span.end_with_timestamp(timestamp.checked_add(Duration::from_secs(10)).unwrap()); span.with_data(|data| assert_eq!(data.end_time, timestamp)); @@ -418,7 +418,7 @@ mod tests { span.add_event("some_event".to_string(), vec![KeyValue::new("k", "v")]); span.add_event_with_timestamp( "some_event".to_string(), - SystemTime::now(), + crate::time::now(), vec![KeyValue::new("k", "v")], ); let err = std::io::Error::from(std::io::ErrorKind::Other); diff --git a/opentelemetry/src/sdk/trace/tracer.rs b/opentelemetry/src/sdk/trace/tracer.rs index e885d0c6c1..1951f45e5e 100644 --- a/opentelemetry/src/sdk/trace/tracer.rs +++ b/opentelemetry/src/sdk/trace/tracer.rs @@ -22,7 +22,6 @@ use crate::trace::{ use crate::{Context, KeyValue}; use std::fmt; use std::sync::Weak; -use std::time::SystemTime; /// `Tracer` implementation to create and manage spans #[derive(Clone)] @@ -246,7 +245,7 @@ impl crate::trace::Tracer for Tracer { if let Some(link_options) = &mut link_options { links.append_vec(link_options); } - let start_time = builder.start_time.unwrap_or_else(SystemTime::now); + let start_time = builder.start_time.unwrap_or_else(crate::time::now); let end_time = builder.end_time.unwrap_or(start_time); let mut message_events = EvictedQueue::new(config.max_events_per_span); if let Some(mut events) = builder.message_events { diff --git a/opentelemetry/src/testing/trace.rs b/opentelemetry/src/testing/trace.rs index da86e7f065..6d519588e2 100644 --- a/opentelemetry/src/testing/trace.rs +++ b/opentelemetry/src/testing/trace.rs @@ -10,7 +10,6 @@ use crate::{ }; use async_trait::async_trait; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::time::SystemTime; #[derive(Debug)] pub struct TestSpan(pub SpanContext); @@ -42,8 +41,8 @@ pub fn new_test_export_span_data() -> exporter::SpanData { parent_span_id: SpanId::from_u64(0), span_kind: SpanKind::Internal, name: "opentelemetry".to_string(), - start_time: SystemTime::now(), - end_time: SystemTime::now(), + start_time: crate::time::now(), + end_time: crate::time::now(), attributes: EvictedHashMap::new(config.max_attributes_per_span, 0), message_events: EvictedQueue::new(config.max_events_per_span), links: EvictedQueue::new(config.max_links_per_span),