Add support for Python Dramatiq background task / job library#127
Add support for Python Dramatiq background task / job library#127carlosantoniodasilva merged 11 commits intomainfrom
Conversation
Dramatiq uses two separate Redis keys to handle this: one contains the message ids, the other contains the actual messages we need to fetch.
It can be done with a separate django-dramatiq integration so it can be a little more involving, thus a sample app to help test and show that setup.
This is not really useful/relevant, especially because most of the time people will have set up judoscale_dramatiq before adding any actors/tasks, so there'll be an empty list of queues anyway. That's the behavior I'm seeing, but it works on collection afterwards just fine.
The setup for FastAPI / Flask is basically the same usual one for doing it standalone, so it doesn't seem to warrant an additional section, but for Django there's some different way of doing things, if you're using django-dramatiq (which presumably most are), so add explicit docs.
| if message_id := self.redis.lindex(queue_key, -1): | ||
| if isinstance(message_id, bytes): | ||
| message_id = message_id.decode() | ||
| if payload := self.redis.hget(msgs_key, message_id): |
There was a problem hiding this comment.
I think this 2-step / Redis calls to read the actual message payload might result in a possible race condition if a worker happens to grab the same job/message we fetched, so when we get to read the payload it's not there anymore...
In that case we might end up reporting a 0 latency... so I'm wondering if we should perhaps try again (or 2-3 times) if we get no payload, so it tries on the next message_id and so on?
Perhaps it's such an edge case that won't be a problem in real usage, since it's very unlikely and likely self-corrects on the next report.
There was a problem hiding this comment.
I think since it's self-corrects, we don't need to worry about it.
| if message_id := self.redis.lindex(queue_key, -1): | ||
| if isinstance(message_id, bytes): | ||
| message_id = message_id.decode() | ||
| if payload := self.redis.hget(msgs_key, message_id): |
There was a problem hiding this comment.
I think since it's self-corrects, we don't need to worry about it.
## [1.13.0](v1.12.0...v1.13.0) (2026-02-16) ### Features * Add support for Python Dramatiq background task / job library ([#127](#127)) ([4d36e72](4d36e72)) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
https://dramatiq.io
request catcher samples: FastAPI + Dramatiq
request catcher samples: Django + Dramatiq