diff --git a/README.md b/README.md index 5226e1c..134a6dc 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,23 @@ diode-replay-dryrun \ my_app_92722156890707.json ``` +### OTLP client + +`DiodeOTLPClient` converts ingestion entities into OpenTelemetry log records and exports them to an OTLP endpoint (gRPC). This is useful when a collector ingests log data and forwards it to Diode. + +```python +from netboxlabs.diode.sdk import Entity, DiodeOTLPClient + +with DiodeOTLPClient( + target="grpc://localhost:4317", + app_name="my-producer", + app_version="0.0.1", +) as client: + client.ingest([Entity(site="Site1")]) +``` + +Each entity is serialised to JSON and sent as a log record with producer metadata so downstream collectors can enrich and forward the payload. The client raises `OTLPClientError` when the export fails. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables. + ## Supported entities (object types) * ASN diff --git a/netboxlabs/diode/sdk/__init__.py b/netboxlabs/diode/sdk/__init__.py index 89a22e9..ac8d2ff 100644 --- a/netboxlabs/diode/sdk/__init__.py +++ b/netboxlabs/diode/sdk/__init__.py @@ -5,9 +5,11 @@ from netboxlabs.diode.sdk.client import ( DiodeClient, DiodeDryRunClient, + DiodeOTLPClient, load_dryrun_entities, ) assert DiodeClient assert DiodeDryRunClient +assert DiodeOTLPClient assert load_dryrun_entities diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index 295514b..e41aa09 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -20,9 +20,19 @@ import grpc import sentry_sdk from google.protobuf.json_format import MessageToJson, ParseDict +from opentelemetry.proto.collector.logs.v1 import ( + logs_service_pb2, + logs_service_pb2_grpc, +) +from opentelemetry.proto.common.v1 import common_pb2 +from opentelemetry.proto.logs.v1 import logs_pb2 from netboxlabs.diode.sdk.diode.v1 import ingester_pb2, ingester_pb2_grpc -from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError +from netboxlabs.diode.sdk.exceptions import ( + DiodeClientError, + DiodeConfigError, + OTLPClientError, +) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -38,6 +48,7 @@ _LOGGER = logging.getLogger(__name__) _MAX_RETRIES_ENVVAR_NAME = "DIODE_MAX_AUTH_RETRIES" + def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]: """Yield entities from a file with concatenated JSON messages.""" path = Path(file_path) @@ -79,7 +90,9 @@ def parse_target(target: str) -> tuple[str, str, bool]: parsed_target = urlparse(target) if parsed_target.scheme not in ["grpc", "grpcs", "http", "https"]: - raise ValueError("target should start with grpc://, grpcs://, http:// or https://") + raise ValueError( + "target should start with grpc://, grpcs://, http:// or https://" + ) # Determine if TLS verification should be enabled tls_verify = _should_verify_tls(parsed_target.scheme) @@ -149,16 +162,21 @@ def __init__( log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper() logging.basicConfig(level=log_level) - self._max_auth_retries = int(_get_optional_config_value( - _MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries) - ) or max_auth_retries) + self._max_auth_retries = int( + _get_optional_config_value(_MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries)) + or max_auth_retries + ) self._cert_file = _get_optional_config_value( _DIODE_CERT_FILE_ENVVAR_NAME, cert_file ) self._target, self._path, self._tls_verify = parse_target(target) # Load certificates once if needed - self._certificates = _load_certs(self._cert_file) if (self._tls_verify or self._cert_file) else None + self._certificates = ( + _load_certs(self._cert_file) + if (self._tls_verify or self._cert_file) + else None + ) self._app_name = app_name self._app_version = app_version self._platform = platform.platform() @@ -406,6 +424,227 @@ def ingest( return ingester_pb2.IngestResponse() +class DiodeOTLPClient(DiodeClientInterface): + """Diode OTLP client that exports ingestion entities as OTLP logs.""" + + _name = "diode-sdk-python-otlp" + _version = version_semver() + + def __init__( + self, + target: str, + app_name: str, + app_version: str, + *, + timeout: float = 10.0, + metadata: dict[str, str] | Iterable[tuple[str, str]] | None = None, + cert_file: str | None = None, + ): + """Initiate a new Diode OTLP client.""" + log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper() + logging.basicConfig(level=log_level) + + self._app_name = app_name + self._app_version = app_version + self._platform = platform.platform() + self._python_version = platform.python_version() + self._timeout = timeout + + self._target, self._path, self._tls_verify = parse_target(target) + self._cert_file = _get_optional_config_value( + _DIODE_CERT_FILE_ENVVAR_NAME, cert_file + ) + self._certificates = ( + _load_certs(self._cert_file) + if (self._tls_verify or self._cert_file) + else None + ) + + channel_opts = ( + ( + "grpc.primary_user_agent", + f"{self._name}/{self._version} {self._app_name}/{self._app_version}", + ), + ) + + if self._tls_verify: + credentials = ( + grpc.ssl_channel_credentials(root_certificates=self._certificates) + if self._certificates + else grpc.ssl_channel_credentials() + ) + base_channel = grpc.secure_channel( + self._target, + credentials, + options=channel_opts, + ) + else: + base_channel = grpc.insecure_channel( + target=self._target, + options=channel_opts, + ) + + self._base_channel = base_channel + channel = base_channel + if self._path: + interceptor = DiodeMethodClientInterceptor(subpath=self._path) + channel = grpc.intercept_channel(base_channel, interceptor) + + self._channel = channel + self._stub = logs_service_pb2_grpc.LogsServiceStub(channel) + self._metadata = self._prepare_metadata(metadata) + + @staticmethod + def _prepare_metadata( + metadata: dict[str, str] | Iterable[tuple[str, str]] | None, + ) -> tuple[tuple[str, str], ...] | None: + if metadata is None: + return None + if isinstance(metadata, dict): + return tuple(metadata.items()) + return tuple(metadata) + + @property + def name(self) -> str: + """Retrieve the client name.""" + return self._name + + @property + def version(self) -> str: + """Retrieve the client version.""" + return self._version + + @property + def app_name(self) -> str: + """Retrieve the producer application name.""" + return self._app_name + + @property + def app_version(self) -> str: + """Retrieve the producer application version.""" + return self._app_version + + @property + def timeout(self) -> float: + """Retrieve the export timeout.""" + return self._timeout + + @property + def target(self) -> str: + """Retrieve the export target.""" + return self._target + + def __enter__(self): + """Enter the runtime context.""" + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + """Exit the runtime context.""" + self.close() + + def close(self): + """Close the underlying channel.""" + if getattr(self, "_base_channel", None): + self._base_channel.close() + + def ingest( + self, + entities: Iterable[Entity | ingester_pb2.Entity | None], + stream: str | None = _DEFAULT_STREAM, + ) -> ingester_pb2.IngestResponse: + """Export entities as OTLP logs.""" + stream = stream or _DEFAULT_STREAM + log_records = [ + self._entity_to_log_record(entity) + for entity in self._normalize_entities(entities) + ] + + if not log_records: + return ingester_pb2.IngestResponse() + + request = self._build_export_request(log_records, stream) + + try: + self._stub.Export( + request, + timeout=self._timeout, + metadata=self._metadata, + ) + except grpc.RpcError as err: + raise OTLPClientError(err) from err + + return ingester_pb2.IngestResponse() + + def _normalize_entities( + self, entities: Iterable[Entity | ingester_pb2.Entity | None] + ) -> list[ingester_pb2.Entity]: + normalized: list[ingester_pb2.Entity] = [] + for entity in entities: + if entity is None: + continue + if not isinstance(entity, ingester_pb2.Entity): + raise TypeError("DiodeOTLPClient expects ingester_pb2.Entity instances") + normalized.append(entity) + return normalized + + def _build_export_request( + self, + log_records: list[logs_pb2.LogRecord], + stream: str | None, + ) -> logs_service_pb2.ExportLogsServiceRequest: + resource_logs = logs_pb2.ResourceLogs() + resource_logs.resource.attributes.extend(self._resource_attributes()) + resource_logs.resource.attributes.append( + self._string_kv("diode.stream", stream) + ) + scope_logs = resource_logs.scope_logs.add() + scope_logs.scope.CopyFrom( + common_pb2.InstrumentationScope( + name=self._name, + version=self._version, + ) + ) + scope_logs.log_records.extend(log_records) + + request = logs_service_pb2.ExportLogsServiceRequest() + request.resource_logs.append(resource_logs) + return request + + def _resource_attributes(self) -> list[common_pb2.KeyValue]: + return [ + self._string_kv("service.name", self._app_name), + self._string_kv("service.version", self._app_version), + self._string_kv("os.description", self._platform), + self._string_kv("process.runtime.version", self._python_version), + ] + + def _entity_to_log_record( + self, + entity: ingester_pb2.Entity, + ) -> logs_pb2.LogRecord: + body_json = MessageToJson(entity, preserving_proto_field_name=True) + entity_type = entity.WhichOneof("entity") or "unknown" + + log_record = logs_pb2.LogRecord( + time_unix_nano=time.time_ns(), + severity_number=logs_pb2.SeverityNumber.SEVERITY_NUMBER_INFO, + severity_text="INFO", + ) + log_record.body.CopyFrom(common_pb2.AnyValue(string_value=body_json)) + log_record.attributes.extend( + [ + self._string_kv("diode.entity", entity_type), + ] + ) + return log_record + + @staticmethod + def _string_kv(key: str, value: str) -> common_pb2.KeyValue: + return common_pb2.KeyValue( + key=key, value=common_pb2.AnyValue(string_value=value) + ) + + class _DiodeAuthentication: def __init__( self, @@ -429,7 +668,7 @@ def authenticate(self) -> str: """Request an OAuth2 token using client credentials and return it.""" if self._tls_verify and self._certificates: context = ssl.create_default_context() - context.load_verify_locations(cadata=self._certificates.decode('utf-8')) + context.load_verify_locations(cadata=self._certificates.decode("utf-8")) conn = http.client.HTTPSConnection( self._target, context=context, diff --git a/netboxlabs/diode/sdk/exceptions.py b/netboxlabs/diode/sdk/exceptions.py index a831f1c..d7eee68 100644 --- a/netboxlabs/diode/sdk/exceptions.py +++ b/netboxlabs/diode/sdk/exceptions.py @@ -45,3 +45,42 @@ def details(self): def __repr__(self): """Return string representation.""" return f"" + + +class OTLPClientError(BaseError): + """Raised when the OTLP client fails to export log data.""" + + def __init__(self, error: Exception, message: str | None = None): + """Initialize OTLPClientError.""" + self._message = message or "OTLP export failed" + self.status_code = None + self.details = None + + if isinstance(error, grpc.RpcError): + try: + self.status_code = error.code() + except Exception: # pragma: no cover - defensive + self.status_code = None + try: + self.details = error.details() + except Exception: # pragma: no cover - defensive + self.details = None + else: + self.details = str(error) + + parts: list[str] = [self._message] + if self.status_code is not None: + status_name = getattr(self.status_code, "name", str(self.status_code)) + parts.append(f"status={status_name}") + if self.details: + parts.append(f"details={self.details}") + + super().__init__(", ".join(parts)) + + def __repr__(self): + """Return string representation.""" + status = getattr(self.status_code, "name", self.status_code) + return ( + f"" + ) diff --git a/pyproject.toml b/pyproject.toml index a6ac5a0..7025a26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "grpcio>=1.68.1", "grpcio-status>=1.68.1", "sentry-sdk>=2.2.1", + "opentelemetry-proto>=1.26.0", ] [project.optional-dependencies] # Optional diff --git a/tests/test_client.py b/tests/test_client.py index 4652040..09f288a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -16,6 +16,7 @@ DiodeClient, DiodeDryRunClient, DiodeMethodClientInterceptor, + DiodeOTLPClient, _ClientCallDetails, _DiodeAuthentication, _get_sentry_dsn, @@ -24,7 +25,11 @@ parse_target, ) from netboxlabs.diode.sdk.diode.v1 import ingester_pb2 -from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError +from netboxlabs.diode.sdk.exceptions import ( + DiodeClientError, + DiodeConfigError, + OTLPClientError, +) from netboxlabs.diode.sdk.ingester import Entity from netboxlabs.diode.sdk.version import version_semver @@ -741,6 +746,105 @@ def test_load_dryrun_entities_from_fixture(message_path, tmp_path): assert entities[-1].wireless_link.ssid == "P2P-Link-1" +def test_otlp_client_exports_entities(): + """Ensure DiodeOTLPClient serializes entities and exports them as logs.""" + with ( + patch("netboxlabs.diode.sdk.client.grpc.insecure_channel") as mock_insecure_channel, + patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub") as mock_stub_cls, + ): + mock_insecure_channel.return_value = mock.Mock() + stub_instance = mock_stub_cls.return_value + + client = DiodeOTLPClient( + target="grpc://collector:4317", + app_name="orb-producer", + app_version="1.2.3", + ) + + response = client.ingest( + entities=[Entity(site="Site1"), Entity(device="Device1")] + ) + + stub_instance.Export.assert_called_once() + export_args, export_kwargs = stub_instance.Export.call_args + request = export_args[0] + resource_logs = request.resource_logs[0] + scope_logs = resource_logs.scope_logs[0] + log_records = scope_logs.log_records + assert len(log_records) == 2 + body = json.loads(log_records[0].body.string_value) + assert body["site"]["name"] == "Site1" + attributes = {kv.key: kv.value.string_value for kv in log_records[0].attributes} + assert attributes["diode.entity"] == "site" + assert export_kwargs["timeout"] == client.timeout + assert isinstance(response, ingester_pb2.IngestResponse) + + +def test_otlp_client_raises_on_rpc_error(): + """Ensure DiodeOTLPClient wraps gRPC errors in OTLPClientError.""" + + class DummyRpcError(grpc.RpcError): + def __init__(self, code, details): + self._code = code + self._details = details + + def code(self): + return self._code + + def details(self): + return self._details + + with ( + patch("netboxlabs.diode.sdk.client.grpc.insecure_channel") as mock_insecure_channel, + patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub") as mock_stub_cls, + ): + mock_insecure_channel.return_value = mock.Mock() + stub_instance = mock_stub_cls.return_value + stub_instance.Export.side_effect = DummyRpcError( + grpc.StatusCode.UNAVAILABLE, "endpoint offline" + ) + + client = DiodeOTLPClient( + target="grpc://collector:4317", + app_name="orb-producer", + app_version="1.2.3", + ) + + with pytest.raises(OTLPClientError) as excinfo: + client.ingest(entities=[Entity(site="Site1")]) + + assert excinfo.value.status_code == grpc.StatusCode.UNAVAILABLE + assert "details=endpoint offline" in str(excinfo.value) + + +def test_otlp_client_grpcs_uses_secure_channel(): + """Ensure DiodeOTLPClient configures SSL credentials for secure targets.""" + with ( + patch("netboxlabs.diode.sdk.client.grpc.ssl_channel_credentials") as mock_ssl_credentials, + patch("netboxlabs.diode.sdk.client.grpc.secure_channel") as mock_secure_channel, + patch("netboxlabs.diode.sdk.client.grpc.intercept_channel") as mock_intercept_channel, + patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub"), + ): + base_channel = mock.Mock() + mock_secure_channel.return_value = base_channel + intercept_channel = mock.Mock() + mock_intercept_channel.return_value = intercept_channel + mock_ssl_credentials.return_value = mock.Mock() + + client = DiodeOTLPClient( + target="grpcs://collector.example:4317/custom", + app_name="orb-producer", + app_version="1.2.3", + ) + + mock_ssl_credentials.assert_called_once() + mock_secure_channel.assert_called_once() + mock_intercept_channel.assert_called_once() + + client.close() + base_channel.close.assert_called_once() + + def test_diode_authentication_with_custom_certificates(): """Test _DiodeAuthentication with custom certificates - covers SSL context creation.""" # Create test certificate content