In [3]:
import nest_asyncio
import asyncio
import socket
from keyword import kwlist

# Apply the patch to allow asyncio.run() in a running event loop
nest_asyncio.apply()

MAX_KEYWORD_LEN = 4

async def probe(domain: str) -> tuple[str, bool]:
    loop = asyncio.get_running_loop()
    try:
        await loop.getaddrinfo(domain, None)
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

async def main() -> None:
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
    domains = (f'{name}.dev'.lower() for name in names)
    coros = [probe(domain) for domain in domains]
    for coro in asyncio.as_completed(coros):
        domain, found = await coro
        mark = '+' if found else ' '
        print(f'{mark} {domain}')

if __name__ == '__main__':
    asyncio.run(main())

+ not.dev
+ from.dev
+ del.dev
  none.dev
  for.dev
  or.dev
+ try.dev
+ in.dev
+ and.dev
  else.dev
  with.dev
+ as.dev
  elif.dev
  if.dev
  pass.dev
+ true.dev
+ def.dev
  is.dev


In [6]:
def download_many(cc_list: list[str]) -> int:
    return asyncio.run(supervisor(cc_list))

async def supervisor(cc_list: list[str]) -> int:
    async with AsyncClient() as client:
        to_do = [download_one(client, cc) for cc in sorted(cc_list)]
        res = await asyncio.gather(*to_do)
    return len(res)

if __name__ == '__main__':
    main(download_many)

TypeError: main() takes 0 positional arguments but 1 was given

In [5]:
from httpx import AsyncClient

async def download_one(client: AsyncClient, cc: str):
    image = await get_flag(client, cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

async def get_flag(client: AsyncClient, cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=6.1, follow_redirects=True)
    return resp.read()

In [31]:
import asyncio
import sys
from collections import Counter
from http import HTTPStatus
from pathlib import Path
from enum import Enum
from pathlib import Path

import argparse
import httpx
import tqdm

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
DownloadStatus = Enum('DownloadStatus', 'OK NOT_FOUND ERROR')
DEST_DIR = Path('downloaded')
DEFAULT_SERVER = 'LOCAL'

SERVERS = {
    'REMOTE': 'https://www.fluentpython.com/data/flags',
    'LOCAL':  'http://localhost:8000/flags',
    'DELAY':  'http://localhost:8001/flags',
    'ERROR':  'http://localhost:8002/flags',
}

def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
        description='Download flags for country codes. '
                    'Default: top 20 countries by population.')
    parser.add_argument(
        'cc', metavar='CC', nargs='*',
        help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument(
        '-a', '--all', action='store_true',
        help='get all available flags (AD to ZW)')
    parser.add_argument(
        '-e', '--every', action='store_true',
        help='get flags for every possible code (AA...ZZ)')
    parser.add_argument(
        '-l', '--limit', metavar='N', type=int, help='limit to N first codes',
        default=sys.maxsize)
    parser.add_argument(
        '-m', '--max_req', metavar='CONCURRENT', type=int,
        default=default_concur_req,
        help=f'maximum concurrent requests (default={default_concur_req})')
    parser.add_argument(
        '-s', '--server', metavar='LABEL', default=DEFAULT_SERVER,
        help=f'Server to hit; one of {server_options} '
             f'(default={DEFAULT_SERVER})')
    parser.add_argument(
        '-v', '--verbose', action='store_true',
        help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        # "standard" exit status codes:
        # https://stackoverflow.com/questions/1101957/are-there-any-standard-exit-status-codes-in-linux/40484670#40484670
        sys.exit(2)  # command line usage error
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(2)  # command line usage error
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print(f'*** Usage error: --server LABEL '
              f'must be one of {server_options}')
        parser.print_usage()
        sys.exit(2)  # command line usage error
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(2)  # command line usage error

    if not cc_list:
        cc_list = sorted(POP20_CC)[:args.limit]
    return args, cc_list

def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename).write_bytes(img)

def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    final_report(cc_list, counter, t0)

async def get_flag(client: httpx.AsyncClient, base_url: str, cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()
    return resp.content

async def download_one(client: httpx.AsyncClient, cc: str, base_url: str, semaphore: asyncio.Semaphore, verbose: bool) -> DownloadStatus:
    try:
        async with semaphore:
            image = await get_flag(client, base_url, cc)
    except httpx.HTTPStatusError as exc:
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        await asyncio.to_thread(save_flag, image, f'{cc}.gif')
        status = DownloadStatus.OK
    if verbose and msg:
        print(cc, msg)
    return status

In [32]:
"""
All network I/O is done with coroutines in asyncio, but not file I/O. 
the asyncio.to_thread coroutine makes it easy to delegate file I/O
to a thread pool provided by asyncio
"""

'\nAll network I/O is done with coroutines in asyncio, but not file I/O. \nthe asyncio.to_thread coroutine makes it easy to delegate file I/O\nto a thread pool provided by asyncio\n'

In [33]:
"""
There are three Semaphore classes in Python’s standard library:
threading, multiprocessing, asyncio

Semaphore:
controls access to a shared resource by limiting the number of coroutines that can access it concurrently.
use cases:
Database Connections
Web Scraping
API Requests

Semaphore has an internal counter that is decremented whenever we await on .acquire()
and incremented when we call .release()

Awaiting on .acquire() causes no delay when the counter is greater than zero, but if
the counter is zero, .acquire() suspends the awaiting coroutine until some other
coroutine calls .release() on the same Semaphore
"""

'\nThere are three Semaphore classes in Python’s standard library:\nthreading, multiprocessing, asyncio\n\nSemaphore:\ncontrols access to a shared resource by limiting the number of coroutines that can access it concurrently.\nuse cases:\nDatabase Connections\nWeb Scraping\nAPI Requests\n\nSemaphore has an internal counter that is decremented whenever we await on .acquire()\nand incremented when we call .release()\n\nAwaiting on .acquire() causes no delay when the counter is greater than zero, but if\nthe counter is zero, .acquire() suspends the awaiting coroutine until some other\ncoroutine calls .release() on the same Semaphore\n'

In [34]:
async def supervisor(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]:
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req)
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose) for cc in sorted(cc_list)]
        to_do_iter = asyncio.as_completed(to_do)
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
        error: httpx.HTTPError | None = None
        for coro in to_do_iter:
            try:
                status = await coro
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc
            except KeyboardInterrupt:
                break
            if error:
                status = DownloadStatus.ERROR
                if verbose:
                    url = str(error.request.url)
                    cc = Path(url).stem.upper()
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1
    return counter

