feat: Add NatsTaskProvider for NATS JetStream task queue support#181
feat: Add NatsTaskProvider for NATS JetStream task queue support#181
Conversation
Add NatsTaskProvider implementing the TaskProvider interface using NATS JetStream for reliable task queue processing with acknowledgment, retries, timeouts, and dead-letter queues. Enable JetStream in docker-compose for NATS service. https://claude.ai/code/session_01K5qptQLf7N1CiGemKUwKkG
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #181 +/- ##
==========================================
Coverage 100.00% 100.00%
==========================================
Files 9 10 +1
Lines 1034 1266 +232
Branches 200 231 +31
==========================================
+ Hits 1034 1266 +232 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request introduces a NATS JetStream-based task provider for the qified library, enabling reliable task processing with support for retries, dead-letter queues, and timeouts. While the implementation provides a solid foundation, the review identifies several critical areas for improvement to ensure scalability and reliability in distributed environments. Key feedback includes leveraging native JetStream features for retry tracking and task extensions instead of in-memory state, adding error handling to the consumer loop to prevent crashes from malformed messages, and optimizing the retrieval of dead-letter tasks and consumer concurrency settings.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ebdaa95414
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Use msg.nak() instead of msg.term() on shutdown to preserve tasks - Use msg.working() in extend() for JetStream-native deadline extension - Wrap JSON.parse in try-catch to prevent consumer crash on malformed messages - Hex-encode special chars in stream/consumer names to prevent collisions - Scope clearQueue attempt cleanup to the specific queue only - Add test for queue names with special characters - Achieve 100% line coverage https://claude.ai/code/session_01K5qptQLf7N1CiGemKUwKkG
…memory attempt counts Replace the in-memory _attemptCounts map with msg.info.deliveryCount from JetStream. This makes retry tracking correct across multiple provider instances in distributed deployments. Also switches from republishing for retries to msg.nak() which lets JetStream handle redelivery natively. https://claude.ai/code/session_01K5qptQLf7N1CiGemKUwKkG
What kind of change does this PR introduce?
Feature - Adds a complete task queue provider implementation using NATS JetStream.
Description
This PR introduces
NatsTaskProvider, a new task queue provider that leverages NATS JetStream for reliable, distributed task processing. The implementation includes:Key Features
context.extend()Hookifiedfor error event emission and lifecycle hooksImplementation Details
Testing
Added 1,100+ lines of comprehensive test coverage including:
All tests pass with 100% code coverage.
Dependencies
@nats-io/jetstreamfor JetStream APIhookifiedfor event emission capabilities-jsflag)Checklist
https://claude.ai/code/session_01K5qptQLf7N1CiGemKUwKkG