In [1]:
def grep(pattern):
   print("start grep for", pattern)
   while True:
       s = yield
       if pattern in s:
           print("found!", s)
       else:
           print("no %s in %s" % (pattern, s))

In [36]:
gr = grep("python")

In [3]:
gr

<generator object grep at 0x122c2a2f0>

In [4]:
next(gr)

start grep for python


In [5]:
gr.send("")

no python in 


In [6]:
gr.send("python")

found! python


In [7]:
next(gr)

TypeError: argument of type 'NoneType' is not a container or iterable

In [8]:
gr.send("python")

StopIteration: 

In [37]:
gr = grep("python")

In [38]:
gr.send(None)

start grep for python


In [39]:
gr.send("wdwd")

no python in wdwd


In [40]:
gr.throw(ValueError("wrong"))

ValueError: wrong

In [10]:
lst = [1, 2, 3, 4, 5]

In [11]:
it1 = iter(lst)

In [12]:
it1

<list_iterator at 0x122e15030>

In [13]:
next(it1)

1

In [14]:
next(it1)

2

In [15]:
it2 = iter(it1)

In [16]:
it2

<list_iterator at 0x122e15030>

In [17]:
it1 is it2

True

In [18]:
next(it2)

3

In [19]:
it3 = iter(lst)

In [20]:
type(it1)

list_iterator

In [21]:
it3 is it1

False

In [22]:
next(it3), next(it3)

(1, 2)

In [23]:
next(it1)

4

In [24]:
class SpecialIterator:
    def __init__(self, limit):
        self.limit = limit
        self.counter = 0

    def __iter__(self):
        print("iter")
        return self
        
    def __next__(self):
        print("next")
        if self.counter < self.limit:
            self.counter += 1
            return self.counter
        else:
            raise StopIteration

In [28]:
it1 = SpecialIterator(5)

In [29]:
next(it1), next(it1)

next
next


(1, 2)

In [30]:
it2 = iter(it1)

iter


In [31]:
it2 is it1, next(it2)

next


(True, 3)

In [33]:
next(it2)

next


4

In [34]:
next(it2)

next


5

In [35]:
next(it2)

next


StopIteration: 

In [32]:
for i in SpecialIterator(3):
    print(f"{i=}")

iter
next
i=1
next
i=2
next
i=3
next


In [41]:
it = iter(range(10))

result = list(
    zip(
        it,
        map(lambda x: x ** 2, it)
    )
)
print(result)

[(0, 1), (2, 9), (4, 25), (6, 49), (8, 81)]


In [43]:
# start..stop step, step - update

def counter(start, stop, step):
    while start < stop:
        step_candidate = yield start
        if step_candidate is not None and isinstance(step_candidate, int):
            step = step_candidate

        start += step


cnt = counter(-5, 10, 3)
assert next(cnt) == -5
assert next(cnt) == -2
assert cnt.send(5) == 3
assert next(cnt) == 8
assert cnt.send(1) == 9
print("ok")

ok


In [1]:
import asyncio
import time
import aiohttp

In [47]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    await say_after(1, "hello")
    await say_after(2, "world")
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:25:16
hello
world
finished at 20:25:19


In [48]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    say1 = say_after(1, "hello")
    await say_after(2, "world")
    await say1

    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:26:45
world
hello
finished at 20:26:48


In [49]:
coro = main()

In [50]:
coro

<coroutine object main at 0x123201460>

In [52]:
await coro

started at 20:27:44
world
hello
finished at 20:27:47


In [53]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    
    await asyncio.create_task(say_after(1, "hello"))
    await asyncio.create_task(say_after(2, "world"))
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:29:06
hello
world
finished at 20:29:09


In [3]:
def async_timeit(coro):
    async def inner(*args, **kwargs):
        t1 = time.time()
        try:
            return await coro(*args, **kwargs)
        finally:
            t2 = time.time()
            print(f"Total time `{coro.__name__}`", t2 - t1)

    return inner

In [59]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(say_after(1, "hello"))
    await asyncio.create_task(say_after(2, "world"))
    await task1
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:35:37
hello
world
finished at 20:35:39
Total time `main` 2.0020670890808105


In [60]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(say_after(1, "hello"))
    task2 = asyncio.create_task(say_after(2, "world"))

    await asyncio.gather(task1, task2)
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:37:09
hello
world
finished at 20:37:11
Total time `main` 2.002526044845581


