# Chapter 16: Async/Await Programming

Master asynchronous programming with async/await, asyncio, and concurrent execution



### What is Async Programming? (Slide 47)


<p><strong>Asynchronous Programming</strong> - Execute code without blocking, enabling concurrent operations</p>
<p><strong>Key Concepts:</strong></p>
<ul>
<li><strong>Synchronous</strong> - One task completes before next starts (blocking)</li>
<li><strong>Asynchronous</strong> - Tasks can run concurrently (non-blocking)</li>
<li><strong>Coroutine</strong> - Function that can pause and resume</li>
<li><strong>Event Loop</strong> - Manages and executes async tasks</li>
</ul>
<p><strong>When to Use:</strong></p>
<ul>
<li>I/O-bound operations (network, files, databases)</li>
<li>Web scraping with multiple requests</li>
<li>Real-time applications (chat, streaming)</li>
<li>Concurrent API calls</li>
</ul>
<p><strong>Benefits:</strong> Better resource utilization, faster I/O operations, improved responsiveness</p>


### Sync vs Async - The Problem (Slide 48)


In [None]:
import time

# SYNCHRONOUS - Blocking
def download_file(name, seconds):
    print(f"Starting {name}")
    time.sleep(seconds)  # Simulate download
    print(f"Finished {name}")
    return f"Data from {name}"

# Downloads happen one at a time
start = time.time()
download_file("file1", 2)
download_file("file2", 2)
download_file("file3", 2)
end = time.time()

print(f"Total time: {end-start:.2f}s")  # ~6 seconds
# Output:
# Starting file1
# Finished file1  (2s later)
# Starting file2
# Finished file2  (2s later)
# Starting file3
# Finished file3  (2s later)
# Total time: 6.00s

# Problem: Wasted time waiting!
# Each download blocks the next


> **Note:** Synchronous code waits for each task to complete


### Async Solution - Concurrent Execution (Slide 49)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

# ASYNCHRONOUS - Non-blocking
async def download_file(name, seconds):
    print(f"Starting {name}")
    await asyncio.sleep(seconds)  # Non-blocking wait
    print(f"Finished {name}")
    return f"Data from {name}"

# Run concurrently
async def main():
    start = time.time()

    # Create tasks
    task1 = download_file("file1", 2)
    task2 = download_file("file2", 2)
    task3 = download_file("file3", 2)

    # Run concurrently
    results = await asyncio.gather(task1, task2, task3)

    end = time.time()
    print(f"Total time: {end-start:.2f}s")  # ~2 seconds!
    return results

await asyncio.create_task(main())
# Output:
# Starting file1
# Starting file2
# Starting file3
# Finished file1  (all finish ~2s later)
# Finished file2
# Finished file3
# Total time: 2.00s


> **Note:** Async runs tasks concurrently, not sequentially


### async/await Basics (Slide 50)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

# Define coroutine with 'async def'
async def greet(name, delay):
    print(f"Hello {name}!")
    await asyncio.sleep(delay)  # Pause execution
    print(f"Goodbye {name}!")
    return f"Greeted {name}"

# Cannot call async function directly!
# greet("Alice", 1)  # Returns coroutine object, doesn't run

# Must use await ) or await
async def main():
    result = await greet("Alice", 1)
    print(result)

# Run from synchronous code
await asyncio.create_task(main())

# Rules:
# 1. 'async def' creates coroutine
# 2. 'await' pauses until coroutine completes
# 3. Can only 'await' inside 'async def'
# 4. Use await ) to start from sync code


> **Note:** async def creates coroutines, await pauses execution


### Running Multiple Coroutines (Slide 51)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

async def fetch_data(source, delay):
    print(f"Fetching from {source}...")
    await asyncio.sleep(delay)
    return f"Data from {source}"

# Method 1: Sequential (slow)
async def sequential():
    result1 = await fetch_data("API-1", 2)
    result2 = await fetch_data("API-2", 2)
    result3 = await fetch_data("API-3", 2)
    return [result1, result2, result3]
# Takes 6 seconds total

# Method 2: Concurrent with gather (fast!)
async def concurrent():
    results = await asyncio.gather(
        fetch_data("API-1", 2),
        fetch_data("API-2", 2),
        fetch_data("API-3", 2)
    )
    return results
# Takes only 2 seconds total!

# Method 3: Create tasks explicitly
async def with_tasks():
    task1 = asyncio.create_task(fetch_data("API-1", 2))
    task2 = asyncio.create_task(fetch_data("API-2", 2))
    task3 = asyncio.create_task(fetch_data("API-3", 2))

    results = await asyncio.gather(task1, task2, task3)
    return results


