In [10]:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager

from utils.go_fast import crunch as cy_crunch
from utils.pure import crunch as py_crunch
from server import Server, RecordKeeper

server = Server()

In [3]:
@asynccontextmanager
async def timeit():
    loop = asyncio.get_running_loop()
    start = loop.time()
    yield
    end = loop.time()
    print(f"{(end - start):.2f}", "seconds")

In [11]:
async def single():
    """
    What most people who start writing asyncio do
    """
    async with timeit():
        data = await server.get()


await single()

0.50 seconds


In [None]:
async def sequential():
    """
    How most people continue writing asyncio
    """
    async with timeit():
        data = await server.get()
        data2 = await server.get()


await sequential()

In [None]:
async def use_gather():
    """
    Gathering. The first asyncio optimisation you've probably used
    """
    async with timeit():
        data, data2 = await asyncio.gather(server.get(), server.get())


await use_gather()

In [None]:
async def py_processing():
    """
    How you might write some data processing with asyncio
    """
    async with timeit():
        data1 = await server.get()
        py_crunch(data1)
        data2 = await server.get()
        py_crunch(data2)


await py_processing()

In [None]:
async def cy_processing():
    """
    When you want CPU-bound speed, you write in Cython (or similar)
    """
    async with timeit():
        data1 = await server.get()
        cy_crunch(data1)
        data2 = await server.get()
        cy_crunch(data2)


await cy_processing()

In [None]:
async def gather_and_py():
    """
    You've optimised your data pull, but it's not as fast as it could be
    """
    async with timeit():
        data, data2 = await asyncio.gather(server.get(), server.get())
        py_crunch(data2)
        py_crunch(data)


await gather_and_py()

In [None]:
async def gather_and_cy():
    """
    You've optimised your data pull, and the data processing. Can't get much faster, right?
    """
    async with timeit():
        data, data2 = await asyncio.gather(server.get(), server.get())
        cy_crunch(data2)
        cy_crunch(data)


await gather_and_cy()

In [None]:
async def gather_and_cy_to_thread():
    """
    Any extension that releases the GIL will allow us to utilise threading to improve CPU-bound
    code
    """
    async with timeit():
        data, data2 = await asyncio.gather(server.get(), server.get())
        await asyncio.gather(
            asyncio.to_thread(cy_crunch, data), asyncio.to_thread(cy_crunch, data2)
        )


await gather_and_cy_to_thread()

In [None]:
async def restrict_executor():
    """
    Demonstrates differently using the loop executor
    """
    async with timeit():
        loop = asyncio.get_running_loop()
        executor = ThreadPoolExecutor(max_workers=1)
        data, data2 = await asyncio.gather(server.get(), server.get())
        await asyncio.gather(
            loop.run_in_executor(executor, cy_crunch, data),
            loop.run_in_executor(executor, cy_crunch, data2),
        )
    input("Waiting")
    async with timeit():
        # Note we can also assign the executor to the whole loop
        loop.set_default_executor(executor)
        data, data2 = await asyncio.gather(server.get(), server.get())
        await asyncio.gather(
            asyncio.to_thread(cy_crunch, data), asyncio.to_thread(cy_crunch, data2)
        )
    input("Waiting")
    # there's virtually no speed difference here because we're only using 2 items. Let's increase that
    loop.set_default_executor(None)  # clear the default executor
    async with timeit():
        data_list = await asyncio.gather(*[server.get() for _ in range(1_000)])
        await asyncio.gather(*[asyncio.to_thread(cy_crunch, d) for d in data_list])
    # that's pretty fast. But we don't want to utilise the breadth of our threads
    input("Waiting")
    async with timeit():
        loop.set_default_executor(executor)
        data_list = await asyncio.gather(*[server.get() for _ in range(1_000)])
        await asyncio.gather(*[asyncio.to_thread(cy_crunch, d) for d in data_list])


await restrict_executor()

In [None]:
async def semaphore():
    """
    Use a semaphore to restrict your throughput
    """
    sema4 = asyncio.Semaphore(20)

    async def fetcher(sem: asyncio.Semaphore | None) -> int:
        if sem is None:
            return await server.get()
        else:
            async with sem:
                return await server.get()

    print("Without semaphore")
    async with timeit():
        async with asyncio.TaskGroup() as tg:
            tasks = set()
            for _ in range(100):
                tasks.add(tg.create_task(fetcher(None)))
    input("Waiting")
    print("With semaphore")
    async with timeit():
        async with asyncio.TaskGroup() as tg:
            tasks = set()
            for _ in range(100):
                tasks.add(tg.create_task(fetcher(sema4)))


await semaphore()

In [None]:
async def use_queue():
    """
    Using queues to manage the data pulling and processing
    """

    async def consumer(
        pro_q: asyncio.Queue[asyncio.Task[int]],
        con_q: asyncio.Queue[asyncio.Task[float]],
    ):
        """
        We use `None` in `pro_q` to indicate end (called a 'sentinel')
        """
        while True:
            data = await pro_q.get()
            if data is None:
                break
            task = loop.create_task(asyncio.to_thread(py_crunch, await data))
            pro_q.task_done()
            await con_q.put(task)

    async with timeit():
        loop = asyncio.get_running_loop()
        results = []
        producer_queue = asyncio.Queue()
        consumer_queue = asyncio.Queue()
        consumer_task = loop.create_task(consumer(producer_queue, consumer_queue))
        for _ in range(101):
            to_send = loop.create_task(server.get())
            await producer_queue.put(to_send)
        for idx in range(101):
            results.append(await (await consumer_queue.get()))
            if idx == 100:
                # to shut it down
                await producer_queue.put(None)
        await consumer_task
        print(f"Completed {len(results)} tasks: {results}")


