In [1]:
import asyncio
import time

import aiohttp

In [2]:
def timeit(fn):
    async def inner(*args, **kwargs):
        t1 = time.time()
        res = await fn(*args, **kwargs)

        print(f"{fn=}, {res=}")
        t2 = time.time()
        print("time", t2 - t1)

        return res
    return inner

In [9]:
URL = "https://docs.python.org/3/whatsnew/3.12.html"
N_URLS = 50


async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # print(resp.status)
            data = await resp.text()
            return data


@timeit
async def fetch_batch_urls(urls):
    print("start fetch_batch_urls")
    tasks = []
    for url in urls:
        # tasks.append(asyncio.create_task(fetch_url(url)))
        tasks.append(fetch_url(url))

    await asyncio.gather(*tasks)

    return len(urls)


await fetch_batch_urls([URL] * N_URLS)

start fetch_batch_urls
fn=<function fetch_batch_urls at 0x106ce5260>, res=50
time 0.6526269912719727


50

# Semaphore

In [14]:
URL = "https://docs.python.org/3/whatsnew/3.12.html"
N_URLS = 50
N_SIM = 5


async def fetch_url(url, sem):
    async with sem:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                # print(resp.status)
                data = await resp.text()

    return data


@timeit
async def fetch_batch_urls(urls, n_sim):
    print("start fetch_batch_urls")

    sem = asyncio.Semaphore(n_sim)
    tasks = []
    for url in urls:
        tasks.append(fetch_url(url, sem))

    await asyncio.gather(*tasks)

    return len(urls)


await fetch_batch_urls([URL] * N_URLS, N_SIM)

start fetch_batch_urls
fn=<function fetch_batch_urls at 0x10701a020>, res=50
time 2.8871710300445557


50

# Generator

In [16]:
async def countdown(n):
    for i in range(n):
        yield i
        print(f"yielded {i=}")
        await asyncio.sleep(0.1)


async for x in countdown(5):
    print(f"{x=}")

x=0
yielded i=0
x=1
yielded i=1
x=2
yielded i=2
x=3
yielded i=3
x=4
yielded i=4


In [17]:
async def countdown(n):
    for i in range(n):
        yield await asyncio.sleep(0.1)


async for x in countdown(5):
    print(f"{x=}")

x=None
x=None
x=None
x=None
x=None


In [20]:
URL = "https://docs.python.org/3/whatsnew/3.12.html"
N_URLS = 5


async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # print(resp.status)
            data = await resp.text()
            return data


@timeit
async def fetch_batch_urls(urls):
    print("start fetch_batch_urls")
    tasks = []
    for url in urls:
        tasks.append(fetch_url(url))

    start = time.time()

    for compl_task in asyncio.as_completed(tasks):
        print(f"compl_task, {time.time() - start=}")
        res = await compl_task
        print(f"finished, {time.time() - start=}")

    return len(urls)


await fetch_batch_urls([URL] * N_URLS)

start fetch_batch_urls
compl_task, time.time() - start=0.00010824203491210938
finished, time.time() - start=0.2556571960449219
compl_task, time.time() - start=0.2558102607727051
finished, time.time() - start=0.2558321952819824
compl_task, time.time() - start=0.2558400630950928
finished, time.time() - start=0.264545202255249
compl_task, time.time() - start=0.26457715034484863
finished, time.time() - start=0.2675762176513672
compl_task, time.time() - start=0.26761412620544434
finished, time.time() - start=0.2698981761932373
fn=<function fetch_batch_urls at 0x106ce53a0>, res=5
time 0.27009105682373047


5

In [21]:
URL = "https://docs.python.org/3/whatsnew/3.12.html"
N_URLS = 5


async def fetch_url(url, name):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # print(resp.status)
            data = await resp.text()
    print(f"coro stop {name=}")
    return name


@timeit
async def fetch_batch_urls(urls):
    print("start fetch_batch_urls")
    tasks = []
    for i, url in enumerate(urls, 1):
        tasks.append(fetch_url(url, f"fetch_{i}"))

    start = time.time()

    for compl_task in asyncio.as_completed(tasks):
        print(f"compl_task, {time.time() - start=}")
        res = await compl_task
        print(f"finished {res=}, {time.time() - start=}")

    return len(urls)


await fetch_batch_urls([URL] * N_URLS)

start fetch_batch_urls
compl_task, time.time() - start=5.793571472167969e-05
coro stop name='fetch_3'
coro stop name='fetch_2'
finished res='fetch_3', time.time() - start=0.31260108947753906
compl_task, time.time() - start=0.3126339912414551
finished res='fetch_2', time.time() - start=0.31277012825012207
compl_task, time.time() - start=0.3127858638763428
coro stop name='fetch_1'
finished res='fetch_1', time.time() - start=0.41811585426330566
compl_task, time.time() - start=0.4181380271911621
coro stop name='fetch_5'
coro stop name='fetch_4'
finished res='fetch_5', time.time() - start=0.42159008979797363
compl_task, time.time() - start=0.4216158390045166
finished res='fetch_4', time.time() - start=0.421673059463501
fn=<function fetch_batch_urls at 0x107267e20>, res=5
time 0.4218101501464844


5

In [25]:
URL = "https://docs.python.org/3/whatsnew/3.12.html"
N_URLS = 10


async def fetch_url(url, name):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # print(resp.status)
            data = await resp.text()
    #print(f"coro stop {name=}")
    return name


