diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst b/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst index 6ed5c5ebcf..a654038467 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst @@ -9,6 +9,11 @@ OpenTelemetry Prometheus Remote Write Exporter This package contains an exporter to send metrics from the OpenTelemetry Python SDK directly to a Prometheus Remote Write integrated backend (such as Cortex or Thanos) without having to run an instance of the Prometheus server. +Key features +------------ + +* Optional bounded retries with exponential backoff and jitter for retryable HTTP/network failures. + Installation ------------ @@ -21,6 +26,12 @@ Installation .. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ .. _Prometheus Remote Write integrated backend: https://prometheus.io/docs/operating/integrations/ +Configuration highlights +------------------------ + +* ``max_retries`` (default ``3``), ``retry_backoff_factor`` (default ``0.5``), ``retry_backoff_max`` (default ``5.0``), and ``retry_jitter_ratio`` (default ``0.1``) tune the retry policy for retryable statuses (429/408/5xx) and connection/timeouts. The retry adapter is built when the exporter is instantiated; update these values at construction time. +* Total request time can grow to roughly ``(max_retries + 1) * timeout`` plus backoff; server ``Retry-After`` hints are ignored (``respect_retry_after_header=False``). + References ---------- diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index cfb1d9ea75..bf5e22ff9d 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -13,13 +13,16 @@ # limitations under the License. import logging +import random import re from collections import defaultdict from itertools import chain -from typing import Dict, Mapping, Sequence +from typing import Dict, Mapping, Optional, Sequence import requests import snappy +from requests.adapters import HTTPAdapter +from urllib3.util import Retry from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( # pylint: disable=no-name-in-module WriteRequest, @@ -55,6 +58,25 @@ UNDERSCORE_REGEX = re.compile(r"_+") +class _JitterRetry(Retry): + def __init__( + self, + *args, + backoff_max: float = 5.0, + jitter_ratio: float = 0.1, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.backoff_max = backoff_max + self.jitter_ratio = jitter_ratio + + def get_backoff_time(self) -> float: + backoff = super().get_backoff_time() + if self.jitter_ratio: + backoff += backoff * self.jitter_ratio * random.uniform(-1, 1) + return min(max(backoff, 0.0), self.backoff_max) + + class PrometheusRemoteWriteMetricsExporter(MetricExporter): """ Prometheus remote write metric exporter for OpenTelemetry. @@ -79,6 +101,11 @@ def __init__( resources_as_labels: bool = True, preferred_temporality: Dict[type, AggregationTemporality] = None, preferred_aggregation: Dict = None, + max_retries: int = 3, + retry_backoff_factor: float = 0.5, + retry_backoff_max: float = 5.0, + retry_jitter_ratio: float = 0.1, + retry_status_codes: Optional[Sequence[int]] = None, ): self.endpoint = endpoint self.basic_auth = basic_auth @@ -87,6 +114,11 @@ def __init__( self.tls_config = tls_config self.proxies = proxies self.resources_as_labels = resources_as_labels + self.max_retries = max_retries + self.retry_backoff_factor = retry_backoff_factor + self.retry_backoff_max = retry_backoff_max + self.retry_jitter_ratio = retry_jitter_ratio + self.retry_status_codes = retry_status_codes if not preferred_temporality: preferred_temporality = { @@ -98,6 +130,7 @@ def __init__( ObservableGauge: AggregationTemporality.CUMULATIVE, } + self._session = self._create_session() super().__init__(preferred_temporality, preferred_aggregation) @property @@ -181,6 +214,56 @@ def headers(self): def headers(self, headers: Dict): self._headers = headers + @property + def max_retries(self) -> int: + return self._max_retries + + @max_retries.setter + def max_retries(self, max_retries: int): + if max_retries < 0: + raise ValueError("max_retries must be greater than or equal to 0") + self._max_retries = max_retries + + @property + def retry_backoff_factor(self) -> float: + return self._retry_backoff_factor + + @retry_backoff_factor.setter + def retry_backoff_factor(self, retry_backoff_factor: float): + if retry_backoff_factor <= 0: + raise ValueError("retry_backoff_factor must be greater than 0") + self._retry_backoff_factor = retry_backoff_factor + + @property + def retry_backoff_max(self) -> float: + return self._retry_backoff_max + + @retry_backoff_max.setter + def retry_backoff_max(self, retry_backoff_max: float): + if retry_backoff_max <= 0: + raise ValueError("retry_backoff_max must be greater than 0") + self._retry_backoff_max = retry_backoff_max + + @property + def retry_jitter_ratio(self) -> float: + return self._retry_jitter_ratio + + @retry_jitter_ratio.setter + def retry_jitter_ratio(self, retry_jitter_ratio: float): + if retry_jitter_ratio < 0: + raise ValueError("retry_jitter_ratio must be greater than or equal to 0") + self._retry_jitter_ratio = retry_jitter_ratio + + @property + def retry_status_codes(self) -> Sequence[int]: + return self._retry_status_codes + + @retry_status_codes.setter + def retry_status_codes(self, retry_status_codes: Optional[Sequence[int]]): + if retry_status_codes is None: + retry_status_codes = chain([429, 408], range(500, 600)) + self._retry_status_codes = tuple(retry_status_codes) + def export( self, metrics_data: MetricsData, @@ -253,7 +336,9 @@ def _parse_metric( return self._convert_to_timeseries(sample_sets, resource_labels) def _convert_to_timeseries( - self, sample_sets: Mapping[tuple, Sequence], resource_labels: Sequence + self, + sample_sets: Mapping[tuple, Sequence], + resource_labels: Sequence, ) -> Sequence[TimeSeries]: timeseries = [] for labels, samples in sample_sets.items(): @@ -339,6 +424,7 @@ def handle_bucket(value, bound=None, name_override=None): sample_attr_pairs.append( handle_bucket(data_point.count, name_override=f"{name}_count") ) + return sample_attr_pairs def _parse_data_point(self, data_point, name=None): @@ -366,6 +452,27 @@ def _build_headers(self) -> Dict: headers[header_name] = header_value return headers + def _create_session(self) -> requests.Session: + session = requests.Session() + retry = _JitterRetry( + total=self.max_retries, + connect=self.max_retries, + read=self.max_retries, + status=self.max_retries, + allowed_methods=frozenset(["POST"]), + status_forcelist=self.retry_status_codes, + backoff_factor=self.retry_backoff_factor, + raise_on_status=False, + raise_on_redirect=False, + respect_retry_after_header=False, + backoff_max=self.retry_backoff_max, + jitter_ratio=self.retry_jitter_ratio, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + return session + def _send_message( self, message: bytes, headers: Dict ) -> MetricExportResult: @@ -390,7 +497,7 @@ def _send_message( self.tls_config["key_file"], ) try: - response = requests.post( + response = self._session.post( self.endpoint, data=message, headers=headers, @@ -400,8 +507,9 @@ def _send_message( cert=cert, verify=verify, ) - if not response.ok: - response.raise_for_status() + if response.ok: + return MetricExportResult.SUCCESS + response.raise_for_status() except requests.exceptions.RequestException as err: logger.error("Export POST request failed with reason: %s", err) return MetricExportResult.FAILURE diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 814de75be4..116cbbd409 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -15,6 +15,8 @@ import unittest from unittest.mock import patch +import requests + import pytest # pylint: disable=no-name-in-module @@ -283,9 +285,10 @@ def test_invalid_tls_config_key_only_param(self): # Ensures export is successful with valid export_records and config -@patch("requests.post") -def test_valid_export(mock_post, prom_rw, metric): - mock_post.return_value.configure_mock(**{"status_code": 200}) +def test_valid_export(prom_rw, metric): + mock_post = unittest.mock.Mock() + mock_post.return_value.configure_mock(ok=True, status_code=200) + prom_rw._session.post = mock_post # Assumed a "None" for Scope or Resource aren't valid, so build them here scope = ScopeMetrics( @@ -314,12 +317,16 @@ def test_invalid_export(prom_rw): @patch("requests.post") def test_valid_send_message(mock_post, prom_rw): mock_post.return_value.configure_mock(**{"ok": True}) + prom_rw._session.post = mock_post # use the mocked session post result = prom_rw._send_message(bytes(), {}) assert mock_post.call_count == 1 assert result == MetricExportResult.SUCCESS def test_invalid_send_message(prom_rw): + prom_rw._session.post = unittest.mock.Mock( + side_effect=requests.exceptions.RequestException("boom") + ) result = prom_rw._send_message(bytes(), {}) assert result == MetricExportResult.FAILURE @@ -341,3 +348,25 @@ def test_build_headers(prom_rw): assert headers["Content-Type"] == "application/x-protobuf" assert headers["X-Prometheus-Remote-Write-Version"] == "0.1.0" assert headers["Custom Header"] == "test_header" + + +def test_session_retry_configuration(prom_rw): + adapter = prom_rw._session.adapters["https://"] + retry = adapter.max_retries + assert retry.total == prom_rw.max_retries + assert "POST" in retry.allowed_methods + assert 500 in retry.status_forcelist + + +def test_non_retryable_status(prom_rw): + response = unittest.mock.Mock() + response.ok = False + response.status_code = 400 + response.raise_for_status.side_effect = requests.exceptions.HTTPError( + response=response + ) + prom_rw._session.post = unittest.mock.Mock(return_value=response) + + result = prom_rw._send_message(bytes(), {}) + assert result == MetricExportResult.FAILURE + assert prom_rw._session.post.call_count == 1