def download_many(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]:
    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro)
    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

usage: ipykernel_launcher.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
                             [-v]
                             [CC ...]
ipykernel_launcher.py: error: unrecognized arguments: -f


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [35]:
async def get_country(client: httpx.AsyncClient, base_url: str, cc: str) -> str:
    url = f'{base_url}/{cc}/metadata.json'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()
    metadata = resp.json()
    return metadata['country']

In [37]:
async def download_one(client: httpx.AsyncClient, cc: str, base_url: str, semaphore: asyncio.Semaphore, verbose: bool) -> DownloadStatus:
    try:
        # for paralllel could use gather but if one raises exception its pointless.
        async with semaphore:
            image = await get_flag(client, base_url, cc)
        async with semaphore:
            country = await get_country(client, base_url, cc)
    except httpx.HTTPStatusError as exc:
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        filename = country.replace(' ', '_')
        await asyncio.to_thread(save_flag, image, f'{filename}.gif')
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status

In [38]:
"""
To save an image to disk Python >= 3.9:
await asyncio.to_thread(save_falg, image, f'{cc}.gif')

Below 3.9:
loop = asyncio.get_running_loop()
loop.run_in_executor(None, save_flag, image, f'{cc}.gif')
"""

"\nTo save an image to disk Python >= 3.9:\nawait asyncio.to_thread(save_falg, image, f'{cc}.gif')\n\nBelow 3.9:\nloop = asyncio.get_running_loop()\nloop.run_in_executor(None, save_flag, image, f'{cc}.gif')\n"

In [39]:
"""
A thread pool was more performant in the particular use case 
of a database driver—despite the myth that asynchronous approaches
are always faster than threads for network I/O.
"""

'\nA thread pool was more performant in the particular use case \nof a database driver—despite the myth that asynchronous approaches\nare always faster than threads for network I/O.\n'

In [40]:
"""
An InvertedIndex.search method breaks a query into words, and returns the
intersection of the entries for each word. For instance an emoji search,
for “face” finds 171 results, “cat” finds 14, but “cat face” only 10.
"""

'\nAn InvertedIndex.search method breaks a query into words, and returns the\nintersection of the entries for each word. For instance an emoji search,\nfor “face” finds 171 results, “cat” finds 14, but “cat face” only 10.\n'

