Skip to content

Commit

Permalink
Optimize Resource Sharing Across Exporters with Arc Implementation (#…
Browse files Browse the repository at this point in the history
…1526)

Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
  • Loading branch information
3 people committed Feb 15, 2024
1 parent 650904f commit 476f2c1
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 90 deletions.
3 changes: 3 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
- [#1471](https://github.com/open-telemetry/opentelemetry-rust/pull/1471) Configure batch log record processor via [`OTEL_BLRP_*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/configuration/sdk-environment-variables.md#batch-logrecord-processor) environment variables and via `OtlpLogPipeline::with_batch_config`
- [#1503](https://github.com/open-telemetry/opentelemetry-rust/pull/1503) Make the documentation for In-Memory exporters visible.

- [#1526](https://github.com/open-telemetry/opentelemetry-rust/pull/1526)
Performance Improvement : Creating Spans and LogRecords are now faster, by avoiding expensive cloning of `Resource` for every Span/LogRecord.

### Changed

- **Breaking**
Expand Down
193 changes: 103 additions & 90 deletions opentelemetry-sdk/src/resource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,22 @@ use opentelemetry::{Key, KeyValue, Value};
use std::borrow::Cow;
use std::collections::{hash_map, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

/// Inner structure of `Resource` holding the actual data.
/// This structure is designed to be shared among `Resource` instances via `Arc`.
#[derive(Debug, Clone, PartialEq)]
struct ResourceInner {
attrs: HashMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
}

/// An immutable representation of the entity producing telemetry as attributes.
/// Utilizes `Arc` for efficient sharing and cloning.
#[derive(Clone, Debug, PartialEq)]
pub struct Resource {
attrs: HashMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
inner: Arc<ResourceInner>,
}

impl Default for Resource {
Expand All @@ -58,10 +67,13 @@ impl Default for Resource {

impl Resource {
/// Creates an empty resource.
/// This is the basic constructor that initializes a resource with no attributes and no schema URL.
pub fn empty() -> Self {
Self {
attrs: Default::default(),
schema_url: None,
Resource {
inner: Arc::new(ResourceInner {
attrs: HashMap::new(),
schema_url: None,
}),
}
}

Expand All @@ -70,13 +82,17 @@ impl Resource {
/// Values are de-duplicated by key, and the first key-value pair with a non-empty string value
/// will be retained
pub fn new<T: IntoIterator<Item = KeyValue>>(kvs: T) -> Self {
let mut resource = Resource::empty();

for kv in kvs.into_iter() {
resource.attrs.insert(kv.key, kv.value);
let mut attrs = HashMap::new();
for kv in kvs {
attrs.insert(kv.key, kv.value);
}

resource
Resource {
inner: Arc::new(ResourceInner {
attrs,
schema_url: None,
}),
}
}

/// Create a new `Resource` from a key value pairs and [schema url].
Expand All @@ -92,9 +108,22 @@ impl Resource {
KV: IntoIterator<Item = KeyValue>,
S: Into<Cow<'static, str>>,
{
let mut resource = Self::new(kvs);
resource.schema_url = Some(schema_url.into());
resource
let schema_url_str = schema_url.into();
let normalized_schema_url = if schema_url_str.is_empty() {
None
} else {
Some(schema_url_str)
};
let mut attrs = HashMap::new();
for kv in kvs {
attrs.insert(kv.key, kv.value);
}
Resource {
inner: Arc::new(ResourceInner {
attrs,
schema_url: normalized_schema_url,
}),
}
}

/// Create a new `Resource` from resource detectors.
Expand All @@ -104,9 +133,12 @@ impl Resource {
let mut resource = Resource::empty();
for detector in detectors {
let detected_res = detector.detect(timeout);
// This call ensures that if the Arc is not uniquely owned,
// the data is cloned before modification, preserving safety.
// If the Arc is uniquely owned, it simply returns a mutable reference to the data.
let inner = Arc::make_mut(&mut resource.inner);
for (key, value) in detected_res.into_iter() {
// using insert instead of merge to avoid clone.
resource.attrs.insert(key, value);
inner.attrs.insert(Key::new(key.clone()), value.clone());
}
}

Expand All @@ -129,87 +161,62 @@ impl Resource {
///
/// [Schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn merge<T: Deref<Target = Self>>(&self, other: T) -> Self {
if self.attrs.is_empty() {
if self.is_empty() {
return other.clone();
}
if other.attrs.is_empty() {
if other.is_empty() {
return self.clone();
}

let mut resource = Resource::empty();

// attrs from self take the less priority, even when the new value is empty.
for (k, v) in self.attrs.iter() {
resource.attrs.insert(k.clone(), v.clone());
}
for (k, v) in other.attrs.iter() {
resource.attrs.insert(k.clone(), v.clone());
let mut combined_attrs = self.inner.attrs.clone();
for (k, v) in other.inner.attrs.iter() {
combined_attrs.insert(k.clone(), v.clone());
}

if self.schema_url == other.schema_url {
resource.schema_url = self.schema_url.clone();
} else if self.schema_url.is_none() {
// if the other resource has schema url, use it.
if other.schema_url.is_some() {
resource.schema_url = other.schema_url.clone();
}
// else empty schema url.
} else {
// if self has schema url, use it.
if other.schema_url.is_none() {
resource.schema_url = self.schema_url.clone();
}
// Resolve the schema URL according to the precedence rules
let combined_schema_url = match (&self.inner.schema_url, &other.inner.schema_url) {
// If both resources have a schema URL and it's the same, use it
(Some(url1), Some(url2)) if url1 == url2 => Some(url1.clone()),
// If both resources have a schema URL but they are not the same, the schema URL will be empty
(Some(_), Some(_)) => None,
// If this resource does not have a schema URL, and the other resource has a schema URL, it will be used
(None, Some(url)) => Some(url.clone()),
// If this resource has a schema URL, it will be used (covers case 1 and any other cases where `self` has a schema URL)
(Some(url), _) => Some(url.clone()),
// If both resources do not have a schema URL, the schema URL will be empty
(None, None) => None,
};
Resource {
inner: Arc::new(ResourceInner {
attrs: combined_attrs,
schema_url: combined_schema_url,
}),
}

resource
}

/// Return the [schema url] of the resource. If the resource does not have a schema url, return `None`.
///
/// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn schema_url(&self) -> Option<&str> {
self.schema_url.as_ref().map(|s| s.as_ref())
self.inner.schema_url.as_ref().map(|s| s.as_ref())
}

/// Returns the number of attributes for this resource
pub fn len(&self) -> usize {
self.attrs.len()
self.inner.attrs.len()
}

/// Returns `true` if the resource contains no attributes.
pub fn is_empty(&self) -> bool {
self.attrs.is_empty()
self.inner.attrs.is_empty()
}

/// Gets an iterator over the attributes of this resource, sorted by key.
pub fn iter(&self) -> Iter<'_> {
self.into_iter()
Iter(self.inner.attrs.iter())
}

/// Retrieve the value from resource associate with given key.
pub fn get(&self, key: Key) -> Option<Value> {
self.attrs.get(&key).cloned()
}
}

/// An owned iterator over the entries of a `Resource`.
#[derive(Debug)]
pub struct IntoIter(hash_map::IntoIter<Key, Value>);

impl Iterator for IntoIter {
type Item = (Key, Value);

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl IntoIterator for Resource {
type Item = (Key, Value);
type IntoIter = IntoIter;

fn into_iter(self) -> Self::IntoIter {
IntoIter(self.attrs.into_iter())
self.inner.attrs.get(&key).cloned()
}
}

Expand All @@ -230,7 +237,7 @@ impl<'a> IntoIterator for &'a Resource {
type IntoIter = Iter<'a>;

fn into_iter(self) -> Self::IntoIter {
Iter(self.attrs.iter())
Iter(self.inner.attrs.iter())
}
}

Expand Down Expand Up @@ -264,13 +271,10 @@ mod tests {
let mut expected_attrs = HashMap::new();
expected_attrs.insert(Key::new("a"), Value::from("final"));

assert_eq!(
Resource::new(args_with_dupe_keys),
Resource {
attrs: expected_attrs,
schema_url: None,
}
);
let resource = Resource::new(args_with_dupe_keys);
let resource_inner = Arc::try_unwrap(resource.inner).expect("Failed to unwrap Arc");
assert_eq!(resource_inner.attrs, expected_attrs);
assert_eq!(resource_inner.schema_url, None);
}

#[test]
Expand All @@ -293,13 +297,14 @@ mod tests {
expected_attrs.insert(Key::new("c"), Value::from("c-value"));
expected_attrs.insert(Key::new("d"), Value::from(""));

assert_eq!(
resource_a.merge(&resource_b),
Resource {
let expected_resource = Resource {
inner: Arc::new(ResourceInner {
attrs: expected_attrs,
schema_url: None,
}
);
schema_url: None, // Assuming schema_url handling if needed
}),
};

assert_eq!(resource_a.merge(&resource_b), expected_resource);
}

#[test]
Expand All @@ -317,24 +322,32 @@ mod tests {
(None, None, None),
];

for (schema_url, other_schema_url, expect_schema_url) in test_cases.into_iter() {
let mut resource = Resource::new(vec![KeyValue::new("key", "")]);
resource.schema_url = schema_url.map(Into::into);
for (schema_url_a, schema_url_b, expected_schema_url) in test_cases.into_iter() {
let resource_a = Resource::from_schema_url(
vec![KeyValue::new("key", "")],
schema_url_a.unwrap_or(""),
);
let resource_b = Resource::from_schema_url(
vec![KeyValue::new("key", "")],
schema_url_b.unwrap_or(""),
);

let mut other_resource = Resource::new(vec![KeyValue::new("key", "")]);
other_resource.schema_url = other_schema_url.map(Into::into);
let merged_resource = resource_a.merge(&resource_b);
let result_schema_url = merged_resource.schema_url();

assert_eq!(
resource.merge(&other_resource).schema_url,
expect_schema_url.map(Into::into)
result_schema_url.map(|s| s as &str),
expected_schema_url,
"Merging schema_url_a {:?} with schema_url_b {:?} did not yield expected result {:?}",
schema_url_a, schema_url_b, expected_schema_url
);
}

// if only one resource contains key value pairs
let resource = Resource::from_schema_url(vec![], "http://schema/a");
let other_resource = Resource::new(vec![KeyValue::new("key", "")]);

assert_eq!(resource.merge(&other_resource).schema_url, None);
assert_eq!(resource.merge(&other_resource).schema_url(), None);
}

#[test]
Expand Down

0 comments on commit 476f2c1

Please sign in to comment.