From 5a25786aa94da251a42f2450452336a19ec81036 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 29 May 2026 09:45:08 +0300 Subject: [PATCH 1/2] refactor: metrics-seam review followups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Flip OutboxTelemetryMiddleware include_messages_counters default to False to match upstream and the recorder seam. * Centralize optional-extra probes in _import_checker.py (find_spec booleans) — refactor client.py, metrics/{opentelemetry,prometheus}.py, opentelemetry/middleware.py, prometheus/middleware.py onto it. Closes the previously-unguarded middleware paths and unifies three different guard styles. Friendly install-hint ImportError moves to __init__. * Extract BROKER_SYSTEM = "outbox" to metrics/__init__.py; recorder adapters and middleware providers import the single constant. * Add _safe_emit helper; replace four `with suppress(Exception):` blocks in testing.py with it so the test-broker publish path gets the same DEBUG-log shape as the real producer. * Align Prometheus _published_total to OTel's "successes only" — errors (count=0) and timer-id no-ops no longer increment the counter. Document the choice next to the `published` event vocabulary. * Add end-to-end tests for nacked_retried / nacked_terminal flowing through PrometheusRecorder + OpenTelemetryRecorder via TestOutboxBroker. * Add negative tests confirming publish-scope middleware does NOT fire under TestOutboxBroker — the documented bypass is now regression-proofed. Co-Authored-By: Claude Opus 4.7 (1M context) --- faststream_outbox/_import_checker.py | 30 ++++++ faststream_outbox/client.py | 24 +++-- faststream_outbox/metrics/__init__.py | 32 ++++++- faststream_outbox/metrics/opentelemetry.py | 20 ++-- faststream_outbox/metrics/prometheus.py | 51 +++++++---- faststream_outbox/opentelemetry/middleware.py | 24 +++-- faststream_outbox/opentelemetry/provider.py | 9 +- faststream_outbox/prometheus/middleware.py | 7 ++ faststream_outbox/prometheus/provider.py | 8 +- faststream_outbox/testing.py | 91 ++++++++++--------- tests/test_metrics_opentelemetry.py | 66 ++++++++++++++ tests/test_metrics_prometheus.py | 90 +++++++++++++++++- tests/test_middleware_opentelemetry.py | 27 ++++++ tests/test_middleware_prometheus.py | 35 +++++++ tests/test_unit.py | 10 +- 15 files changed, 416 insertions(+), 108 deletions(-) create mode 100644 faststream_outbox/_import_checker.py diff --git a/faststream_outbox/_import_checker.py b/faststream_outbox/_import_checker.py new file mode 100644 index 0000000..a70d049 --- /dev/null +++ b/faststream_outbox/_import_checker.py @@ -0,0 +1,30 @@ +""" +Centralized probes for optional-extra imports. + +Each ``is_*_installed`` is a module-level boolean derived from +``importlib.util.find_spec`` so consumers can guard runtime imports without a +``try/except ImportError`` ladder, and so diagnostic surfaces (e.g. a `/health` +endpoint) can report which extras are available. Modeled after the +``import_checker`` module in the sister project ``lite-bootstrap``. + +The booleans are evaluated at import time and cached — extras can't be +installed mid-process, so re-probing would be wasted work. +""" + +from importlib.util import find_spec + + +is_alembic_installed = find_spec("alembic") is not None +is_asyncpg_installed = find_spec("asyncpg") is not None +is_fastapi_installed = find_spec("fastapi") is not None +is_opentelemetry_installed = find_spec("opentelemetry") is not None +is_prometheus_client_installed = find_spec("prometheus_client") is not None + + +__all__ = [ + "is_alembic_installed", + "is_asyncpg_installed", + "is_fastapi_installed", + "is_opentelemetry_installed", + "is_prometheus_client_installed", +] diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index 04ff968..19085d9 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -33,29 +33,27 @@ update, ) +# Optional dependency: alembic backs validate_schema() only. The probe lives in +# ``_import_checker`` so every optional-extra site uses the same shape. Users who +# don't call validate_schema() never trigger the runtime import path. +from faststream_outbox._import_checker import is_alembic_installed from faststream_outbox.message import OutboxInnerMessage from faststream_outbox.schema import make_outbox_table -# Optional dependency: alembic backs validate_schema() only. Importing at module -# level (per the project's "no inline imports" convention) means we resolve once -# at import time; users who don't call validate_schema() never trigger the path -# and never need the dependency installed. -try: - from alembic.autogenerate import compare_metadata as _alembic_compare_metadata - from alembic.migration import MigrationContext as _AlembicMigrationContext -except ImportError: # pragma: no cover # alembic is in the dev group so the except branch is unreachable in CI - _alembic_compare_metadata = None # ty: ignore[invalid-assignment] - _AlembicMigrationContext = None # ty: ignore[invalid-assignment] - - if TYPE_CHECKING: import typing from collections.abc import Mapping, Sequence + from alembic.autogenerate import compare_metadata as _alembic_compare_metadata + from alembic.migration import MigrationContext as _AlembicMigrationContext from sqlalchemy import Connection, Table from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine +if is_alembic_installed: + from alembic.autogenerate import compare_metadata as _alembic_compare_metadata + from alembic.migration import MigrationContext as _AlembicMigrationContext + class AbstractOutboxClient(abc.ABC): """ @@ -333,7 +331,7 @@ def _validate_schema_sync(connection: "Connection", table: "Table") -> list[str] additional server defaults, add a targeted ``information_schema`` probe rather than flipping this flag. """ - if _alembic_compare_metadata is None or _AlembicMigrationContext is None: + if not is_alembic_installed: msg = "validate_schema() requires alembic. Install with `pip install faststream-outbox[validate]`." raise ImportError(msg) diff --git a/faststream_outbox/metrics/__init__.py b/faststream_outbox/metrics/__init__.py index b7708a9..3b340cb 100644 --- a/faststream_outbox/metrics/__init__.py +++ b/faststream_outbox/metrics/__init__.py @@ -20,6 +20,10 @@ * ``lease_lost`` — terminal flush found a foreign lease. Tags include ``phase`` (``terminal`` | ``retry``). * ``published`` — producer-side insert. Tags include ``status`` (``success`` | ``error``), ``count, size_bytes, duration_seconds``. No ``subscriber`` tag. + ``count`` is **messages landed**, not publish attempts — errors and ``timer_id`` + no-ops carry ``count=0``. Counter-style adapters should `inc(count)` so totals + reflect messages-on-the-wire; duration histograms record every attempt + (including failures) so failed-publish latency stays observable. Recorders run on the event loop and **must not block**. Synchronous ``prometheus_client.Counter.inc()`` is fine (microseconds); a blocking HTTP/StatsD @@ -27,6 +31,7 @@ ordering and explode the task graph. """ +import logging import typing from collections.abc import Callable, Mapping @@ -34,8 +39,33 @@ MetricsRecorder = Callable[[str, Mapping[str, typing.Any]], None] +# Canonical broker-system identifier — stamped as ``messaging.system`` (OTel) +# and ``broker`` (Prometheus) by both the recorder-seam adapters in this package +# and the native middleware providers in ``faststream_outbox.{opentelemetry, +# prometheus}.provider``. Centralized here so dashboards see a single value +# across both seams and a rename is a one-line change. +BROKER_SYSTEM = "outbox" + + +_logger = logging.getLogger(__name__) + + def _noop_recorder(_event: str, _tags: Mapping[str, typing.Any]) -> None: """Default recorder — does nothing; lets instrumentation sites call unconditionally.""" -__all__ = ["MetricsRecorder", "_noop_recorder"] +def _safe_emit(recorder: MetricsRecorder, event: str, tags: Mapping[str, typing.Any]) -> None: + """ + Invoke ``recorder`` swallowing exceptions and logging at DEBUG. + + Shared by every call site that emits metrics from the test broker. A broken + user-supplied recorder must never poison the dispatch path — DEBUG-level + logging surfaces the failure to operators without flooding production logs. + """ + try: + recorder(event, tags) + except Exception: # noqa: BLE001 + _logger.log(logging.DEBUG, "metrics recorder raised", exc_info=True) + + +__all__ = ["BROKER_SYSTEM", "MetricsRecorder", "_noop_recorder", "_safe_emit"] diff --git a/faststream_outbox/metrics/opentelemetry.py b/faststream_outbox/metrics/opentelemetry.py index 232a9fd..27c1054 100644 --- a/faststream_outbox/metrics/opentelemetry.py +++ b/faststream_outbox/metrics/opentelemetry.py @@ -39,12 +39,15 @@ import typing from collections.abc import Mapping +from faststream_outbox._import_checker import is_opentelemetry_installed +from faststream_outbox.metrics import BROKER_SYSTEM -try: + +if typing.TYPE_CHECKING: + from opentelemetry import metrics as ot_metrics + +if is_opentelemetry_installed: from opentelemetry import metrics as ot_metrics -except ImportError as e: # pragma: no cover - msg = "OpenTelemetryRecorder requires the 'opentelemetry' extra: pip install 'faststream-outbox[opentelemetry]'" - raise ImportError(msg) from e # Mirror FastStream's opentelemetry/consts.py keys verbatim. We bake the keys as @@ -55,7 +58,6 @@ _ATTR_DEST = "messaging.destination.name" _ATTR_OPERATION = "messaging.operation" _ATTR_ERROR_TYPE = "error.type" -_MESSAGING_SYSTEM = "outbox" # Outbox-specific extension attributes — namespaced under ``messaging.outbox.*`` # so they don't collide with stock messaging-semconv keys. @@ -87,6 +89,12 @@ def __init__( meter: "ot_metrics.Meter | None" = None, include_messages_counters: bool = False, ) -> None: + if not is_opentelemetry_installed: # pragma: no cover # opentelemetry is in the dev group + msg = ( + "OpenTelemetryRecorder requires the 'opentelemetry' extra: " + "pip install 'faststream-outbox[opentelemetry]'" + ) + raise ImportError(msg) if meter is not None: chosen_meter = meter elif meter_provider is not None: @@ -135,7 +143,7 @@ def __init__( def _attrs(self, tags: Mapping[str, typing.Any], *, operation: str) -> dict[str, typing.Any]: attrs: dict[str, typing.Any] = { - _ATTR_SYSTEM: _MESSAGING_SYSTEM, + _ATTR_SYSTEM: BROKER_SYSTEM, _ATTR_DEST: tags["queue"], _ATTR_OPERATION: operation, } diff --git a/faststream_outbox/metrics/prometheus.py b/faststream_outbox/metrics/prometheus.py index 86db787..886ed6a 100644 --- a/faststream_outbox/metrics/prometheus.py +++ b/faststream_outbox/metrics/prometheus.py @@ -29,26 +29,29 @@ import typing from collections.abc import Callable, Mapping, Sequence +from faststream._internal.constants import EMPTY + +from faststream_outbox._import_checker import is_prometheus_client_installed +from faststream_outbox.metrics import BROKER_SYSTEM + -try: +if typing.TYPE_CHECKING: from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram -except ImportError as e: # pragma: no cover - msg = "PrometheusRecorder requires the 'prometheus' extra: pip install 'faststream-outbox[prometheus]'" - raise ImportError(msg) from e -from faststream._internal.constants import EMPTY +if is_prometheus_client_installed: + from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +# ``DEFAULT_SIZE_BUCKETS`` is a class attribute on FastStream's MetricsContainer, +# not a module-level constant. Re-export through the attribute access so any +# future upstream tweak (different bucket spacing, additional buckets) flows +# in automatically. Upstream ``faststream.prometheus`` is part of the dev group; +# the import is unconditional rather than guarded behind an extra probe because +# the canonical source of buckets lives in upstream FastStream, not in a separate +# package. +from faststream.prometheus.container import MetricsContainer as _UpstreamContainer -try: - # ``DEFAULT_SIZE_BUCKETS`` is a class attribute on FastStream's MetricsContainer, - # not a module-level constant. Re-export through the attribute access so any - # future upstream tweak (different bucket spacing, additional buckets) flows - # in automatically. - from faststream.prometheus.container import MetricsContainer as _UpstreamContainer - _UPSTREAM_SIZE_BUCKETS: tuple[float, ...] = tuple(_UpstreamContainer.DEFAULT_SIZE_BUCKETS) -except ImportError: # pragma: no cover - _UPSTREAM_SIZE_BUCKETS = tuple(2.0**n for n in range(4, 25)) +_UPSTREAM_SIZE_BUCKETS: tuple[float, ...] = tuple(_UpstreamContainer.DEFAULT_SIZE_BUCKETS) # Mirror FastStream's PrometheusMiddleware duration histogram boundaries verbatim # so dashboards comparing process duration across brokers use the same buckets. @@ -69,7 +72,6 @@ 10.0, float("inf"), ) -_BROKER_LABEL = "outbox" class PrometheusRecorder: @@ -95,12 +97,15 @@ class PrometheusRecorder: def __init__( self, *, - registry: CollectorRegistry, + registry: "CollectorRegistry", app_name: str = EMPTY, metrics_prefix: str = "faststream", received_messages_size_buckets: Sequence[float] | None = None, custom_labels: dict[str, str | Callable[[Mapping[str, typing.Any]], str]] | None = None, ) -> None: + if not is_prometheus_client_installed: # pragma: no cover # prometheus_client is in the dev group + msg = "PrometheusRecorder requires the 'prometheus' extra: pip install 'faststream-outbox[prometheus]'" + raise ImportError(msg) self._app_name = "" if app_name is EMPTY else app_name self._custom_label_keys = list((custom_labels or {}).keys()) self._custom_label_resolvers = list((custom_labels or {}).values()) @@ -209,7 +214,7 @@ def _consume_values(self, tags: Mapping[str, typing.Any]) -> tuple[str, ...]: # producer): map it to the empty string so the ``handler`` label still # has a stable value rather than KeyError-ing the metric lookup. handler = tags.get("subscriber", "") - return (self._app_name, _BROKER_LABEL, handler, *self._resolve_custom_values(tags)) + return (self._app_name, BROKER_SYSTEM, handler, *self._resolve_custom_values(tags)) def _publish_values(self, tags: Mapping[str, typing.Any]) -> tuple[str, ...]: # Upstream tags publish-side metrics by ``destination`` (the queue @@ -218,7 +223,7 @@ def _publish_values(self, tags: Mapping[str, typing.Any]) -> tuple[str, ...]: # series consistent when ``OutboxPrometheusMiddleware`` is registered # alongside this recorder. destination = tags.get("queue", "") - return (self._app_name, _BROKER_LABEL, destination, *self._resolve_custom_values(tags)) + return (self._app_name, BROKER_SYSTEM, destination, *self._resolve_custom_values(tags)) def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: C901 consume_base = self._consume_values(tags) @@ -262,7 +267,15 @@ def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: if event == "published": publish_base = self._publish_values(tags) status = tags.get("status", "success") - self._published_total.labels(*publish_base, status).inc() + # Count = messages landed. Errors (count=0) and timer_id conflicts + # (count=0) don't increment the totals — aligns with the OTel adapter's + # ``messaging.publish.messages`` semantics. ``_published_exceptions`` + # below is the canonical error counter; ``_published_duration`` records + # every attempt (with the status label) so failed-publish latency stays + # observable. + count = tags.get("count", 1) + if count: + self._published_total.labels(*publish_base, status).inc(count) duration = tags.get("duration_seconds") if duration is not None: self._published_duration.labels(*publish_base).observe(duration) diff --git a/faststream_outbox/opentelemetry/middleware.py b/faststream_outbox/opentelemetry/middleware.py index 56bc9d0..cab33dd 100644 --- a/faststream_outbox/opentelemetry/middleware.py +++ b/faststream_outbox/opentelemetry/middleware.py @@ -7,25 +7,37 @@ (``fetched``, ``lease_lost``), use the ``MetricsRecorder`` seam alongside. """ +import typing + from faststream.opentelemetry.middleware import TelemetryMiddleware -from opentelemetry.metrics import Meter, MeterProvider -from opentelemetry.trace import TracerProvider +from faststream_outbox._import_checker import is_opentelemetry_installed from faststream_outbox.opentelemetry.provider import OutboxTelemetrySettingsProvider from faststream_outbox.response import OutboxPublishCommand +if typing.TYPE_CHECKING: + from opentelemetry.metrics import Meter, MeterProvider + from opentelemetry.trace import TracerProvider + + class OutboxTelemetryMiddleware(TelemetryMiddleware[OutboxPublishCommand]): """Drop-in `TelemetryMiddleware` for the outbox broker.""" def __init__( self, *, - tracer_provider: TracerProvider | None = None, - meter_provider: MeterProvider | None = None, - meter: Meter | None = None, - include_messages_counters: bool = True, + tracer_provider: "TracerProvider | None" = None, + meter_provider: "MeterProvider | None" = None, + meter: "Meter | None" = None, + include_messages_counters: bool = False, ) -> None: + if not is_opentelemetry_installed: # pragma: no cover # opentelemetry is in the dev group + msg = ( + "OutboxTelemetryMiddleware requires the 'opentelemetry' extra: " + "pip install 'faststream-outbox[opentelemetry]'" + ) + raise ImportError(msg) super().__init__( settings_provider_factory=lambda _: OutboxTelemetrySettingsProvider(), tracer_provider=tracer_provider, diff --git a/faststream_outbox/opentelemetry/provider.py b/faststream_outbox/opentelemetry/provider.py index d33f314..0454f33 100644 --- a/faststream_outbox/opentelemetry/provider.py +++ b/faststream_outbox/opentelemetry/provider.py @@ -6,6 +6,7 @@ from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME from faststream_outbox.message import OutboxInnerMessage +from faststream_outbox.metrics import BROKER_SYSTEM from faststream_outbox.response import OutboxPublishCommand @@ -32,10 +33,10 @@ class OutboxTelemetrySettingsProvider( __slots__ = ("messaging_system",) def __init__(self) -> None: - # Canonical value — must match ``messaging.metrics.opentelemetry._MESSAGING_SYSTEM`` - # and ``messaging.metrics.prometheus._BROKER_LABEL`` so dashboards see one - # ``messaging.system`` / ``broker`` value across both seams. - self.messaging_system = "outbox" + # Canonical value — shared with the recorder-seam adapters via the + # ``BROKER_SYSTEM`` constant so dashboards see one ``messaging.system`` + # / ``broker`` value across both seams. + self.messaging_system = BROKER_SYSTEM def get_consume_attrs_from_message( self, diff --git a/faststream_outbox/prometheus/middleware.py b/faststream_outbox/prometheus/middleware.py index f0ede22..4bad3ce 100644 --- a/faststream_outbox/prometheus/middleware.py +++ b/faststream_outbox/prometheus/middleware.py @@ -14,6 +14,7 @@ from faststream._internal.constants import EMPTY from faststream.prometheus.middleware import PrometheusMiddleware +from faststream_outbox._import_checker import is_prometheus_client_installed from faststream_outbox.message import OutboxInnerMessage from faststream_outbox.prometheus.provider import OutboxMetricsSettingsProvider from faststream_outbox.response import OutboxPublishCommand @@ -37,6 +38,12 @@ def __init__( received_messages_size_buckets: Sequence[float] | None = None, custom_labels: dict[str, str | Callable[[typing.Any], str]] | None = None, ) -> None: + if not is_prometheus_client_installed: # pragma: no cover # prometheus_client is in the dev group + msg = ( + "OutboxPrometheusMiddleware requires the 'prometheus' extra: " + "pip install 'faststream-outbox[prometheus]'" + ) + raise ImportError(msg) super().__init__( settings_provider_factory=lambda _: OutboxMetricsSettingsProvider(), registry=registry, diff --git a/faststream_outbox/prometheus/provider.py b/faststream_outbox/prometheus/provider.py index 11023d6..c0e5e5d 100644 --- a/faststream_outbox/prometheus/provider.py +++ b/faststream_outbox/prometheus/provider.py @@ -5,6 +5,7 @@ from faststream.prometheus import ConsumeAttrs, MetricsSettingsProvider from faststream_outbox.message import OutboxInnerMessage +from faststream_outbox.metrics import BROKER_SYSTEM from faststream_outbox.response import OutboxPublishCommand @@ -20,10 +21,9 @@ class OutboxMetricsSettingsProvider( __slots__ = ("messaging_system",) def __init__(self) -> None: - # Canonical value — must match ``metrics.prometheus._BROKER_LABEL`` and - # ``opentelemetry.provider.messaging_system`` so the ``broker`` label is - # the same value across recorder + middleware seams. - self.messaging_system = "outbox" + # Canonical value — shared via the ``BROKER_SYSTEM`` constant so the + # ``broker`` label is the same value across recorder + middleware seams. + self.messaging_system = BROKER_SYSTEM def get_consume_attrs_from_message( self, diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 9242005..6245b79 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -15,7 +15,7 @@ import datetime as _dt import typing import uuid -from contextlib import contextmanager, suppress +from contextlib import contextmanager from dataclasses import dataclass, field from unittest import mock @@ -29,6 +29,7 @@ from faststream_outbox.client import AbstractOutboxClient from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage +from faststream_outbox.metrics import _safe_emit from faststream_outbox.response import OutboxPublishCommand @@ -283,17 +284,17 @@ async def publish(self, cmd: OutboxPublishCommand) -> int | None: timer_id=cmd.timer_id, ) recorder = self._broker.config.broker_config.metrics_recorder - with suppress(Exception): - recorder( - "published", - { - "queue": cmd.queue, - "status": "success", - "count": 0 if row_id is None else 1, - "size_bytes": len(payload), - "duration_seconds": 0.0, - }, - ) + _safe_emit( + recorder, + "published", + { + "queue": cmd.queue, + "status": "success", + "count": 0 if row_id is None else 1, + "size_bytes": len(payload), + "duration_seconds": 0.0, + }, + ) if not self._run_loops and row_id is not None: await _sync_dispatch(self._fake_client, self._broker, cmd.queue, row_id) return row_id @@ -320,17 +321,17 @@ async def publish_batch(self, cmd: OutboxPublishCommand) -> None: landed += 1 if not self._run_loops and row_id is not None: await _sync_dispatch(self._fake_client, self._broker, cmd.queue, row_id) - with suppress(Exception): - recorder( - "published", - { - "queue": cmd.queue, - "status": "success", - "count": landed, - "size_bytes": total_size, - "duration_seconds": 0.0, - }, - ) + _safe_emit( + recorder, + "published", + { + "queue": cmd.queue, + "status": "success", + "count": landed, + "size_bytes": total_size, + "duration_seconds": 0.0, + }, + ) async def request(self, cmd: OutboxPublishCommand) -> typing.NoReturn: msg = "OutboxBroker does not support request-reply" @@ -383,17 +384,17 @@ async def fake_publish( # success/no-op; reproduce that here so test-broker users can assert on # publish-side metrics without exercising the full Postgres path. recorder = broker.config.broker_config.metrics_recorder - with suppress(Exception): - recorder( - "published", - { - "queue": queue, - "status": "success", - "count": 0 if row_id is None else 1, - "size_bytes": len(payload), - "duration_seconds": 0.0, - }, - ) + _safe_emit( + recorder, + "published", + { + "queue": queue, + "status": "success", + "count": 0 if row_id is None else 1, + "size_bytes": len(payload), + "duration_seconds": 0.0, + }, + ) # Sync dispatch ignores next_attempt_at — timers fire immediately in test mode. # Skip only when loop mode is on (loops would re-dispatch) or the insert was a # timer-dedup no-op. @@ -440,17 +441,17 @@ async def fake_publish_batch( landed += 1 if not run_loops and row_id is not None: await _sync_dispatch(fake_client, broker, queue, row_id) - with suppress(Exception): - recorder( - "published", - { - "queue": queue, - "status": "success", - "count": landed, - "size_bytes": total_size, - "duration_seconds": 0.0, - }, - ) + _safe_emit( + recorder, + "published", + { + "queue": queue, + "status": "success", + "count": landed, + "size_bytes": total_size, + "duration_seconds": 0.0, + }, + ) return fake_publish_batch diff --git a/tests/test_metrics_opentelemetry.py b/tests/test_metrics_opentelemetry.py index c9927dd..bb70e56 100644 --- a/tests/test_metrics_opentelemetry.py +++ b/tests/test_metrics_opentelemetry.py @@ -1,6 +1,7 @@ """Unit tests for ``OpenTelemetryRecorder`` — drop-in adapter for the seam.""" import typing +from unittest.mock import AsyncMock import pytest @@ -9,7 +10,10 @@ from opentelemetry import metrics as ot_metrics from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from sqlalchemy import MetaData +from sqlalchemy.ext.asyncio import AsyncSession +from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_outbox_table from faststream_outbox.metrics.opentelemetry import OpenTelemetryRecorder @@ -172,3 +176,65 @@ def test_otel_published_error_stamps_error_type_attribute() -> None: attrs = dict(metrics["messaging.publish.duration"].data_points[0].attributes) assert attrs["error.type"] == "IntegrityError" assert attrs["messaging.outbox.status"] == "error" + + +# ----- end-to-end recorder coverage (handler raises → nacked events flow) ----- +# Mirrors the Prometheus E2E tests: prove the contract between subscriber emission +# sites and the OTel adapter without hand-crafting tag dicts. + + +def _e2e_session() -> AsyncMock: + return AsyncMock(spec=AsyncSession) + + +def _e2e_broker(reader: InMemoryMetricReader, **subscriber_kwargs: typing.Any) -> OutboxBroker: + del subscriber_kwargs # consumed by caller's decorator + metadata = MetaData() + table = make_outbox_table(metadata) + provider = MeterProvider(metric_readers=[reader]) + return OutboxBroker( + outbox_table=table, + metrics_recorder=OpenTelemetryRecorder(meter_provider=provider, include_messages_counters=True), + ) + + +async def test_otel_e2e_handler_raises_emits_nacked_status_and_error_type_attrs() -> None: + """Default retry → handler raise schedules retry → process.duration carries status="nacked", error.type.""" + reader = InMemoryMetricReader() + broker = _e2e_broker(reader) + + @broker.subscriber("orders") + async def handle(body: dict) -> None: + del body + msg = "boom" + raise ValueError(msg) + + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=_e2e_session()) + + metrics = _collect_metrics(reader) + assert "messaging.process.duration" in metrics + attrs = dict(metrics["messaging.process.duration"].data_points[0].attributes) + assert attrs["messaging.outbox.status"] == "nacked" + assert attrs["error.type"] == "ValueError" + + +async def test_otel_e2e_handler_raises_with_noretry_stamps_terminal_reason_attr() -> None: + """NoRetry → handler raise terminates → process.duration carries terminal_reason="retry_terminal".""" + reader = InMemoryMetricReader() + broker = _e2e_broker(reader) + + @broker.subscriber("orders", retry_strategy=NoRetry()) + async def handle(body: dict) -> None: + del body + msg = "boom" + raise RuntimeError(msg) + + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=_e2e_session()) + + metrics = _collect_metrics(reader) + attrs = dict(metrics["messaging.process.duration"].data_points[0].attributes) + assert attrs["messaging.outbox.terminal_reason"] == "retry_terminal" + assert attrs["error.type"] == "RuntimeError" + assert attrs["messaging.outbox.status"] == "nacked" diff --git a/tests/test_metrics_prometheus.py b/tests/test_metrics_prometheus.py index 8121bf3..8342700 100644 --- a/tests/test_metrics_prometheus.py +++ b/tests/test_metrics_prometheus.py @@ -1,14 +1,18 @@ """Unit tests for ``PrometheusRecorder`` — drop-in adapter for the seam.""" import typing +from unittest.mock import AsyncMock import pytest -prometheus_client = pytest.importorskip("prometheus_client") -from prometheus_client import CollectorRegistry # noqa: E402 +pytest.importorskip("prometheus_client") +from prometheus_client import CollectorRegistry +from sqlalchemy import MetaData +from sqlalchemy.ext.asyncio import AsyncSession -from faststream_outbox.metrics.prometheus import PrometheusRecorder # noqa: E402 +from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_outbox_table +from faststream_outbox.metrics.prometheus import PrometheusRecorder def _sample(reg: CollectorRegistry, name: str, labels: dict[str, str]) -> float | None: @@ -199,3 +203,83 @@ def test_prometheus_custom_metrics_prefix_renames_series() -> None: # The default prefix metric must NOT exist. assert _sample(reg, "faststream_outbox_fetch_batches_total", {**_base_labels(), "non_empty": "true"}) is None assert _sample(reg, "my_app_outbox_fetch_batches_total", {**_base_labels(), "non_empty": "true"}) == 1.0 + + +# ----- end-to-end recorder coverage (handler raises → nacked events flow) ----- +# These tests prove the contract between subscriber emission sites and the +# Prometheus adapter end-to-end. Unit tests above hand-craft tag dicts and would +# not catch a rename like ``exception_type`` → ``exc_type`` in the emission code. + + +def _e2e_session() -> AsyncMock: + return AsyncMock(spec=AsyncSession) + + +async def test_prometheus_e2e_handler_raises_emits_nacked_retried_with_exception_type() -> None: + """Default retry strategy → handler raise schedules a retry → nacked_retried event.""" + reg = CollectorRegistry() + metadata = MetaData() + table = make_outbox_table(metadata) + broker = OutboxBroker(outbox_table=table, metrics_recorder=PrometheusRecorder(registry=reg)) + + @broker.subscriber("orders") + async def handle(body: dict) -> None: + del body + msg = "boom" + raise ValueError(msg) + + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=_e2e_session()) + + # The subscriber's `call_name` is "handle". Status must be "nacked". + assert ( + _sample( + reg, + "faststream_received_processed_messages_total", + {**_base_labels("Handle"), "status": "nacked"}, + ) + == 1.0 + ) + assert ( + _sample( + reg, + "faststream_received_processed_messages_exceptions_total", + {**_base_labels("Handle"), "exception_type": "ValueError"}, + ) + == 1.0 + ) + + +async def test_prometheus_e2e_handler_raises_with_noretry_emits_nacked_terminal_with_reason() -> None: + """NoRetry strategy → handler raise terminates → nacked_terminal(reason="retry_terminal").""" + reg = CollectorRegistry() + metadata = MetaData() + table = make_outbox_table(metadata) + broker = OutboxBroker(outbox_table=table, metrics_recorder=PrometheusRecorder(registry=reg)) + + @broker.subscriber("orders", retry_strategy=NoRetry()) + async def handle(body: dict) -> None: + del body + msg = "boom" + raise RuntimeError(msg) + + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=_e2e_session()) + + assert ( + _sample( + reg, + "faststream_outbox_terminal_total", + {**_base_labels("Handle"), "reason": "retry_terminal"}, + ) + == 1.0 + ) + # Terminal nacked path also bumps the upstream ``status="nacked"`` counter. + assert ( + _sample( + reg, + "faststream_received_processed_messages_total", + {**_base_labels("Handle"), "status": "nacked"}, + ) + == 1.0 + ) diff --git a/tests/test_middleware_opentelemetry.py b/tests/test_middleware_opentelemetry.py index 53e52f5..53e63c3 100644 --- a/tests/test_middleware_opentelemetry.py +++ b/tests/test_middleware_opentelemetry.py @@ -203,3 +203,30 @@ def test_outbox_telemetry_provider_publish_destination_name() -> None: session=AsyncMock(spec=AsyncSession), ) assert provider.get_publish_destination_name(cmd) == "dst-q" + + +async def test_outbox_telemetry_middleware_publish_scope_does_not_fire_under_test_broker() -> None: + """ + ``TestOutboxBroker`` patches ``broker.publish`` directly, bypassing ``_basic_publish``. + + The middleware's ``publish_scope`` therefore must not fire. The recorder seam + (via ``FakeOutboxProducer`` / ``_build_fake_publish``) is the publish-side + metrics path in test mode. Negative-assert here so a future refactor that + routes test-broker publishes through ``_basic_publish`` (and thus through + the publish-scope middleware) trips this guardrail. + """ + reader = InMemoryMetricReader() + broker = _make_broker(reader) + + @broker.subscriber("orders") + async def handle(body: dict) -> None: + pass + + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=_session_mock()) + + instruments = _instruments(reader) + # Publish-scope middleware would create messaging.publish.duration; under + # the test broker the publish path bypasses _basic_publish, so the instrument + # must be absent. + assert "messaging.publish.duration" not in instruments diff --git a/tests/test_middleware_prometheus.py b/tests/test_middleware_prometheus.py index 29dc0a8..1fabb96 100644 --- a/tests/test_middleware_prometheus.py +++ b/tests/test_middleware_prometheus.py @@ -228,3 +228,38 @@ def _acked_from(reg: CollectorRegistry) -> float: assert _acked_from(middleware_reg) == 1.0 assert _acked_from(recorder_reg) == 1.0 + + +async def test_outbox_prometheus_middleware_publish_scope_does_not_fire_under_test_broker() -> None: + """ + ``TestOutboxBroker`` patches ``broker.publish`` directly, bypassing ``_basic_publish``. + + The middleware's ``publish_scope`` therefore must not fire. The recorder seam + (via ``FakeOutboxProducer`` / ``_build_fake_publish``) is the publish-side + metrics path in test mode. Negative-assert here so a future refactor that + routes test-broker publishes through ``_basic_publish`` (and thus through + the publish-scope middleware) trips this guardrail instead of silently + double-counting in production users that test through ``TestOutboxBroker``. + """ + reg = CollectorRegistry() + broker = _make_broker(reg) + + @broker.subscriber("orders") + async def handle(body: dict) -> None: + pass + + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=_session_mock()) + + # A Counter that's never been ``.labels(...).inc()``-ed produces zero samples; + # any non-zero sample on the publish-side series means middleware fired. + non_zero_publish_samples = [ + (sample.name, dict(sample.labels), sample.value) + for metric in reg.collect() + if metric.name == "faststream_published_messages" + for sample in metric.samples + if sample.value > 0.0 + ] + assert non_zero_publish_samples == [], ( + f"publish_scope middleware fired under TestOutboxBroker: {non_zero_publish_samples}" + ) diff --git a/tests/test_unit.py b/tests/test_unit.py index 01173bb..8b36000 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -994,14 +994,10 @@ def test_validate_schema_sync_raises_when_alembic_missing() -> None: """Without alembic installed the validator must raise ImportError with the install hint.""" metadata = MetaData() t = make_outbox_table(metadata) - # Alembic is imported at module load; simulate "not installed" by zeroing the - # sentinels the function checks — matches what client.py's except-ImportError does. + # Alembic is probed via ``_import_checker.is_alembic_installed`` at import time; + # simulate "not installed" by flipping the boolean the function checks. with ( - patch.multiple( - "faststream_outbox.client", - _alembic_compare_metadata=None, - _AlembicMigrationContext=None, - ), + patch("faststream_outbox.client.is_alembic_installed", new=False), pytest.raises(ImportError, match=r"pip install faststream-outbox\[validate\]"), ): _validate_schema_sync(MagicMock(), t) From 7caed714a069f4ce9b1e07966a3c5a0d2e23a707 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 29 May 2026 10:13:11 +0300 Subject: [PATCH 2/2] refactor: cover optional-extra ImportError branches; review nits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Drop `# pragma: no cover` on the four `if not is_X_installed: raise ImportError` branches. Add tests that patch the `is_*_installed` booleans (`OpenTelemetryRecorder`, `PrometheusRecorder`, `OutboxTelemetryMiddleware`, `OutboxPrometheusMiddleware`) and assert the friendly install-hint surfaces. Mirrors the existing `test_validate_schema_sync_raises_when_alembic_missing` shape. * Subscriber `_emit_metric` now passes `exc_info=exc` to `self._log`, matching the producer's swallow-and-log shape so operators see the same traceback whether the recorder raises on a consume or publish event. * `metrics/prometheus.py` `published` block: `if count:` → `if count > 0:` for explicit intent (per review nit). --- faststream_outbox/metrics/opentelemetry.py | 2 +- faststream_outbox/metrics/prometheus.py | 4 ++-- faststream_outbox/opentelemetry/middleware.py | 2 +- faststream_outbox/prometheus/middleware.py | 2 +- faststream_outbox/subscriber/usecase.py | 6 +++++- tests/test_metrics_opentelemetry.py | 11 ++++++++++- tests/test_metrics_prometheus.py | 11 ++++++++++- tests/test_middleware_opentelemetry.py | 11 ++++++++++- tests/test_middleware_prometheus.py | 11 ++++++++++- 9 files changed, 50 insertions(+), 10 deletions(-) diff --git a/faststream_outbox/metrics/opentelemetry.py b/faststream_outbox/metrics/opentelemetry.py index 27c1054..9faa711 100644 --- a/faststream_outbox/metrics/opentelemetry.py +++ b/faststream_outbox/metrics/opentelemetry.py @@ -89,7 +89,7 @@ def __init__( meter: "ot_metrics.Meter | None" = None, include_messages_counters: bool = False, ) -> None: - if not is_opentelemetry_installed: # pragma: no cover # opentelemetry is in the dev group + if not is_opentelemetry_installed: msg = ( "OpenTelemetryRecorder requires the 'opentelemetry' extra: " "pip install 'faststream-outbox[opentelemetry]'" diff --git a/faststream_outbox/metrics/prometheus.py b/faststream_outbox/metrics/prometheus.py index 886ed6a..90360a3 100644 --- a/faststream_outbox/metrics/prometheus.py +++ b/faststream_outbox/metrics/prometheus.py @@ -103,7 +103,7 @@ def __init__( received_messages_size_buckets: Sequence[float] | None = None, custom_labels: dict[str, str | Callable[[Mapping[str, typing.Any]], str]] | None = None, ) -> None: - if not is_prometheus_client_installed: # pragma: no cover # prometheus_client is in the dev group + if not is_prometheus_client_installed: msg = "PrometheusRecorder requires the 'prometheus' extra: pip install 'faststream-outbox[prometheus]'" raise ImportError(msg) self._app_name = "" if app_name is EMPTY else app_name @@ -274,7 +274,7 @@ def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: # every attempt (with the status label) so failed-publish latency stays # observable. count = tags.get("count", 1) - if count: + if count > 0: self._published_total.labels(*publish_base, status).inc(count) duration = tags.get("duration_seconds") if duration is not None: diff --git a/faststream_outbox/opentelemetry/middleware.py b/faststream_outbox/opentelemetry/middleware.py index cab33dd..c3d607b 100644 --- a/faststream_outbox/opentelemetry/middleware.py +++ b/faststream_outbox/opentelemetry/middleware.py @@ -32,7 +32,7 @@ def __init__( meter: "Meter | None" = None, include_messages_counters: bool = False, ) -> None: - if not is_opentelemetry_installed: # pragma: no cover # opentelemetry is in the dev group + if not is_opentelemetry_installed: msg = ( "OutboxTelemetryMiddleware requires the 'opentelemetry' extra: " "pip install 'faststream-outbox[opentelemetry]'" diff --git a/faststream_outbox/prometheus/middleware.py b/faststream_outbox/prometheus/middleware.py index 4bad3ce..227b082 100644 --- a/faststream_outbox/prometheus/middleware.py +++ b/faststream_outbox/prometheus/middleware.py @@ -38,7 +38,7 @@ def __init__( received_messages_size_buckets: Sequence[float] | None = None, custom_labels: dict[str, str | Callable[[typing.Any], str]] | None = None, ) -> None: - if not is_prometheus_client_installed: # pragma: no cover # prometheus_client is in the dev group + if not is_prometheus_client_installed: msg = ( "OutboxPrometheusMiddleware requires the 'prometheus' extra: " "pip install 'faststream-outbox[prometheus]'" diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index b254f8c..8d129d8 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -146,10 +146,14 @@ def _base_tags(self, queue: str) -> dict[str, typing.Any]: def _emit_metric(self, event: str, tags: Mapping[str, typing.Any]) -> None: try: self._outer_config.metrics_recorder(event, tags) - except Exception: # noqa: BLE001 + except Exception as exc: # noqa: BLE001 + # Match the producer's swallow-and-log shape (with ``exc_info``) so + # operators see the same traceback whether the recorder raised on a + # subscriber or publisher event. self._log( log_level=logging.DEBUG, message="metrics recorder raised", + exc_info=exc, ) @typing.override diff --git a/tests/test_metrics_opentelemetry.py b/tests/test_metrics_opentelemetry.py index bb70e56..ee7ccbd 100644 --- a/tests/test_metrics_opentelemetry.py +++ b/tests/test_metrics_opentelemetry.py @@ -1,7 +1,7 @@ """Unit tests for ``OpenTelemetryRecorder`` — drop-in adapter for the seam.""" import typing -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch import pytest @@ -238,3 +238,12 @@ async def handle(body: dict) -> None: assert attrs["messaging.outbox.terminal_reason"] == "retry_terminal" assert attrs["error.type"] == "RuntimeError" assert attrs["messaging.outbox.status"] == "nacked" + + +def test_otel_recorder_raises_friendly_error_when_extra_missing() -> None: + """Emulating ``opentelemetry`` as not installed must surface the install-hint ImportError.""" + with ( + patch("faststream_outbox.metrics.opentelemetry.is_opentelemetry_installed", new=False), + pytest.raises(ImportError, match=r"pip install 'faststream-outbox\[opentelemetry\]'"), + ): + OpenTelemetryRecorder() diff --git a/tests/test_metrics_prometheus.py b/tests/test_metrics_prometheus.py index 8342700..5d0d0bd 100644 --- a/tests/test_metrics_prometheus.py +++ b/tests/test_metrics_prometheus.py @@ -1,7 +1,7 @@ """Unit tests for ``PrometheusRecorder`` — drop-in adapter for the seam.""" import typing -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch import pytest @@ -283,3 +283,12 @@ async def handle(body: dict) -> None: ) == 1.0 ) + + +def test_prometheus_recorder_raises_friendly_error_when_extra_missing() -> None: + """Emulating ``prometheus_client`` as not installed must surface the install-hint ImportError.""" + with ( + patch("faststream_outbox.metrics.prometheus.is_prometheus_client_installed", new=False), + pytest.raises(ImportError, match=r"pip install 'faststream-outbox\[prometheus\]'"), + ): + PrometheusRecorder(registry=CollectorRegistry()) diff --git a/tests/test_middleware_opentelemetry.py b/tests/test_middleware_opentelemetry.py index 53e63c3..b59c340 100644 --- a/tests/test_middleware_opentelemetry.py +++ b/tests/test_middleware_opentelemetry.py @@ -10,7 +10,7 @@ import datetime as _dt import typing import uuid -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -230,3 +230,12 @@ async def handle(body: dict) -> None: # the test broker the publish path bypasses _basic_publish, so the instrument # must be absent. assert "messaging.publish.duration" not in instruments + + +def test_outbox_telemetry_middleware_raises_friendly_error_when_extra_missing() -> None: + """Emulating ``opentelemetry`` as not installed must surface the install-hint ImportError.""" + with ( + patch("faststream_outbox.opentelemetry.middleware.is_opentelemetry_installed", new=False), + pytest.raises(ImportError, match=r"pip install 'faststream-outbox\[opentelemetry\]'"), + ): + OutboxTelemetryMiddleware() diff --git a/tests/test_middleware_prometheus.py b/tests/test_middleware_prometheus.py index 1fabb96..9675782 100644 --- a/tests/test_middleware_prometheus.py +++ b/tests/test_middleware_prometheus.py @@ -8,7 +8,7 @@ import datetime as _dt import uuid -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -263,3 +263,12 @@ async def handle(body: dict) -> None: assert non_zero_publish_samples == [], ( f"publish_scope middleware fired under TestOutboxBroker: {non_zero_publish_samples}" ) + + +def test_outbox_prometheus_middleware_raises_friendly_error_when_extra_missing() -> None: + """Emulating ``prometheus_client`` as not installed must surface the install-hint ImportError.""" + with ( + patch("faststream_outbox.prometheus.middleware.is_prometheus_client_installed", new=False), + pytest.raises(ImportError, match=r"pip install 'faststream-outbox\[prometheus\]'"), + ): + OutboxPrometheusMiddleware(registry=CollectorRegistry())