In [6]:
# Using fastAPI
import asyncio
import os
from pathlib import Path
from unicodedata import name

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from asyncio.trsock import TransportSocket

from charindex import InvertedIndex, format_results

# Path(__file__) will not work as we are in jupyter notebook
STATIC_PATH = Path(os.getcwd()) / 'static'

app = FastAPI(
    title='Emoji Finder',
    description='search for an emoji by name',
)

class CharName(BaseModel): # enforces type hints at runtime, for data validation.
    char: str
    name: str

# FastAPI has project generation scripts that prepare static assets for CDN.
# And to use a proxy/load-balancer in the front of the server.
# "edge router" this is just a quick example
def init(app):
    app.state.index = InvertedIndex()
    app.state.form = (STATIC_PATH / 'form.html').read_text()

init(app)

@app.get('/search', response_model=list[CharName])
async def search(q: str):
    chars = sorted(app.state.index.search(q))
    return ({'char': c, 'name': name(c)} for c in chars)

@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form():
    return app.state.form

In [8]:
import functools
from typing import cast

async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
    server = await asyncio.start_server(
        functools.partial(finder, index),
        host, port)
    socket_list = cast(tuple[TransportSocket, ...], server.sockets)
    addr = socket_list[0].getsockname()
    print(f'Serving on {addr}. Hit CTRL-C to stop.')
    await server.serve_forever()

def main(host: str = '127.0.0.1', port_arg: str = '8081'):
    port = int(port_arg)
    print('Building index.')
    index = InvertedIndex()
    try:
        asyncio.run(supervisor(index, host, port))
    except KeyboardInterrupt:
        print('\nServer shut down.')

if __name__ == '__main__':
    main() # just use the defaults
   # main(*sys.argv[1:]) # This is meant to be ran by the command line.

In [58]:
CRLF = b'\r\n'
PROMPT = b'?> '

async def finder(index: InvertedIndex, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
    client = writer.get_extra_info('peername')
    while True:
        writer.write(PROMPT)
        await writer.drain()
        data = await reader.readline()
        if not data:
            break
        try:
            query = data.decode().strip()
        except UnicodeDecodeError:
            query = '\x00'
        print(f' From {client}: {query!r}')
        if query:
            if ord(query[:1]) < 32:
                break
            results = await search(query, index, writer)
            print(f'  To {client}: {results} results.')
    writer.close()
    await writer.wait_closed()
    print(f'Close {client}.')

In [9]:
async def search(query: str, index: InvertedIndex, writer: asyncio.StreamWriter) -> int:
    chars = index.search(query)
    lines = (line.encode() + CRLF for line in format_results(chars))
    writer.writelines(lines)
    await writer.drain()
    status_line = f'{"-" * 66} {len(chars)} found'
    writer.write(status_line.encode() + CRLF)
    await writer.drain()
    return len(chars)

In [10]:
"""
objects implementing
__aenter__, __aexit__
returns an awaitable

asynchronous iterables:
__aiter__

Asynchronous generators
async for ...

Since Python 3.8, you can run the interpreter with the -m asyncio command line
option to get an async REPL: a Python console that imports asyncio, provides a
running event loop, and accepts await, async for, and async with at the top-level
prompt—which otherwise are syntax errors when used outside of native coroutines.
"""

'\nobjects implementing\n__aenter__, __aexit__\nreturns an awaitable\n\nasynchronous iterables:\n__aiter__\n\nAsynchronous generators\nasync for ...\n\nSince Python 3.8, you can run the interpreter with the -m asyncio command line\noption to get an async REPL: a Python console that imports asyncio, provides a\nrunning event loop, and accepts await, async for, and async with at the top-level\nprompt—which otherwise are syntax errors when used outside of native coroutines.\n'

In [13]:
import asyncio
import socket
from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple, Optional

class Result(NamedTuple):
    domain: str
    found: bool

OptionalLoop = Optional[asyncio.AbstractEventLoop]

async def probe(domain: str, loop: OptionalLoop = None) -> Result:
    if loop is None:
        loop = asyncio.get_running_loop()
    try:
        await loop.getaddrinfo(domain, None)
    except socket.gaierror:
        return Result(domain, False)
    return Result(domain, True)

async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]:
    loop = asyncio.get_running_loop()
    coros = [probe(domain, loop) for domain in domains]
    for coro in asyncio.as_completed(coros): # not async for because asyncio.as_completed is a classic generator
        yield await coro