In [61]:
async def say_after(delay, what):
    time.sleep(delay)
    print(what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(say_after(1, "hello"))
    task2 = asyncio.create_task(say_after(2, "world"))

    await asyncio.gather(task1, task2)
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:37:52
hello
world
finished at 20:37:55
Total time `main` 3.009917974472046


In [62]:
async def say_after(delay, what):
    #time.sleep(delay)
    await asyncio.to_thread(time.sleep, delay)
    print(what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(say_after(1, "hello"))
    task2 = asyncio.create_task(say_after(2, "world"))

    await asyncio.gather(task1, task2)
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:42:15
hello
world
finished at 20:42:17
Total time `main` 2.006805181503296


In [67]:
async def say_after(delay, what):
    for i in range(10):
        print("inner", what)
        time.sleep(delay / 10)
        await asyncio.sleep(0)
    print(what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(say_after(1, "hello"))
    task2 = asyncio.create_task(say_after(2, "world"))

    await asyncio.gather(task1, task2)
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:46:18
inner hello
inner world
inner hello
inner world
inner hello
inner world
inner hello
inner world
inner hello
inner world
inner hello
inner world
inner hello
inner world
inner hello
inner world
inner hello
inner world
inner hello
inner world
hello
world
finished at 20:46:21
Total time `main` 3.0893819332122803


In [82]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def fetch_batch_urls(urls):
    for url in urls:
        await fetch_url(url)


@async_timeit
async def run():
    await fetch_batch_urls(URLS)


await run()

Total time `run` 7.781434774398804


In [83]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def fetch_batch_urls(urls):
    tasks = [
        asyncio.create_task(fetch_url(url))
        for url in urls
    ]
    await asyncio.gather(*tasks)


@async_timeit
async def run():
    await fetch_batch_urls(URLS)


await run()

Total time `run` 7.566035270690918


In [84]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url):
    await asyncio.sleep(1)


async def fetch_batch_urls(urls):
    for url in urls:
        await fetch_url(url)


@async_timeit
async def run():
    await fetch_batch_urls(URLS)


await run()

Total time `run` 20.021975994110107


In [85]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url):
    await asyncio.sleep(1)


async def fetch_batch_urls(urls):
    tasks = [
        asyncio.create_task(fetch_url(url))
        for url in urls
    ]
    await asyncio.gather(*tasks)


@async_timeit
async def run():
    await fetch_batch_urls(URLS)


await run()

Total time `run` 1.0017168521881104


In [4]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20
WORKERS = 4


async def fetch_url(url):
    await asyncio.sleep(1)


async def fetch_worker(que, name):
    print("fetch_worker started", name)
    cnt = 0
    while True:
        url = await que.get()
        if url is None:
            await que.put(url)
            break

        try:
            result = await fetch_url(url)
        except Exception:
            pass
        cnt += 1
    
    print("fetch_worker finished", name, cnt)


@async_timeit
async def run():
    que = asyncio.Queue()
    for url in URLS:
        await que.put(url)
    await que.put(None)
        
    tasks = [
        asyncio.create_task(fetch_worker(que, f"fetcher_{i}"))
        for i in range(WORKERS)
    ]
    await asyncio.gather(*tasks)


await run()

fetch_worker started fetcher_0
fetch_worker started fetcher_1
fetch_worker started fetcher_2
fetch_worker started fetcher_3
fetch_worker finished fetcher_0 5
fetch_worker finished fetcher_1 5
fetch_worker finished fetcher_2 5
fetch_worker finished fetcher_3 5
Total time `run` 5.005600929260254


In [10]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 50
WORKERS = 5


async def fetch_url(url, session):
    async with session.get(url) as resp:
        return await resp.text()


async def fetch_worker(que, session, name):
    print("fetch_worker started", name)
    cnt = 0
    while True:
        url = await que.get()
        if url is None:
            await que.put(url)
            break

        try:
            result = await fetch_url(url, session)
        except Exception:
            pass
        cnt += 1
    
    print("fetch_worker finished", name, cnt)


@async_timeit
async def run():
    que = asyncio.Queue()
    for url in URLS:
        await que.put(url)
    await que.put(None)

    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_worker(que, session, f"fetcher_{i}"))
            for i in range(WORKERS)
        ]
        await asyncio.gather(*tasks)


await run()

fetch_worker started fetcher_0
fetch_worker started fetcher_1
fetch_worker started fetcher_2
fetch_worker started fetcher_3
fetch_worker started fetcher_4
fetch_worker finished fetcher_0 9
fetch_worker finished fetcher_1 10
fetch_worker finished fetcher_2 12
fetch_worker finished fetcher_4 11
fetch_worker finished fetcher_3 8
Total time `run` 1.5966730117797852


