# Mastering Python asyncio (Basic ‚Üí Advanced)
**Goal:** Go from *what is async?* to building robust, cancellable, high‚Äëthroughput async systems.

**How to use this notebook:**
- Read the markdown, run each code cell, and tweak values.
- Do the **Exercises** (they‚Äôre intentionally practical).
- In Jupyter, you can often use **top‚Äëlevel `await`** directly in a cell.
- In normal Python scripts, use `asyncio.run(main())`.

> Tested concepts target Python 3.10+ (works best on 3.11+).

## 0) Quick setup and sanity checks

In [1]:
import sys, asyncio, platform
print("Python:", sys.version)
print("Platform:", platform.platform())
print("asyncio:", asyncio.__file__)

Python: 3.11.9 (tags/v3.11.9:de54cf5, Apr  2 2024, 10:12:12) [MSC v.1938 64 bit (AMD64)]
Platform: Windows-10-10.0.26200-SP0
asyncio: c:\Users\risha\AppData\Local\Programs\Python\Python311\Lib\asyncio\__init__.py


## 1) Mental model: what `async` / `await` really means
- `async def` defines a **coroutine function**.
- Calling it returns a **coroutine object** (it does *not* run yet).
- `await` **pauses** the coroutine until the awaited thing completes.
- The **event loop** schedules coroutines (and callbacks) so that while one waits for I/O, another can run.

**Key idea:** asyncio gives **concurrency**, not parallelism (unless you use threads/processes).

### 1.1) Your first coroutine

In [4]:
import asyncio

async def hello():
    print("hello ...")
    await asyncio.sleep(5)  # pretend we're waiting for I/O
    print("... world")

# In a normal script:
# asyncio.run(hello())

# In Jupyter/IPython, you can usually do:
await hello()

hello ...
... world


### 1.2) Common beginner mistake: forgetting to `await`

In [5]:
async def compute():
    await asyncio.sleep(5)
    return 42

coro = compute()
print("This is a coroutine object, not a result:", coro)

result = await coro
print("After awaiting:", result)

This is a coroutine object, not a result: <coroutine object compute at 0x000001F93CCF90C0>
After awaiting: 42


**Concurrency** means multiple tasks are in progress at the same time, but not necessarily executing at the exact same instant.

- Usually this happens on one CPU core via time-slicing (rapid task switching).

- Think of it like:

A single barista serving 3 customers:

- Takes order from A
- Switches to B while A‚Äôs coffee brews
- Switches to C

All customers feel served ‚Äútogether‚Äù, but the barista is still one.

Example
- A web server handling many requests on one core
- Python asyncio

‚ö° 2. Parallelism ‚Äî ‚ÄúDoing many things at exactly the same time‚Äù

üßµ 3. Multithreading ‚Äî ‚ÄúMultiple threads inside one process‚Äù

üß© 4. Multiprocessing ‚Äî ‚ÄúMultiple independent processes‚Äù


| Concept         | Runs at same time? | Uses multiple cores? | Memory shared? | Main purpose            |
| --------------- | ------------------ | -------------------- | -------------- | ----------------------- |
| Concurrency     | Not necessarily    | No                   | Depends        | Organize tasks          |
| Parallelism     | Yes                | Yes                  | Depends        | Speed                   |
| Multithreading  | Maybe              | Maybe                | ‚úÖ Yes          | Lightweight concurrency |
| Multiprocessing | Yes                | Yes                  | ‚ùå No           | True CPU parallelism    |


## 2) Concurrency with Tasks
A **Task** is a scheduled coroutine. Tasks let multiple coroutines make progress concurrently.

- `asyncio.create_task(coro())` schedules immediately.
- `await task` waits for completion.
- `asyncio.gather(...)` awaits many concurrently.

### 2.1) Sequential vs concurrent

In [None]:
import time, asyncio

