Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,17 @@ scheduler = TaskiqScheduler(broker, [array_source])
```
During startup the scheduler will try to migrate schedules from an old source to a new one. Please be sure to specify different prefixe just to avoid any kind of collision between these two.


## Dynamic queue names


Brokers supports dynamic queue names, allowing you to specify different queues when kicking tasks. This is useful for routing tasks to specific queues based on runtime conditions, such as priority levels, tenant isolation, or environment-specific processing.

Simply pass the desired queue name as message's label when kicking a task to override the broker's default queue configuration.

```python
@broker.task(queue_name="low_priority")
async def low_priority_task() -> None:
print("I don't mind waiting a little longer")
```
3 changes: 2 additions & 1 deletion taskiq_redis/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ async def kick(self, message: BrokerMessage) -> None:

:param message: message to append.
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with Redis(connection_pool=self.connection_pool) as redis_conn:
await redis_conn.xadd(
self.queue_name,
queue_name,
{b"data": message.message},
maxlen=self.maxlen,
approximate=self.approximate,
Expand Down
3 changes: 2 additions & 1 deletion taskiq_redis/redis_sentinel_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,10 @@ async def kick(self, message: BrokerMessage) -> None:
:param message: message to append.
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with self._acquire_master_conn() as redis_conn:
await redis_conn.xadd(
self.queue_name,
queue_name,
{b"data": message.message},
maxlen=self.maxlen,
approximate=self.approximate,
Expand Down
Loading