Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@ diode-replay-dryrun \
my_app_92722156890707.json
```

### OTLP client

`DiodeOTLPClient` converts ingestion entities into OpenTelemetry log records and exports them to an OTLP endpoint (gRPC). This is useful when a collector ingests log data and forwards it to Diode.

```python
from netboxlabs.diode.sdk import Entity, DiodeOTLPClient

with DiodeOTLPClient(
target="grpc://localhost:4317",
app_name="my-producer",
app_version="0.0.1",
) as client:
client.ingest([Entity(site="Site1")])
```

Each entity is serialised to JSON and sent as a log record with producer metadata so downstream collectors can enrich and forward the payload. The client raises `OTLPClientError` when the export fails. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables.

## Supported entities (object types)

* ASN
Expand Down
2 changes: 2 additions & 0 deletions netboxlabs/diode/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from netboxlabs.diode.sdk.client import (
DiodeClient,
DiodeDryRunClient,
DiodeOTLPClient,
load_dryrun_entities,
)

assert DiodeClient
assert DiodeDryRunClient
assert DiodeOTLPClient
assert load_dryrun_entities
253 changes: 246 additions & 7 deletions netboxlabs/diode/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@
import grpc
import sentry_sdk
from google.protobuf.json_format import MessageToJson, ParseDict
from opentelemetry.proto.collector.logs.v1 import (
logs_service_pb2,
logs_service_pb2_grpc,
)
from opentelemetry.proto.common.v1 import common_pb2
from opentelemetry.proto.logs.v1 import logs_pb2

from netboxlabs.diode.sdk.diode.v1 import ingester_pb2, ingester_pb2_grpc
from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError
from netboxlabs.diode.sdk.exceptions import (
DiodeClientError,
DiodeConfigError,
OTLPClientError,
)
from netboxlabs.diode.sdk.ingester import Entity
from netboxlabs.diode.sdk.version import version_semver

Expand All @@ -38,6 +48,7 @@
_LOGGER = logging.getLogger(__name__)
_MAX_RETRIES_ENVVAR_NAME = "DIODE_MAX_AUTH_RETRIES"


def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]:
"""Yield entities from a file with concatenated JSON messages."""
path = Path(file_path)
Expand Down Expand Up @@ -79,7 +90,9 @@ def parse_target(target: str) -> tuple[str, str, bool]:
parsed_target = urlparse(target)

if parsed_target.scheme not in ["grpc", "grpcs", "http", "https"]:
raise ValueError("target should start with grpc://, grpcs://, http:// or https://")
raise ValueError(
"target should start with grpc://, grpcs://, http:// or https://"
)

# Determine if TLS verification should be enabled
tls_verify = _should_verify_tls(parsed_target.scheme)
Expand Down Expand Up @@ -149,16 +162,21 @@ def __init__(
log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper()
logging.basicConfig(level=log_level)

self._max_auth_retries = int(_get_optional_config_value(
_MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries)
) or max_auth_retries)
self._max_auth_retries = int(
_get_optional_config_value(_MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries))
or max_auth_retries
)
self._cert_file = _get_optional_config_value(
_DIODE_CERT_FILE_ENVVAR_NAME, cert_file
)
self._target, self._path, self._tls_verify = parse_target(target)

# Load certificates once if needed
self._certificates = _load_certs(self._cert_file) if (self._tls_verify or self._cert_file) else None
self._certificates = (
_load_certs(self._cert_file)
if (self._tls_verify or self._cert_file)
else None
)
self._app_name = app_name
self._app_version = app_version
self._platform = platform.platform()
Expand Down Expand Up @@ -406,6 +424,227 @@ def ingest(
return ingester_pb2.IngestResponse()


class DiodeOTLPClient(DiodeClientInterface):
"""Diode OTLP client that exports ingestion entities as OTLP logs."""

_name = "diode-sdk-python-otlp"
_version = version_semver()

def __init__(
self,
target: str,
app_name: str,
app_version: str,
*,
timeout: float = 10.0,
metadata: dict[str, str] | Iterable[tuple[str, str]] | None = None,
cert_file: str | None = None,
):
"""Initiate a new Diode OTLP client."""
log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper()
logging.basicConfig(level=log_level)

self._app_name = app_name
self._app_version = app_version
self._platform = platform.platform()
self._python_version = platform.python_version()
self._timeout = timeout

self._target, self._path, self._tls_verify = parse_target(target)
self._cert_file = _get_optional_config_value(
_DIODE_CERT_FILE_ENVVAR_NAME, cert_file
)
self._certificates = (
_load_certs(self._cert_file)
if (self._tls_verify or self._cert_file)
else None
)

channel_opts = (
(
"grpc.primary_user_agent",
f"{self._name}/{self._version} {self._app_name}/{self._app_version}",
),
)

if self._tls_verify:
credentials = (
grpc.ssl_channel_credentials(root_certificates=self._certificates)
if self._certificates
else grpc.ssl_channel_credentials()
)
base_channel = grpc.secure_channel(
self._target,
credentials,
options=channel_opts,
)
else:
base_channel = grpc.insecure_channel(
target=self._target,
options=channel_opts,
)

self._base_channel = base_channel
channel = base_channel
if self._path:
interceptor = DiodeMethodClientInterceptor(subpath=self._path)
channel = grpc.intercept_channel(base_channel, interceptor)

self._channel = channel
self._stub = logs_service_pb2_grpc.LogsServiceStub(channel)
self._metadata = self._prepare_metadata(metadata)

@staticmethod
def _prepare_metadata(
metadata: dict[str, str] | Iterable[tuple[str, str]] | None,
) -> tuple[tuple[str, str], ...] | None:
if metadata is None:
return None
if isinstance(metadata, dict):
return tuple(metadata.items())
return tuple(metadata)

@property
def name(self) -> str:
"""Retrieve the client name."""
return self._name

@property
def version(self) -> str:
"""Retrieve the client version."""
return self._version

@property
def app_name(self) -> str:
"""Retrieve the producer application name."""
return self._app_name

@property
def app_version(self) -> str:
"""Retrieve the producer application version."""
return self._app_version

@property
def timeout(self) -> float:
"""Retrieve the export timeout."""
return self._timeout

@property
def target(self) -> str:
"""Retrieve the export target."""
return self._target

def __enter__(self):
"""Enter the runtime context."""
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
"""Exit the runtime context."""
self.close()

def close(self):
"""Close the underlying channel."""
if getattr(self, "_base_channel", None):
self._base_channel.close()

def ingest(
self,
entities: Iterable[Entity | ingester_pb2.Entity | None],
stream: str | None = _DEFAULT_STREAM,
) -> ingester_pb2.IngestResponse:
"""Export entities as OTLP logs."""
stream = stream or _DEFAULT_STREAM
log_records = [
self._entity_to_log_record(entity)
for entity in self._normalize_entities(entities)
]

if not log_records:
return ingester_pb2.IngestResponse()

request = self._build_export_request(log_records, stream)

try:
self._stub.Export(
request,
timeout=self._timeout,
metadata=self._metadata,
)
except grpc.RpcError as err:
raise OTLPClientError(err) from err

return ingester_pb2.IngestResponse()

def _normalize_entities(
self, entities: Iterable[Entity | ingester_pb2.Entity | None]
) -> list[ingester_pb2.Entity]:
normalized: list[ingester_pb2.Entity] = []
for entity in entities:
if entity is None:
continue
if not isinstance(entity, ingester_pb2.Entity):
raise TypeError("DiodeOTLPClient expects ingester_pb2.Entity instances")
normalized.append(entity)
return normalized

def _build_export_request(
self,
log_records: list[logs_pb2.LogRecord],
stream: str | None,
) -> logs_service_pb2.ExportLogsServiceRequest:
resource_logs = logs_pb2.ResourceLogs()
resource_logs.resource.attributes.extend(self._resource_attributes())
resource_logs.resource.attributes.append(
self._string_kv("diode.stream", stream)
)
scope_logs = resource_logs.scope_logs.add()
scope_logs.scope.CopyFrom(
common_pb2.InstrumentationScope(
name=self._name,
version=self._version,
)
)
scope_logs.log_records.extend(log_records)

request = logs_service_pb2.ExportLogsServiceRequest()
request.resource_logs.append(resource_logs)
return request

def _resource_attributes(self) -> list[common_pb2.KeyValue]:
return [
self._string_kv("service.name", self._app_name),
self._string_kv("service.version", self._app_version),
self._string_kv("os.description", self._platform),
self._string_kv("process.runtime.version", self._python_version),
]

def _entity_to_log_record(
self,
entity: ingester_pb2.Entity,
) -> logs_pb2.LogRecord:
body_json = MessageToJson(entity, preserving_proto_field_name=True)
entity_type = entity.WhichOneof("entity") or "unknown"

log_record = logs_pb2.LogRecord(
time_unix_nano=time.time_ns(),
severity_number=logs_pb2.SeverityNumber.SEVERITY_NUMBER_INFO,
severity_text="INFO",
)
log_record.body.CopyFrom(common_pb2.AnyValue(string_value=body_json))
log_record.attributes.extend(
[
self._string_kv("diode.entity", entity_type),
]
)
return log_record

@staticmethod
def _string_kv(key: str, value: str) -> common_pb2.KeyValue:
return common_pb2.KeyValue(
key=key, value=common_pb2.AnyValue(string_value=value)
)


class _DiodeAuthentication:
def __init__(
self,
Expand All @@ -429,7 +668,7 @@ def authenticate(self) -> str:
"""Request an OAuth2 token using client credentials and return it."""
if self._tls_verify and self._certificates:
context = ssl.create_default_context()
context.load_verify_locations(cadata=self._certificates.decode('utf-8'))
context.load_verify_locations(cadata=self._certificates.decode("utf-8"))
conn = http.client.HTTPSConnection(
self._target,
context=context,
Expand Down
39 changes: 39 additions & 0 deletions netboxlabs/diode/sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,42 @@ def details(self):
def __repr__(self):
"""Return string representation."""
return f"<DiodeClientError status code: {self._status_code}, details: {self._details}>"


class OTLPClientError(BaseError):
"""Raised when the OTLP client fails to export log data."""

def __init__(self, error: Exception, message: str | None = None):
"""Initialize OTLPClientError."""
self._message = message or "OTLP export failed"
self.status_code = None
self.details = None

if isinstance(error, grpc.RpcError):
try:
self.status_code = error.code()
except Exception: # pragma: no cover - defensive
self.status_code = None
try:
self.details = error.details()
except Exception: # pragma: no cover - defensive
self.details = None
else:
self.details = str(error)

parts: list[str] = [self._message]
if self.status_code is not None:
status_name = getattr(self.status_code, "name", str(self.status_code))
parts.append(f"status={status_name}")
if self.details:
parts.append(f"details={self.details}")

super().__init__(", ".join(parts))

def __repr__(self):
"""Return string representation."""
status = getattr(self.status_code, "name", self.status_code)
return (
f"<OTLPClientError message={self._message!r}, "
f"status_code={status!r}, details={self.details!r}>"
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = [
"grpcio>=1.68.1",
"grpcio-status>=1.68.1",
"sentry-sdk>=2.2.1",
"opentelemetry-proto>=1.26.0",
]

[project.optional-dependencies] # Optional
Expand Down
Loading
Loading