Skip to content

Commit

Permalink
Fix handling of empty metric collection cycles (#3335)
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Sep 7, 2023
1 parent 6070a0d commit 3a651c7
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 73 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Fix handling of empty metric collection cycles
([#3335](https://github.com/open-telemetry/opentelemetry-python/pull/3335))
- Fix error when no LoggerProvider configured for LoggingHandler
([#3423](https://github.com/open-telemetry/opentelemetry-python/pull/3423))



## Version 1.20.0/0.41b0 (2023-09-04)

- Modify Prometheus exporter to translate non-monotonic Sums into Gauges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from logging import getLogger
from threading import Lock
from time import time_ns
from typing import Dict, List, Sequence
from typing import Dict, List, Optional, Sequence

from opentelemetry.metrics import Instrument
from opentelemetry.sdk.metrics._internal.aggregation import (
Expand Down Expand Up @@ -126,7 +126,7 @@ def collect(
self,
aggregation_temporality: AggregationTemporality,
collection_start_nanos: int,
) -> Sequence[DataPointT]:
) -> Optional[Sequence[DataPointT]]:

data_points: List[DataPointT] = []
with self._lock:
Expand All @@ -136,4 +136,8 @@ def collect(
)
if data_point is not None:
data_points.append(data_point)
return data_points

# Returning here None instead of an empty list because the caller
# does not consume a sequence and to be consistent with the rest of
# collect methods that also return None.
return data_points or None
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,14 @@ def collect(self, timeout_millis: float = 10_000) -> None:
)
return

self._receive_metrics(
self._collect(self, timeout_millis=timeout_millis),
timeout_millis=timeout_millis,
)
metrics = self._collect(self, timeout_millis=timeout_millis)

if metrics is not None:

self._receive_metrics(
metrics,
timeout_millis=timeout_millis,
)

@final
def _set_collect_callback(
Expand Down Expand Up @@ -515,8 +519,7 @@ def _receive_metrics(
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics_data is None:
return

token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
with self._export_lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import ABC, abstractmethod
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping
from typing import Iterable, List, Mapping, Optional

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics
Expand Down Expand Up @@ -51,7 +51,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Iterable[Metric]:
) -> Optional[Iterable[Metric]]:
pass


Expand Down Expand Up @@ -94,7 +94,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Iterable[Metric]:
) -> Optional[Iterable[Metric]]:

with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
Expand Down Expand Up @@ -123,4 +123,6 @@ def collect(
for measurement in measurements:
metric_reader_storage.consume_measurement(measurement)

return self._reader_storages[metric_reader].collect()
result = self._reader_storages[metric_reader].collect()

return result
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from logging import getLogger
from threading import RLock
from time import time_ns
from typing import Dict, List
from typing import Dict, List, Optional

from opentelemetry.metrics import (
Asynchronous,
Expand Down Expand Up @@ -119,7 +119,7 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
view_instrument_match.consume_measurement(measurement)

def collect(self) -> MetricsData:
def collect(self) -> Optional[MetricsData]:
# Use a list instead of yielding to prevent a slow reader from holding
# SDK locks

Expand Down Expand Up @@ -152,16 +152,21 @@ def collect(self) -> MetricsData:

for view_instrument_match in view_instrument_matches:

data_points = view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
)

if data_points is None:
continue

if isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_SumAggregation,
):
data = Sum(
aggregation_temporality=aggregation_temporality,
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
is_monotonic=isinstance(
instrument, (Counter, ObservableCounter)
),
Expand All @@ -171,20 +176,14 @@ def collect(self) -> MetricsData:
view_instrument_match._aggregation,
_LastValueAggregation,
):
data = Gauge(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
)
)
data = Gauge(data_points=data_points)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_ExplicitBucketHistogramAggregation,
):
data = Histogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
aggregation_temporality=aggregation_temporality,
)
elif isinstance(
Expand All @@ -200,9 +199,7 @@ def collect(self) -> MetricsData:
_ExponentialBucketHistogramAggregation,
):
data = ExponentialHistogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
aggregation_temporality=aggregation_temporality,
)

Expand All @@ -216,32 +213,38 @@ def collect(self) -> MetricsData:
)
)

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(
instrumentation_scope_scope_metrics.values()
),
schema_url=self._sdk_config.resource.schema_url,
if metrics:

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)

if instrumentation_scope_scope_metrics:

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(
instrumentation_scope_scope_metrics.values()
),
schema_url=self._sdk_config.resource.schema_url,
)
]
)
]
)

return None

def _handle_view_instrument_match(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,19 @@ def test_console_exporter(self):

self.assertEqual(metrics["attributes"], {"a": "b"})
self.assertEqual(metrics["value"], 1)

def test_console_exporter_no_export(self):

output = StringIO()
exporter = ConsoleMetricExporter(out=output)
reader = PeriodicExportingMetricReader(
exporter, export_interval_millis=100
)
provider = MeterProvider(metric_readers=[reader])
provider.shutdown()

output.seek(0)
actual = "".join(output.readlines())
expected = ""

self.assertEqual(actual, expected)
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ def test_disable_default_views(self):
counter.add(10, {"label": "value1"})
counter.add(10, {"label": "value2"})
counter.add(10, {"label": "value3"})
self.assertEqual(
(
reader.get_metrics_data()
.resource_metrics[0]
.scope_metrics[0]
.metrics
),
[],
)
self.assertIsNone(reader.get_metrics_data())

def test_disable_default_views_add_custom(self):
reader = InMemoryMetricReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ class TestExporterConcurrency(ConcurrencyTestBase):
> be called again only after the current call returns.
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch
This test also tests that a thread that calls the a
``MetricReader.collect`` method using an asynchronous instrument is able
to perform two actions in the same thread lock space (without it being
interrupted by another thread):
1. Consume the measurement produced by the callback associated to the
asynchronous instrument.
2. Export the measurement mentioned in the step above.
"""

def test_exporter_not_called_concurrently(self):
Expand All @@ -84,7 +93,11 @@ def test_exporter_not_called_concurrently(self):
)
meter_provider = MeterProvider(metric_readers=[reader])

counter_cb_counter = 0

def counter_cb(options: CallbackOptions):
nonlocal counter_cb_counter
counter_cb_counter += 1
yield Observation(2)

meter_provider.get_meter(__name__).create_observable_counter(
Expand All @@ -97,6 +110,7 @@ def test_many_threads():

self.run_with_many_threads(test_many_threads, num_threads=100)

self.assertEqual(counter_cb_counter, 100)
# no thread should be in export() now
self.assertEqual(exporter.count_in_export, 0)
# should be one call for each thread
Expand Down

0 comments on commit 3a651c7

Please sign in to comment.