From 84455a23a8b7fa0f8c751efa0db5cb7748b85661 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Fri, 5 Jun 2026 10:53:38 -0700 Subject: [PATCH 1/2] feat(auth-cache): add hit/miss/eviction metrics to authentication cache Instrument AsyncTTLCache so cache effectiveness is observable per flow. Reads emit auth_cache.access tagged cache: + result:hit|miss_expired| miss_absent; capacity evictions emit auth_cache.eviction. Emission mirrors the dual OTel + StatsD pattern in db_metrics.py and no-ops when neither backend is configured. This makes it possible to distinguish a low hit rate caused by a short TTL (miss_expired) from one caused by non-repeating keys or a cold per-worker cache (miss_absent), and to surface capacity-driven eviction. --- agentex/src/api/authentication_cache.py | 33 ++++-- agentex/src/utils/cache_metrics.py | 100 ++++++++++++++++++ .../api/test_authentication_cache_metrics.py | 92 ++++++++++++++++ .../tests/unit/utils/test_cache_metrics.py | 58 ++++++++++ 4 files changed, 277 insertions(+), 6 deletions(-) create mode 100644 agentex/src/utils/cache_metrics.py create mode 100644 agentex/tests/unit/api/test_authentication_cache_metrics.py create mode 100644 agentex/tests/unit/utils/test_cache_metrics.py diff --git a/agentex/src/api/authentication_cache.py b/agentex/src/api/authentication_cache.py index 12a4bbe0..066788f2 100644 --- a/agentex/src/api/authentication_cache.py +++ b/agentex/src/api/authentication_cache.py @@ -7,6 +7,7 @@ from collections import OrderedDict from typing import Any +from src.utils.cache_metrics import record_cache_access, record_cache_eviction from src.utils.logging import make_logger logger = make_logger(__name__) @@ -27,14 +28,16 @@ class AsyncTTLCache: """Async-safe TTL cache implementation using OrderedDict with asyncio locks.""" - def __init__(self, max_size: int = 1000, ttl_seconds: int = 300): + def __init__(self, name: str, max_size: int = 1000, ttl_seconds: int = 300): """ Initialize async-safe TTL cache. Args: + name: Logical cache name, used as the ``cache`` tag on emitted metrics max_size: Maximum number of entries in the cache ttl_seconds: Time-to-live for cache entries in seconds """ + self.name = name self.cache: OrderedDict[str, tuple[Any, float]] = OrderedDict() self.max_size = max_size self.ttl_seconds = ttl_seconds @@ -44,6 +47,10 @@ async def get(self, key: str) -> Any | None: """Get value from cache if it exists and hasn't expired.""" async with self._lock: if key not in self.cache: + # MISS: the key was never cached (or was already evicted). In a + # load test this dominating means the key never repeats (unique + # creds/cookies per request) or the cache is cold per-worker. + record_cache_access(self.name, "miss_absent") return None value, timestamp = self.cache[key] @@ -51,10 +58,15 @@ async def get(self, key: str) -> Any | None: # Check if entry has expired if time.time() - timestamp > self.ttl_seconds: del self.cache[key] + # MISS: the key was present but past its TTL. This dominating + # means the TTL is too short for the request rate (churn). + record_cache_access(self.name, "miss_expired") return None # Move to end (most recently used) self.cache.move_to_end(key) + # HIT: present and fresh. + record_cache_access(self.name, "hit") return value async def set(self, key: str, value: Any) -> None: @@ -63,6 +75,7 @@ async def set(self, key: str, value: Any) -> None: # Remove oldest entry if cache is full if len(self.cache) >= self.max_size and key not in self.cache: self.cache.popitem(last=False) + record_cache_eviction(self.name) self.cache[key] = (value, time.time()) self.cache.move_to_end(key) @@ -117,12 +130,20 @@ def __init__( authorization_cache_ttl: TTL for authorization checks in seconds max_cache_size: Maximum number of entries per cache """ - # Separate async-safe caches for different authentication types - self.agent_identity_cache = AsyncTTLCache(max_cache_size, agent_cache_ttl) - self.agent_api_key_cache = AsyncTTLCache(max_cache_size, agent_cache_ttl) - self.auth_gateway_cache = AsyncTTLCache(max_cache_size, auth_gateway_cache_ttl) + # Separate async-safe caches for different authentication types. + # The name is used as the ``cache`` tag on emitted metrics so hit rate + # can be broken down per flow. + self.agent_identity_cache = AsyncTTLCache( + "agent_identity", max_cache_size, agent_cache_ttl + ) + self.agent_api_key_cache = AsyncTTLCache( + "agent_api_key", max_cache_size, agent_cache_ttl + ) + self.auth_gateway_cache = AsyncTTLCache( + "auth_gateway", max_cache_size, auth_gateway_cache_ttl + ) self.authorization_check_cache = AsyncTTLCache( - max_cache_size, authorization_cache_ttl + "authorization_check", max_cache_size, authorization_cache_ttl ) logger.info( diff --git a/agentex/src/utils/cache_metrics.py b/agentex/src/utils/cache_metrics.py new file mode 100644 index 00000000..6bc53377 --- /dev/null +++ b/agentex/src/utils/cache_metrics.py @@ -0,0 +1,100 @@ +""" +Metrics instrumentation for the in-process authentication/authorization caches. + +Mirrors the dual-emit pattern in ``src/utils/db_metrics.py``: + +- When an OTLP endpoint is configured (``OTEL_EXPORTER_OTLP_ENDPOINT``), counters + are recorded through the OpenTelemetry SDK. +- When the Datadog Agent is reachable (``DD_AGENT_HOST``), the same events are + emitted as StatsD counters. +- When neither is configured, every function here is a cheap no-op. + +The goal is to make cache effectiveness observable: hit rate per cache, the +reason for misses (expired vs never-seen), and capacity-driven evictions. These +are exactly the signals needed to tell whether a low hit rate is a TTL problem, +a key-cardinality problem, or a capacity problem. +""" + +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Literal + +from datadog import statsd + +from src.utils.logging import make_logger +from src.utils.otel_metrics import get_meter + +if TYPE_CHECKING: + from opentelemetry.metrics import Counter + +logger = make_logger(__name__) + +# StatsD is only emitted if the Datadog Agent host is configured. +_STATSD_ENABLED = bool(os.environ.get("DD_AGENT_HOST")) + +# Outcome of a single cache read. "hit" = present and fresh; "miss_expired" = +# present but past its TTL (TTL too short / churn); "miss_absent" = never cached +# or evicted (cold cache, key never repeats, or capacity eviction). +CacheResult = Literal["hit", "miss_expired", "miss_absent"] + +# Lazily-created OTel instruments (created once, on first use). +_access_counter: Counter | None = None +_eviction_counter: Counter | None = None +_instruments_initialized = False + + +def _ensure_instruments() -> None: + """Create OTel counters on first use. No-op if OTel is not configured.""" + global _access_counter, _eviction_counter, _instruments_initialized + + if _instruments_initialized: + return + _instruments_initialized = True + + meter = get_meter("agentex.auth_cache") + if meter is None: + # OTel not configured; OTel path stays disabled. StatsD may still emit. + return + + _access_counter = meter.create_counter( + name="auth_cache.access", + description="Authentication/authorization cache reads, tagged by cache and result", + unit="{access}", + ) + _eviction_counter = meter.create_counter( + name="auth_cache.eviction", + description="LRU evictions from authentication/authorization caches", + unit="{eviction}", + ) + + +def record_cache_access(cache_name: str, result: CacheResult) -> None: + """ + Record a single cache read. + + Args: + cache_name: Logical cache name (e.g. "auth_gateway", "agent_api_key"). + result: One of "hit", "miss_expired", "miss_absent". + """ + _ensure_instruments() + + if _access_counter is not None: + _access_counter.add(1, {"cache": cache_name, "result": result}) + + if _STATSD_ENABLED: + statsd.increment( + "auth_cache.access", + tags=[f"cache:{cache_name}", f"result:{result}"], + ) + + +def record_cache_eviction(cache_name: str) -> None: + """Record a single capacity-driven (LRU) eviction.""" + _ensure_instruments() + + if _eviction_counter is not None: + _eviction_counter.add(1, {"cache": cache_name}) + + if _STATSD_ENABLED: + statsd.increment("auth_cache.eviction", tags=[f"cache:{cache_name}"]) diff --git a/agentex/tests/unit/api/test_authentication_cache_metrics.py b/agentex/tests/unit/api/test_authentication_cache_metrics.py new file mode 100644 index 00000000..8b9dfc5b --- /dev/null +++ b/agentex/tests/unit/api/test_authentication_cache_metrics.py @@ -0,0 +1,92 @@ +"""Tests for cache hit/miss/eviction instrumentation in ``authentication_cache``. + +Asserts that ``AsyncTTLCache`` records the correct metric for each of the three +read outcomes (hit / miss_expired / miss_absent) and emits an eviction metric on +capacity-driven LRU eviction. The emission backend (OTel / StatsD) is covered +separately; here we patch the recorder functions at their import site in +``authentication_cache`` and assert the calls. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest +from src.api.authentication_cache import AsyncTTLCache, AuthenticationCache + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestAsyncTTLCacheMetrics: + async def test_hit_records_hit(self): + cache = AsyncTTLCache(name="agent_api_key", ttl_seconds=300) + await cache.set("k", "v") + + with patch("src.api.authentication_cache.record_cache_access") as record_access: + result = await cache.get("k") + + assert result == "v" + record_access.assert_called_once_with("agent_api_key", "hit") + + async def test_absent_key_records_miss_absent(self): + cache = AsyncTTLCache(name="auth_gateway", ttl_seconds=300) + + with patch("src.api.authentication_cache.record_cache_access") as record_access: + result = await cache.get("never-set") + + assert result is None + record_access.assert_called_once_with("auth_gateway", "miss_absent") + + async def test_expired_entry_records_miss_expired(self): + cache = AsyncTTLCache(name="auth_gateway", ttl_seconds=60) + await cache.set("k", "v") + # Force expiry deterministically by backdating the stored timestamp, + # avoiding any reliance on wall-clock timing in the test. + value, _ = cache.cache["k"] + cache.cache["k"] = (value, 0.0) + + with patch("src.api.authentication_cache.record_cache_access") as record_access: + result = await cache.get("k") + + assert result is None + record_access.assert_called_once_with("auth_gateway", "miss_expired") + # The expired entry should also have been purged from the cache. + assert "k" not in cache.cache + + async def test_capacity_eviction_records_eviction(self): + cache = AsyncTTLCache(name="authorization_check", max_size=1, ttl_seconds=300) + await cache.set("first", "v1") + + with patch( + "src.api.authentication_cache.record_cache_eviction" + ) as record_eviction: + # Inserting a second distinct key evicts the oldest (LRU). + await cache.set("second", "v2") + + record_eviction.assert_called_once_with("authorization_check") + assert "first" not in cache.cache + assert "second" in cache.cache + + async def test_overwriting_existing_key_does_not_evict(self): + cache = AsyncTTLCache(name="authorization_check", max_size=1, ttl_seconds=300) + await cache.set("k", "v1") + + with patch( + "src.api.authentication_cache.record_cache_eviction" + ) as record_eviction: + # Re-setting an existing key is an update, not a capacity eviction. + await cache.set("k", "v2") + + record_eviction.assert_not_called() + assert await cache.get("k") == "v2" + + +@pytest.mark.unit +def test_authentication_cache_assigns_distinct_names(): + """Each sub-cache carries the name used as the ``cache`` metric tag.""" + cache = AuthenticationCache() + + assert cache.agent_identity_cache.name == "agent_identity" + assert cache.agent_api_key_cache.name == "agent_api_key" + assert cache.auth_gateway_cache.name == "auth_gateway" + assert cache.authorization_check_cache.name == "authorization_check" diff --git a/agentex/tests/unit/utils/test_cache_metrics.py b/agentex/tests/unit/utils/test_cache_metrics.py new file mode 100644 index 00000000..241fa6b2 --- /dev/null +++ b/agentex/tests/unit/utils/test_cache_metrics.py @@ -0,0 +1,58 @@ +"""Tests for the auth-cache metrics emitter. + +Covers the two paths that matter operationally: the no-op path (neither OTel nor +StatsD configured, which is the default in tests and local dev) must never +raise, and the StatsD path must emit a counter with the expected name and tags. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest +from src.utils import cache_metrics + + +@pytest.mark.unit +def test_record_functions_are_noop_when_unconfigured(): + # With no OTLP endpoint and no DD_AGENT_HOST, both calls must be harmless. + with ( + patch.object(cache_metrics, "_STATSD_ENABLED", False), + patch.object(cache_metrics, "_access_counter", None), + patch.object(cache_metrics, "_eviction_counter", None), + patch.object(cache_metrics, "_instruments_initialized", True), + ): + cache_metrics.record_cache_access("auth_gateway", "hit") + cache_metrics.record_cache_eviction("auth_gateway") + + +@pytest.mark.unit +def test_record_cache_access_emits_statsd_when_enabled(): + with ( + patch.object(cache_metrics, "_STATSD_ENABLED", True), + patch.object(cache_metrics, "_instruments_initialized", True), + patch.object(cache_metrics, "_access_counter", None), + patch.object(cache_metrics, "statsd") as mock_statsd, + ): + cache_metrics.record_cache_access("auth_gateway", "miss_absent") + + mock_statsd.increment.assert_called_once_with( + "auth_cache.access", + tags=["cache:auth_gateway", "result:miss_absent"], + ) + + +@pytest.mark.unit +def test_record_cache_eviction_emits_statsd_when_enabled(): + with ( + patch.object(cache_metrics, "_STATSD_ENABLED", True), + patch.object(cache_metrics, "_instruments_initialized", True), + patch.object(cache_metrics, "_eviction_counter", None), + patch.object(cache_metrics, "statsd") as mock_statsd, + ): + cache_metrics.record_cache_eviction("agent_api_key") + + mock_statsd.increment.assert_called_once_with( + "auth_cache.eviction", + tags=["cache:agent_api_key"], + ) From 79a404ea495803cd1f6d7d4e0ea8e43ee959c6ef Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Fri, 5 Jun 2026 11:01:47 -0700 Subject: [PATCH 2/2] fix(auth-cache): address greptile review findings - Guard metric emission so instrumentation never raises into the caller. record_cache_access/record_cache_eviction now wrap their full body in try/except (centralized in the emitter rather than at each call site in get()/set()), matching the defensive pattern in db_metrics.py. A StatsD UDP error or OTel SDK fault can no longer propagate up the critical auth path. - Add a test asserting both emitters swallow backend errors. --- agentex/src/utils/cache_metrics.py | 42 ++++++++++++------- .../tests/unit/utils/test_cache_metrics.py | 17 ++++++++ 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/agentex/src/utils/cache_metrics.py b/agentex/src/utils/cache_metrics.py index 6bc53377..e736a8f3 100644 --- a/agentex/src/utils/cache_metrics.py +++ b/agentex/src/utils/cache_metrics.py @@ -76,25 +76,39 @@ def record_cache_access(cache_name: str, result: CacheResult) -> None: Args: cache_name: Logical cache name (e.g. "auth_gateway", "agent_api_key"). result: One of "hit", "miss_expired", "miss_absent". + + Never raises: emission failures (e.g. a StatsD UDP socket error or an OTel + SDK fault) are swallowed so instrumentation can never disrupt a caller on + the critical auth path. """ - _ensure_instruments() + try: + _ensure_instruments() - if _access_counter is not None: - _access_counter.add(1, {"cache": cache_name, "result": result}) + if _access_counter is not None: + _access_counter.add(1, {"cache": cache_name, "result": result}) - if _STATSD_ENABLED: - statsd.increment( - "auth_cache.access", - tags=[f"cache:{cache_name}", f"result:{result}"], - ) + if _STATSD_ENABLED: + statsd.increment( + "auth_cache.access", + tags=[f"cache:{cache_name}", f"result:{result}"], + ) + except Exception: + logger.debug("Failed to emit auth_cache.access metric", exc_info=True) def record_cache_eviction(cache_name: str) -> None: - """Record a single capacity-driven (LRU) eviction.""" - _ensure_instruments() + """ + Record a single capacity-driven (LRU) eviction. + + Never raises: see ``record_cache_access``. + """ + try: + _ensure_instruments() - if _eviction_counter is not None: - _eviction_counter.add(1, {"cache": cache_name}) + if _eviction_counter is not None: + _eviction_counter.add(1, {"cache": cache_name}) - if _STATSD_ENABLED: - statsd.increment("auth_cache.eviction", tags=[f"cache:{cache_name}"]) + if _STATSD_ENABLED: + statsd.increment("auth_cache.eviction", tags=[f"cache:{cache_name}"]) + except Exception: + logger.debug("Failed to emit auth_cache.eviction metric", exc_info=True) diff --git a/agentex/tests/unit/utils/test_cache_metrics.py b/agentex/tests/unit/utils/test_cache_metrics.py index 241fa6b2..a64a9fd7 100644 --- a/agentex/tests/unit/utils/test_cache_metrics.py +++ b/agentex/tests/unit/utils/test_cache_metrics.py @@ -26,6 +26,23 @@ def test_record_functions_are_noop_when_unconfigured(): cache_metrics.record_cache_eviction("auth_gateway") +@pytest.mark.unit +def test_record_functions_swallow_emission_errors(): + # A failing backend must never propagate to the caller (critical auth path). + with ( + patch.object(cache_metrics, "_STATSD_ENABLED", True), + patch.object(cache_metrics, "_instruments_initialized", True), + patch.object(cache_metrics, "_access_counter", None), + patch.object(cache_metrics, "_eviction_counter", None), + patch.object(cache_metrics, "statsd") as mock_statsd, + ): + mock_statsd.increment.side_effect = OSError("socket in a bad state") + + # Neither call should raise despite the backend blowing up. + cache_metrics.record_cache_access("auth_gateway", "hit") + cache_metrics.record_cache_eviction("auth_gateway") + + @pytest.mark.unit def test_record_cache_access_emits_statsd_when_enabled(): with (