## Асинхронные очереди

In [1]:
import logging

logging.basicConfig(
    level=logging.INFO,
    datefmt="%M:%S",
    format="[%(asctime)s.%(msecs)03d] %(lineno)3d %(funcName)15s %(levelname)-8s - %(message)s",
)
log = logging.getLogger(__name__)

In [2]:
log.info("Hello there!")

[07:35.639]   1        <module> INFO     - Hello there!


In [3]:
import asyncio


async def producer(n_tasks: int):
    log.info("start producing")
    tasks = []
    for i in range(1, n_tasks + 1):
        await asyncio.sleep(0.1)
        task = {"id": i, "name": f"task_{i:03}"}
        log.info("Prepared task %s", task)
        tasks.append(task)

    log.info("done producing")
    return tasks


async def process_task(task: dict):
    log.info("process task %s", task)
    await asyncio.sleep(0.1)
    log.info("done task %s", task)


async def consumer(tasks: list[dict]):
    log.info("start consuming")
    for task in tasks:
        await process_task(task)
    
    log.info("done consuming")

In [4]:
async def main():
    tasks = await producer(10)
    await consumer(tasks)


await main()

[13:58.722]   5        producer INFO     - start producing
[13:58.831]  10        producer INFO     - Prepared task {'id': 1, 'name': 'task_001'}
[13:58.933]  10        producer INFO     - Prepared task {'id': 2, 'name': 'task_002'}
[13:59.036]  10        producer INFO     - Prepared task {'id': 3, 'name': 'task_003'}
[13:59.138]  10        producer INFO     - Prepared task {'id': 4, 'name': 'task_004'}
[13:59.249]  10        producer INFO     - Prepared task {'id': 5, 'name': 'task_005'}
[13:59.351]  10        producer INFO     - Prepared task {'id': 6, 'name': 'task_006'}
[13:59.453]  10        producer INFO     - Prepared task {'id': 7, 'name': 'task_007'}
[13:59.556]  10        producer INFO     - Prepared task {'id': 8, 'name': 'task_008'}
[13:59.660]  10        producer INFO     - Prepared task {'id': 9, 'name': 'task_009'}
[13:59.762]  10        producer INFO     - Prepared task {'id': 10, 'name': 'task_010'}
[13:59.763]  13        producer INFO     - done producing
[13:59.763] 

In [5]:

async def producer(queue: asyncio.Queue, n_tasks: int):
    log.info("start producing")
    for i in range(1, n_tasks + 1):
        await asyncio.sleep(0.1)
        task = {"id": i, "name": f"task_{i:03}"}
        log.info("Prepared task %s", task)
        await queue.put(task)

    log.info("done producing")


async def consumer(queue: asyncio.Queue):
    log.info("start consuming")
    while not queue.empty():
        task = await queue.get()
        await process_task(task)
    
    log.info("done consuming")

In [6]:
async def main():
    queue = asyncio.Queue()
    await producer(queue, n_tasks=10)
    await consumer(queue)

await main()

[19:12.440]   2        producer INFO     - start producing
[19:12.553]   6        producer INFO     - Prepared task {'id': 1, 'name': 'task_001'}
[19:12.656]   6        producer INFO     - Prepared task {'id': 2, 'name': 'task_002'}
[19:12.758]   6        producer INFO     - Prepared task {'id': 3, 'name': 'task_003'}
[19:12.863]   6        producer INFO     - Prepared task {'id': 4, 'name': 'task_004'}
[19:12.967]   6        producer INFO     - Prepared task {'id': 5, 'name': 'task_005'}
[19:13.071]   6        producer INFO     - Prepared task {'id': 6, 'name': 'task_006'}
[19:13.175]   6        producer INFO     - Prepared task {'id': 7, 'name': 'task_007'}
[19:13.282]   6        producer INFO     - Prepared task {'id': 8, 'name': 'task_008'}
[19:13.384]   6        producer INFO     - Prepared task {'id': 9, 'name': 'task_009'}
[19:13.489]   6        producer INFO     - Prepared task {'id': 10, 'name': 'task_010'}
[19:13.491]   9        producer INFO     - done producing
[19:13.491] 

In [7]:
async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, n_tasks=10))
        tg.create_task(consumer(queue))

await main()

[20:50.922]   2        producer INFO     - start producing
[20:50.924]  13        consumer INFO     - start consuming
[20:50.924]  18        consumer INFO     - done consuming
[20:51.025]   6        producer INFO     - Prepared task {'id': 1, 'name': 'task_001'}
[20:51.131]   6        producer INFO     - Prepared task {'id': 2, 'name': 'task_002'}
[20:51.234]   6        producer INFO     - Prepared task {'id': 3, 'name': 'task_003'}
[20:51.336]   6        producer INFO     - Prepared task {'id': 4, 'name': 'task_004'}
[20:51.449]   6        producer INFO     - Prepared task {'id': 5, 'name': 'task_005'}
[20:51.551]   6        producer INFO     - Prepared task {'id': 6, 'name': 'task_006'}
[20:51.653]   6        producer INFO     - Prepared task {'id': 7, 'name': 'task_007'}
[20:51.755]   6        producer INFO     - Prepared task {'id': 8, 'name': 'task_008'}
[20:51.858]   6        producer INFO     - Prepared task {'id': 9, 'name': 'task_009'}
[20:51.960]   6        producer INFO     

In [8]:
async def consumer(queue: asyncio.Queue):
    log.info("init consumer")
    await asyncio.sleep(0.1)
    log.info("start consuming")
    while not queue.empty():
        task = await queue.get()
        await process_task(task)
    
    log.info("done consuming")

In [9]:
async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, n_tasks=10))
        tg.create_task(consumer(queue))

await main()

