Skip to content

smithk86/asyncio-pool-ng

Repository files navigation

asyncio-pool-ng

PyPI version Python Versions License: MIT Code style: black

About

AsyncioPoolNG takes the ideas used in asyncio-pool and wraps them around an asyncio.TaskGroup.

AsyncioPool has three main functions spawn, map, and itermap.

  1. spawn: Schedule an async function on the pool and get a future back which will eventually have either the result or the exception from the function.
  2. map: Spawn an async function for each item in an iterable object, and return a set containing a future for each item.
  • asyncio.wait() can be used to wait for the set of futures to complete.
  • When the AsyncioPool closes, it will wait for all tasks to complete. All pending futures will be complete once it is closed.
  1. itermap: Works similarly to map but returns an Async Generator which yields each future as it completes.

Differences from asyncio-pool

  1. asyncio-pool-ng implements Python typing and passes validation checks with mypy's strict mode. This helps IDEs and static type checkers know what type of result to expect when getting data from a completed future.
  2. asyncio-pool uses callbacks to process data before returning it; asyncio-pool-ng only returns Future instances directly. The future will contain either a result or an exception which can then be handled as needed.
  3. While asyncio-pool schedules Coroutine instances directly, asyncio-pool-ng takes the callable and arguments, and creates the Coroutine instance at execution time.

Example

import asyncio
import logging
from random import random

from asyncio_pool import AsyncioPool


logging.basicConfig(level=logging.INFO)


async def worker(number: int) -> int:
    await asyncio.sleep(random() / 2)
    return number * 2


async def main() -> None:
    result: int = 0
    results: list[int] = []

    async with AsyncioPool(2) as pool:
        """spawn task and wait for the results"""
        result = await pool.spawn(worker, 5)
        assert result == 10
        logging.info(f"results for pool.spawn(worker, 5): {result}")

        """spawn task and get results later"""
        future: asyncio.Future[int] = pool.spawn(worker, 5)

        # do other stuff

        result = await future
        assert result == 10

        """map an async function to a set of values"""
        futures: set[asyncio.Future[int]] = pool.map(worker, range(10))
        await asyncio.wait(futures)
        results = [x.result() for x in futures]
        logging.info(f"results for pool.map(worker, range(10)): {results}")
        results.sort()
        assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

        """iterate futures as they complete"""
        logging.info("results for pool.itermap(worker, range(10)):")
        results = []
        async for future in pool.itermap(worker, range(10)):
            results.append(future.result())
            logging.info(f"> {future.result()}")

        results.sort()
        assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


asyncio.run(main())