diff --git a/CHANGELOG.md b/CHANGELOG.md index d6ff166ec8..7e2017006f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2528](https://github.com/open-telemetry/opentelemetry-python/pull/2528)) - Fix delta histogram sum not being reset on collection ([#2533](https://github.com/open-telemetry/opentelemetry-python/pull/2533)) +- Add InMemoryMetricReader to metrics SDK + ([#2540](https://github.com/open-telemetry/opentelemetry-python/pull/2540)) ## [1.10.0-0.29b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.10.0-0.29b0) - 2022-03-10 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 284629330f..dc172a48af 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -18,8 +18,8 @@ from enum import Enum from os import environ, linesep from sys import stdout -from threading import Event, Thread -from typing import IO, Callable, Iterable, Optional, Sequence +from threading import Event, RLock, Thread +from typing import IO, Callable, Iterable, List, Optional, Sequence from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -96,6 +96,36 @@ def shutdown(self) -> None: pass +class InMemoryMetricReader(MetricReader): + """Implementation of :class:`MetricReader` that returns its metrics from :func:`metrics`. + + This is useful for e.g. unit tests. + """ + + def __init__( + self, + preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + ) -> None: + super().__init__(preferred_temporality=preferred_temporality) + self._lock = RLock() + self._metrics: List[Metric] = [] + + def get_metrics(self) -> List[Metric]: + """Reads and returns current metrics from the SDK""" + with self._lock: + self.collect() + metrics = self._metrics + self._metrics = [] + return metrics + + def _receive_metrics(self, metrics: Iterable[Metric]): + with self._lock: + self._metrics = list(metrics) + + def shutdown(self) -> bool: + return True + + class PeriodicExportingMetricReader(MetricReader): """`PeriodicExportingMetricReader` is an implementation of `MetricReader` that collects metrics based on a user-configurable time interval, and passes the diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py new file mode 100644 index 0000000000..f69fcea9e5 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -0,0 +1,80 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase +from unittest.mock import Mock + +from opentelemetry._metrics.measurement import Measurement +from opentelemetry.sdk._metrics import MeterProvider +from opentelemetry.sdk._metrics.export import InMemoryMetricReader +from opentelemetry.sdk._metrics.point import ( + AggregationTemporality, + Metric, + Sum, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo + + +class TestInMemoryMetricReader(TestCase): + def test_no_metrics(self): + mock_collect_callback = Mock(return_value=[]) + reader = InMemoryMetricReader() + reader._set_collect_callback(mock_collect_callback) + self.assertEqual(reader.get_metrics(), []) + mock_collect_callback.assert_called_once() + + def test_converts_metrics_to_list(self): + metric = Metric( + attributes={"myattr": "baz"}, + description="", + instrumentation_info=InstrumentationInfo("testmetrics"), + name="foo", + resource=Resource.create(), + unit="", + point=Sum( + start_time_unix_nano=1647626444152947792, + time_unix_nano=1647626444153163239, + value=72.3309814450449, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + mock_collect_callback = Mock(return_value=(metric,)) + reader = InMemoryMetricReader() + reader._set_collect_callback(mock_collect_callback) + + returned_metrics = reader.get_metrics() + mock_collect_callback.assert_called_once() + self.assertIsInstance(returned_metrics, list) + self.assertEqual(len(returned_metrics), 1) + self.assertIs(returned_metrics[0], metric) + + def test_shutdown(self): + # shutdown should always be successful + self.assertTrue(InMemoryMetricReader().shutdown()) + + def test_integration(self): + reader = InMemoryMetricReader() + meter = MeterProvider(metric_readers=[reader]).get_meter("test_meter") + counter1 = meter.create_counter("counter1") + meter.create_observable_gauge( + "observable_gauge1", lambda: [Measurement(value=12)] + ) + counter1.add(1, {"foo": "1"}) + counter1.add(1, {"foo": "2"}) + + metrics = reader.get_metrics() + # should be 3 metrics, one from the observable gauge and one for each labelset from the counter + self.assertEqual(len(metrics), 3)