async def work(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"

async def sequential():
    t0 = time.perf_counter()
    a = await work("A", 0.6)
    b = await work("B", 0.6)
    return a, b, time.perf_counter() - t0

async def concurrent():
    t0 = time.perf_counter()
    t1 = asyncio.create_task(work("A", 0.6))
    t2 = asyncio.create_task(work("B", 0.6))
    a, b = await asyncio.gather(t1, t2)
    return a, b, time.perf_counter() - t0

print("Sequential:", await sequential())
print("Concurrent:", await concurrent())

### 2.2) `gather` return order vs completion order

In [None]:
import random, asyncio

async def jittery(i):
    d = random.random() * 0.5
    await asyncio.sleep(d)
    return (i, round(d, 3))

results = await asyncio.gather(*(jittery(i) for i in range(5)))
print("Returned in input order:", results)

### 2.3) `as_completed` for streaming results as they finish

In [None]:
import asyncio, random

async def job(i):
    d = random.random() * 0.6
    await asyncio.sleep(d)
    return f"job {i} finished in {d:.2f}s"

tasks = [asyncio.create_task(job(i)) for i in range(6)]

for fut in asyncio.as_completed(tasks):
    print(await fut)

## 3) Error handling in concurrent code
Two core tools:
- `asyncio.gather(..., return_exceptions=True)` to collect exceptions as results.
- `try/except` around awaited tasks for fine control.

Also remember: **un-awaited Tasks can hide errors** (you‚Äôll see warnings like `Task exception was never retrieved`).

In [None]:
import asyncio

async def ok():
    await asyncio.sleep(0.1)
    return "OK"

async def boom():
    await asyncio.sleep(0.2)
    raise ValueError("kaboom")

# gather default: first exception cancels the gather wait by raising
try:
    await asyncio.gather(ok(), boom())
except Exception as e:
    print("gather raised:", repr(e))

# capture exceptions as results
results = await asyncio.gather(ok(), boom(), return_exceptions=True)
print("return_exceptions=True:", results)

## 4) Cancellation, timeouts, shielding
Cancellation is a feature, not an error.
- `task.cancel()` requests cancellation.
- `asyncio.TimeoutError` for `wait_for` timeouts.
- Use `asyncio.shield()` to protect an operation from cancellation (sparingly).

### 4.1) Cooperative cancellation

In [None]:
import asyncio

async def long_running():
    try:
        for i in range(10):
            await asyncio.sleep(0.2)
            print("tick", i)
    except asyncio.CancelledError:
        print("cleanup before cancel")
        raise  # important: re-raise so cancellation propagates

task = asyncio.create_task(long_running())
await asyncio.sleep(0.55)
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("task cancelled (caller observed)")

### 4.2) Timeouts with `wait_for`

In [None]:
import asyncio

async def slow():
    await asyncio.sleep(2)
    return "done"

try:
    await asyncio.wait_for(slow(), timeout=0.3)
except asyncio.TimeoutError:
    print("Timed out!")

### 4.3) Shielding a sub-operation (use carefully)

In [None]:
import asyncio

async def must_finish():
    await asyncio.sleep(0.5)
    return "finished critical section"

async def parent():
    # If parent is cancelled, shielded child continues.
    return await asyncio.shield(must_finish())

task = asyncio.create_task(parent())
await asyncio.sleep(0.1)
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("parent cancelled, but child may still be running")

await asyncio.sleep(0.6)  # let the shielded part complete in background
print("Done waiting.")

## 5) Structured concurrency with `TaskGroup` (Python 3.11+)
TaskGroup gives safer lifecycle management: tasks are scoped and errors are aggregated.

- If one task fails, siblings are cancelled.
- Exceptions are raised as an `ExceptionGroup`.

If you‚Äôre on Python < 3.11, skip this section or upgrade for the best experience.

In [None]:
import sys, asyncio

async def tg_demo():
    async def worker(i):
        await asyncio.sleep(0.1 * i)
        if i == 3:
            raise RuntimeError("worker 3 failed")
        return i

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(worker(i)) for i in range(5)]
    # If we got here, all succeeded
    return [t.result() for t in tasks]

