Skip to content

Add leader election for Reaper high availability#18

Merged
mcollina merged 19 commits intomainfrom
feat/reaper-leader-election
Feb 17, 2026
Merged

Add leader election for Reaper high availability#18
mcollina merged 19 commits intomainfrom
feat/reaper-leader-election

Conversation

@mcollina
Copy link
Copy Markdown
Member

Summary

  • Implement Redis-based leader election so multiple Reaper instances can run for high availability, with only one active at a time
  • Uses SET NX PX pattern for atomic lock acquisition with TTL
  • Leader renews lock periodically; if it crashes, lock expires and a follower takes over
  • Graceful shutdown releases lock immediately for fast failover

Changes

  • Add optional acquireLeaderLock, renewLeaderLock, releaseLeaderLock methods to Storage interface
  • Implement lock methods in RedisStorage using Lua scripts for atomicity
  • Refactor Reaper with #becomeActive()/#becomeInactive() lifecycle methods
  • Add leadershipAcquired and leadershipLost events
  • Add comprehensive test suite for leader election scenarios

Usage

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,              // Default: false (backward compatible)
    lockTTL: 30000,             // Lock expiry (default: 30s)
    renewalInterval: 10000,     // Renew every 10s
    acquireRetryInterval: 5000  // Followers poll every 5s
  }
})

reaper.on('leadershipAcquired', () => console.log('Now leader'))
reaper.on('leadershipLost', () => console.log('Lost leadership'))

Test plan

  • Run existing Reaper tests to ensure backward compatibility
  • Test single reaper with election enabled acquires lock
  • Test two reapers - only one becomes leader
  • Test leader shutdown - follower takes over
  • Test lock renewal works across TTL periods
  • Test error handling when storage doesn't support leader election

🤖 Generated with Claude Code

mcollina and others added 19 commits February 16, 2026 16:37
- Add setBlockingConcurrency to Storage interface (for concurrent BLMOVE)
- Document JsonSerde and createJsonSerde exports
- Clarify that 'stalled' events are emitted by Reaper, not Queue
- Remove 'cancelled' event from Queue (not implemented)
- Update file structure to match actual layout:
  - Remove non-existent scripts/ directory
  - Remove non-existent redis-scripts.ts
  - Add types/fast-write-atomic.d.ts
  - Add test/helpers/ and test/integration/ directories
  - Update test file list to match actual files
- Fix error handling section:
  - Remove non-existent DuplicateError
  - Add JobFailedError, JobNotFoundError, JobCancelledError, StorageError
  - Document that duplicates return status, not throw

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Testing proved that iovalkey can multiplex multiple BLMOVE calls on
a single Redis connection. Redis queues blocking commands from the
same client internally, so multiple blocking clients are unnecessary.

Changes:
- Replace blocking client pool with single blocking client
- Remove setBlockingConcurrency from Storage interface
- Remove setBlockingConcurrency from all storage implementations
- Remove setBlockingConcurrency call from Consumer
- Update DESIGN.md to remove the method

This simplifies the codebase without any performance impact.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Extract parseState function that was duplicated in producer.ts and
reaper.ts into a shared utility module.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add toJSON() method to all error classes so that properties appear
when errors are serialized with JSON.stringify.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Scripts are Redis-specific and don't belong in src/. Move to project
root directory for clarity.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Main index.ts already imports directly from specific storage files.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Both methods had identical implementations in RedisStorage and
MemoryStorage. Simplify by having refreshWorker delegate to
registerWorker (FileStorage already did this).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
For consistency with other generic types like Job<TPayload>.

Also update tsconfig to ES2024 for Promise.withResolvers support.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add started, stopped, failing events to QueueEvents type
- Emit started event after storage connects in Queue.start()
- Emit stopped event after storage disconnects in Queue.stop()
- Emit failing event in Consumer when job will be retried
- Forward failing event from Consumer to Queue

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Emit 'enqueued' when Queue.enqueue() successfully adds a job
- Emit 'cancelled' when Queue.cancel() successfully cancels a job
- Add 'requeued' to ConsumerEvents and emit when job is returned to
  queue during graceful shutdown
- Forward 'requeued' event from Consumer to Queue

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add SerializedError interface with message, code, and stack fields
- Change MessageStatus.error from string to SerializedError
- Include error code in serialization (if available)
- Parse error as JSON in Producer.getStatus() with fallback

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The jobsTTL option was defined but never implemented. Remove it and
clarify that resultTTL is used for both results and errors.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Update subscribeToJob and notifyJobComplete to support 'failing' status
- Add failing notification in retryJob for all storage backends
- Update retry.lua script to publish 'failing' notification
- Update DESIGN.md to reflect the API changes and remove unused jobsTTL

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove checkInterval config option and periodic scanning
- Rely on event subscriptions for detecting processing jobs
- Keep initial boot scan to catch jobs processing before Reaper started
- Per-job timers handle visibility timeout tracking efficiently

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Document all Queue events (started, stopped, enqueued, failing,
  requeued, cancelled)
- Update MessageStatus.error to show structured error format
- Update Reaper docs to remove checkInterval (now event-based)
- Add explanation of Reaper's event-based monitoring approach

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Implement Redis-based leader election so multiple Reaper instances can
run for high availability, with only one active at a time.

- Add optional acquireLeaderLock, renewLeaderLock, releaseLeaderLock
  methods to Storage interface
- Implement lock methods in RedisStorage using SET NX PX and Lua scripts
- Refactor Reaper with leader election state machine
- Add leadershipAcquired and leadershipLost events
- Add comprehensive tests for leader election scenarios

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The reaper-leader-election test was running in the test:memory CI job
which has no Redis, causing connection failures and a 15-minute timeout.
Use shouldRunRedisTests() to skip when REDIS_URL is not set, matching
the pattern used by other Redis-dependent tests. Also remove unused
waitForEvents import that caused a lint error.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflicts in README.md and src/reaper.ts by keeping both the
leader election features from feat/reaper-leader-election and the
changes from origin/main.

- README.md: Preserve leader election documentation section
- src/reaper.ts: Maintain leader election implementation with all
  necessary interfaces, constants, and methods
@mcollina mcollina merged commit 9ea5bfd into main Feb 17, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant