Skip to content

Commit

Permalink
Group Logs and Spans by Resource and Instrumentation Scope in OTLP Ex…
Browse files Browse the repository at this point in the history
…porter (#1873)

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
  • Loading branch information
lalitb and cijothomas committed Jun 26, 2024
1 parent b3315f2 commit 9b746e0
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 37 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using
- Fixing the OTLP HTTP/JSON exporter. [#1882](https://github.com/open-telemetry/opentelemetry-rust/pull/1882) - The exporter was broken in the
previous release.
- **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated.
- Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).


## v0.16.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl LogExporter for OtlpHttpClient {
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
let (body, content_type) = { self.build_logs_export_body(owned_batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down
22 changes: 7 additions & 15 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::{
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;

#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogData;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -307,16 +310,9 @@ impl OtlpHttpClient {
fn build_trace_export_body(
&self,
spans: Vec<SpanData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::{
collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans,
};

let resource_spans = spans
.into_iter()
.map(|span| ResourceSpans::new(span, resource))
.collect::<Vec<_>>();
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);

let req = ExportTraceServiceRequest { resource_spans };
match self.protocol {
Expand All @@ -333,13 +329,9 @@ impl OtlpHttpClient {
fn build_logs_export_body(
&self,
logs: Vec<LogData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = logs
.into_iter()
.map(|log_event| (log_event, resource).into())
.collect::<Vec<_>>();
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
let req = ExportLogsServiceRequest { resource_logs };

match self.protocol {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient {
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {
let (body, content_type) = match self.build_trace_export_body(batch) {
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Expand Down
18 changes: 9 additions & 9 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicLogsClient {
Expand Down Expand Up @@ -65,15 +67,13 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

// TODO: Avoid cloning here.
let resource_logs = {
batch
.into_iter()
.map(|log_data_cow| (log_data_cow.into_owned()))
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
};
//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource);

client
.export(Request::from_parts(
Expand Down
11 changes: 3 additions & 8 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use opentelemetry::trace::TraceError;
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
Expand Down Expand Up @@ -71,13 +72,7 @@ impl SpanExporter for TonicTracesClient {
}
};

// TODO: Avoid cloning here.
let resource_spans = {
batch
.into_iter()
.map(|log_data| ResourceSpans::new(log_data, &self.resource))
.collect()
};
let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);

Box::pin(async move {
client
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-proto/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## vNext

- Bump MSRV to 1.70 [1864](https://github.com/open-telemetry/opentelemetry-rust/pull/1874)
- Group log and Span batch by their resource and instrumentation scope before exporting [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).
- Introduced `group_logs_by_resource_and_scope()` and `group_spans_by_resource_and_scope()` methods to group logs and spans by the resource and scope respectively.

## v0.6.0

Expand Down
6 changes: 4 additions & 2 deletions opentelemetry-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ path = "tests/json_deserialize.rs"


[features]
default = []
default = ["full"]

full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"]

Expand All @@ -42,6 +42,7 @@ trace = ["opentelemetry/trace", "opentelemetry_sdk/trace"]
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs"]
zpages = ["trace"]
testing = ["opentelemetry/testing"]

# add ons
with-schemars = ["schemars"]
Expand All @@ -57,7 +58,8 @@ serde = { workspace = true, optional = true, features = ["serde_derive"] }
hex = { version = "0.4.3", optional = true }

[dev-dependencies]
opentelemetry = { version = "0.23", features = ["testing"], path = "../opentelemetry" }
tonic-build = { workspace = true }
prost-build = { workspace = true }
tempfile = "3.3.0"
serde_json = { workspace = true }
serde_json = { workspace = true }
126 changes: 125 additions & 1 deletion opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
pub mod tonic {
use crate::{
tonic::{
common::v1::{any_value::Value, AnyValue, ArrayValue, KeyValue, KeyValueList},
common::v1::{
any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
KeyValueList,
},
logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
resource::v1::Resource,
Attributes,
},
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
};
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
use std::borrow::Cow;
use std::collections::HashMap;

impl From<LogsAnyValue> for AnyValue {
fn from(value: LogsAnyValue) -> Self {
Expand Down Expand Up @@ -143,4 +148,123 @@ pub mod tonic {
}
}
}

pub fn group_logs_by_resource_and_scope(
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
let scope_map = logs.iter().fold(
HashMap::new(),
|mut scope_map: HashMap<
Cow<'static, str>,
Vec<&opentelemetry_sdk::export::logs::LogData>,
>,
log| {
let key = log
.record
.target
.clone()
.unwrap_or_else(|| log.instrumentation.name.clone());
scope_map.entry(key).or_default().push(log);
scope_map
},
);

let scope_logs = scope_map
.into_iter()
.map(|(key, log_data)| ScopeLogs {
scope: Some(InstrumentationScope::from((
&log_data.first().unwrap().instrumentation,
Some(key),
))),
schema_url: resource.schema_url.clone().unwrap_or_default(),
log_records: log_data
.into_iter()
.map(|log_data| log_data.record.clone().into())
.collect(),
})
.collect();

vec![ResourceLogs {
resource: Some(Resource {
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
scope_logs,
schema_url: resource.schema_url.clone().unwrap_or_default(),
}]
}
}

#[cfg(test)]
mod tests {
use crate::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry::logs::LogRecord as _;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::{logs::LogRecord, Resource};
use std::time::SystemTime;

fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
let mut logrecord = LogRecord::default();
logrecord.set_timestamp(SystemTime::now());
logrecord.set_observed_timestamp(SystemTime::now());
LogData {
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
instrumentation_name.to_string(),
)
.build(),
record: logrecord,
}
}

#[test]
fn test_group_logs_by_resource_and_scope_single_scope() {
let resource = Resource::default();
let log1 = create_test_log_data("test-lib", "Log 1");
let log2 = create_test_log_data("test-lib", "Log 2");

let logs = vec![log1, log2];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
assert_eq!(resource_logs.scope_logs.len(), 1);

let scope_logs = &resource_logs.scope_logs[0];
assert_eq!(scope_logs.log_records.len(), 2);
}

#[test]
fn test_group_logs_by_resource_and_scope_multiple_scopes() {
let resource = Resource::default();
let log1 = create_test_log_data("lib1", "Log 1");
let log2 = create_test_log_data("lib2", "Log 2");

let logs = vec![log1, log2];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
assert_eq!(resource_logs.scope_logs.len(), 2);

let scope_logs_1 = &resource_logs
.scope_logs
.iter()
.find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
.unwrap();
let scope_logs_2 = &resource_logs
.scope_logs
.iter()
.find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
.unwrap();

assert_eq!(scope_logs_1.log_records.len(), 1);
assert_eq!(scope_logs_2.log_records.len(), 1);
}
}
Loading

0 comments on commit 9b746e0

Please sign in to comment.