From 476f2c11a9bbd93ba842e1945acf22a49afbadc0 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 15 Feb 2024 09:43:48 -0800 Subject: [PATCH] Optimize Resource Sharing Across Exporters with Arc Implementation (#1526) Co-authored-by: Cijo Thomas Co-authored-by: Cijo Thomas --- opentelemetry-sdk/CHANGELOG.md | 3 + opentelemetry-sdk/src/resource/mod.rs | 193 ++++++++++++++------------ 2 files changed, 106 insertions(+), 90 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index da91a7151f..91814f20ef 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -8,6 +8,9 @@ - [#1471](https://github.com/open-telemetry/opentelemetry-rust/pull/1471) Configure batch log record processor via [`OTEL_BLRP_*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/configuration/sdk-environment-variables.md#batch-logrecord-processor) environment variables and via `OtlpLogPipeline::with_batch_config` - [#1503](https://github.com/open-telemetry/opentelemetry-rust/pull/1503) Make the documentation for In-Memory exporters visible. +- [#1526](https://github.com/open-telemetry/opentelemetry-rust/pull/1526) +Performance Improvement : Creating Spans and LogRecords are now faster, by avoiding expensive cloning of `Resource` for every Span/LogRecord. + ### Changed - **Breaking** diff --git a/opentelemetry-sdk/src/resource/mod.rs b/opentelemetry-sdk/src/resource/mod.rs index 79ce0122eb..f43fe1c1e0 100644 --- a/opentelemetry-sdk/src/resource/mod.rs +++ b/opentelemetry-sdk/src/resource/mod.rs @@ -34,13 +34,22 @@ use opentelemetry::{Key, KeyValue, Value}; use std::borrow::Cow; use std::collections::{hash_map, HashMap}; use std::ops::Deref; +use std::sync::Arc; use std::time::Duration; +/// Inner structure of `Resource` holding the actual data. +/// This structure is designed to be shared among `Resource` instances via `Arc`. +#[derive(Debug, Clone, PartialEq)] +struct ResourceInner { + attrs: HashMap, + schema_url: Option>, +} + /// An immutable representation of the entity producing telemetry as attributes. +/// Utilizes `Arc` for efficient sharing and cloning. #[derive(Clone, Debug, PartialEq)] pub struct Resource { - attrs: HashMap, - schema_url: Option>, + inner: Arc, } impl Default for Resource { @@ -58,10 +67,13 @@ impl Default for Resource { impl Resource { /// Creates an empty resource. + /// This is the basic constructor that initializes a resource with no attributes and no schema URL. pub fn empty() -> Self { - Self { - attrs: Default::default(), - schema_url: None, + Resource { + inner: Arc::new(ResourceInner { + attrs: HashMap::new(), + schema_url: None, + }), } } @@ -70,13 +82,17 @@ impl Resource { /// Values are de-duplicated by key, and the first key-value pair with a non-empty string value /// will be retained pub fn new>(kvs: T) -> Self { - let mut resource = Resource::empty(); - - for kv in kvs.into_iter() { - resource.attrs.insert(kv.key, kv.value); + let mut attrs = HashMap::new(); + for kv in kvs { + attrs.insert(kv.key, kv.value); } - resource + Resource { + inner: Arc::new(ResourceInner { + attrs, + schema_url: None, + }), + } } /// Create a new `Resource` from a key value pairs and [schema url]. @@ -92,9 +108,22 @@ impl Resource { KV: IntoIterator, S: Into>, { - let mut resource = Self::new(kvs); - resource.schema_url = Some(schema_url.into()); - resource + let schema_url_str = schema_url.into(); + let normalized_schema_url = if schema_url_str.is_empty() { + None + } else { + Some(schema_url_str) + }; + let mut attrs = HashMap::new(); + for kv in kvs { + attrs.insert(kv.key, kv.value); + } + Resource { + inner: Arc::new(ResourceInner { + attrs, + schema_url: normalized_schema_url, + }), + } } /// Create a new `Resource` from resource detectors. @@ -104,9 +133,12 @@ impl Resource { let mut resource = Resource::empty(); for detector in detectors { let detected_res = detector.detect(timeout); + // This call ensures that if the Arc is not uniquely owned, + // the data is cloned before modification, preserving safety. + // If the Arc is uniquely owned, it simply returns a mutable reference to the data. + let inner = Arc::make_mut(&mut resource.inner); for (key, value) in detected_res.into_iter() { - // using insert instead of merge to avoid clone. - resource.attrs.insert(key, value); + inner.attrs.insert(Key::new(key.clone()), value.clone()); } } @@ -129,87 +161,62 @@ impl Resource { /// /// [Schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url pub fn merge>(&self, other: T) -> Self { - if self.attrs.is_empty() { + if self.is_empty() { return other.clone(); } - if other.attrs.is_empty() { + if other.is_empty() { return self.clone(); } - - let mut resource = Resource::empty(); - - // attrs from self take the less priority, even when the new value is empty. - for (k, v) in self.attrs.iter() { - resource.attrs.insert(k.clone(), v.clone()); - } - for (k, v) in other.attrs.iter() { - resource.attrs.insert(k.clone(), v.clone()); + let mut combined_attrs = self.inner.attrs.clone(); + for (k, v) in other.inner.attrs.iter() { + combined_attrs.insert(k.clone(), v.clone()); } - - if self.schema_url == other.schema_url { - resource.schema_url = self.schema_url.clone(); - } else if self.schema_url.is_none() { - // if the other resource has schema url, use it. - if other.schema_url.is_some() { - resource.schema_url = other.schema_url.clone(); - } - // else empty schema url. - } else { - // if self has schema url, use it. - if other.schema_url.is_none() { - resource.schema_url = self.schema_url.clone(); - } + // Resolve the schema URL according to the precedence rules + let combined_schema_url = match (&self.inner.schema_url, &other.inner.schema_url) { + // If both resources have a schema URL and it's the same, use it + (Some(url1), Some(url2)) if url1 == url2 => Some(url1.clone()), + // If both resources have a schema URL but they are not the same, the schema URL will be empty + (Some(_), Some(_)) => None, + // If this resource does not have a schema URL, and the other resource has a schema URL, it will be used + (None, Some(url)) => Some(url.clone()), + // If this resource has a schema URL, it will be used (covers case 1 and any other cases where `self` has a schema URL) + (Some(url), _) => Some(url.clone()), + // If both resources do not have a schema URL, the schema URL will be empty + (None, None) => None, + }; + Resource { + inner: Arc::new(ResourceInner { + attrs: combined_attrs, + schema_url: combined_schema_url, + }), } - - resource } /// Return the [schema url] of the resource. If the resource does not have a schema url, return `None`. /// /// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url pub fn schema_url(&self) -> Option<&str> { - self.schema_url.as_ref().map(|s| s.as_ref()) + self.inner.schema_url.as_ref().map(|s| s.as_ref()) } /// Returns the number of attributes for this resource pub fn len(&self) -> usize { - self.attrs.len() + self.inner.attrs.len() } /// Returns `true` if the resource contains no attributes. pub fn is_empty(&self) -> bool { - self.attrs.is_empty() + self.inner.attrs.is_empty() } /// Gets an iterator over the attributes of this resource, sorted by key. pub fn iter(&self) -> Iter<'_> { - self.into_iter() + Iter(self.inner.attrs.iter()) } /// Retrieve the value from resource associate with given key. pub fn get(&self, key: Key) -> Option { - self.attrs.get(&key).cloned() - } -} - -/// An owned iterator over the entries of a `Resource`. -#[derive(Debug)] -pub struct IntoIter(hash_map::IntoIter); - -impl Iterator for IntoIter { - type Item = (Key, Value); - - fn next(&mut self) -> Option { - self.0.next() - } -} - -impl IntoIterator for Resource { - type Item = (Key, Value); - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - IntoIter(self.attrs.into_iter()) + self.inner.attrs.get(&key).cloned() } } @@ -230,7 +237,7 @@ impl<'a> IntoIterator for &'a Resource { type IntoIter = Iter<'a>; fn into_iter(self) -> Self::IntoIter { - Iter(self.attrs.iter()) + Iter(self.inner.attrs.iter()) } } @@ -264,13 +271,10 @@ mod tests { let mut expected_attrs = HashMap::new(); expected_attrs.insert(Key::new("a"), Value::from("final")); - assert_eq!( - Resource::new(args_with_dupe_keys), - Resource { - attrs: expected_attrs, - schema_url: None, - } - ); + let resource = Resource::new(args_with_dupe_keys); + let resource_inner = Arc::try_unwrap(resource.inner).expect("Failed to unwrap Arc"); + assert_eq!(resource_inner.attrs, expected_attrs); + assert_eq!(resource_inner.schema_url, None); } #[test] @@ -293,13 +297,14 @@ mod tests { expected_attrs.insert(Key::new("c"), Value::from("c-value")); expected_attrs.insert(Key::new("d"), Value::from("")); - assert_eq!( - resource_a.merge(&resource_b), - Resource { + let expected_resource = Resource { + inner: Arc::new(ResourceInner { attrs: expected_attrs, - schema_url: None, - } - ); + schema_url: None, // Assuming schema_url handling if needed + }), + }; + + assert_eq!(resource_a.merge(&resource_b), expected_resource); } #[test] @@ -317,16 +322,24 @@ mod tests { (None, None, None), ]; - for (schema_url, other_schema_url, expect_schema_url) in test_cases.into_iter() { - let mut resource = Resource::new(vec![KeyValue::new("key", "")]); - resource.schema_url = schema_url.map(Into::into); + for (schema_url_a, schema_url_b, expected_schema_url) in test_cases.into_iter() { + let resource_a = Resource::from_schema_url( + vec![KeyValue::new("key", "")], + schema_url_a.unwrap_or(""), + ); + let resource_b = Resource::from_schema_url( + vec![KeyValue::new("key", "")], + schema_url_b.unwrap_or(""), + ); - let mut other_resource = Resource::new(vec![KeyValue::new("key", "")]); - other_resource.schema_url = other_schema_url.map(Into::into); + let merged_resource = resource_a.merge(&resource_b); + let result_schema_url = merged_resource.schema_url(); assert_eq!( - resource.merge(&other_resource).schema_url, - expect_schema_url.map(Into::into) + result_schema_url.map(|s| s as &str), + expected_schema_url, + "Merging schema_url_a {:?} with schema_url_b {:?} did not yield expected result {:?}", + schema_url_a, schema_url_b, expected_schema_url ); } @@ -334,7 +347,7 @@ mod tests { let resource = Resource::from_schema_url(vec![], "http://schema/a"); let other_resource = Resource::new(vec![KeyValue::new("key", "")]); - assert_eq!(resource.merge(&other_resource).schema_url, None); + assert_eq!(resource.merge(&other_resource).schema_url(), None); } #[test]