[22:35.010]   2        producer INFO     - start producing
[22:35.019]   2        consumer INFO     - init consumer
[22:35.120]   6        producer INFO     - Prepared task {'id': 1, 'name': 'task_001'}
[22:35.121]   4        consumer INFO     - start consuming
[22:35.121]  18    process_task INFO     - process task {'id': 1, 'name': 'task_001'}
[22:35.222]   6        producer INFO     - Prepared task {'id': 2, 'name': 'task_002'}
[22:35.223]  20    process_task INFO     - done task {'id': 1, 'name': 'task_001'}
[22:35.224]  18    process_task INFO     - process task {'id': 2, 'name': 'task_002'}
[22:35.323]   6        producer INFO     - Prepared task {'id': 3, 'name': 'task_003'}
[22:35.327]  20    process_task INFO     - done task {'id': 2, 'name': 'task_002'}
[22:35.328]  18    process_task INFO     - process task {'id': 3, 'name': 'task_003'}
[22:35.428]   6        producer INFO     - Prepared task {'id': 4, 'name': 'task_004'}
[22:35.433]  20    process_task INFO     - done task 

In [10]:

async def process_task(task: dict):
    log.info("process task %s", task)
    await asyncio.sleep(0.09)
    log.info("done task %s", task)


In [11]:
async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, n_tasks=10))
        tg.create_task(consumer(queue))

await main()

[23:54.267]   2        producer INFO     - start producing
[23:54.271]   2        consumer INFO     - init consumer
[23:54.372]   6        producer INFO     - Prepared task {'id': 1, 'name': 'task_001'}
[23:54.379]   4        consumer INFO     - start consuming
[23:54.386]   2    process_task INFO     - process task {'id': 1, 'name': 'task_001'}
[23:54.478]   6        producer INFO     - Prepared task {'id': 2, 'name': 'task_002'}
[23:54.480]   4    process_task INFO     - done task {'id': 1, 'name': 'task_001'}
[23:54.485]   2    process_task INFO     - process task {'id': 2, 'name': 'task_002'}
[23:54.581]   6        producer INFO     - Prepared task {'id': 3, 'name': 'task_003'}
[23:54.590]   4    process_task INFO     - done task {'id': 2, 'name': 'task_002'}
[23:54.592]   2    process_task INFO     - process task {'id': 3, 'name': 'task_003'}
[23:54.689]   4    process_task INFO     - done task {'id': 3, 'name': 'task_003'}
[23:54.690]   9        consumer INFO     - done consuming

In [12]:
stop = object()

async def producer(queue: asyncio.Queue, n_tasks: int):
    log.info("start producing")
    for i in range(1, n_tasks + 1):
        await asyncio.sleep(0.1)
        task = {"id": i, "name": f"task_{i:03}"}
        log.info("Prepared task %s", task)
        await queue.put(task)

    await queue.put(stop)  
    log.info("done producing")


async def process_task(task: dict):
    log.info("process task %s", task)
    await asyncio.sleep(0.1)
    log.info("done task %s", task)


async def consumer(queue: asyncio.Queue):
    log.info("start consuming")
    while (task := await queue.get()) is not stop:
        await process_task(task)
    
    log.info("done consuming")

In [13]:
async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, n_tasks=10))
        tg.create_task(consumer(queue))

await main()

[29:14.888]   4        producer INFO     - start producing
[29:14.891]  22        consumer INFO     - start consuming
[29:14.992]   8        producer INFO     - Prepared task {'id': 1, 'name': 'task_001'}
[29:14.993]  16    process_task INFO     - process task {'id': 1, 'name': 'task_001'}
[29:15.094]   8        producer INFO     - Prepared task {'id': 2, 'name': 'task_002'}
[29:15.097]  18    process_task INFO     - done task {'id': 1, 'name': 'task_001'}
[29:15.098]  16    process_task INFO     - process task {'id': 2, 'name': 'task_002'}
[29:15.197]   8        producer INFO     - Prepared task {'id': 3, 'name': 'task_003'}
[29:15.213]  18    process_task INFO     - done task {'id': 2, 'name': 'task_002'}
[29:15.217]  16    process_task INFO     - process task {'id': 3, 'name': 'task_003'}
[29:15.299]   8        producer INFO     - Prepared task {'id': 4, 'name': 'task_004'}
[29:15.324]  18    process_task INFO     - done task {'id': 3, 'name': 'task_003'}
[29:15.325]  16    process_

In [14]:

async def process_task(task: dict):
    log.info("process task %s", task)
    await asyncio.sleep(0.15)
    log.info("done task %s", task)



In [15]:
async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, n_tasks=10))
        tg.create_task(consumer(queue))

await main()

[31:01.803]   4        producer INFO     - start producing
[31:01.804]  22        consumer INFO     - start consuming
[31:01.908]   8        producer INFO     - Prepared task {'id': 1, 'name': 'task_001'}
[31:01.910]   2    process_task INFO     - process task {'id': 1, 'name': 'task_001'}
[31:02.011]   8        producer INFO     - Prepared task {'id': 2, 'name': 'task_002'}
[31:02.069]   4    process_task INFO     - done task {'id': 1, 'name': 'task_001'}
[31:02.070]   2    process_task INFO     - process task {'id': 2, 'name': 'task_002'}
[31:02.116]   8        producer INFO     - Prepared task {'id': 3, 'name': 'task_003'}
[31:02.222]   8        producer INFO     - Prepared task {'id': 4, 'name': 'task_004'}
[31:02.223]   4    process_task INFO     - done task {'id': 2, 'name': 'task_002'}
[31:02.226]   2    process_task INFO     - process task {'id': 3, 'name': 'task_003'}
[31:02.324]   8        producer INFO     - Prepared task {'id': 5, 'name': 'task_005'}
[31:02.387]   4    proc