Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ OpenTelemetry Prometheus Remote Write Exporter
This package contains an exporter to send metrics from the OpenTelemetry Python SDK directly to a Prometheus Remote Write integrated backend
(such as Cortex or Thanos) without having to run an instance of the Prometheus server.

Key features
------------

* Optional bounded retries with exponential backoff and jitter for retryable HTTP/network failures.


Installation
------------
Expand All @@ -21,6 +26,12 @@ Installation
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. _Prometheus Remote Write integrated backend: https://prometheus.io/docs/operating/integrations/

Configuration highlights
------------------------

* ``max_retries`` (default ``3``), ``retry_backoff_factor`` (default ``0.5``), ``retry_backoff_max`` (default ``5.0``), and ``retry_jitter_ratio`` (default ``0.1``) tune the retry policy for retryable statuses (429/408/5xx) and connection/timeouts. The retry adapter is built when the exporter is instantiated; update these values at construction time.
* Total request time can grow to roughly ``(max_retries + 1) * timeout`` plus backoff; server ``Retry-After`` hints are ignored (``respect_retry_after_header=False``).


References
----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
# limitations under the License.

import logging
import random
import re
from collections import defaultdict
from itertools import chain
from typing import Dict, Mapping, Sequence
from typing import Dict, Mapping, Optional, Sequence

