diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index fcb26673c7..5295ae9ee0 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -19,6 +19,8 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using - Fixing the OTLP HTTP/JSON exporter. [#1882](https://github.com/open-telemetry/opentelemetry-rust/pull/1882) - The exporter was broken in the previous release. - **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated. +- Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873). + ## v0.16.0 diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 3e59ffdd8e..396dec680d 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -25,7 +25,7 @@ impl LogExporter for OtlpHttpClient { .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData .collect::>(); - let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? }; + let (body, content_type) = { self.build_logs_export_body(owned_batch)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 4a45379121..2fa3ff851b 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -9,7 +9,10 @@ use crate::{ use http::{HeaderName, HeaderValue, Uri}; use opentelemetry_http::HttpClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; - +#[cfg(feature = "logs")] +use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; +#[cfg(feature = "trace")] +use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; #[cfg(feature = "logs")] use opentelemetry_sdk::export::logs::LogData; #[cfg(feature = "trace")] @@ -307,16 +310,9 @@ impl OtlpHttpClient { fn build_trace_export_body( &self, spans: Vec, - resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> opentelemetry::trace::TraceResult<(Vec, &'static str)> { - use opentelemetry_proto::tonic::{ - collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans, - }; - - let resource_spans = spans - .into_iter() - .map(|span| ResourceSpans::new(span, resource)) - .collect::>(); + use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; + let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource); let req = ExportTraceServiceRequest { resource_spans }; match self.protocol { @@ -333,13 +329,9 @@ impl OtlpHttpClient { fn build_logs_export_body( &self, logs: Vec, - resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; - let resource_logs = logs - .into_iter() - .map(|log_event| (log_event, resource).into()) - .collect::>(); + let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); let req = ExportLogsServiceRequest { resource_logs }; match self.protocol { diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 8d6c3116cd..cc7894c266 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient { Err(err) => return Box::pin(std::future::ready(Err(err))), }; - let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) { + let (body, content_type) = match self.build_trace_export_body(batch) { Ok(body) => body, Err(e) => return Box::pin(std::future::ready(Err(e))), }; diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 6cefd611ff..b529eda511 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -7,6 +7,8 @@ use opentelemetry_proto::tonic::collector::logs::v1::{ use opentelemetry_sdk::export::logs::{LogData, LogExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; +use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; + use super::BoxInterceptor; pub(crate) struct TonicLogsClient { @@ -65,15 +67,13 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; - // TODO: Avoid cloning here. - let resource_logs = { - batch - .into_iter() - .map(|log_data_cow| (log_data_cow.into_owned())) - .map(|log_data| (log_data, &self.resource)) - .map(Into::into) - .collect() - }; + //TODO: avoid cloning here. + let owned_batch = batch + .into_iter() + .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData + .collect::>(); + + let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource); client .export(Request::from_parts( diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index a0dbe0e76b..a4d12ebde7 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -5,10 +5,11 @@ use opentelemetry::trace::TraceError; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; -use opentelemetry_proto::tonic::trace::v1::ResourceSpans; use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; +use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; + use super::BoxInterceptor; pub(crate) struct TonicTracesClient { @@ -71,13 +72,7 @@ impl SpanExporter for TonicTracesClient { } }; - // TODO: Avoid cloning here. - let resource_spans = { - batch - .into_iter() - .map(|log_data| ResourceSpans::new(log_data, &self.resource)) - .collect() - }; + let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource); Box::pin(async move { client diff --git a/opentelemetry-proto/CHANGELOG.md b/opentelemetry-proto/CHANGELOG.md index afcccc6ea4..666dd30571 100644 --- a/opentelemetry-proto/CHANGELOG.md +++ b/opentelemetry-proto/CHANGELOG.md @@ -3,6 +3,8 @@ ## vNext - Bump MSRV to 1.70 [1864](https://github.com/open-telemetry/opentelemetry-rust/pull/1874) +- Group log and Span batch by their resource and instrumentation scope before exporting [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873). + - Introduced `group_logs_by_resource_and_scope()` and `group_spans_by_resource_and_scope()` methods to group logs and spans by the resource and scope respectively. ## v0.6.0 diff --git a/opentelemetry-proto/Cargo.toml b/opentelemetry-proto/Cargo.toml index 626542884d..a462812e77 100644 --- a/opentelemetry-proto/Cargo.toml +++ b/opentelemetry-proto/Cargo.toml @@ -29,7 +29,7 @@ path = "tests/json_deserialize.rs" [features] -default = [] +default = ["full"] full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"] @@ -42,6 +42,7 @@ trace = ["opentelemetry/trace", "opentelemetry_sdk/trace"] metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"] logs = ["opentelemetry/logs", "opentelemetry_sdk/logs"] zpages = ["trace"] +testing = ["opentelemetry/testing"] # add ons with-schemars = ["schemars"] @@ -57,7 +58,8 @@ serde = { workspace = true, optional = true, features = ["serde_derive"] } hex = { version = "0.4.3", optional = true } [dev-dependencies] +opentelemetry = { version = "0.23", features = ["testing"], path = "../opentelemetry" } tonic-build = { workspace = true } prost-build = { workspace = true } tempfile = "3.3.0" -serde_json = { workspace = true } \ No newline at end of file +serde_json = { workspace = true } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 88d296e1d0..0a672d3d51 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -2,7 +2,10 @@ pub mod tonic { use crate::{ tonic::{ - common::v1::{any_value::Value, AnyValue, ArrayValue, KeyValue, KeyValueList}, + common::v1::{ + any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, + KeyValueList, + }, logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}, resource::v1::Resource, Attributes, @@ -10,6 +13,8 @@ pub mod tonic { transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; + use std::borrow::Cow; + use std::collections::HashMap; impl From for AnyValue { fn from(value: LogsAnyValue) -> Self { @@ -143,4 +148,123 @@ pub mod tonic { } } } + + pub fn group_logs_by_resource_and_scope( + logs: Vec, + resource: &ResourceAttributesWithSchema, + ) -> Vec { + // Group logs by target or instrumentation name + let scope_map = logs.iter().fold( + HashMap::new(), + |mut scope_map: HashMap< + Cow<'static, str>, + Vec<&opentelemetry_sdk::export::logs::LogData>, + >, + log| { + let key = log + .record + .target + .clone() + .unwrap_or_else(|| log.instrumentation.name.clone()); + scope_map.entry(key).or_default().push(log); + scope_map + }, + ); + + let scope_logs = scope_map + .into_iter() + .map(|(key, log_data)| ScopeLogs { + scope: Some(InstrumentationScope::from(( + &log_data.first().unwrap().instrumentation, + Some(key), + ))), + schema_url: resource.schema_url.clone().unwrap_or_default(), + log_records: log_data + .into_iter() + .map(|log_data| log_data.record.clone().into()) + .collect(), + }) + .collect(); + + vec![ResourceLogs { + resource: Some(Resource { + attributes: resource.attributes.0.clone(), + dropped_attributes_count: 0, + }), + scope_logs, + schema_url: resource.schema_url.clone().unwrap_or_default(), + }] + } +} + +#[cfg(test)] +mod tests { + use crate::transform::common::tonic::ResourceAttributesWithSchema; + use opentelemetry::logs::LogRecord as _; + use opentelemetry_sdk::export::logs::LogData; + use opentelemetry_sdk::{logs::LogRecord, Resource}; + use std::time::SystemTime; + + fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData { + let mut logrecord = LogRecord::default(); + logrecord.set_timestamp(SystemTime::now()); + logrecord.set_observed_timestamp(SystemTime::now()); + LogData { + instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder( + instrumentation_name.to_string(), + ) + .build(), + record: logrecord, + } + } + + #[test] + fn test_group_logs_by_resource_and_scope_single_scope() { + let resource = Resource::default(); + let log1 = create_test_log_data("test-lib", "Log 1"); + let log2 = create_test_log_data("test-lib", "Log 2"); + + let logs = vec![log1, log2]; + let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + + let grouped_logs = + crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); + + assert_eq!(grouped_logs.len(), 1); + let resource_logs = &grouped_logs[0]; + assert_eq!(resource_logs.scope_logs.len(), 1); + + let scope_logs = &resource_logs.scope_logs[0]; + assert_eq!(scope_logs.log_records.len(), 2); + } + + #[test] + fn test_group_logs_by_resource_and_scope_multiple_scopes() { + let resource = Resource::default(); + let log1 = create_test_log_data("lib1", "Log 1"); + let log2 = create_test_log_data("lib2", "Log 2"); + + let logs = vec![log1, log2]; + let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + let grouped_logs = + crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); + + assert_eq!(grouped_logs.len(), 1); + let resource_logs = &grouped_logs[0]; + assert_eq!(resource_logs.scope_logs.len(), 2); + + let scope_logs_1 = &resource_logs + .scope_logs + .iter() + .find(|scope| scope.scope.as_ref().unwrap().name == "lib1") + .unwrap(); + let scope_logs_2 = &resource_logs + .scope_logs + .iter() + .find(|scope| scope.scope.as_ref().unwrap().name == "lib2") + .unwrap(); + + assert_eq!(scope_logs_1.log_records.len(), 1); + assert_eq!(scope_logs_2.log_records.len(), 1); + } } diff --git a/opentelemetry-proto/src/transform/trace.rs b/opentelemetry-proto/src/transform/trace.rs index 5c534301e3..12871b33aa 100644 --- a/opentelemetry-proto/src/transform/trace.rs +++ b/opentelemetry-proto/src/transform/trace.rs @@ -9,6 +9,7 @@ pub mod tonic { use opentelemetry::trace; use opentelemetry::trace::{Link, SpanId, SpanKind}; use opentelemetry_sdk::export::trace::SpanData; + use std::collections::HashMap; impl From for span::SpanKind { fn from(span_kind: SpanKind) -> Self { @@ -44,6 +45,50 @@ pub mod tonic { } } } + impl From for Span { + fn from(source_span: opentelemetry_sdk::export::trace::SpanData) -> Self { + let span_kind: span::SpanKind = source_span.span_kind.into(); + Span { + trace_id: source_span.span_context.trace_id().to_bytes().to_vec(), + span_id: source_span.span_context.span_id().to_bytes().to_vec(), + trace_state: source_span.span_context.trace_state().header(), + parent_span_id: { + if source_span.parent_span_id != SpanId::INVALID { + source_span.parent_span_id.to_bytes().to_vec() + } else { + vec![] + } + }, + flags: source_span.span_context.trace_flags().to_u8() as u32, + name: source_span.name.into_owned(), + kind: span_kind as i32, + start_time_unix_nano: to_nanos(source_span.start_time), + end_time_unix_nano: to_nanos(source_span.end_time), + dropped_attributes_count: source_span.dropped_attributes_count, + attributes: Attributes::from(source_span.attributes).0, + dropped_events_count: source_span.events.dropped_count, + events: source_span + .events + .into_iter() + .map(|event| span::Event { + time_unix_nano: to_nanos(event.timestamp), + name: event.name.into(), + attributes: Attributes::from(event.attributes).0, + dropped_attributes_count: event.dropped_attributes_count, + }) + .collect(), + dropped_links_count: source_span.links.dropped_count, + links: source_span.links.into_iter().map(Into::into).collect(), + status: Some(Status { + code: status::StatusCode::from(&source_span.status).into(), + message: match source_span.status { + trace::Status::Error { description } => description.to_string(), + _ => Default::default(), + }, + }), + } + } + } impl ResourceSpans { pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self { @@ -105,4 +150,201 @@ pub mod tonic { } } } + + pub fn group_spans_by_resource_and_scope( + spans: Vec, + resource: &ResourceAttributesWithSchema, + ) -> Vec { + // Group spans by their instrumentation library + let scope_map = spans.iter().fold( + HashMap::new(), + |mut scope_map: HashMap<&opentelemetry_sdk::InstrumentationLibrary, Vec<&SpanData>>, + span| { + let instrumentation = &span.instrumentation_lib; + scope_map.entry(instrumentation).or_default().push(span); + scope_map + }, + ); + + // Convert the grouped spans into ScopeSpans + let scope_spans = scope_map + .into_iter() + .map(|(instrumentation, span_records)| ScopeSpans { + scope: Some((instrumentation, None).into()), + schema_url: resource.schema_url.clone().unwrap_or_default(), + spans: span_records + .into_iter() + .map(|span_data| span_data.clone().into()) + .collect(), + }) + .collect(); + + // Wrap ScopeSpans into a single ResourceSpans + vec![ResourceSpans { + resource: Some(Resource { + attributes: resource.attributes.0.clone(), + dropped_attributes_count: 0, + }), + scope_spans, + schema_url: resource.schema_url.clone().unwrap_or_default(), + }] + } +} + +#[cfg(test)] +mod tests { + use crate::tonic::common::v1::any_value::Value; + use crate::transform::common::tonic::ResourceAttributesWithSchema; + use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, + }; + use opentelemetry::KeyValue; + use opentelemetry_sdk::export::trace::SpanData; + use opentelemetry_sdk::resource::Resource; + use opentelemetry_sdk::trace::{SpanEvents, SpanLinks}; + use opentelemetry_sdk::InstrumentationLibrary; + use std::borrow::Cow; + use std::time::{Duration, SystemTime}; + + fn create_test_span_data(instrumentation_name: &'static str) -> SpanData { + let span_context = SpanContext::new( + TraceId::from_u128(123), + SpanId::from_u64(456), + TraceFlags::default(), + false, + TraceState::default(), + ); + + SpanData { + span_context, + parent_span_id: SpanId::from_u64(0), + span_kind: SpanKind::Internal, + name: Cow::Borrowed("test_span"), + start_time: SystemTime::now(), + end_time: SystemTime::now() + Duration::from_secs(1), + attributes: vec![KeyValue::new("key", "value")], + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_lib: InstrumentationLibrary::builder(instrumentation_name).build(), + } + } + + #[test] + fn test_group_spans_by_resource_and_scope_single_scope() { + let resource = Resource::new(vec![KeyValue::new("resource_key", "resource_value")]); + let span_data = create_test_span_data("lib1"); + + let spans = vec![span_data.clone()]; + let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + + let grouped_spans = + crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource); + + assert_eq!(grouped_spans.len(), 1); + + let resource_spans = &grouped_spans[0]; + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes.len(), + 1 + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0].key, + "resource_key" + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0] + .value + .clone() + .unwrap() + .value + .unwrap(), + Value::StringValue("resource_value".to_string()) + ); + + let scope_spans = &resource_spans.scope_spans; + assert_eq!(scope_spans.len(), 1); + + let scope_span = &scope_spans[0]; + assert_eq!(scope_span.scope.as_ref().unwrap().name, "lib1"); + assert_eq!(scope_span.spans.len(), 1); + + assert_eq!( + scope_span.spans[0].trace_id, + span_data.span_context.trace_id().to_bytes().to_vec() + ); + } + + #[test] + fn test_group_spans_by_resource_and_scope_multiple_scopes() { + let resource = Resource::new(vec![KeyValue::new("resource_key", "resource_value")]); + let span_data1 = create_test_span_data("lib1"); + let span_data2 = create_test_span_data("lib1"); + let span_data3 = create_test_span_data("lib2"); + + let spans = vec![span_data1.clone(), span_data2.clone(), span_data3.clone()]; + let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + + let grouped_spans = + crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource); + + assert_eq!(grouped_spans.len(), 1); + + let resource_spans = &grouped_spans[0]; + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes.len(), + 1 + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0].key, + "resource_key" + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0] + .value + .clone() + .unwrap() + .value + .unwrap(), + Value::StringValue("resource_value".to_string()) + ); + + let scope_spans = &resource_spans.scope_spans; + assert_eq!(scope_spans.len(), 2); + + // Check the scope spans for both lib1 and lib2 + let mut lib1_scope_span = None; + let mut lib2_scope_span = None; + + for scope_span in scope_spans { + match scope_span.scope.as_ref().unwrap().name.as_str() { + "lib1" => lib1_scope_span = Some(scope_span), + "lib2" => lib2_scope_span = Some(scope_span), + _ => {} + } + } + + let lib1_scope_span = lib1_scope_span.expect("lib1 scope span not found"); + let lib2_scope_span = lib2_scope_span.expect("lib2 scope span not found"); + + assert_eq!(lib1_scope_span.scope.as_ref().unwrap().name, "lib1"); + assert_eq!(lib2_scope_span.scope.as_ref().unwrap().name, "lib2"); + + assert_eq!(lib1_scope_span.spans.len(), 2); + assert_eq!(lib2_scope_span.spans.len(), 1); + + assert_eq!( + lib1_scope_span.spans[0].trace_id, + span_data1.span_context.trace_id().to_bytes().to_vec() + ); + assert_eq!( + lib1_scope_span.spans[1].trace_id, + span_data2.span_context.trace_id().to_bytes().to_vec() + ); + assert_eq!( + lib2_scope_span.spans[0].trace_id, + span_data3.span_context.trace_id().to_bytes().to_vec() + ); + } }