From abfe6098e3a851e3701bf8e5e0d4327c931c14f9 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Wed, 22 Oct 2025 13:28:59 -0300 Subject: [PATCH 1/7] feat: add Diode QueueClient --- README.md | 18 +++ netboxlabs/diode/sdk/__init__.py | 2 + netboxlabs/diode/sdk/client.py | 218 ++++++++++++++++++++++++++++- netboxlabs/diode/sdk/exceptions.py | 26 ++++ tests/test_client.py | 92 +++++++++++- 5 files changed, 353 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5226e1c..4630b67 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,24 @@ diode-replay-dryrun \ my_app_92722156890707.json ``` +### Queue client + +`QueueClient` serializes ingestion payloads to JSON and posts them to an orb-agent endpoint via HTTP(S). This is useful when the orb-agent (or another intermediary) exposes a lightweight transport instead of gRPC. + +```python +from netboxlabs.diode.sdk import Entity, QueueClient + +with QueueClient( + target="http://localhost:9000/queue", + app_name="my-producer", + app_version="0.0.1", + queue="devices", +) as client: + client.ingest([Entity(site="Site1")]) +``` + +The request body mirrors the gRPC ingest payload, augmented with SDK and producer metadata so orb-agent can enrich and forward the message. The client raises `QueueClientError` when the endpoint returns a non-2xx status. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables. + ## Supported entities (object types) * ASN diff --git a/netboxlabs/diode/sdk/__init__.py b/netboxlabs/diode/sdk/__init__.py index 89a22e9..8280a8f 100644 --- a/netboxlabs/diode/sdk/__init__.py +++ b/netboxlabs/diode/sdk/__init__.py @@ -5,9 +5,11 @@ from netboxlabs.diode.sdk.client import ( DiodeClient, DiodeDryRunClient, + QueueClient, load_dryrun_entities, ) assert DiodeClient assert DiodeDryRunClient +assert QueueClient assert load_dryrun_entities diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index 295514b..953fe90 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -19,10 +19,14 @@ import certifi import grpc import sentry_sdk -from google.protobuf.json_format import MessageToJson, ParseDict +from google.protobuf.json_format import MessageToDict, MessageToJson, ParseDict from netboxlabs.diode.sdk.diode.v1 import ingester_pb2, ingester_pb2_grpc -from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError +from netboxlabs.diode.sdk.exceptions import ( + DiodeClientError, + DiodeConfigError, + QueueClientError, +) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -406,6 +410,216 @@ def ingest( return ingester_pb2.IngestResponse() +class QueueClient(DiodeClientInterface): + """Client that forwards ingestion payloads to orb-agent via HTTP.""" + + _name = "diode-sdk-python-queue" + _version = version_semver() + + def __init__( + self, + target: str, + app_name: str, + app_version: str, + *, + queue: str | None = None, + timeout: float = 10.0, + headers: dict[str, str] | None = None, + cert_file: str | None = None, + ): + """Initiate a new queue client.""" + log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper() + logging.basicConfig(level=log_level) + + parsed_target = urlparse(target) + if parsed_target.scheme not in ["http", "https"]: + raise ValueError("QueueClient target should start with http:// or https://") + if not parsed_target.hostname: + raise ValueError("QueueClient target must include a hostname") + + self._raw_target = target + self._app_name = app_name + self._app_version = app_version + self._scheme = parsed_target.scheme + self._host = parsed_target.hostname + self._port = parsed_target.port or (443 if self._scheme == "https" else 80) + path = parsed_target.path or "" + if path and not path.startswith("/"): + path = f"/{path}" + if not path: + path = "/ingest" + if parsed_target.query: + path = f"{path}?{parsed_target.query}" + self._path = path + self._queue = queue + self._timeout = timeout + self._platform = platform.platform() + self._python_version = platform.python_version() + self._tls_verify = _should_verify_tls(parsed_target.scheme) + self._cert_file = _get_optional_config_value( + _DIODE_CERT_FILE_ENVVAR_NAME, cert_file + ) + self._certificates = ( + _load_certs(self._cert_file) + if self._cert_file and self._tls_verify + else None + ) + + default_headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": f"{self._name}/{self._version} {self._app_name}/{self._app_version}", + } + if headers: + default_headers.update(headers) + self._headers = default_headers + + @property + def name(self) -> str: + """Retrieve the client name.""" + return self._name + + @property + def version(self) -> str: + """Retrieve the client version.""" + return self._version + + @property + def app_name(self) -> str: + """Retrieve the producer application name.""" + return self._app_name + + @property + def app_version(self) -> str: + """Retrieve the producer application version.""" + return self._app_version + + @property + def queue(self) -> str | None: + """Retrieve the target queue name.""" + return self._queue + + @property + def timeout(self) -> float: + """Retrieve the HTTP timeout.""" + return self._timeout + + @property + def target(self) -> str: + """Retrieve the original target.""" + return self._raw_target + + def __enter__(self): + """Enter the runtime context.""" + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + """Exit the runtime context.""" + self.close() + + def close(self): + """Queue client maintains no persistent connections.""" + + def ingest( + self, + entities: Iterable[Entity | ingester_pb2.Entity | None], + stream: str | None = _DEFAULT_STREAM, + ) -> ingester_pb2.IngestResponse: + """Serialize entities and enqueue them via HTTP.""" + payload = self._serialize_payload(entities, stream or _DEFAULT_STREAM) + status_code, response_body = self._send( + json.dumps(payload, separators=(",", ":")) + ) + if status_code >= 400: + text_body = ( + response_body.decode("utf-8", errors="ignore") if response_body else None + ) + raise QueueClientError(status_code, "Queue request failed", text_body) + + ingest_response = ingester_pb2.IngestResponse() + if response_body: + decoded_body = response_body.decode("utf-8", errors="ignore") + try: + response_data = json.loads(decoded_body) + except json.JSONDecodeError: + return ingest_response + try: + ParseDict(response_data, ingest_response) + except ValueError: + _LOGGER.debug( + "Unable to parse queue response body into IngestResponse" + ) + return ingest_response + + def _serialize_payload( + self, + entities: Iterable[Entity | ingester_pb2.Entity | None], + stream: str, + ) -> dict: + """Serialize the ingestion payload for orb-agent.""" + serialized_entities: list[dict] = [] + for entity in entities: + if entity is None: + continue + if not isinstance(entity, ingester_pb2.Entity): + raise TypeError("QueueClient expects ingester_pb2.Entity instances") + serialized_entities.append( + MessageToDict(entity, preserving_proto_field_name=True) + ) + + payload: dict[str, object] = { + "id": str(uuid.uuid4()), + "stream": stream, + "sdk": {"name": self._name, "version": self._version}, + "producer": { + "app_name": self._app_name, + "app_version": self._app_version, + }, + "metadata": { + "platform": self._platform, + "python_version": self._python_version, + }, + "entities": serialized_entities, + } + if self._queue: + payload["queue"] = self._queue + return payload + + def _send(self, payload: str) -> tuple[int, bytes]: + """Send the serialized payload to orb-agent.""" + if self._scheme == "https": + context = self._build_ssl_context() + connection = http.client.HTTPSConnection( + self._host, + self._port, + timeout=self._timeout, + context=context, + ) + else: + connection = http.client.HTTPConnection( + self._host, + self._port, + timeout=self._timeout, + ) + + try: + connection.request("POST", self._path, payload, headers=self._headers) + response = connection.getresponse() + body = response.read() + return response.status, body + finally: + connection.close() + + def _build_ssl_context(self) -> ssl.SSLContext: + """Return the SSL context honouring TLS verification settings.""" + if not self._tls_verify: + return ssl._create_unverified_context() + context = ssl.create_default_context() + if self._certificates: + context.load_verify_locations(cadata=self._certificates.decode("utf-8")) + return context + + class _DiodeAuthentication: def __init__( self, diff --git a/netboxlabs/diode/sdk/exceptions.py b/netboxlabs/diode/sdk/exceptions.py index a831f1c..d14781b 100644 --- a/netboxlabs/diode/sdk/exceptions.py +++ b/netboxlabs/diode/sdk/exceptions.py @@ -45,3 +45,29 @@ def details(self): def __repr__(self): """Return string representation.""" return f"" + + +class QueueClientError(BaseError): + """Raised when the queue client fails to enqueue a payload.""" + + def __init__( + self, + status_code: int, + message: str, + response_body: str | None = None, + ): + self.status_code = status_code + self.message = message + self.response_body = response_body + detail = message + if response_body: + detail = f"{message}: {response_body}" + super().__init__(f"{status_code} {detail}") + + def __repr__(self): + """Return string representation.""" + body = f", response_body={self.response_body!r}" if self.response_body else "" + return ( + f"" + ) diff --git a/tests/test_client.py b/tests/test_client.py index 4652040..24e29b5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -15,6 +15,7 @@ _DIODE_SENTRY_DSN_ENVVAR_NAME, DiodeClient, DiodeDryRunClient, + QueueClient, DiodeMethodClientInterceptor, _ClientCallDetails, _DiodeAuthentication, @@ -24,7 +25,11 @@ parse_target, ) from netboxlabs.diode.sdk.diode.v1 import ingester_pb2 -from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError +from netboxlabs.diode.sdk.exceptions import ( + DiodeClientError, + DiodeConfigError, + QueueClientError, +) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -741,6 +746,91 @@ def test_load_dryrun_entities_from_fixture(message_path, tmp_path): assert entities[-1].wireless_link.ssid == "P2P-Link-1" +def test_queue_client_posts_serialized_entities(): + """Ensure QueueClient serializes entities and posts them to orb-agent.""" + with patch("http.client.HTTPConnection") as mock_http_conn: + mock_conn_instance = mock_http_conn.return_value + mock_response = mock.Mock() + mock_response.status = 202 + mock_response.read.return_value = b'{"errors": []}' + mock_conn_instance.getresponse.return_value = mock_response + + client = QueueClient( + target="http://orb-agent:8080/queue", + app_name="orb-producer", + app_version="1.2.3", + queue="orb", + timeout=2.0, + ) + + response = client.ingest( + entities=[Entity(site="Site1"), Entity(device="Device1")] + ) + + args, kwargs = mock_conn_instance.request.call_args + assert args[0] == "POST" + assert args[1] == "/queue" + payload = json.loads(args[2]) + assert payload["queue"] == "orb" + assert payload["stream"] == "latest" + assert len(payload["entities"]) == 2 + assert payload["entities"][0]["site"]["name"] == "Site1" + assert kwargs["headers"]["Content-Type"] == "application/json" + assert len(response.errors) == 0 + + +def test_queue_client_raises_on_http_error(): + """Ensure QueueClient raises a QueueClientError on HTTP failure.""" + with patch("http.client.HTTPConnection") as mock_http_conn: + mock_conn_instance = mock_http_conn.return_value + mock_response = mock.Mock() + mock_response.status = 500 + mock_response.read.return_value = b'{"detail": "failed"}' + mock_conn_instance.getresponse.return_value = mock_response + + client = QueueClient( + target="http://orb-agent:8080/queue", + app_name="orb-producer", + app_version="1.2.3", + ) + + with pytest.raises(QueueClientError) as excinfo: + client.ingest(entities=[Entity(site="Site1")]) + + assert excinfo.value.status_code == 500 + assert "Queue request failed" in str(excinfo.value) + + +def test_queue_client_https_uses_ssl_context(): + """Ensure QueueClient configures SSL context for HTTPS targets.""" + with ( + patch("http.client.HTTPSConnection") as mock_https_conn, + patch("ssl.create_default_context") as mock_default_context, + ): + context_instance = mock.Mock() + mock_default_context.return_value = context_instance + + mock_conn_instance = mock_https_conn.return_value + mock_response = mock.Mock() + mock_response.status = 200 + mock_response.read.return_value = b"" + mock_conn_instance.getresponse.return_value = mock_response + + client = QueueClient( + target="https://orb-agent.local/queue", + app_name="orb-producer", + app_version="1.2.3", + ) + client.ingest(entities=[Entity(site="Site1")]) + + mock_default_context.assert_called_once() + args, kwargs = mock_https_conn.call_args + assert args[0] == "orb-agent.local" + assert kwargs["context"] is context_instance + request_args, _ = mock_conn_instance.request.call_args + assert request_args[1] == "/queue" + + def test_diode_authentication_with_custom_certificates(): """Test _DiodeAuthentication with custom certificates - covers SSL context creation.""" # Create test certificate content From cb5fd97fcfa8689f1258eab62cf8abc3aab7e35a Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Wed, 22 Oct 2025 13:34:33 -0300 Subject: [PATCH 2/7] ruff check fixes --- netboxlabs/diode/sdk/exceptions.py | 1 + netboxlabs/diode/sdk/ingester.py | 2 ++ tests/test_client.py | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/netboxlabs/diode/sdk/exceptions.py b/netboxlabs/diode/sdk/exceptions.py index d14781b..f7f8222 100644 --- a/netboxlabs/diode/sdk/exceptions.py +++ b/netboxlabs/diode/sdk/exceptions.py @@ -56,6 +56,7 @@ def __init__( message: str, response_body: str | None = None, ): + """Initialize QueueClientError.""" self.status_code = status_code self.message = message self.response_body = response_body diff --git a/netboxlabs/diode/sdk/ingester.py b/netboxlabs/diode/sdk/ingester.py index 09ff3d7..a6172d4 100644 --- a/netboxlabs/diode/sdk/ingester.py +++ b/netboxlabs/diode/sdk/ingester.py @@ -11,7 +11,9 @@ import datetime import re from typing import Any + from google.protobuf import timestamp_pb2 as _timestamp_pb2 + import netboxlabs.diode.sdk.diode.v1.ingester_pb2 as pb PRIMARY_VALUE_MAP = { diff --git a/tests/test_client.py b/tests/test_client.py index 24e29b5..de31c0f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -15,8 +15,8 @@ _DIODE_SENTRY_DSN_ENVVAR_NAME, DiodeClient, DiodeDryRunClient, - QueueClient, DiodeMethodClientInterceptor, + QueueClient, _ClientCallDetails, _DiodeAuthentication, _get_sentry_dsn, From 1ebb12a279d06332643e1f671dfc5ae7b73a8304 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Thu, 23 Oct 2025 10:51:42 -0300 Subject: [PATCH 3/7] refactor QueueClient to be OTLPClient --- README.md | 13 +- netboxlabs/diode/sdk/__init__.py | 4 +- netboxlabs/diode/sdk/client.py | 282 ++++++++++++++++------------- netboxlabs/diode/sdk/exceptions.py | 52 ++++-- netboxlabs/diode/sdk/ingester.py | 2 - pyproject.toml | 1 + tests/test_client.py | 138 +++++++------- 7 files changed, 269 insertions(+), 223 deletions(-) diff --git a/README.md b/README.md index 4630b67..f2f8d04 100644 --- a/README.md +++ b/README.md @@ -149,23 +149,22 @@ diode-replay-dryrun \ my_app_92722156890707.json ``` -### Queue client +### OTLP client -`QueueClient` serializes ingestion payloads to JSON and posts them to an orb-agent endpoint via HTTP(S). This is useful when the orb-agent (or another intermediary) exposes a lightweight transport instead of gRPC. +`OtlpClient` converts ingestion entities into OpenTelemetry log records and exports them to an OTLP endpoint (gRPC). This is useful when a collector ingests log data and forwards it to Diode. ```python -from netboxlabs.diode.sdk import Entity, QueueClient +from netboxlabs.diode.sdk import Entity, OtlpClient -with QueueClient( - target="http://localhost:9000/queue", +with OtlpClient( + target="grpc://localhost:4317", app_name="my-producer", app_version="0.0.1", - queue="devices", ) as client: client.ingest([Entity(site="Site1")]) ``` -The request body mirrors the gRPC ingest payload, augmented with SDK and producer metadata so orb-agent can enrich and forward the message. The client raises `QueueClientError` when the endpoint returns a non-2xx status. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables. +Each entity is serialised to JSON and sent as a log record with producer metadata so downstream collectors can enrich and forward the payload. The client raises `OtlpClientError` when the export fails. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables. ## Supported entities (object types) diff --git a/netboxlabs/diode/sdk/__init__.py b/netboxlabs/diode/sdk/__init__.py index 8280a8f..004022b 100644 --- a/netboxlabs/diode/sdk/__init__.py +++ b/netboxlabs/diode/sdk/__init__.py @@ -5,11 +5,11 @@ from netboxlabs.diode.sdk.client import ( DiodeClient, DiodeDryRunClient, - QueueClient, + OtlpClient, load_dryrun_entities, ) assert DiodeClient assert DiodeDryRunClient -assert QueueClient +assert OtlpClient assert load_dryrun_entities diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index 953fe90..bbba1de 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -20,12 +20,15 @@ import grpc import sentry_sdk from google.protobuf.json_format import MessageToDict, MessageToJson, ParseDict +from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc +from opentelemetry.proto.common.v1 import common_pb2 +from opentelemetry.proto.logs.v1 import logs_pb2 from netboxlabs.diode.sdk.diode.v1 import ingester_pb2, ingester_pb2_grpc from netboxlabs.diode.sdk.exceptions import ( DiodeClientError, DiodeConfigError, - QueueClientError, + OtlpClientError, ) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -410,10 +413,10 @@ def ingest( return ingester_pb2.IngestResponse() -class QueueClient(DiodeClientInterface): - """Client that forwards ingestion payloads to orb-agent via HTTP.""" +class OtlpClient(DiodeClientInterface): + """Client that exports ingestion entities as OTLP logs.""" - _name = "diode-sdk-python-queue" + _name = "diode-sdk-python-otlp" _version = version_semver() def __init__( @@ -422,57 +425,73 @@ def __init__( app_name: str, app_version: str, *, - queue: str | None = None, timeout: float = 10.0, - headers: dict[str, str] | None = None, + metadata: dict[str, str] | Iterable[tuple[str, str]] | None = None, cert_file: str | None = None, ): - """Initiate a new queue client.""" + """Initiate a new OTLP client.""" log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper() logging.basicConfig(level=log_level) - parsed_target = urlparse(target) - if parsed_target.scheme not in ["http", "https"]: - raise ValueError("QueueClient target should start with http:// or https://") - if not parsed_target.hostname: - raise ValueError("QueueClient target must include a hostname") - - self._raw_target = target self._app_name = app_name self._app_version = app_version - self._scheme = parsed_target.scheme - self._host = parsed_target.hostname - self._port = parsed_target.port or (443 if self._scheme == "https" else 80) - path = parsed_target.path or "" - if path and not path.startswith("/"): - path = f"/{path}" - if not path: - path = "/ingest" - if parsed_target.query: - path = f"{path}?{parsed_target.query}" - self._path = path - self._queue = queue - self._timeout = timeout self._platform = platform.platform() self._python_version = platform.python_version() - self._tls_verify = _should_verify_tls(parsed_target.scheme) + self._timeout = timeout + + self._target, self._path, self._tls_verify = parse_target(target) self._cert_file = _get_optional_config_value( _DIODE_CERT_FILE_ENVVAR_NAME, cert_file ) self._certificates = ( _load_certs(self._cert_file) - if self._cert_file and self._tls_verify + if (self._tls_verify or self._cert_file) else None ) - default_headers = { - "Content-Type": "application/json", - "Accept": "application/json", - "User-Agent": f"{self._name}/{self._version} {self._app_name}/{self._app_version}", - } - if headers: - default_headers.update(headers) - self._headers = default_headers + channel_opts = ( + ( + "grpc.primary_user_agent", + f"{self._name}/{self._version} {self._app_name}/{self._app_version}", + ), + ) + + if self._tls_verify: + credentials = ( + grpc.ssl_channel_credentials(root_certificates=self._certificates) + if self._certificates + else grpc.ssl_channel_credentials() + ) + base_channel = grpc.secure_channel( + self._target, + credentials, + options=channel_opts, + ) + else: + base_channel = grpc.insecure_channel( + target=self._target, + options=channel_opts, + ) + + self._base_channel = base_channel + channel = base_channel + if self._path: + interceptor = DiodeMethodClientInterceptor(subpath=self._path) + channel = grpc.intercept_channel(base_channel, interceptor) + + self._channel = channel + self._stub = logs_service_pb2_grpc.LogsServiceStub(channel) + self._metadata = self._prepare_metadata(metadata) + + @staticmethod + def _prepare_metadata( + metadata: dict[str, str] | Iterable[tuple[str, str]] | None, + ) -> tuple[tuple[str, str], ...] | None: + if metadata is None: + return None + if isinstance(metadata, dict): + return tuple(metadata.items()) + return tuple(metadata) @property def name(self) -> str: @@ -494,20 +513,15 @@ def app_version(self) -> str: """Retrieve the producer application version.""" return self._app_version - @property - def queue(self) -> str | None: - """Retrieve the target queue name.""" - return self._queue - @property def timeout(self) -> float: - """Retrieve the HTTP timeout.""" + """Retrieve the export timeout.""" return self._timeout @property def target(self) -> str: - """Retrieve the original target.""" - return self._raw_target + """Retrieve the export target.""" + return self._target def __enter__(self): """Enter the runtime context.""" @@ -518,106 +532,114 @@ def __exit__(self, exc_type, exc_value, exc_traceback): self.close() def close(self): - """Queue client maintains no persistent connections.""" + """Close the underlying channel.""" + if getattr(self, "_base_channel", None): + self._base_channel.close() def ingest( self, entities: Iterable[Entity | ingester_pb2.Entity | None], stream: str | None = _DEFAULT_STREAM, ) -> ingester_pb2.IngestResponse: - """Serialize entities and enqueue them via HTTP.""" - payload = self._serialize_payload(entities, stream or _DEFAULT_STREAM) - status_code, response_body = self._send( - json.dumps(payload, separators=(",", ":")) - ) - if status_code >= 400: - text_body = ( - response_body.decode("utf-8", errors="ignore") if response_body else None + """Export entities as OTLP logs.""" + stream = stream or _DEFAULT_STREAM + log_records = [ + self._entity_to_log_record(entity, stream) + for entity in self._normalize_entities(entities) + ] + + if not log_records: + return ingester_pb2.IngestResponse() + + request = self._build_export_request(log_records) + + try: + self._stub.Export( + request, + timeout=self._timeout, + metadata=self._metadata, ) - raise QueueClientError(status_code, "Queue request failed", text_body) + except grpc.RpcError as err: + raise OtlpClientError(err) from err - ingest_response = ingester_pb2.IngestResponse() - if response_body: - decoded_body = response_body.decode("utf-8", errors="ignore") - try: - response_data = json.loads(decoded_body) - except json.JSONDecodeError: - return ingest_response - try: - ParseDict(response_data, ingest_response) - except ValueError: - _LOGGER.debug( - "Unable to parse queue response body into IngestResponse" - ) - return ingest_response + return ingester_pb2.IngestResponse() - def _serialize_payload( - self, - entities: Iterable[Entity | ingester_pb2.Entity | None], - stream: str, - ) -> dict: - """Serialize the ingestion payload for orb-agent.""" - serialized_entities: list[dict] = [] + def _normalize_entities( + self, entities: Iterable[Entity | ingester_pb2.Entity | None] + ) -> list[ingester_pb2.Entity]: + normalized: list[ingester_pb2.Entity] = [] for entity in entities: if entity is None: continue if not isinstance(entity, ingester_pb2.Entity): - raise TypeError("QueueClient expects ingester_pb2.Entity instances") - serialized_entities.append( - MessageToDict(entity, preserving_proto_field_name=True) - ) - - payload: dict[str, object] = { - "id": str(uuid.uuid4()), - "stream": stream, - "sdk": {"name": self._name, "version": self._version}, - "producer": { - "app_name": self._app_name, - "app_version": self._app_version, - }, - "metadata": { - "platform": self._platform, - "python_version": self._python_version, - }, - "entities": serialized_entities, - } - if self._queue: - payload["queue"] = self._queue - return payload - - def _send(self, payload: str) -> tuple[int, bytes]: - """Send the serialized payload to orb-agent.""" - if self._scheme == "https": - context = self._build_ssl_context() - connection = http.client.HTTPSConnection( - self._host, - self._port, - timeout=self._timeout, - context=context, - ) - else: - connection = http.client.HTTPConnection( - self._host, - self._port, - timeout=self._timeout, + raise TypeError("OtlpClient expects ingester_pb2.Entity instances") + normalized.append(entity) + return normalized + + def _build_export_request( + self, log_records: list[logs_pb2.LogRecord] + ) -> logs_service_pb2.ExportLogsServiceRequest: + resource_logs = logs_pb2.ResourceLogs() + resource_logs.resource.attributes.extend(self._resource_attributes()) + + scope_logs = resource_logs.scope_logs.add() + scope_logs.scope.CopyFrom( + common_pb2.InstrumentationScope( + name=self._name, + version=self._version, ) + ) + scope_logs.log_records.extend(log_records) + + request = logs_service_pb2.ExportLogsServiceRequest() + request.resource_logs.append(resource_logs) + return request + + def _resource_attributes(self) -> list[common_pb2.KeyValue]: + return [ + self._string_kv("service.name", self._app_name), + self._string_kv("service.version", self._app_version), + self._string_kv("telemetry.sdk.name", self._name), + self._string_kv("telemetry.sdk.language", "python"), + self._string_kv("telemetry.sdk.version", self._version), + self._string_kv("os.description", self._platform), + self._string_kv("process.runtime.version", self._python_version), + ] + + def _entity_to_log_record( + self, entity: ingester_pb2.Entity, stream: str + ) -> logs_pb2.LogRecord: + entity_dict = MessageToDict(entity, preserving_proto_field_name=True) + body_json = json.dumps(entity_dict, separators=(",", ":")) + now = time.time_ns() + entity_type = entity.WhichOneof("entity") or "unknown" + + log_record = logs_pb2.LogRecord( + time_unix_nano=now, + observed_time_unix_nano=now, + severity_number=logs_pb2.SeverityNumber.SEVERITY_NUMBER_INFO, + severity_text="INFO", + ) + log_record.body.CopyFrom( + common_pb2.AnyValue(string_value=body_json) + ) + log_record.trace_id = uuid.uuid4().bytes + log_record.span_id = uuid.uuid4().bytes[:8] + log_record.attributes.extend( + [ + self._string_kv("diode.entity_type", entity_type), + self._string_kv("diode.stream", stream), + self._string_kv("diode.sdk.name", self._name), + self._string_kv("diode.sdk.version", self._version), + self._string_kv("diode.producer.app_name", self._app_name), + self._string_kv("diode.producer.app_version", self._app_version), + ] + ) + return log_record - try: - connection.request("POST", self._path, payload, headers=self._headers) - response = connection.getresponse() - body = response.read() - return response.status, body - finally: - connection.close() - - def _build_ssl_context(self) -> ssl.SSLContext: - """Return the SSL context honouring TLS verification settings.""" - if not self._tls_verify: - return ssl._create_unverified_context() - context = ssl.create_default_context() - if self._certificates: - context.load_verify_locations(cadata=self._certificates.decode("utf-8")) - return context + @staticmethod + def _string_kv(key: str, value: str) -> common_pb2.KeyValue: + return common_pb2.KeyValue(key=key, value=common_pb2.AnyValue(string_value=value)) class _DiodeAuthentication: diff --git a/netboxlabs/diode/sdk/exceptions.py b/netboxlabs/diode/sdk/exceptions.py index f7f8222..98d7178 100644 --- a/netboxlabs/diode/sdk/exceptions.py +++ b/netboxlabs/diode/sdk/exceptions.py @@ -47,28 +47,40 @@ def __repr__(self): return f"" -class QueueClientError(BaseError): - """Raised when the queue client fails to enqueue a payload.""" - - def __init__( - self, - status_code: int, - message: str, - response_body: str | None = None, - ): - """Initialize QueueClientError.""" - self.status_code = status_code - self.message = message - self.response_body = response_body - detail = message - if response_body: - detail = f"{message}: {response_body}" - super().__init__(f"{status_code} {detail}") +class OtlpClientError(BaseError): + """Raised when the OTLP client fails to export log data.""" + + def __init__(self, error: Exception, message: str | None = None): + """Initialize OtlpClientError.""" + self._message = message or "OTLP export failed" + self.status_code = None + self.details = None + + if isinstance(error, grpc.RpcError): + try: + self.status_code = error.code() + except Exception: # pragma: no cover - defensive + self.status_code = None + try: + self.details = error.details() + except Exception: # pragma: no cover - defensive + self.details = None + else: + self.details = str(error) + + parts: list[str] = [self._message] + if self.status_code is not None: + status_name = getattr(self.status_code, "name", str(self.status_code)) + parts.append(f"status={status_name}") + if self.details: + parts.append(f"details={self.details}") + + super().__init__(", ".join(parts)) def __repr__(self): """Return string representation.""" - body = f", response_body={self.response_body!r}" if self.response_body else "" + status = getattr(self.status_code, "name", self.status_code) return ( - f"" + f"" ) diff --git a/netboxlabs/diode/sdk/ingester.py b/netboxlabs/diode/sdk/ingester.py index a6172d4..09ff3d7 100644 --- a/netboxlabs/diode/sdk/ingester.py +++ b/netboxlabs/diode/sdk/ingester.py @@ -11,9 +11,7 @@ import datetime import re from typing import Any - from google.protobuf import timestamp_pb2 as _timestamp_pb2 - import netboxlabs.diode.sdk.diode.v1.ingester_pb2 as pb PRIMARY_VALUE_MAP = { diff --git a/pyproject.toml b/pyproject.toml index a6ac5a0..7025a26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "grpcio>=1.68.1", "grpcio-status>=1.68.1", "sentry-sdk>=2.2.1", + "opentelemetry-proto>=1.26.0", ] [project.optional-dependencies] # Optional diff --git a/tests/test_client.py b/tests/test_client.py index de31c0f..3d78985 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -16,7 +16,7 @@ DiodeClient, DiodeDryRunClient, DiodeMethodClientInterceptor, - QueueClient, + OtlpClient, _ClientCallDetails, _DiodeAuthentication, _get_sentry_dsn, @@ -28,7 +28,7 @@ from netboxlabs.diode.sdk.exceptions import ( DiodeClientError, DiodeConfigError, - QueueClientError, + OtlpClientError, ) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -746,89 +746,103 @@ def test_load_dryrun_entities_from_fixture(message_path, tmp_path): assert entities[-1].wireless_link.ssid == "P2P-Link-1" -def test_queue_client_posts_serialized_entities(): - """Ensure QueueClient serializes entities and posts them to orb-agent.""" - with patch("http.client.HTTPConnection") as mock_http_conn: - mock_conn_instance = mock_http_conn.return_value - mock_response = mock.Mock() - mock_response.status = 202 - mock_response.read.return_value = b'{"errors": []}' - mock_conn_instance.getresponse.return_value = mock_response +def test_otlp_client_exports_entities(): + """Ensure OtlpClient serializes entities and exports them as logs.""" + with ( + patch("netboxlabs.diode.sdk.client.grpc.insecure_channel") as mock_insecure_channel, + patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub") as mock_stub_cls, + ): + mock_insecure_channel.return_value = mock.Mock() + stub_instance = mock_stub_cls.return_value - client = QueueClient( - target="http://orb-agent:8080/queue", + client = OtlpClient( + target="grpc://collector:4317", app_name="orb-producer", app_version="1.2.3", - queue="orb", - timeout=2.0, ) response = client.ingest( entities=[Entity(site="Site1"), Entity(device="Device1")] ) - args, kwargs = mock_conn_instance.request.call_args - assert args[0] == "POST" - assert args[1] == "/queue" - payload = json.loads(args[2]) - assert payload["queue"] == "orb" - assert payload["stream"] == "latest" - assert len(payload["entities"]) == 2 - assert payload["entities"][0]["site"]["name"] == "Site1" - assert kwargs["headers"]["Content-Type"] == "application/json" - assert len(response.errors) == 0 - - -def test_queue_client_raises_on_http_error(): - """Ensure QueueClient raises a QueueClientError on HTTP failure.""" - with patch("http.client.HTTPConnection") as mock_http_conn: - mock_conn_instance = mock_http_conn.return_value - mock_response = mock.Mock() - mock_response.status = 500 - mock_response.read.return_value = b'{"detail": "failed"}' - mock_conn_instance.getresponse.return_value = mock_response + stub_instance.Export.assert_called_once() + export_args, export_kwargs = stub_instance.Export.call_args + request = export_args[0] + resource_logs = request.resource_logs[0] + scope_logs = resource_logs.scope_logs[0] + log_records = scope_logs.log_records + assert len(log_records) == 2 + body = json.loads(log_records[0].body.string_value) + assert body["site"]["name"] == "Site1" + attributes = {kv.key: kv.value.string_value for kv in log_records[0].attributes} + assert attributes["diode.stream"] == "latest" + assert export_kwargs["timeout"] == client.timeout + assert isinstance(response, ingester_pb2.IngestResponse) + + +def test_otlp_client_raises_on_rpc_error(): + """Ensure OtlpClient wraps gRPC errors in OtlpClientError.""" + + class DummyRpcError(grpc.RpcError): + def __init__(self, code, details): + self._code = code + self._details = details - client = QueueClient( - target="http://orb-agent:8080/queue", + def code(self): + return self._code + + def details(self): + return self._details + + with ( + patch("netboxlabs.diode.sdk.client.grpc.insecure_channel") as mock_insecure_channel, + patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub") as mock_stub_cls, + ): + mock_insecure_channel.return_value = mock.Mock() + stub_instance = mock_stub_cls.return_value + stub_instance.Export.side_effect = DummyRpcError( + grpc.StatusCode.UNAVAILABLE, "endpoint offline" + ) + + client = OtlpClient( + target="grpc://collector:4317", app_name="orb-producer", app_version="1.2.3", ) - with pytest.raises(QueueClientError) as excinfo: + with pytest.raises(OtlpClientError) as excinfo: client.ingest(entities=[Entity(site="Site1")]) - assert excinfo.value.status_code == 500 - assert "Queue request failed" in str(excinfo.value) + assert excinfo.value.status_code == grpc.StatusCode.UNAVAILABLE + assert "details=endpoint offline" in str(excinfo.value) -def test_queue_client_https_uses_ssl_context(): - """Ensure QueueClient configures SSL context for HTTPS targets.""" +def test_otlp_client_grpcs_uses_secure_channel(): + """Ensure OtlpClient configures SSL credentials for secure targets.""" with ( - patch("http.client.HTTPSConnection") as mock_https_conn, - patch("ssl.create_default_context") as mock_default_context, + patch("netboxlabs.diode.sdk.client.grpc.ssl_channel_credentials") as mock_ssl_credentials, + patch("netboxlabs.diode.sdk.client.grpc.secure_channel") as mock_secure_channel, + patch("netboxlabs.diode.sdk.client.grpc.intercept_channel") as mock_intercept_channel, + patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub"), ): - context_instance = mock.Mock() - mock_default_context.return_value = context_instance - - mock_conn_instance = mock_https_conn.return_value - mock_response = mock.Mock() - mock_response.status = 200 - mock_response.read.return_value = b"" - mock_conn_instance.getresponse.return_value = mock_response - - client = QueueClient( - target="https://orb-agent.local/queue", + base_channel = mock.Mock() + mock_secure_channel.return_value = base_channel + intercept_channel = mock.Mock() + mock_intercept_channel.return_value = intercept_channel + mock_ssl_credentials.return_value = mock.Mock() + + client = OtlpClient( + target="grpcs://collector.example:4317/custom", app_name="orb-producer", app_version="1.2.3", ) - client.ingest(entities=[Entity(site="Site1")]) - - mock_default_context.assert_called_once() - args, kwargs = mock_https_conn.call_args - assert args[0] == "orb-agent.local" - assert kwargs["context"] is context_instance - request_args, _ = mock_conn_instance.request.call_args - assert request_args[1] == "/queue" + + mock_ssl_credentials.assert_called_once() + mock_secure_channel.assert_called_once() + mock_intercept_channel.assert_called_once() + + client.close() + base_channel.close.assert_called_once() def test_diode_authentication_with_custom_certificates(): From ad1e9cce2c04161275393f191eadd5be1cbfd110 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Thu, 23 Oct 2025 13:54:00 -0300 Subject: [PATCH 4/7] rename client --- README.md | 8 ++++---- netboxlabs/diode/sdk/__init__.py | 4 ++-- netboxlabs/diode/sdk/client.py | 12 ++++++------ netboxlabs/diode/sdk/exceptions.py | 6 +++--- tests/test_client.py | 18 +++++++++--------- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index f2f8d04..134a6dc 100644 --- a/README.md +++ b/README.md @@ -151,12 +151,12 @@ diode-replay-dryrun \ ### OTLP client -`OtlpClient` converts ingestion entities into OpenTelemetry log records and exports them to an OTLP endpoint (gRPC). This is useful when a collector ingests log data and forwards it to Diode. +`DiodeOTLPClient` converts ingestion entities into OpenTelemetry log records and exports them to an OTLP endpoint (gRPC). This is useful when a collector ingests log data and forwards it to Diode. ```python -from netboxlabs.diode.sdk import Entity, OtlpClient +from netboxlabs.diode.sdk import Entity, DiodeOTLPClient -with OtlpClient( +with DiodeOTLPClient( target="grpc://localhost:4317", app_name="my-producer", app_version="0.0.1", @@ -164,7 +164,7 @@ with OtlpClient( client.ingest([Entity(site="Site1")]) ``` -Each entity is serialised to JSON and sent as a log record with producer metadata so downstream collectors can enrich and forward the payload. The client raises `OtlpClientError` when the export fails. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables. +Each entity is serialised to JSON and sent as a log record with producer metadata so downstream collectors can enrich and forward the payload. The client raises `OTLPClientError` when the export fails. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables. ## Supported entities (object types) diff --git a/netboxlabs/diode/sdk/__init__.py b/netboxlabs/diode/sdk/__init__.py index 004022b..ac8d2ff 100644 --- a/netboxlabs/diode/sdk/__init__.py +++ b/netboxlabs/diode/sdk/__init__.py @@ -5,11 +5,11 @@ from netboxlabs.diode.sdk.client import ( DiodeClient, DiodeDryRunClient, - OtlpClient, + DiodeOTLPClient, load_dryrun_entities, ) assert DiodeClient assert DiodeDryRunClient -assert OtlpClient +assert DiodeOTLPClient assert load_dryrun_entities diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index bbba1de..fa33b02 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -28,7 +28,7 @@ from netboxlabs.diode.sdk.exceptions import ( DiodeClientError, DiodeConfigError, - OtlpClientError, + OTLPClientError, ) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -413,8 +413,8 @@ def ingest( return ingester_pb2.IngestResponse() -class OtlpClient(DiodeClientInterface): - """Client that exports ingestion entities as OTLP logs.""" +class DiodeOTLPClient(DiodeClientInterface): + """Diode OTLP client that exports ingestion entities as OTLP logs.""" _name = "diode-sdk-python-otlp" _version = version_semver() @@ -429,7 +429,7 @@ def __init__( metadata: dict[str, str] | Iterable[tuple[str, str]] | None = None, cert_file: str | None = None, ): - """Initiate a new OTLP client.""" + """Initiate a new Diode OTLP client.""" log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper() logging.basicConfig(level=log_level) @@ -560,7 +560,7 @@ def ingest( metadata=self._metadata, ) except grpc.RpcError as err: - raise OtlpClientError(err) from err + raise OTLPClientError(err) from err return ingester_pb2.IngestResponse() @@ -572,7 +572,7 @@ def _normalize_entities( if entity is None: continue if not isinstance(entity, ingester_pb2.Entity): - raise TypeError("OtlpClient expects ingester_pb2.Entity instances") + raise TypeError("DiodeOTLPClient expects ingester_pb2.Entity instances") normalized.append(entity) return normalized diff --git a/netboxlabs/diode/sdk/exceptions.py b/netboxlabs/diode/sdk/exceptions.py index 98d7178..d7eee68 100644 --- a/netboxlabs/diode/sdk/exceptions.py +++ b/netboxlabs/diode/sdk/exceptions.py @@ -47,11 +47,11 @@ def __repr__(self): return f"" -class OtlpClientError(BaseError): +class OTLPClientError(BaseError): """Raised when the OTLP client fails to export log data.""" def __init__(self, error: Exception, message: str | None = None): - """Initialize OtlpClientError.""" + """Initialize OTLPClientError.""" self._message = message or "OTLP export failed" self.status_code = None self.details = None @@ -81,6 +81,6 @@ def __repr__(self): """Return string representation.""" status = getattr(self.status_code, "name", self.status_code) return ( - f"" ) diff --git a/tests/test_client.py b/tests/test_client.py index 3d78985..d694404 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -16,7 +16,7 @@ DiodeClient, DiodeDryRunClient, DiodeMethodClientInterceptor, - OtlpClient, + DiodeOTLPClient, _ClientCallDetails, _DiodeAuthentication, _get_sentry_dsn, @@ -28,7 +28,7 @@ from netboxlabs.diode.sdk.exceptions import ( DiodeClientError, DiodeConfigError, - OtlpClientError, + OTLPClientError, ) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -747,7 +747,7 @@ def test_load_dryrun_entities_from_fixture(message_path, tmp_path): def test_otlp_client_exports_entities(): - """Ensure OtlpClient serializes entities and exports them as logs.""" + """Ensure DiodeOTLPClient serializes entities and exports them as logs.""" with ( patch("netboxlabs.diode.sdk.client.grpc.insecure_channel") as mock_insecure_channel, patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub") as mock_stub_cls, @@ -755,7 +755,7 @@ def test_otlp_client_exports_entities(): mock_insecure_channel.return_value = mock.Mock() stub_instance = mock_stub_cls.return_value - client = OtlpClient( + client = DiodeOTLPClient( target="grpc://collector:4317", app_name="orb-producer", app_version="1.2.3", @@ -781,7 +781,7 @@ def test_otlp_client_exports_entities(): def test_otlp_client_raises_on_rpc_error(): - """Ensure OtlpClient wraps gRPC errors in OtlpClientError.""" + """Ensure DiodeOTLPClient wraps gRPC errors in OTLPClientError.""" class DummyRpcError(grpc.RpcError): def __init__(self, code, details): @@ -804,13 +804,13 @@ def details(self): grpc.StatusCode.UNAVAILABLE, "endpoint offline" ) - client = OtlpClient( + client = DiodeOTLPClient( target="grpc://collector:4317", app_name="orb-producer", app_version="1.2.3", ) - with pytest.raises(OtlpClientError) as excinfo: + with pytest.raises(OTLPClientError) as excinfo: client.ingest(entities=[Entity(site="Site1")]) assert excinfo.value.status_code == grpc.StatusCode.UNAVAILABLE @@ -818,7 +818,7 @@ def details(self): def test_otlp_client_grpcs_uses_secure_channel(): - """Ensure OtlpClient configures SSL credentials for secure targets.""" + """Ensure DiodeOTLPClient configures SSL credentials for secure targets.""" with ( patch("netboxlabs.diode.sdk.client.grpc.ssl_channel_credentials") as mock_ssl_credentials, patch("netboxlabs.diode.sdk.client.grpc.secure_channel") as mock_secure_channel, @@ -831,7 +831,7 @@ def test_otlp_client_grpcs_uses_secure_channel(): mock_intercept_channel.return_value = intercept_channel mock_ssl_credentials.return_value = mock.Mock() - client = OtlpClient( + client = DiodeOTLPClient( target="grpcs://collector.example:4317/custom", app_name="orb-producer", app_version="1.2.3", From a112b102eec03f9509464b7ef7feab769bccdb28 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Tue, 28 Oct 2025 13:42:15 -0300 Subject: [PATCH 5/7] reduce repeated code --- netboxlabs/diode/sdk/client.py | 51 ++++++++++++++++++---------------- tests/test_client.py | 2 +- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index fa33b02..5c376b1 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -20,7 +20,10 @@ import grpc import sentry_sdk from google.protobuf.json_format import MessageToDict, MessageToJson, ParseDict -from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc +from opentelemetry.proto.collector.logs.v1 import ( + logs_service_pb2, + logs_service_pb2_grpc, +) from opentelemetry.proto.common.v1 import common_pb2 from opentelemetry.proto.logs.v1 import logs_pb2 @@ -45,6 +48,7 @@ _LOGGER = logging.getLogger(__name__) _MAX_RETRIES_ENVVAR_NAME = "DIODE_MAX_AUTH_RETRIES" + def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]: """Yield entities from a file with concatenated JSON messages.""" path = Path(file_path) @@ -86,7 +90,9 @@ def parse_target(target: str) -> tuple[str, str, bool]: parsed_target = urlparse(target) if parsed_target.scheme not in ["grpc", "grpcs", "http", "https"]: - raise ValueError("target should start with grpc://, grpcs://, http:// or https://") + raise ValueError( + "target should start with grpc://, grpcs://, http:// or https://" + ) # Determine if TLS verification should be enabled tls_verify = _should_verify_tls(parsed_target.scheme) @@ -156,16 +162,21 @@ def __init__( log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper() logging.basicConfig(level=log_level) - self._max_auth_retries = int(_get_optional_config_value( - _MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries) - ) or max_auth_retries) + self._max_auth_retries = int( + _get_optional_config_value(_MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries)) + or max_auth_retries + ) self._cert_file = _get_optional_config_value( _DIODE_CERT_FILE_ENVVAR_NAME, cert_file ) self._target, self._path, self._tls_verify = parse_target(target) # Load certificates once if needed - self._certificates = _load_certs(self._cert_file) if (self._tls_verify or self._cert_file) else None + self._certificates = ( + _load_certs(self._cert_file) + if (self._tls_verify or self._cert_file) + else None + ) self._app_name = app_name self._app_version = app_version self._platform = platform.platform() @@ -597,11 +608,10 @@ def _build_export_request( def _resource_attributes(self) -> list[common_pb2.KeyValue]: return [ - self._string_kv("service.name", self._app_name), - self._string_kv("service.version", self._app_version), - self._string_kv("telemetry.sdk.name", self._name), - self._string_kv("telemetry.sdk.language", "python"), - self._string_kv("telemetry.sdk.version", self._version), + self._string_kv("sdk.name", self._name), + self._string_kv("sdk.version", self._version), + self._string_kv("producer.app_name", self._app_name), + self._string_kv("producer.app_version", self._app_version), self._string_kv("os.description", self._platform), self._string_kv("process.runtime.version", self._python_version), ] @@ -620,26 +630,19 @@ def _entity_to_log_record( severity_number=logs_pb2.SeverityNumber.SEVERITY_NUMBER_INFO, severity_text="INFO", ) - log_record.body.CopyFrom( - common_pb2.AnyValue(string_value=body_json) - ) - log_record.trace_id = uuid.uuid4().bytes - log_record.span_id = uuid.uuid4().bytes[:8] + log_record.body.CopyFrom(common_pb2.AnyValue(string_value=body_json)) log_record.attributes.extend( [ - self._string_kv("diode.entity_type", entity_type), - self._string_kv("diode.stream", stream), - self._string_kv("diode.sdk.name", self._name), - self._string_kv("diode.sdk.version", self._version), - self._string_kv("diode.producer.app_name", self._app_name), - self._string_kv("diode.producer.app_version", self._app_version), + self._string_kv("diode.entity", entity_type), ] ) return log_record @staticmethod def _string_kv(key: str, value: str) -> common_pb2.KeyValue: - return common_pb2.KeyValue(key=key, value=common_pb2.AnyValue(string_value=value)) + return common_pb2.KeyValue( + key=key, value=common_pb2.AnyValue(string_value=value) + ) class _DiodeAuthentication: @@ -665,7 +668,7 @@ def authenticate(self) -> str: """Request an OAuth2 token using client credentials and return it.""" if self._tls_verify and self._certificates: context = ssl.create_default_context() - context.load_verify_locations(cadata=self._certificates.decode('utf-8')) + context.load_verify_locations(cadata=self._certificates.decode("utf-8")) conn = http.client.HTTPSConnection( self._target, context=context, diff --git a/tests/test_client.py b/tests/test_client.py index d694404..09f288a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -775,7 +775,7 @@ def test_otlp_client_exports_entities(): body = json.loads(log_records[0].body.string_value) assert body["site"]["name"] == "Site1" attributes = {kv.key: kv.value.string_value for kv in log_records[0].attributes} - assert attributes["diode.stream"] == "latest" + assert attributes["diode.entity"] == "site" assert export_kwargs["timeout"] == client.timeout assert isinstance(response, ingester_pb2.IngestResponse) From d89995cf4c5b42ee3cfae4c727287a8b04471237 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Tue, 28 Oct 2025 16:05:51 -0300 Subject: [PATCH 6/7] simplify --- netboxlabs/diode/sdk/client.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index 5c376b1..9621e18 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -19,7 +19,7 @@ import certifi import grpc import sentry_sdk -from google.protobuf.json_format import MessageToDict, MessageToJson, ParseDict +from google.protobuf.json_format import MessageToJson, ParseDict from opentelemetry.proto.collector.logs.v1 import ( logs_service_pb2, logs_service_pb2_grpc, @@ -555,14 +555,14 @@ def ingest( """Export entities as OTLP logs.""" stream = stream or _DEFAULT_STREAM log_records = [ - self._entity_to_log_record(entity, stream) + self._entity_to_log_record(entity) for entity in self._normalize_entities(entities) ] if not log_records: return ingester_pb2.IngestResponse() - request = self._build_export_request(log_records) + request = self._build_export_request(log_records, stream) try: self._stub.Export( @@ -588,11 +588,15 @@ def _normalize_entities( return normalized def _build_export_request( - self, log_records: list[logs_pb2.LogRecord] + self, + log_records: list[logs_pb2.LogRecord], + stream: str | None, ) -> logs_service_pb2.ExportLogsServiceRequest: resource_logs = logs_pb2.ResourceLogs() resource_logs.resource.attributes.extend(self._resource_attributes()) - + resource_logs.resource.attributes.append( + self._string_kv("diode.stream", stream) + ) scope_logs = resource_logs.scope_logs.add() scope_logs.scope.CopyFrom( common_pb2.InstrumentationScope( @@ -608,19 +612,17 @@ def _build_export_request( def _resource_attributes(self) -> list[common_pb2.KeyValue]: return [ - self._string_kv("sdk.name", self._name), - self._string_kv("sdk.version", self._version), - self._string_kv("producer.app_name", self._app_name), - self._string_kv("producer.app_version", self._app_version), + self._string_kv("service.name", self._app_name), + self._string_kv("service.version", self._app_version), self._string_kv("os.description", self._platform), self._string_kv("process.runtime.version", self._python_version), ] def _entity_to_log_record( - self, entity: ingester_pb2.Entity, stream: str + self, + entity: ingester_pb2.Entity, ) -> logs_pb2.LogRecord: - entity_dict = MessageToDict(entity, preserving_proto_field_name=True) - body_json = json.dumps(entity_dict, separators=(",", ":")) + body_json = MessageToJson(entity, preserving_proto_field_name=True) now = time.time_ns() entity_type = entity.WhichOneof("entity") or "unknown" From 8bcaa5477465951a47e8d48629d68b8bd479f312 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Tue, 28 Oct 2025 16:09:07 -0300 Subject: [PATCH 7/7] remove observed --- netboxlabs/diode/sdk/client.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index 9621e18..e41aa09 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -623,12 +623,10 @@ def _entity_to_log_record( entity: ingester_pb2.Entity, ) -> logs_pb2.LogRecord: body_json = MessageToJson(entity, preserving_proto_field_name=True) - now = time.time_ns() entity_type = entity.WhichOneof("entity") or "unknown" log_record = logs_pb2.LogRecord( - time_unix_nano=now, - observed_time_unix_nano=now, + time_unix_nano=time.time_ns(), severity_number=logs_pb2.SeverityNumber.SEVERITY_NUMBER_INFO, severity_text="INFO", )