import requests
import snappy
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( # pylint: disable=no-name-in-module
WriteRequest,
Expand Down Expand Up @@ -55,6 +58,25 @@
UNDERSCORE_REGEX = re.compile(r"_+")


class _JitterRetry(Retry):
def __init__(
self,
*args,
backoff_max: float = 5.0,
jitter_ratio: float = 0.1,
**kwargs,
):
super().__init__(*args, **kwargs)
self.backoff_max = backoff_max
self.jitter_ratio = jitter_ratio

def get_backoff_time(self) -> float:
backoff = super().get_backoff_time()
if self.jitter_ratio:
backoff += backoff * self.jitter_ratio * random.uniform(-1, 1)
return min(max(backoff, 0.0), self.backoff_max)


class PrometheusRemoteWriteMetricsExporter(MetricExporter):
"""
Prometheus remote write metric exporter for OpenTelemetry.
Expand All @@ -79,6 +101,11 @@ def __init__(
resources_as_labels: bool = True,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict = None,
max_retries: int = 3,
retry_backoff_factor: float = 0.5,
retry_backoff_max: float = 5.0,
retry_jitter_ratio: float = 0.1,
retry_status_codes: Optional[Sequence[int]] = None,
):
self.endpoint = endpoint
self.basic_auth = basic_auth
Expand All @@ -87,6 +114,11 @@ def __init__(
self.tls_config = tls_config
self.proxies = proxies
self.resources_as_labels = resources_as_labels
self.max_retries = max_retries
self.retry_backoff_factor = retry_backoff_factor
self.retry_backoff_max = retry_backoff_max
self.retry_jitter_ratio = retry_jitter_ratio
self.retry_status_codes = retry_status_codes

if not preferred_temporality:
preferred_temporality = {
Expand All @@ -98,6 +130,7 @@ def __init__(
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

self._session = self._create_session()
super().__init__(preferred_temporality, preferred_aggregation)

@property
Expand Down Expand Up @@ -181,6 +214,56 @@ def headers(self):
def headers(self, headers: Dict):
self._headers = headers

@property
def max_retries(self) -> int:
return self._max_retries

@max_retries.setter
def max_retries(self, max_retries: int):
if max_retries < 0:
raise ValueError("max_retries must be greater than or equal to 0")
self._max_retries = max_retries

@property
def retry_backoff_factor(self) -> float:
return self._retry_backoff_factor

@retry_backoff_factor.setter
def retry_backoff_factor(self, retry_backoff_factor: float):
if retry_backoff_factor <= 0:
raise ValueError("retry_backoff_factor must be greater than 0")
self._retry_backoff_factor = retry_backoff_factor

@property
def retry_backoff_max(self) -> float:
return self._retry_backoff_max

@retry_backoff_max.setter
def retry_backoff_max(self, retry_backoff_max: float):
if retry_backoff_max <= 0:
raise ValueError("retry_backoff_max must be greater than 0")
self._retry_backoff_max = retry_backoff_max

@property
def retry_jitter_ratio(self) -> float:
return self._retry_jitter_ratio

@retry_jitter_ratio.setter
def retry_jitter_ratio(self, retry_jitter_ratio: float):
if retry_jitter_ratio < 0:
raise ValueError("retry_jitter_ratio must be greater than or equal to 0")
self._retry_jitter_ratio = retry_jitter_ratio

@property
def retry_status_codes(self) -> Sequence[int]:
return self._retry_status_codes

@retry_status_codes.setter
def retry_status_codes(self, retry_status_codes: Optional[Sequence[int]]):
if retry_status_codes is None:
retry_status_codes = chain([429, 408], range(500, 600))
self._retry_status_codes = tuple(retry_status_codes)

def export(
self,
metrics_data: MetricsData,
Expand Down Expand Up @@ -253,7 +336,9 @@ def _parse_metric(
return self._convert_to_timeseries(sample_sets, resource_labels)

def _convert_to_timeseries(
self, sample_sets: Mapping[tuple, Sequence], resource_labels: Sequence
self,
sample_sets: Mapping[tuple, Sequence],
resource_labels: Sequence,
) -> Sequence[TimeSeries]:
timeseries = []
for labels, samples in sample_sets.items():
Expand Down Expand Up @@ -339,6 +424,7 @@ def handle_bucket(value, bound=None, name_override=None):
sample_attr_pairs.append(
handle_bucket(data_point.count, name_override=f"{name}_count")
)

return sample_attr_pairs

def _parse_data_point(self, data_point, name=None):
Expand Down Expand Up @@ -366,6 +452,27 @@ def _build_headers(self) -> Dict:
headers[header_name] = header_value
return headers

def _create_session(self) -> requests.Session:
session = requests.Session()
retry = _JitterRetry(
total=self.max_retries,
connect=self.max_retries,
read=self.max_retries,
status=self.max_retries,
allowed_methods=frozenset(["POST"]),
status_forcelist=self.retry_status_codes,
backoff_factor=self.retry_backoff_factor,
raise_on_status=False,
raise_on_redirect=False,
respect_retry_after_header=False,
backoff_max=self.retry_backoff_max,
jitter_ratio=self.retry_jitter_ratio,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session

def _send_message(
self, message: bytes, headers: Dict
) -> MetricExportResult:
Expand All @@ -390,7 +497,7 @@ def _send_message(
self.tls_config["key_file"],
)
try:
response = requests.post(
response = self._session.post(
self.endpoint,
data=message,
headers=headers,
Expand All @@ -400,8 +507,9 @@ def _send_message(
cert=cert,
verify=verify,
)
if not response.ok:
response.raise_for_status()
if response.ok:
return MetricExportResult.SUCCESS
response.raise_for_status()
except requests.exceptions.RequestException as err:
logger.error("Export POST request failed with reason: %s", err)
return MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import unittest
from unittest.mock import patch

import requests

import pytest

# pylint: disable=no-name-in-module
Expand Down Expand Up @@ -283,9 +285,10 @@ def test_invalid_tls_config_key_only_param(self):


# Ensures export is successful with valid export_records and config
@patch("requests.post")
def test_valid_export(mock_post, prom_rw, metric):
mock_post.return_value.configure_mock(**{"status_code": 200})
def test_valid_export(prom_rw, metric):
mock_post = unittest.mock.Mock()
mock_post.return_value.configure_mock(ok=True, status_code=200)
prom_rw._session.post = mock_post

# Assumed a "None" for Scope or Resource aren't valid, so build them here
scope = ScopeMetrics(
Expand Down Expand Up @@ -314,12 +317,16 @@ def test_invalid_export(prom_rw):
@patch("requests.post")
def test_valid_send_message(mock_post, prom_rw):
mock_post.return_value.configure_mock(**{"ok": True})
prom_rw._session.post = mock_post # use the mocked session post
result = prom_rw._send_message(bytes(), {})
assert mock_post.call_count == 1
assert result == MetricExportResult.SUCCESS


def test_invalid_send_message(prom_rw):
prom_rw._session.post = unittest.mock.Mock(
side_effect=requests.exceptions.RequestException("boom")
)
result = prom_rw._send_message(bytes(), {})
assert result == MetricExportResult.FAILURE

Expand All @@ -341,3 +348,25 @@ def test_build_headers(prom_rw):
assert headers["Content-Type"] == "application/x-protobuf"
assert headers["X-Prometheus-Remote-Write-Version"] == "0.1.0"
assert headers["Custom Header"] == "test_header"


def test_session_retry_configuration(prom_rw):
adapter = prom_rw._session.adapters["https://"]
retry = adapter.max_retries
assert retry.total == prom_rw.max_retries
assert "POST" in retry.allowed_methods
assert 500 in retry.status_forcelist


def test_non_retryable_status(prom_rw):
response = unittest.mock.Mock()
response.ok = False
response.status_code = 400
response.raise_for_status.side_effect = requests.exceptions.HTTPError(
response=response
)
prom_rw._session.post = unittest.mock.Mock(return_value=response)

result = prom_rw._send_message(bytes(), {})
assert result == MetricExportResult.FAILURE
assert prom_rw._session.post.call_count == 1