diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md index 885f3f6096..2da493673b 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md @@ -9,3 +9,5 @@ ((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206]) - Add conversion to TimeSeries methods ((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207]) +- Add request methods + ((#212)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/212]) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index d039565953..f807d6c6dc 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -12,9 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import re from typing import Dict, Sequence +import requests +import snappy + from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( WriteRequest, ) @@ -36,6 +40,8 @@ ValueObserverAggregator, ) +logger = logging.getLogger(__name__) + class PrometheusRemoteWriteMetricsExporter(MetricsExporter): """ @@ -108,7 +114,7 @@ def timeout(self, timeout: int): @property def tls_config(self): - return self.tls_config + return self._tls_config @tls_config.setter def tls_config(self, tls_config: Dict): @@ -148,10 +154,13 @@ def headers(self, headers: Dict): def export( self, export_records: Sequence[ExportRecord] ) -> MetricsExportResult: - raise NotImplementedError() + timeseries = self.convert_to_timeseries(export_records) + message = self.build_message(timeseries) + headers = self.get_headers() + return self.send_message(message, headers) def shutdown(self) -> None: - raise NotImplementedError() + pass def convert_to_timeseries( self, export_records: Sequence[ExportRecord] @@ -280,12 +289,66 @@ def create_label(self, name: str, value: str) -> Label: return label def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: - raise NotImplementedError() + write_request = WriteRequest() + write_request.timeseries.extend(timeseries) + serialized_message = write_request.SerializeToString() + return snappy.compress(serialized_message) def get_headers(self) -> Dict: - raise NotImplementedError() + headers = { + "Content-Encoding": "snappy", + "Content-Type": "application/x-protobuf", + "X-Prometheus-Remote-Write-Version": "0.1.0", + } + if self.headers: + for header_name, header_value in self.headers.items(): + headers[header_name] = header_value + return headers def send_message( self, message: bytes, headers: Dict ) -> MetricsExportResult: - raise NotImplementedError() + auth = None + if self.basic_auth: + basic_auth = self.basic_auth + if "password" in basic_auth: + auth = (basic_auth.username, basic_auth.password) + else: + with open(basic_auth.password_file) as file: + auth = (basic_auth.username, file.readline()) + + cert = None + verify = True + if self.tls_config: + if "ca_file" in self.tls_config: + verify = self.tls_config["ca_file"] + elif "insecure_skip_verify" in self.tls_config: + verify = self.tls_config["insecure_skip_verify"] + + if ( + "cert_file" in self.tls_config + and "key_file" in self.tls_config + ): + cert = ( + self.tls_config["cert_file"], + self.tls_config["key_file"], + ) + response = requests.post( + self.endpoint, + data=message, + headers=headers, + auth=auth, + timeout=self.timeout, + proxies=self.proxies, + cert=cert, + verify=verify, + ) + if response.status_code != 200: + logger.warning( + "POST request failed with status %s with reason: %s and content: %s", + str(response.status_code), + response.reason, + str(response.content), + ) + return MetricsExportResult.FAILURE + return MetricsExportResult.SUCCESS diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 5edf322c5c..d81ff3df66 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -309,25 +309,65 @@ def test_create_timeseries(self): self.assertEqual(timeseries, expected_timeseries) +class ResponseStub: + def __init__(self, status_code): + self.status_code = status_code + self.reason = "dummy_reason" + self.content = "dummy_content" + + class TestExport(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - pass + self._exporter = PrometheusRemoteWriteMetricsExporter( + endpoint="/prom/test_endpoint" + ) # Ensures export is successful with valid export_records and config - def test_export(self): - pass - - def test_valid_send_message(self): - pass - - def test_invalid_send_message(self): - pass + @mock.patch("requests.post", return_value=ResponseStub(200)) + def test_export(self, mock_post): + test_metric = Counter("testname", "testdesc", "testunit", int, None) + labels = {"environment": "testing"} + record = ExportRecord( + test_metric, labels, SumAggregator(), Resource({}), + ) + result = self._exporter.export([record]) + self.assertIs(result, MetricsExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 1) + + @mock.patch("requests.post", return_value=ResponseStub(200)) + def test_valid_send_message(self, mock_post): + result = self._exporter.send_message(bytes(), {}) + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(result, MetricsExportResult.SUCCESS) + + @mock.patch("requests.post", return_value=ResponseStub(404)) + def test_invalid_send_message(self, mock_post): + result = self._exporter.send_message(bytes(), {}) + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(result, MetricsExportResult.FAILURE) # Verifies that build_message calls snappy.compress and returns SerializedString - def test_build_message(self): - pass + @mock.patch("snappy.compress", return_value=bytes()) + def test_build_message(self, mock_compress): + test_timeseries = [ + TimeSeries(), + TimeSeries(), + ] + message = self._exporter.build_message(test_timeseries) + self.assertEqual(mock_compress.call_count, 1) + self.assertIsInstance(message, bytes) # Ensure correct headers are added when valid config is provided def test_get_headers(self): - pass + self._exporter.headers = {"Custom Header": "test_header"} + + headers = self._exporter.get_headers() + self.assertEqual(headers.get("Content-Encoding", ""), "snappy") + self.assertEqual( + headers.get("Content-Type", ""), "application/x-protobuf" + ) + self.assertEqual( + headers.get("X-Prometheus-Remote-Write-Version", ""), "0.1.0" + ) + self.assertEqual(headers.get("Custom Header", ""), "test_header")