Skip to content

fix(streams): add sliding TTL to Redis stream keys#215

Merged
smoreinis merged 8 commits intomainfrom
fix/redis-stream-ttl
May 4, 2026
Merged

fix(streams): add sliding TTL to Redis stream keys#215
smoreinis merged 8 commits intomainfrom
fix/redis-stream-ttl

Conversation

@smoreinis
Copy link
Copy Markdown
Collaborator

@smoreinis smoreinis commented May 1, 2026

Summary

  • Adds env-var-configurable sliding TTL on per-task Redis stream keys via pipelined XADD + EXPIRE.
  • Default REDIS_STREAM_TTL_SECONDS=3600 (1h sliding window).
  • Set REDIS_STREAM_TTL_SECONDS=0 to disable (preserves pre-change behavior).

Fixes orphan stream accumulation when the SSE-disconnect cleanup path doesn't run (e.g., consumer never connects, backend crashes mid-task). Active streams stay alive indefinitely — every write refreshes the TTL.

Test plan

  • test_send_data_sets_ttl_on_stream_key — TTL ~3600s set on key after send_data
  • test_send_data_skips_ttl_when_disabled — no TTL set when REDIS_STREAM_TTL_SECONDS=0
  • test_send_data_still_applies_maxlen — MAXLEN trimming still works
  • Full make test suite passes (389 passed locally)
  • Operators with long-idle workloads can override the env var pre-deploy

Greptile Summary

This PR adds a configurable sliding TTL to Redis stream keys by pipelining XADD + EXPIRE in a single round-trip, defaulting to 3600 s and disabling cleanly at REDIS_STREAM_TTL_SECONDS=0. The implementation correctly handles pipeline errors with raise_on_error=False and per-result type checks, and is covered by focused integration tests.

Confidence Score: 5/5

Safe to merge — no blocking issues; logic is correct, error handling is sound, and tests provide good coverage.

The previous P1 concern about raise_on_error has been fully addressed. Pipeline results are inspected individually, XADD failures propagate correctly, and EXPIRE failures degrade gracefully. The TTL-disabled path preserves the original behaviour exactly. No new P0 or P1 findings.

No files require special attention.

Important Files Changed

Filename Overview
agentex/src/adapters/streams/adapter_redis.py Core change: branches on TTL env var to use a non-transactional pipeline (XADD + EXPIRE) with per-result error handling; XADD failures are re-raised, EXPIRE failures are logged as warnings — logic is sound and well-commented.
agentex/src/config/environment_variables.py Adds REDIS_STREAM_TTL_SECONDS enum key, model field (default 3600), and from_env parsing — consistent with existing pattern for REDIS_STREAM_MAXLEN.
agentex/tests/integration/test_redis_stream_ttl.py New integration test file covering TTL set, TTL disabled, and MAXLEN still applied; uses try/finally for cleanup and restoring mutated env vars.
agentex/tests/fixtures/repositories.py Adds REDIS_STREAM_TTL_SECONDS=3600 to the stub env class — mirrors the integration_client fixture change.
agentex/tests/integration/fixtures/integration_client.py Adds REDIS_STREAM_TTL_SECONDS=3600 to the isolated test stub — consistent with the unit test fixture update.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant RedisStreamRepository
    participant RedisPipeline
    participant Redis

    Caller->>RedisStreamRepository: send_data(topic, data)
    RedisStreamRepository->>RedisStreamRepository: check REDIS_STREAM_TTL_SECONDS

    alt TTL > 0
        RedisStreamRepository->>RedisPipeline: pipeline(transaction=False)
        RedisPipeline->>RedisPipeline: queue XADD(topic, data, maxlen)
        RedisPipeline->>RedisPipeline: queue EXPIRE(topic, ttl_seconds)
        RedisPipeline->>Redis: execute(raise_on_error=False)
        Redis-->>RedisPipeline: [message_id | Exception, True/False | Exception]
        RedisPipeline-->>RedisStreamRepository: results[]
        alt results[0] is Exception
            RedisStreamRepository->>Caller: raise Exception
        else results[1] is Exception
            RedisStreamRepository->>RedisStreamRepository: logger.warning(TTL refresh failed)
            RedisStreamRepository->>Caller: return message_id
        else
            RedisStreamRepository->>Caller: return message_id
        end
    else TTL == 0 (disabled)
        RedisStreamRepository->>Redis: xadd(topic, data, maxlen)
        Redis-->>RedisStreamRepository: message_id
        RedisStreamRepository->>Caller: return message_id
    end
Loading

Reviews (3): Last reviewed commit: "fix(streams): address greptile review fi..." | Re-trigger Greptile

@smoreinis smoreinis requested a review from a team as a code owner May 1, 2026 00:39
Comment thread agentex/src/adapters/streams/adapter_redis.py
@smoreinis smoreinis force-pushed the fix/redis-stream-ttl branch from e5d30ff to 11b9540 Compare May 1, 2026 00:49
- Use raise_on_error=False on pipeline.execute() so an EXPIRE failure
  doesn't propagate to the caller after XADD already succeeded; log a
  warning instead. Prevents callers from retrying and duplicating
  messages on a benign TTL-refresh failure.
- Loosen TTL test lower bound from 3590 to 3480 (120s window) to
  tolerate CI scheduling jitter without losing regression coverage.
@smoreinis smoreinis merged commit 332092b into main May 4, 2026
48 of 51 checks passed
@smoreinis smoreinis deleted the fix/redis-stream-ttl branch May 4, 2026 18:42
x added a commit to scaleapi/scale-agentex-python that referenced this pull request May 5, 2026
Mirror scaleapi/scale-agentex#215 (server-side adapter): pipeline XADD
with EXPIRE so each task:* stream key gets a sliding TTL. Orphaned
streams (no writes for the TTL window) self-delete in Redis without
needing an explicit cleanup_stream call from the caller.

This is the right shape of fix for the SDK's leak: an explicit DEL on
terminal task transitions (an earlier draft of this PR) introduced a
race where the server's task_updated event published to the same
topic could be deleted before a connected frontend SSE consumer read
it. EXPIRE sidesteps that — TTL only fires after inactivity, so an
actively-streaming agent or actively-reading consumer keeps the key
alive, and the key only ages out once everyone is done with it.

Defaults match the server: REDIS_STREAM_TTL_SECONDS=3600 (1h),
overridable via env var. Setting it to 0 short-circuits to plain XADD
(no TTL refresh), matching the server's escape hatch.

Implementation notes:
- transaction=False on the pipeline: connection-level batching, no
  MULTI/EXEC overhead for what's already a fast op.
- raise_on_error=False: an EXPIRE failure after a successful XADD
  must not surface to the caller. The message has been published;
  retrying would duplicate it. We log and move on. Next successful
  XADD will reset the TTL anyway.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants