From fe2bfdf6b78feaf1b3a7c34c567a60bb894bafaf Mon Sep 17 00:00:00 2001 From: Florian Lonqueu-Brochard Date: Tue, 30 Sep 2025 13:23:23 +0200 Subject: [PATCH] feat: support custom queue_name per message for all kick methods --- README.md | 14 ++++++++++++++ taskiq_redis/redis_broker.py | 3 ++- taskiq_redis/redis_sentinel_broker.py | 3 ++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0e1e9ed..00148af 100644 --- a/README.md +++ b/README.md @@ -189,3 +189,17 @@ scheduler = TaskiqScheduler(broker, [array_source]) ``` During startup the scheduler will try to migrate schedules from an old source to a new one. Please be sure to specify different prefixe just to avoid any kind of collision between these two. + + +## Dynamic queue names + + +Brokers supports dynamic queue names, allowing you to specify different queues when kicking tasks. This is useful for routing tasks to specific queues based on runtime conditions, such as priority levels, tenant isolation, or environment-specific processing. + +Simply pass the desired queue name as message's label when kicking a task to override the broker's default queue configuration. + +```python +@broker.task(queue_name="low_priority") +async def low_priority_task() -> None: + print("I don't mind waiting a little longer") +``` diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 7a18fed..3965443 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -251,9 +251,10 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ + queue_name = message.labels.get("queue_name") or self.queue_name async with Redis(connection_pool=self.connection_pool) as redis_conn: await redis_conn.xadd( - self.queue_name, + queue_name, {b"data": message.message}, maxlen=self.maxlen, approximate=self.approximate, diff --git a/taskiq_redis/redis_sentinel_broker.py b/taskiq_redis/redis_sentinel_broker.py index 45a788e..3727628 100644 --- a/taskiq_redis/redis_sentinel_broker.py +++ b/taskiq_redis/redis_sentinel_broker.py @@ -230,9 +230,10 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ + queue_name = message.labels.get("queue_name") or self.queue_name async with self._acquire_master_conn() as redis_conn: await redis_conn.xadd( - self.queue_name, + queue_name, {b"data": message.message}, maxlen=self.maxlen, approximate=self.approximate,