# #lesson-09

In [11]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 50
WORKERS = 5


async def fetch_url(url, session):
    async with session.get(url) as resp:
        return await resp.text()


async def fetch_worker(que, session, name):
    print("fetch_worker started", name)
    cnt = 0
    while True:
        url = await que.get()
        if url is None:
            await que.put(url)
            break

        try:
            result = await fetch_url(url, session)
        except Exception:
            pass
        cnt += 1
    
    print("fetch_worker finished", name, cnt)


@async_timeit
async def run():
    que = asyncio.Queue()
    for url in URLS:
        await que.put(url)
    await que.put(None)

    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_worker(que, session, f"fetcher_{i}"))
            for i in range(WORKERS)
        ]
        await asyncio.gather(*tasks)


await run()

fetch_worker started fetcher_0
fetch_worker started fetcher_1
fetch_worker started fetcher_2
fetch_worker started fetcher_3
fetch_worker started fetcher_4
fetch_worker finished fetcher_0 10
fetch_worker finished fetcher_3 10
fetch_worker finished fetcher_4 10
fetch_worker finished fetcher_1 11
fetch_worker finished fetcher_2 9
Total time `run` 1.397209882736206


In [16]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 50
WORKERS = 5


async def fetch_url(url, session):
    async with session.get(url) as resp:
        return await resp.text()


async def fetch_worker(que, session, name):
    print("fetch_worker started", name)
    cnt = 0
    while True:
        try:
            url = await que.get()
        except BaseException as err:
            print("ERROR:", err, type(err))
            raise

        try:
            result = await fetch_url(url, session)
        except Exception:
            pass
        finally:
            que.task_done()

        cnt += 1
    
    print("fetch_worker finished", name, cnt)


@async_timeit
async def run():
    que = asyncio.Queue()
    for url in URLS:
        await que.put(url)
    await que.put(None)

    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_worker(que, session, f"fetcher_{i}"))
            for i in range(WORKERS)
        ]

        await que.join()

        for task in tasks:
            task.cancel()


await run()

fetch_worker started fetcher_0
fetch_worker started fetcher_1
fetch_worker started fetcher_2
fetch_worker started fetcher_3
fetch_worker started fetcher_4
ERROR:  <class 'asyncio.exceptions.CancelledError'>
ERROR:  <class 'asyncio.exceptions.CancelledError'>
ERROR:  <class 'asyncio.exceptions.CancelledError'>
ERROR:  <class 'asyncio.exceptions.CancelledError'>
ERROR:  <class 'asyncio.exceptions.CancelledError'>
Total time `run` 1.686194896697998


In [18]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url, session, sem):
    async with sem:
        async with session.get(url) as resp:
            return await resp.text()


async def fetch_batch_urls(urls, session, sem):
    tasks = [
        asyncio.create_task(fetch_url(url, session, sem))
        for url in urls
    ]
    await asyncio.gather(*tasks)


@async_timeit
async def run(urls):
    sem = asyncio.Semaphore(len(urls))
    
    async with aiohttp.ClientSession() as session:
        await fetch_batch_urls(urls, session, sem)


await run(URLS)

Total time `run` 0.8602087497711182


In [20]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url, session, sem):
    async with sem:
        async with session.get(url) as resp:
            return await resp.text()


async def fetch_batch_urls(urls, session, sem):
    tasks = [
        asyncio.create_task(fetch_url(url, session, sem))
        for url in urls
    ]
    await asyncio.gather(*tasks)


@async_timeit
async def run(urls):
    sem = asyncio.Semaphore(5)
    
    async with aiohttp.ClientSession() as session:
        await fetch_batch_urls(urls, session, sem)


await run(URLS)

Total time `run` 0.6162121295928955


# Generator

In [21]:
async def countdown(n):
    for i in range(n):
        yield i
        print(f"yield {i}")
        await asyncio.sleep(0.1)

In [22]:
async for i in countdown(5):
    print("------")

------
yield 0
------
yield 1
------
yield 2
------
yield 3
------
yield 4


In [28]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 5


async def fetch_url(url, session):
    async with session.get(url) as resp:
        return await resp.text()


async def fetch_batch_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_url(url, session))
            for url in urls
        ]
        start = time.time()
        for complete_task in asyncio.as_completed(tasks):
            print("got complete_task", time.time() - start)
            result = await complete_task
            print("got task result", time.time() - start)