async def gen_fetch_as_completed(tasks):
    start = time.time()

    for compl_task in asyncio.as_completed(tasks):
        #print(f"compl_task, {time.time() - start=}")
        res = await compl_task
        #print(f"finished {res=}, {time.time() - start=}")

        num = int(res.split("_")[1])
        if num % 2 == 0:
            yield res


@timeit
async def fetch_batch_urls(urls):
    print("start fetch_batch_urls")
    tasks = []
    for i, url in enumerate(urls, 1):
        tasks.append(fetch_url(url, f"fetch_{i}"))

    async for name in gen_fetch_as_completed(tasks):
        print("generator res", name)

    return len(urls)


await fetch_batch_urls([URL] * N_URLS)

start fetch_batch_urls
generator res fetch_2
generator res fetch_6
generator res fetch_10
generator res fetch_8
generator res fetch_4
fn=<function fetch_batch_urls at 0x10713a480>, res=10
time 0.4074258804321289


10

# Workers

In [31]:
URL = "https://docs.python.org/3/whatsnew/3.12.html"
N_URLS = 50
N_WORKERS = 2


async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # print(resp.status)
            data = await resp.text()

    return data


async def worker(que, name):
    print(f"{name} started")
    while True:
        url = await que.get()

        if url is None:
            await que.put(url)
            break

        try:
            result = await fetch_url(url)
        except Exception as err:
            print("Error", err)

    print(f"{name} finished")


@timeit
async def fetch_batch_urls(urls, n_workers):
    print("start fetch_batch_urls")

    que = asyncio.Queue()
    for url in urls:
        await que.put(url)
    await que.put(None)

    workers = [
        worker(que, f"worker_{i}")
        for i in range(n_workers)
    ]

    await asyncio.gather(*workers)

    return len(urls)


await fetch_batch_urls([URL] * N_URLS, N_WORKERS)

start fetch_batch_urls
worker_0 started
worker_1 started
worker_1 finished
worker_0 finished
fn=<function fetch_batch_urls at 0x107e07560>, res=50
time 7.1069159507751465


50

In [33]:
URL = "https://docs.python.org/3/whatsnew/3.12.html"
N_URLS = 50
N_WORKERS = 2


async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # print(resp.status)
            data = await resp.text()

    return data


async def worker(que, name):
    print(f"{name} started")
    while True:
        url = await que.get()

        try:
            result = await fetch_url(url)
        except Exception as err:
            print("Error", err)
        finally:
            que.task_done()

    print(f"{name} finished")


@timeit
async def fetch_batch_urls(urls, n_workers):
    print("start fetch_batch_urls")

    que = asyncio.Queue()
    for url in urls:
        await que.put(url)

    workers = [
        asyncio.create_task(worker(que, f"worker_{i}"))
        for i in range(n_workers)
    ]

    await que.join()

    for wr in workers:
        wr.cancel()

    return len(urls)


await fetch_batch_urls([URL] * N_URLS, N_WORKERS)

start fetch_batch_urls
worker_0 started
worker_1 started
fn=<function fetch_batch_urls at 0x108331120>, res=50
time 6.633097887039185


50

# Threads

In [35]:
async def bloking_api(n):
    print(f"start blocking api {n=}")

    time.sleep(n)
    #urllib.urlopen(n)
    #factorial(n)

    print(f"finish blocking api {n=}")


async def non_blocking_api(n):
    print(f"start non-blocking api {n=}")

    await asyncio.sleep(n)
    print(f"finish non-blocking api {n=}")


@timeit
async def run(n):
    await asyncio.gather(bloking_api(n), non_blocking_api(n))


await run(5)

start blocking api n=5
finish blocking api n=5
start non-blocking api n=5
finish non-blocking api n=5
fn=<function run at 0x10709fce0>, res=None
time 10.006462097167969


In [38]:
def bloking_api(n):
    print(f"start blocking api {n=}")

    time.sleep(n)
    #urllib.urlopen(n)
    #factorial(n)

    print(f"finish blocking api {n=}")


async def non_blocking_api(n):
    print(f"start non-blocking api {n=}")

    await asyncio.sleep(n)
    print(f"finish non-blocking api {n=}")


@timeit
async def run(n):
    await asyncio.gather(asyncio.to_thread(bloking_api, n), non_blocking_api(n))


await run(5)

start non-blocking api n=5start blocking api n=5

finish non-blocking api n=5
finish blocking api n=5
fn=<function run at 0x107e06f20>, res=None
time 5.007159233093262


In [39]:
async def bloking_api(n):
    print(f"start blocking api {n=}")

    await asyncio.to_thread(time.sleep, n)
    
    # time.sleep(n)
    #urllib.urlopen(n)
    #factorial(n)

    print(f"finish blocking api {n=}")


async def non_blocking_api(n):
    print(f"start non-blocking api {n=}")

    await asyncio.sleep(n)
    print(f"finish non-blocking api {n=}")


@timeit
async def run(n):
    await asyncio.gather(bloking_api(n), non_blocking_api(n))


await run(5)

start blocking api n=5
start non-blocking api n=5
finish non-blocking api n=5
finish blocking api n=5
fn=<function run at 0x107e06a20>, res=None
time 5.005425214767456


In [40]:
loop = asyncio.get_running_loop()

In [41]:
loop

<_UnixSelectorEventLoop running=True closed=False debug=False>

In [46]:
def bloking_api(n):
    print(f"start blocking api {n=}")

    time.sleep(n)
    #urllib.urlopen(n)
    #factorial(n)

    print(f"finish blocking api {n=}")


await loop.run_in_executor(None, bloking_api, 5)