From cb635b6c9e04cc5b3af62010eff1679e9d3aeff5 Mon Sep 17 00:00:00 2001 From: Florian Lonqueu-Brochard Date: Tue, 7 Oct 2025 11:57:51 +0200 Subject: [PATCH] fix: ack to the right stream in case of dynamic queue name --- taskiq_redis/redis_broker.py | 10 +++++----- taskiq_redis/redis_cluster_broker.py | 8 ++++---- taskiq_redis/redis_sentinel_broker.py | 8 ++++---- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 3965443..663c8e4 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -260,11 +260,11 @@ async def kick(self, message: BrokerMessage) -> None: approximate=self.approximate, ) - def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: + def _ack_generator(self, id: str, queue_name: str) -> Callable[[], Awaitable[None]]: async def _ack() -> None: async with Redis(connection_pool=self.connection_pool) as redis_conn: await redis_conn.xack( - self.queue_name, + queue_name, self.consumer_group_name, id, ) @@ -287,12 +287,12 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: noack=False, count=self.count, ) - for _, msg_list in fetched: + for stream, msg_list in fetched: for msg_id, msg in msg_list: logger.debug("Received message: %s", msg) yield AckableMessage( data=msg[b"data"], - ack=self._ack_generator(msg_id), + ack=self._ack_generator(id=msg_id, queue_name=stream), ) logger.debug("Starting fetching unacknowledged messages") for stream in [self.queue_name, *self.additional_streams.keys()]: @@ -318,5 +318,5 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: logger.debug("Received message: %s", msg) yield AckableMessage( data=msg[b"data"], - ack=self._ack_generator(msg_id), + ack=self._ack_generator(id=msg_id, queue_name=stream), ) diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py index da5d56e..4efdd0c 100644 --- a/taskiq_redis/redis_cluster_broker.py +++ b/taskiq_redis/redis_cluster_broker.py @@ -171,10 +171,10 @@ async def kick(self, message: BrokerMessage) -> None: approximate=self.approximate, ) - def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: + def _ack_generator(self, id: str, queue_name: str) -> Callable[[], Awaitable[None]]: async def _ack() -> None: await self.redis.xack( - self.queue_name, + queue_name, self.consumer_group_name, id, ) @@ -194,10 +194,10 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: block=self.block, noack=False, ) - for _, msg_list in fetched: + for stream, msg_list in fetched: for msg_id, msg in msg_list: logger.debug("Received message: %s", msg) yield AckableMessage( data=msg[b"data"], - ack=self._ack_generator(msg_id), + ack=self._ack_generator(id=msg_id, queue_name=stream), ) diff --git a/taskiq_redis/redis_sentinel_broker.py b/taskiq_redis/redis_sentinel_broker.py index 3727628..f16092b 100644 --- a/taskiq_redis/redis_sentinel_broker.py +++ b/taskiq_redis/redis_sentinel_broker.py @@ -239,11 +239,11 @@ async def kick(self, message: BrokerMessage) -> None: approximate=self.approximate, ) - def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: + def _ack_generator(self, id: str, queue_name: str) -> Callable[[], Awaitable[None]]: async def _ack() -> None: async with self._acquire_master_conn() as redis_conn: await redis_conn.xack( - self.queue_name, + queue_name, self.consumer_group_name, id, ) @@ -264,10 +264,10 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: block=self.block, noack=False, ) - for _, msg_list in fetched: + for stream, msg_list in fetched: for msg_id, msg in msg_list: logger.debug("Received message: %s", msg) yield AckableMessage( data=msg[b"data"], - ack=self._ack_generator(msg_id), + ack=self._ack_generator(id=msg_id, queue_name=stream), )