@async_timeit
async def run(urls):
    await fetch_batch_urls(urls)


await run(URLS)

got complete_task 3.1948089599609375e-05
got task result 0.301527738571167
got complete_task 0.3016979694366455
got task result 0.30736589431762695
got complete_task 0.30739879608154297
got task result 0.33412671089172363
got complete_task 0.3341648578643799
got task result 0.3341989517211914
got complete_task 0.33420896530151367
got task result 0.34596872329711914
Total time `run` 0.347700834274292


In [29]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 5


async def fetch_url(url, session):
    async with session.get(url) as resp:
        return await resp.text()


async def fetch_batch_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_url(url, session))
            for url in urls
        ]
        start = time.time()
        async for complete_task in asyncio.as_completed(tasks):
            print("got complete_task", time.time() - start)
            result = await complete_task
            print("got task result", time.time() - start)


@async_timeit
async def run(urls):
    await fetch_batch_urls(urls)


await run(URLS)

got complete_task 0.2512941360473633
got task result 0.25141286849975586
got complete_task 0.25349903106689453
got task result 0.2535269260406494
got complete_task 0.27022719383239746
got task result 0.2702620029449463
got complete_task 0.286013126373291
got task result 0.2860450744628906
got complete_task 0.28608202934265137
got task result 0.2860910892486572
Total time `run` 0.2879478931427002


In [32]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 15


async def fetch_url(url, session, name):
    async with session.get(url) as resp:
        await resp.text()
        return name


async def fetch_batch_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_url(url, session, f"fetch_{i}"))
            for i, url in enumerate(urls)
        ]

        async for complete_task in asyncio.as_completed(tasks):
            result = await complete_task  # fetch_3
            num = int(result[len("fetch_"):])  # result.replace("fetch_", "")
            if num % 2 == 0:
                yield result


@async_timeit
async def run(urls):
    async for res in fetch_batch_urls(urls):
        print("valid result:", res)


await run(URLS)

valid result: fetch_14
valid result: fetch_8
valid result: fetch_0
valid result: fetch_4
valid result: fetch_2
valid result: fetch_6
valid result: fetch_10
valid result: fetch_12
Total time `run` 0.8367111682891846


In [37]:
async def blocking_api(delay, what):
    print("start blocking_api")
    time.sleep(delay)
    # factorial(1000000)
    # urllib.urlopen(url)

    print("finish blocking_api:", what)


async def nonblocking_api(delay, what):
    print("start nonblocking_api")
    await asyncio.sleep(delay)
    print("finish nonblocking_api:", what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(blocking_api(2, "hello"))
    task2 = asyncio.create_task(nonblocking_api(2, "world"))

    await asyncio.gather(task1, task2)
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:07:53
start blocking_api
finish blocking_api: hello
start nonblocking_api
finish nonblocking_api: world
finished at 20:07:57
Total time `main` 4.002777814865112


In [38]:
async def blocking_api(delay, what):
    print("start blocking_api")
    time.sleep(delay)

    print("finish blocking_api:", what)


async def nonblocking_api(delay, what):
    print("start nonblocking_api")
    await asyncio.sleep(delay)
    print("finish nonblocking_api:", what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(nonblocking_api(2, "world"))
    task2 = asyncio.create_task(blocking_api(2, "hello"))
    
    await asyncio.gather(task1, task2)
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:08:39
start nonblocking_api
start blocking_api
finish blocking_api: hello
finish nonblocking_api: world
finished at 20:08:41
Total time `main` 2.001520872116089


In [35]:
async def blocking_api(delay, what):
    print("start blocking_api")
    #time.sleep(delay)
    # factorial(1000000)
    # urllib.urlopen(url)

    await asyncio.to_thread(time.sleep, delay)
    print("finish blocking_api:", what)


async def nonblocking_api(delay, what):
    print("start nonblocking_api")
    await asyncio.to_thread(time.sleep, delay)
    print("finish nonblocking_api:", what)


@async_timeit
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task1 = asyncio.create_task(blocking_api(2, "hello"))
    task2 = asyncio.create_task(nonblocking_api(2, "world"))

    await asyncio.gather(task1, task2)
    
    print(f"finished at {time.strftime('%X')}")


await main()

started at 20:01:10
start blocking_api
start nonblocking_api
finish nonblocking_api: world
finish blocking_api: hello
finished at 20:01:12
Total time `main` 2.0060548782348633
func run


In [36]:
def func():
    print("func run")


ioloop = asyncio.get_running_loop()


await ioloop.run_in_executor(None, func)