-
-
Notifications
You must be signed in to change notification settings - Fork 72
/
Copy pathutils.py
32 lines (24 loc) · 975 Bytes
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
from typing import AsyncGenerator
from taskiq import AsyncBroker, BrokerMessage
from taskiq.acks import AckableMessage
class AsyncQueueBroker(AsyncBroker):
"""
Broker for testing.
It simply puts all tasks in asyncio.Queue
and returns them in listen method.
"""
def __init__(self) -> None:
self.queue: "asyncio.Queue[bytes]" = asyncio.Queue()
super().__init__(None, None)
async def kick(self, message: BrokerMessage) -> None:
"""Send a message to the queue."""
await self.queue.put(message.message)
async def wait_tasks(self) -> None:
"""Small method to wait for all tasks to be processed."""
await self.queue.join()
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
"""This method returns all tasks from queue."""
while True:
task = await self.queue.get()
yield AckableMessage(data=task, ack=self.queue.task_done)