In [15]:
import sys
from keyword import kwlist

async def main(tld: str) -> None:
    tld = tld.strip('.')
    names = (kw for kw in kwlist if len(kw) <= 4)
    domains = (f'{name}.{tld}'.lower() for name in names)
    print('FOUND\t\tNOT FOUND')
    print('======\t\t========')
    async for domain, found in multi_probe(domains):
        indent = '' if found else '\t\t'
        print(f'{indent}{domain}')

if __name__ == '__main__':
    if len(sys.argv) == 2:
        asyncio.run(main(sys.argv[1]))
    else:
        print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')

Please provide a TLD. Example: /Users/mathias/miniforge3/envs/myenv/lib/python3.10/site-packages/ipykernel_launcher.py COM.BR


In [17]:
from contextlib import asynccontextmanager

@asynccontextmanager
async def web_page(url):
    loop = asyncio.get_running_loop()
    data = await loop.run_in_executor(None, download_webpage, url)
    yield data
    await loop.run_in_executor(None, update_status, url)

async with web_page('news.ycombinator.com') as data:
    process(data)

NameError: name 'download_webpage' is not defined

In [18]:
"""
A native coroutine may return some value other than None. An asynchronous
generator can only use empty return statements.

Asynchronous generators are not awaitable.
"""

'\nA native coroutine may return some value other than None. An asynchronous\ngenerator can only use empty return statements.\n\nAsynchronous generators are not awaitable.\n'

In [19]:
# asynchronous generator expression
names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()
gen_found = (name async for name, found in multi_probe(names) if found)
gen_found

<async_generator object <genexpr> at 0x10b000140>

In [20]:
async for name in gen_found:
    print(name)

python.org
golang.org
rust-lang.org


In [21]:
"""
Using await in a list comprehension is similar to using asyncio.gather.
Gather gives you more control over exception handling, because of its 
return_exceptions argument.
"""

'\nUsing await in a list comprehension is similar to using asyncio.gather.\nGather gives you more control over exception handling, because of its \nreturn_exceptions argument.\n'

In [23]:
names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()
names = sorted(names) # this doesnt matter results come out in the order they were submitted
coros = [probe(name) for name in names]
await asyncio.gather(*coros)

[Result(domain='golang.org', found=True),
 Result(domain='no-lang.invalid', found=False),
 Result(domain='python.org', found=True),
 Result(domain='rust-lang.org', found=True)]

In [24]:
[await probe(name) for name in names]

[Result(domain='golang.org', found=True),
 Result(domain='no-lang.invalid', found=False),
 Result(domain='python.org', found=True),
 Result(domain='rust-lang.org', found=True)]

In [25]:
{name: found async for name, found in multi_probe(names)}

{'no-lang.invalid': False,
 'python.org': True,
 'golang.org': True,
 'rust-lang.org': True}

In [27]:
{name for name in names if (await probe(name)).found}

{'golang.org', 'python.org', 'rust-lang.org'}

In [29]:
# using Curio - improve the usability of the asyncio API
from curio import run, TaskGroup
import curio.socket as socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4

async def probe(domain: str) -> tuple[str, bool]:
    try:
        await socket.getaddrinfo(domain, None)
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

async def main() -> None:
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
    domains = (f'{name}.dev'.lower() for name in names)
    async with TaskGroup() as group:
        for domain in domains:
            await group.spawn(probe, domain)
        async for task in group:
            domain, found = task.result
            mark = '+' if found else ' '
            print(f'{mark} {domain}')

if __name__ == '__main__':
    run(main())

+ true.dev
+ from.dev
+ not.dev
+ del.dev
  for.dev
  none.dev
  pass.dev
  or.dev
+ in.dev
+ def.dev
  else.dev
+ try.dev
+ and.dev
+ as.dev
  if.dev
  elif.dev
  with.dev
  is.dev


In [30]:
"""
Curio has features that allow it to run in a thread along with
asyncio in another thread, in the same process, communicating
via UniversalQueue and UniversalEvent.
"""

'\nCurio has features that allow it to run in a thread along with\nasyncio in another thread, in the same process, communicating\nvia UniversalQueue and UniversalEvent.\n'

In [31]:
# watch later, note Python 3.6 added list comprehensions and generator expressions async
# https://www.youtube.com/watch?v=E-1Y4kSsAFc 