diff --git a/docs/howto/patterns.rst b/docs/howto/patterns.rst index e97755e5..ef4078dc 100644 --- a/docs/howto/patterns.rst +++ b/docs/howto/patterns.rst @@ -87,6 +87,32 @@ terminates, after canceling the other task:: for task in pending: task.cancel() +In-process queue +---------------- + +For simple applications that need to be able to consume and act on multiple +messages at one, an in-process queue can be leveraged:: + + MAX_WORKERS = os.cpu_count() + + queue = asyncio.Queue(MAX_WORKERS) + + async def consumer_handler(websocket): + async for message in websocket: + await queue.put(message) + + async def worker(websocket): + while True: + message = await queue.get() + result = await long_running_task(message) + await websocket.send(result) + + async def handler(websocket): + async with asyncio.TaskGroup() as tg: + tg.create_task(consumer_handler(websocket)) + for _ in range(MAX_WORKERS): + tg.create_task(worker(websocket)) + Registration ------------