From b5e55c3cf5846c3702cfba7b4b695ebaa074ee6e Mon Sep 17 00:00:00 2001 From: Wyatt Childers Date: Fri, 5 Sep 2025 09:53:02 -0400 Subject: [PATCH] Provide an example of using an in-process queue --- docs/howto/patterns.rst | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/docs/howto/patterns.rst b/docs/howto/patterns.rst index e97755e5..ef4078dc 100644 --- a/docs/howto/patterns.rst +++ b/docs/howto/patterns.rst @@ -87,6 +87,32 @@ terminates, after canceling the other task:: for task in pending: task.cancel() +In-process queue +---------------- + +For simple applications that need to be able to consume and act on multiple +messages at one, an in-process queue can be leveraged:: + + MAX_WORKERS = os.cpu_count() + + queue = asyncio.Queue(MAX_WORKERS) + + async def consumer_handler(websocket): + async for message in websocket: + await queue.put(message) + + async def worker(websocket): + while True: + message = await queue.get() + result = await long_running_task(message) + await websocket.send(result) + + async def handler(websocket): + async with asyncio.TaskGroup() as tg: + tg.create_task(consumer_handler(websocket)) + for _ in range(MAX_WORKERS): + tg.create_task(worker(websocket)) + Registration ------------