if sys.version_info >= (3, 11):
    try:
        await tg_demo()
    except* RuntimeError as eg:
        print("Caught RuntimeError(s) from TaskGroup:", eg)
else:
    print("TaskGroup needs Python 3.11+")

## 6) Synchronization primitives
Use these when coroutines share resources:
- `Lock`, `Semaphore`, `Event`, `Condition`
- `Queue` for producer/consumer pipelines

**Rule of thumb:** Prefer message passing (queues) over shared mutable state.

### 6.1) Lock to protect shared state

In [None]:
import asyncio

counter = 0
lock = asyncio.Lock()

async def inc(n):
    global counter
    for _ in range(n):
        async with lock:
            counter += 1

await asyncio.gather(*(inc(1000) for _ in range(10)))
print("counter =", counter, "(expected 10000)")

### 6.2) Semaphore to limit concurrency

In [None]:
import asyncio, time

sem = asyncio.Semaphore(3)

async def limited(i):
    async with sem:
        await asyncio.sleep(0.3)
        return i

t0 = time.perf_counter()
results = await asyncio.gather(*(limited(i) for i in range(10)))
print("results:", results)
print("elapsed:", round(time.perf_counter() - t0, 2), "seconds (‚âà ceil(10/3)*0.3)")

### 6.3) Queue for producer/consumer

In [None]:
import asyncio, random

async def producer(q: asyncio.Queue):
    for i in range(10):
        await asyncio.sleep(random.random() * 0.2)
        await q.put(i)
    # signal completion to consumers
    await q.put(None)

async def consumer(q: asyncio.Queue):
    while True:
        item = await q.get()
        if item is None:
            await q.put(None)  # let other consumers stop too
            q.task_done()
            break
        await asyncio.sleep(0.1)
        print("consumed", item)
        q.task_done()

q = asyncio.Queue()
await asyncio.gather(
    producer(q),
    consumer(q),
    consumer(q),
)
await q.join()
print("pipeline complete")

## 7) Async context managers and async iterators
When setup/cleanup needs awaiting (e.g., network connections), use:
- `async with ...`
- `async for ...`

In [None]:
import asyncio

class AsyncTimer:
    async def __aenter__(self):
        self.t0 = asyncio.get_running_loop().time()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.t1 = asyncio.get_running_loop().time()
        self.elapsed = self.t1 - self.t0

async with AsyncTimer() as t:
    await asyncio.sleep(0.2)

print("elapsed:", round(t.elapsed, 3), "seconds")

### 7.1) Async generator (`async for`)

In [None]:
import asyncio

async def ticker(n):
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i

async for x in ticker(5):
    print("tick:", x)

## 8) Mixing blocking code with asyncio
Asyncio **does not** magically make blocking functions non-blocking.

If you must call blocking code:
- Prefer non-blocking libraries (async-native) when possible.
- Otherwise, offload with `asyncio.to_thread()` (threads) or process pools.

**Watch out:** CPU-heavy work in threads won‚Äôt speed up due to the GIL; use processes for real parallelism.

### 8.1) Offload blocking I/O to a thread with `to_thread`

In [None]:
import asyncio, time

def blocking_io():
    time.sleep(0.5)
    return "blocking result"

async def main():
    t0 = time.perf_counter()
    res = await asyncio.to_thread(blocking_io)
    return res, time.perf_counter() - t0

print(await main())

### 8.2) CPU-bound work: use a process pool (advanced)

In [None]:
import asyncio, concurrent.futures, math, os

def cpu_bound(n: int) -> int:
    # deliberately heavy-ish
    total = 0
    for i in range(1, n):
        total += int(math.sqrt(i) * 1000) % 97
    return total

async def run_cpu_in_processes():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        tasks = [loop.run_in_executor(pool, cpu_bound, 300_000 + i*10_000) for i in range(4)]
        return await asyncio.gather(*tasks)

print("process results:", await run_cpu_in_processes())

## 9) Streams: TCP echo client/server
Asyncio includes stream APIs to build network services.

In many notebook environments, server sockets work fine locally. If your environment restricts networking, treat this as reference code.

