Skip to content

Commit

Permalink
PeriodicExportingMetricReader will continue collection times out (#3098
Browse files Browse the repository at this point in the history
…) (#3100)

In cases where collection times out, the period exporting reader thread should
not terminate, but instead catch, log, and continue on after the regular interval
seconds.

Prior to this commit, a metric collection timeout would terminate the thread and
stop reporting metrics to the wrapped exporter resulting in the appearance in
observability tooling of metrics just stopping without reason.
  • Loading branch information
markallanson committed Feb 23, 2023
1 parent 54a1003 commit 559e06d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
- PeriodicExportingMetricReader will continue if collection times out
([#3100](https://github.com/open-telemetry/opentelemetry-python/pull/3100))


## Version 1.16.0/0.37b0 (2023-02-17)
- Change ``__all__`` to be statically defined.
Expand All @@ -28,10 +31,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3156](https://github.com/open-telemetry/opentelemetry-python/pull/3156))
- deprecate jaeger exporters
([#3158](https://github.com/open-telemetry/opentelemetry-python/pull/3158))

- Create a single resource instance
([#3118](https://github.com/open-telemetry/opentelemetry-python/pull/3118))


## Version 1.15.0/0.36b0 (2022-12-09)

- PeriodicExportingMetricsReader with +Inf interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
AggregationTemporality,
DefaultAggregation,
)
from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
from opentelemetry.sdk.metrics._internal.instrument import (
Counter,
Histogram,
Expand Down Expand Up @@ -497,7 +498,14 @@ def _at_fork_reinit(self):
def _ticker(self) -> None:
interval_secs = self._export_interval_millis / 1e3
while not self._shutdown_event.wait(interval_secs):
self.collect(timeout_millis=self._export_timeout_millis)
try:
self.collect(timeout_millis=self._export_timeout_millis)
except MetricsTimeoutError:
_logger.warning(
"Metric collection timed out. Will try again after %s seconds",
interval_secs,
exc_info=True,
)
# one last collection below before shutting down completely
self.collect(timeout_millis=self._export_interval_millis)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import math
from time import sleep, time_ns
from typing import Sequence
from typing import Optional, Sequence
from unittest.mock import Mock

from flaky import flaky

from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics import Counter, MetricsTimeoutError
from opentelemetry.sdk.metrics._internal import _Counter
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Expand Down Expand Up @@ -67,6 +67,25 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
return True


class ExceptionAtCollectionPeriodicExportingMetricReader(
PeriodicExportingMetricReader
):
def __init__(
self,
exporter: MetricExporter,
exception: Exception,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
) -> None:
super().__init__(
exporter, export_interval_millis, export_timeout_millis
)
self._collect_exception = exception

def collect(self, timeout_millis: float = 10_000) -> None:
raise self._collect_exception


metrics_list = [
Metric(
name="sum_name",
Expand Down Expand Up @@ -111,11 +130,13 @@ def test_defaults(self):
pmr.shutdown()

def _create_periodic_reader(
self, metrics, exporter, collect_wait=0, interval=60000
self, metrics, exporter, collect_wait=0, interval=60000, timeout=30000
):

pmr = PeriodicExportingMetricReader(
exporter, export_interval_millis=interval
exporter,
export_interval_millis=interval,
export_timeout_millis=timeout,
)

def _collect(reader, timeout_millis):
Expand Down Expand Up @@ -219,3 +240,15 @@ def test_exporter_aggregation_preference(self):
self.assertTrue(isinstance(value, DefaultAggregation))
else:
self.assertTrue(isinstance(value, LastValueAggregation))

def test_metric_timeout_does_not_kill_worker_thread(self):
exporter = FakeMetricsExporter()
pmr = ExceptionAtCollectionPeriodicExportingMetricReader(
exporter,
MetricsTimeoutError("test timeout"),
export_timeout_millis=1,
)

sleep(0.1)
self.assertTrue(pmr._daemon_thread.is_alive())
pmr.shutdown()

0 comments on commit 559e06d

Please sign in to comment.