diff --git a/CHANGELOG.md b/CHANGELOG.md index 587ca0a61b..b0741653da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1855](https://github.com/open-telemetry/opentelemetry-python/pull/1855)) - Fixed exporter OTLP header parsing to match baggage header formatting. ([#1869](https://github.com/open-telemetry/opentelemetry-python/pull/1869)) +- Added optional `schema_url` field to `Resource` class + ([#1871](https://github.com/open-telemetry/opentelemetry-python/pull/1871)) ## [1.2.0, 0.21b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.2.0-0.21b0) - 2021-05-11 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index 8755b2d1f9..24e5321ce9 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -141,16 +141,25 @@ class Resource: """A Resource is an immutable representation of the entity producing telemetry as Attributes.""" - def __init__(self, attributes: Attributes): + def __init__( + self, attributes: Attributes, schema_url: typing.Optional[str] = None + ): _filter_attributes(attributes) self._attributes = attributes.copy() + if schema_url is None: + schema_url = "" + self._schema_url = schema_url @staticmethod - def create(attributes: typing.Optional[Attributes] = None) -> "Resource": + def create( + attributes: typing.Optional[Attributes] = None, + schema_url: typing.Optional[str] = None, + ) -> "Resource": """Creates a new `Resource` from attributes. Args: attributes: Optional zero or more key-value pairs. + schema_url: Optional URL pointing to the schema Returns: The newly-created Resource. @@ -159,7 +168,7 @@ def create(attributes: typing.Optional[Attributes] = None) -> "Resource": attributes = {} resource = _DEFAULT_RESOURCE.merge( OTELResourceDetector().detect() - ).merge(Resource(attributes)) + ).merge(Resource(attributes, schema_url)) if not resource.attributes.get(SERVICE_NAME, None): default_service_name = "unknown_service" process_executable_name = resource.attributes.get( @@ -168,7 +177,7 @@ def create(attributes: typing.Optional[Attributes] = None) -> "Resource": if process_executable_name: default_service_name += ":" + process_executable_name resource = resource.merge( - Resource({SERVICE_NAME: default_service_name}) + Resource({SERVICE_NAME: default_service_name}, schema_url) ) return resource @@ -180,12 +189,21 @@ def get_empty() -> "Resource": def attributes(self) -> Attributes: return self._attributes.copy() + @property + def schema_url(self) -> str: + return self._schema_url + def merge(self, other: "Resource") -> "Resource": """Merges this resource and an updating resource into a new `Resource`. If a key exists on both the old and updating resource, the value of the updating resource will override the old resource value. + The updating resource's `schema_url` will be used only if the old + `schema_url` is empty. Attempting to merge two resources with + different, non-empty values for `schema_url` will result in an error + and return the old resource. + Args: other: The other resource to be merged. @@ -194,15 +212,35 @@ def merge(self, other: "Resource") -> "Resource": """ merged_attributes = self.attributes merged_attributes.update(other.attributes) - return Resource(merged_attributes) + + if self.schema_url == "": + schema_url = other.schema_url + elif other.schema_url == "": + schema_url = self.schema_url + elif self.schema_url == other.schema_url: + schema_url = other.schema_url + else: + logger.error( + "Failed to merge resources: The two schemas %s and %s are incompatible", + self.schema_url, + other.schema_url, + ) + return self + + return Resource(merged_attributes, schema_url) def __eq__(self, other: object) -> bool: if not isinstance(other, Resource): return False - return self._attributes == other._attributes + return ( + self._attributes == other._attributes + and self._schema_url == other._schema_url + ) def __hash__(self): - return hash(dumps(self._attributes, sort_keys=True)) + return hash( + f"{dumps(self._attributes, sort_keys=True)}|{self._schema_url}" + ) _EMPTY_RESOURCE = Resource({}) diff --git a/opentelemetry-sdk/tests/resources/test_resources.py b/opentelemetry-sdk/tests/resources/test_resources.py index 87b4006ce7..36eb099cd2 100644 --- a/opentelemetry-sdk/tests/resources/test_resources.py +++ b/opentelemetry-sdk/tests/resources/test_resources.py @@ -17,6 +17,7 @@ import os import unittest import uuid +from logging import ERROR from unittest import mock from opentelemetry.sdk import resources @@ -51,6 +52,14 @@ def test_create(self): resource = resources.Resource.create(attributes) self.assertIsInstance(resource, resources.Resource) self.assertEqual(resource.attributes, expected_attributes) + self.assertEqual(resource.schema_url, "") + + schema_url = "https://opentelemetry.io/schemas/1.3.0" + + resource = resources.Resource.create(attributes, schema_url) + self.assertIsInstance(resource, resources.Resource) + self.assertEqual(resource.attributes, expected_attributes) + self.assertEqual(resource.schema_url, schema_url) os.environ[resources.OTEL_RESOURCE_ATTRIBUTES] = "key=value" resource = resources.Resource.create(attributes) @@ -67,17 +76,45 @@ def test_create(self): self.assertEqual( resource, resources._DEFAULT_RESOURCE.merge( - resources.Resource({resources.SERVICE_NAME: "unknown_service"}) + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) ), ) + self.assertEqual(resource.schema_url, "") + + resource = resources.Resource.create(None, None) + self.assertEqual( + resource, + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ), + ) + self.assertEqual(resource.schema_url, "") resource = resources.Resource.create({}) self.assertEqual( resource, resources._DEFAULT_RESOURCE.merge( - resources.Resource({resources.SERVICE_NAME: "unknown_service"}) + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ), + ) + self.assertEqual(resource.schema_url, "") + + resource = resources.Resource.create({}, None) + self.assertEqual( + resource, + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) ), ) + self.assertEqual(resource.schema_url, "") def test_resource_merge(self): left = resources.Resource({"service": "ui"}) @@ -86,6 +123,33 @@ def test_resource_merge(self): left.merge(right), resources.Resource({"service": "ui", "host": "service-host"}), ) + schema_urls = ( + "https://opentelemetry.io/schemas/1.2.0", + "https://opentelemetry.io/schemas/1.3.0", + ) + + left = resources.Resource.create({}, None) + right = resources.Resource.create({}, None) + self.assertEqual(left.merge(right).schema_url, "") + + left = resources.Resource.create({}, None) + right = resources.Resource.create({}, schema_urls[0]) + self.assertEqual(left.merge(right).schema_url, schema_urls[0]) + + left = resources.Resource.create({}, schema_urls[0]) + right = resources.Resource.create({}, None) + self.assertEqual(left.merge(right).schema_url, schema_urls[0]) + + left = resources.Resource.create({}, schema_urls[0]) + right = resources.Resource.create({}, schema_urls[0]) + self.assertEqual(left.merge(right).schema_url, schema_urls[0]) + + left = resources.Resource.create({}, schema_urls[0]) + right = resources.Resource.create({}, schema_urls[1]) + with self.assertLogs(level=ERROR) as log_entry: + self.assertEqual(left.merge(right), left) + self.assertIn(schema_urls[0], log_entry.output[0]) + self.assertIn(schema_urls[1], log_entry.output[0]) def test_resource_merge_empty_string(self): """Verify Resource.merge behavior with the empty string. @@ -130,6 +194,11 @@ def test_immutability(self): attributes["cost"] = 999.91 self.assertEqual(resource.attributes, attributes_copy) + with self.assertRaises(AttributeError): + resource.schema_url = "bug" + + self.assertEqual(resource.schema_url, "") + def test_service_name_using_process_name(self): resource = resources.Resource.create( {resources.PROCESS_EXECUTABLE_NAME: "test"} @@ -220,6 +289,76 @@ def test_aggregated_resources_multiple_detectors(self): ), ) + def test_aggregated_resources_different_schema_urls(self): + resource_detector1 = mock.Mock(spec=resources.ResourceDetector) + resource_detector1.detect.return_value = resources.Resource( + {"key1": "value1"}, "" + ) + resource_detector2 = mock.Mock(spec=resources.ResourceDetector) + resource_detector2.detect.return_value = resources.Resource( + {"key2": "value2", "key3": "value3"}, "url1" + ) + resource_detector3 = mock.Mock(spec=resources.ResourceDetector) + resource_detector3.detect.return_value = resources.Resource( + { + "key2": "try_to_overwrite_existing_value", + "key3": "try_to_overwrite_existing_value", + "key4": "value4", + }, + "url2", + ) + resource_detector4 = mock.Mock(spec=resources.ResourceDetector) + resource_detector4.detect.return_value = resources.Resource( + { + "key2": "try_to_overwrite_existing_value", + "key3": "try_to_overwrite_existing_value", + "key4": "value4", + }, + "url1", + ) + self.assertEqual( + resources.get_aggregated_resources( + [resource_detector1, resource_detector2] + ), + resources.Resource( + {"key1": "value1", "key2": "value2", "key3": "value3"}, + "url1", + ), + ) + with self.assertLogs(level=ERROR) as log_entry: + self.assertEqual( + resources.get_aggregated_resources( + [resource_detector2, resource_detector3] + ), + resources.Resource( + {"key2": "value2", "key3": "value3"}, "url1" + ), + ) + self.assertIn("url1", log_entry.output[0]) + self.assertIn("url2", log_entry.output[0]) + with self.assertLogs(level=ERROR): + self.assertEqual( + resources.get_aggregated_resources( + [ + resource_detector2, + resource_detector3, + resource_detector4, + resource_detector1, + ] + ), + resources.Resource( + { + "key1": "value1", + "key2": "try_to_overwrite_existing_value", + "key3": "try_to_overwrite_existing_value", + "key4": "value4", + }, + "url1", + ), + ) + self.assertIn("url1", log_entry.output[0]) + self.assertIn("url2", log_entry.output[0]) + def test_resource_detector_ignore_error(self): resource_detector = mock.Mock(spec=resources.ResourceDetector) resource_detector.detect.side_effect = Exception() diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index c9c523c3fa..a5113ffe09 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -869,7 +869,7 @@ def test_span_override_start_and_end_time(self): self.assertEqual(end_time, span.end_time) def test_ended_span(self): - """"Events, attributes are not allowed after span is ended""" + """Events, attributes are not allowed after span is ended""" root = self.tracer.start_span("root")