In [None]:
import asyncio

async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info("peername")
    print(f"server got {message!r} from {addr}")

    writer.write(data)
    await writer.drain()
    writer.close()
    await writer.wait_closed()

async def tcp_demo():
    server = await asyncio.start_server(handle_echo, "127.0.0.1", 0)  # 0 => choose free port
    host, port = server.sockets[0].getsockname()[:2]
    print("server listening on", host, port)

    async with server:
        # make a client request
        reader, writer = await asyncio.open_connection(host, port)
        writer.write(b"hello tcp")
        await writer.drain()
        data = await reader.read(100)
        print("client got:", data.decode())
        writer.close()
        await writer.wait_closed()

        # close server
        server.close()
        await server.wait_closed()

await tcp_demo()

## 10) Subprocesses
Run external commands without blocking the loop using `asyncio.create_subprocess_exec`.
This is handy for orchestration tools.

In [None]:
import asyncio, sys

async def run_cmd():
    # cross-platform: run python itself
    proc = await asyncio.create_subprocess_exec(
        sys.executable, "-c", "print('hi from subprocess')",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    out, err = await proc.communicate()
    return proc.returncode, out.decode().strip(), err.decode().strip()

print(await run_cmd())

## 11) Debugging and observability
Useful tools and techniques:
- `asyncio.get_running_loop().set_debug(True)`
- Environment variable `PYTHONASYNCIODEBUG=1`
- `asyncio.all_tasks()` to inspect current tasks
- Naming tasks (`create_task(..., name='...')` in 3.8+)
- In 3.11+, `task.get_coro()` and structured ExceptionGroups help a lot.

In [None]:
import asyncio

async def inspect_tasks():
    t = asyncio.create_task(asyncio.sleep(0.3), name="my-sleep-task")
    await asyncio.sleep(0)  # let the loop schedule the task
    tasks = asyncio.all_tasks()
    # show a few task names
    named = sorted([task.get_name() for task in tasks])
    await t
    return named

print(await inspect_tasks())

## 12) Advanced patterns you should master
### 12.1) Bounded concurrency (a common interview + real-world pattern)
You have N items and want to process them concurrently with a limit.

We‚Äôll implement a reusable helper `map_async_bounded`.

In [None]:
import asyncio
from typing import Iterable, Callable, Awaitable, TypeVar, List

T = TypeVar("T")
R = TypeVar("R")

async def map_async_bounded(
    items: Iterable[T],
    func: Callable[[T], Awaitable[R]],
    limit: int = 10,
) -> List[R]:
    sem = asyncio.Semaphore(limit)
    results: List[R] = []

    async def run_one(x: T) -> R:
        async with sem:
            return await func(x)

    tasks = [asyncio.create_task(run_one(x)) for x in items]
    # gather preserves order
    return await asyncio.gather(*tasks)

# demo
async def f(x):
    await asyncio.sleep(0.1)
    return x * x

print(await map_async_bounded(range(20), f, limit=4))

### 12.2) Timeouts + retries (without blocking)
You‚Äôll often wrap I/O with timeouts and retry policies.
Below is a simple exponential backoff retry helper.

In [None]:
import asyncio, random
from typing import Optional, Callable, Awaitable, TypeVar

R = TypeVar("R")

async def retry(
    op: Callable[[], Awaitable[R]],
    *,
    retries: int = 3,
    base_delay: float = 0.2,
    timeout: Optional[float] = None,
) -> R:
    last_exc = None
    for attempt in range(retries + 1):
        try:
            if timeout is None:
                return await op()
            return await asyncio.wait_for(op(), timeout=timeout)
        except Exception as e:
            last_exc = e
            if attempt == retries:
                raise
            delay = base_delay * (2 ** attempt) * (1 + random.random() * 0.2)
            await asyncio.sleep(delay)
    raise last_exc  # type: ignore

# demo operation that fails randomly
async def flaky():
    await asyncio.sleep(0.05)
    if random.random() < 0.7:
        raise RuntimeError("flaky failure")
    return "success"

try:
    print(await retry(flaky, retries=5, base_delay=0.05, timeout=0.2))
except Exception as e:
    print("still failed:", repr(e))

### 12.3) Cancellation-safe cleanup (resources)
When cancellation happens, ensure resources close properly.
Pattern: `try: ... finally: ...` and be careful to keep cleanup awaited.

In [None]:
import asyncio

class FakeConnection:
    def __init__(self):
        self.closed = False
    async def close(self):
        await asyncio.sleep(0.05)
        self.closed = True

async def use_conn():
    conn = FakeConnection()
    try:
        # pretend to do some work
        for _ in range(10):
            await asyncio.sleep(0.1)
    finally:
        await conn.close()
        print("closed =", conn.closed)

task = asyncio.create_task(use_conn())
await asyncio.sleep(0.25)
task.cancel()
try:
    await task
except asyncio.CancelledError:
    print("cancel observed by caller")

## 13) Realistic mini-project: async web fetcher (optional)
For real HTTP, you typically use `aiohttp`.

This section is **optional** because the library may not be installed.
If you want to run it, install:
```bash
pip install aiohttp
```
Then run the cells below.

In [None]:
# Uncomment if you have aiohttp installed
# import aiohttp, asyncio, time
# 
# async def fetch(session, url):
#     async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
#         text = await resp.text()
#         return url, resp.status, len(text)
# 
# async def fetch_all(urls, limit=10):
#     sem = asyncio.Semaphore(limit)
#     async with aiohttp.ClientSession() as session:
#         async def bounded(url):
#             async with sem:
#                 return await fetch(session, url)
#         return await asyncio.gather(*(bounded(u) for u in urls))
# 
# urls = ["https://example.com"] * 20
# t0 = time.perf_counter()
# results = await fetch_all(urls, limit=5)
# print("took", round(time.perf_counter()-t0, 2), "seconds")
# print(results[:3], "...")

## 14) Common pitfalls (read carefully)
1. **Blocking calls inside async code** (`time.sleep`, heavy CPU loops). Use `await asyncio.sleep`, `to_thread`, or process pools.
2. **Creating tasks and forgetting them** ‚Üí lost exceptions / memory leaks.
3. **Over-parallelizing** ‚Üí rate limits, file descriptor exhaustion, slowdowns. Use semaphores.
4. **Not handling cancellation** ‚Üí stuck tasks, resource leaks.
5. **Assuming async == faster**. It‚Äôs faster only when your workload is I/O-bound or highly concurrent.
6. **Mixing event loops** (common in notebooks). Prefer top-level `await` or use `asyncio.run` in scripts.

## 15) Exercises (do these to *master* asyncio)
### Beginner
1) Write `async def countdown(n)` that prints `n..1` with 0.2s between.
2) Create two countdowns concurrently and measure time.

### Intermediate
3) Build a producer/consumer pipeline with 3 consumers and bounded queue size.
4) Add cancellation: cancel consumers after 2 seconds and ensure graceful shutdown.

### Advanced
5) Implement `bounded_gather(funcs, limit)` where `funcs` is an iterable of coroutine *factories* (callables that return a coroutine).
6) Implement retry with **jitter**, **timeout**, and an allowlist of exceptions.
7) Build a TCP server that supports multiple messages per connection (readline loop) and broadcasts to all connected clients.

## 16) Cheatsheet (printable)
- Run main: `asyncio.run(main())`
- Schedule: `task = asyncio.create_task(coro())`
- Wait many: `await asyncio.gather(*tasks)`
- Stream results: `for fut in asyncio.as_completed(tasks): ...`
- Timeout: `await asyncio.wait_for(coro(), timeout=...)`
- Cancel: `task.cancel()` and handle `asyncio.CancelledError`
- Limit concurrency: `asyncio.Semaphore(k)`
- Producer/consumer: `asyncio.Queue()`
- Blocking I/O: `await asyncio.to_thread(blocking_fn)`
- Structured concurrency (3.11+): `async with asyncio.TaskGroup() as tg: ...`