await use_queue()

In [None]:
async def tasks_():
    """
    Using tasks instead of just raw coroutines.

    Tasks begin execution (usually) at the next iteration of the event loop
    """
    loop = asyncio.get_running_loop()
    async with timeit():
        query_tasks = [loop.create_task(server.get()) for _ in range(100)]
        queried = await asyncio.gather(*query_tasks)
        for task in queried:
            cy_crunch(task)
    async with timeit():
        query_tasks = [loop.create_task(server.get()) for _ in range(100)]
        queried = await asyncio.gather(*query_tasks)
        await asyncio.gather(*[asyncio.to_thread(cy_crunch, task) for task in queried])


await tasks_()

In [None]:
async def as_completed():
    """
    Using `as_completed` to perform an action on tasks, as they complete
    """
    loop = asyncio.get_running_loop()
    # by giving a set wait time, the tasks will all be received linearly (0 - 9)
    query_tasks = [
        loop.create_task(server.get(0.5), name=str(tsk)) for tsk in range(10)
    ]
    crunch_tasks = []
    task: asyncio.Task[int]
    print(len(query_tasks))
    # Note: this only works on Python 3.13+
    async for task in asyncio.as_completed(query_tasks):
        print("Processing", task.get_name())
        crunch_tasks.append(asyncio.to_thread(cy_crunch, await task))
    await asyncio.gather(*crunch_tasks)
    input("Waiting")
    # passing `None` to `self.server.get` gives us a random wait time (0-1)
    query_tasks = [
        loop.create_task(server.get(None), name=str(tsk)) for tsk in range(10)
    ]
    crunch_tasks = []
    task: asyncio.Task[int]
    print(len(query_tasks))
    async for task in asyncio.as_completed(query_tasks):
        # demonstrates the non-linear order of completion
        print("Processing", task.get_name())
        crunch_tasks.append(asyncio.to_thread(cy_crunch, await task))
    await asyncio.gather(*crunch_tasks)


await as_completed()

In [15]:
async def queue_excels(num_fetchers: int | str = 100, send_threshold: int = 1_000):
    """
    Queues excel at situations like this where you need to constantly fetch data, but not
    all at the same time

    :param num_fetchers: Should be determined through your own benchmarking/profiling
    """
    num_fetchers = int(num_fetchers)
    loop = asyncio.get_running_loop()

    async def producer(
        prod_q: asyncio.Queue[int | None],
        con_q: asyncio.Queue[asyncio.Task[int] | None],
    ) -> None:
        while True:
            from_q = await prod_q.get()
            if from_q is None:
                return None
            r = await server.get()
            process_ = loop.create_task(asyncio.to_thread(cy_crunch, r))
            await con_q.put(process_)
            prod_q.task_done()

    async def consumer(
        con_q: asyncio.Queue[asyncio.Task[int] | None], send_threshold_: int = 1_000
    ):
        async def send(data_):
            await server.post(data=data_)
            print("Sent data", len(data_))

        to_send = []
        while True:
            try:
                recd = await asyncio.wait_for(con_q.get(), 0.5)
                if recd is None:
                    if to_send:
                        await send(to_send)
                    return None
                data = await recd
                to_send.append(data)
                con_q.task_done()
            except TimeoutError:
                if to_send:
                    await send(to_send)
                    to_send.clear()
                continue

            if len(to_send) >= send_threshold_:
                await send(to_send)
                to_send.clear()

    producer_queue = asyncio.Queue()
    consumer_queue = asyncio.Queue()

    producer_tasks = [
        loop.create_task(producer(producer_queue, consumer_queue))
        for _ in range(num_fetchers)
    ]
    consumer_task = loop.create_task(
        consumer(consumer_queue, send_threshold_=send_threshold)
    )

    async with timeit():
        for num in range(1_000):
            await producer_queue.put(num)
        for _ in range(num_fetchers):
            await producer_queue.put(None)
        for t in producer_tasks:
            await t
            print("Producer done")
        await consumer_queue.put(None)
        await consumer_task
        print("Consumer done")


await queue_excels(send_threshold=100)

Sent data 100
Sent data 100
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Producer done
Produc

In [9]:
async def async_for(page_size: int = 10, max_records: int = 100):
    """
    Having an interator that automatically runs a next async fetch can be highly useful
    """
    records: RecordKeeper = await server.records(page_size, max_records)
    async for record in records:
        print(record)


await async_for()

Fetching next page...
876097
195168
345991
722530
356581
527492
365784
925611
990451
544932
Fetching next page...
910809
164646
186372
160800
584825
517477
741369
889727
914007
367736
Fetching next page...
552838
541731
94793
913067
846606
795522
545115
351258
641094
77836
Fetching next page...
557204
971433
567675
851969
802926
683102
926350
190673
780529
460927
Fetching next page...
454323
302782
673426
795142
178156
268183
888575
982245
341467
404107
Fetching next page...
820620
42508
897625
566598
825280
640347
916423
807597
474367
407914
Fetching next page...
249448
316917
671102
623430
596657
228910
263987
527809
576226
610746
Fetching next page...
844125
465847
179673
918975
736963
664635
696515
416742
634017
97769
Fetching next page...
800770
174744
371137
470576
416278
268632
70732
688486
395676
543345
Fetching next page...
94861
544764
909949
306260
492974
389449
196084
576795
45776
301779