> **Note:** asyncio.gather() runs coroutines concurrently


### Error Handling in Async (Slide 52)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

async def risky_operation(name, should_fail=False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError(f"{name} failed!")
    return f"{name} succeeded"

# Handle errors in individual coroutines
async def with_try_catch():
    try:
        result = await risky_operation("Task1", True)
    except ValueError as e:
        print(f"Caught: {e}")

# gather with return_exceptions
async def gather_with_errors():
    results = await asyncio.gather(
        risky_operation("Task1", False),
        risky_operation("Task2", True),  # Will fail
        risky_operation("Task3", False),
        return_exceptions=True  # Don't stop on error
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i+1} failed: {result}")
        else:
            print(f"Task {i+1} succeeded: {result}")

await gather_with_errors()


> **Note:** return_exceptions=True continues despite errors


### Async Context Managers (Slide 53)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

# Async context manager
class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(1)  # Simulate connection
        self.connection = "DB Connection"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database...")
        await asyncio.sleep(0.5)  # Simulate cleanup
        self.connection = None

    async def query(self, sql):
        await asyncio.sleep(0.2)
        return f"Results for: {sql}"

# Use async context manager
async def main():
    async with AsyncDatabase() as db:
        result = await db.query("SELECT * FROM users")
        print(result)
    # Database automatically closed

await asyncio.create_task(main())

# Also works with contextlib
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer(name):
    start = time.time()
    yield
    end = time.time()
    print(f"{name}: {end-start:.2f}s")


> **Note:** Use async with for async context managers


### Async Iterators & Generators (Slide 54)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

# Async iterator
class AsyncCounter:
    def __init__(self, max_count):
        self.max_count = max_count
        self.count = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.count >= self.max_count:
            raise StopAsyncIteration
        await asyncio.sleep(0.5)  # Async operation
        self.count += 1
        return self.count

# Async generator (simpler!)
async def async_range(n):
    for i in range(n):
        await asyncio.sleep(0.5)
        yield i

# Use async for loop
async def main():
    # With async iterator
    async for num in AsyncCounter(3):
        print(f"Count: {num}")

    # With async generator
    async for num in async_range(3):
        print(f"Range: {num}")

await asyncio.create_task(main())


> **Note:** async for iterates over async iterables


### asyncio.wait() - Advanced Control (Slide 55)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

async def worker_task(name, duration):
    await asyncio.sleep(duration)
    return f"{name} completed"

async def main():
    # Create tasks
    tasks = [
        asyncio.create_task(worker_task("Fast", 1)),
        asyncio.create_task(worker_task("Medium", 2)),
        asyncio.create_task(worker_task("Slow", 3))
    ]

    # Wait for first to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"First done: {done.pop().result()}")
    print(f"Still pending: {len(pending)}")

    # Cancel remaining
    for t in pending:
        t.cancel()

    # Wait with timeout
    try:
        result = await asyncio.wait_for(
            worker_task("Long", 10),
            timeout=2.0
        )
    except (asyncio.TimeoutError, RuntimeError):
        print("Task timed out!")

await asyncio.create_task(main())


> **Note:** wait() provides fine-grained control over tasks


### Real-World - Web Scraping (Slide 56)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time
import aiohttp  # pip install aiohttp

async def fetch_url(session, url):
    """Fetch single URL"""
    async with session.get(url) as response:
        html = await response.text()
        return len(html)

async def scrape_websites(urls):
    """Scrape multiple websites concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Usage
async def main():
    urls = [
        'https://example.com',
        'https://python.org',
        'https://github.com',
        'https://stackoverflow.com'
    ]

    import time
    start = time.time()
    sizes = await scrape_websites(urls)
    end = time.time()

    for url, size in zip(urls, sizes):
        print(f"{url}: {size} bytes")

    print(f"Scraped {len(urls)} sites in {end-start:.2f}s")

# await asyncio.create_task(main())
# Much faster than sequential requests!


> **Note:** Async shines for I/O-bound operations like HTTP


### Real-World - Async Database Queries (Slide 57)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time
# Simulating async database library

class AsyncDB:
    async def query(self, sql):
        await asyncio.sleep(0.5)  # Simulate query time
        return f"Results for: {sql}"

async def get_user(db, user_id):
    return await db.query(f"SELECT * FROM users WHERE id={user_id}")

async def get_orders(db, user_id):
    return await db.query(f"SELECT * FROM orders WHERE user_id={user_id}")

async def get_profile(db, user_id):
    return await db.query(f"SELECT * FROM profiles WHERE user_id={user_id}")

# SLOW: Sequential queries (1.5s)
async def fetch_user_data_slow(user_id):
    db = AsyncDB()
    user = await get_user(db, user_id)
    orders = await get_orders(db, user_id)
    profile = await get_profile(db, user_id)
    return {"user": user, "orders": orders, "profile": profile}

# FAST: Concurrent queries (0.5s)
async def fetch_user_data_fast(user_id):
    db = AsyncDB()
    user, orders, profile = await asyncio.gather(
        get_user(db, user_id),
        get_orders(db, user_id),
        get_profile(db, user_id)
    )
    return {"user": user, "orders": orders, "profile": profile}


> **Note:** Run independent DB queries concurrently


### Async Queue - Producer/Consumer (Slide 58)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

async def producer(queue, n):
    """Produce items into queue"""
    for i in range(n):
        item = f"item-{i}"
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.5)

    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    """Consume items from queue"""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break

        print(f"{name} consuming: {item}")
        await asyncio.sleep(1)  # Process item
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # Start producer and consumers
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2")
    )

await asyncio.create_task(main())


> **Note:** Async queues for producer/consumer patterns


### Mixing Sync and Async Code (Slide 59)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time
import time
from concurrent.futures import ThreadPoolExecutor

# Blocking synchronous function
def blocking_io(seconds):
    print(f"Blocking for {seconds}s...")
    time.sleep(seconds)  # Blocks!
    return f"Done after {seconds}s"

# Run blocking code in thread pool
async def run_blocking_in_thread():
    loop = asyncio.get_running_loop()

    # Run in separate thread
    result = await loop.run_in_executor(
        None,  # Use default executor
        blocking_io,
        2
    )
    return result

# Mix async and sync
async def main():
    # These run concurrently!
    results = await asyncio.gather(
        run_blocking_in_thread(),
        run_blocking_in_thread(),
        asyncio.sleep(1)  # Async operation
    )
    print(results)

await asyncio.create_task(main())

# Custom executor with more threads
async def with_custom_executor():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=5) as executor:
        result = await loop.run_in_executor(
            executor,
            blocking_io,
            3
        )
    return result


> **Note:** Use run_in_executor for blocking I/O


### Event Loop Control (Slide 60)


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import time

# Get current event loop
async def get_loop_info():
    loop = asyncio.get_running_loop()
    print(f"Loop: {loop}")
    print(f"Running: {loop.is_running()}")

# Schedule callbacks
def callback(name):
    print(f"Callback: {name}")

async def schedule_callbacks():
    loop = asyncio.get_running_loop()

    # Call soon (next iteration)
    loop.call_soon(callback, "immediate")

    # Call later (after delay)
    loop.call_later(2, callback, "delayed")

    # Call at specific time
    loop.call_at(loop.time() + 3, callback, "scheduled")

    await asyncio.sleep(5)  # Wait for callbacks

# Long-running event loop
async def custom_event_loop():
    async def worker(name):
        for i in range(3):
            print(f"{name}: {i}")
            await asyncio.sleep(1)

    # Create and schedule tasks
    await asyncio.gather(
        worker("Worker-1"),
        worker("Worker-2")
    )

await schedule_callbacks()


> **Note:** Fine-grained control over event loop


### Async Best Practices (Slide 61)


<p><strong>When to Use Async:</strong></p>
<ul>
<li><strong>YES:</strong> I/O-bound (network, files, databases)</li>
<li><strong>NO:</strong> CPU-bound (heavy computation) - use multiprocessing instead</li>
</ul>
<p><strong>Do:</strong></p>
<ul>
<li>Use <code>asyncio.gather()</code> for concurrent execution</li>
<li>Handle exceptions with <code>return_exceptions=True</code></li>
<li>Use <code>async with</code> for resource management</li>
<li>Prefer async libraries (aiohttp, asyncpg, aiofiles)</li>
<li>Use <code>run_in_executor()</code> for blocking code</li>
</ul>
<p><strong>Don't:</strong></p>
<ul>
<li>Mix blocking calls in async code (use executor)</li>
<li>Use async for CPU-bound tasks</li>
<li>Forget to await coroutines</li>
<li>Create too many concurrent tasks (use semaphores)</li>
</ul>
<p><strong>Common Pitfall:</strong> Forgetting <code>await</code> - coroutine runs when awaited, not when created!</p>
