# Python Async/Await - Complete Beginner's Guide üêç

This notebook will take you from zero to confident in understanding and using:
1. **Async/Await** - Writing asynchronous, concurrent code
2. **Event Loops** - Understanding how async works under the hood
3. **Concurrency Patterns** - Tasks, futures, and async context managers
4. **FastAPI Integration** - How async powers FastAPI's performance

Async/await is essential for modern Python web development and FastAPI!

## ‚ö†Ô∏è Important Note for Jupyter Notebooks

**In Jupyter notebooks**, you can use `await` directly in cells - no need for `asyncio.run()`!

- ‚úÖ **In Jupyter**: Use `await my_function()` directly
- ‚úÖ **In Python scripts**: Use `asyncio.run(my_function())`

Jupyter notebooks already have a running event loop, so `asyncio.run()` will cause an error. All examples in this notebook use `await` directly for Jupyter compatibility.

Let's start learning! üöÄ

## Part 1: What are Async/Await?

### Simple Definition
**Async/await** is Python's way of writing asynchronous code that can handle many things at once without blocking. It allows your program to wait for slow operations (like network requests, file I/O, or database queries) while doing other work.

### Key Concepts
- ‚úÖ **Async Function**: A function defined with `async def` that can be paused and resumed
- ‚úÖ **Await**: A keyword that pauses execution until an async operation completes
- ‚úÖ **Event Loop**: Manages and executes async code
- ‚úÖ **Non-blocking**: Doesn't freeze the entire program while waiting

### Why Async/Await?
- **Performance**: Handle thousands of concurrent operations efficiently
- **Scalability**: Serve many clients simultaneously (perfect for web servers!)
- **Resource Efficiency**: Use one thread instead of many
- **FastAPI**: FastAPI is built on async/await for high performance

### Real-World Analogy üè™
Think of a restaurant:
- **Synchronous**: One waiter serves one table at a time (slow!)
- **Asynchronous**: One waiter takes orders, goes to kitchen, checks other tables while food cooks (fast!)

## Part 2: Synchronous vs Asynchronous

Let's understand the difference with a simple example!

In [1]:
# Synchronous (Blocking) Code - Traditional Python

import time

def fetch_data():
    """Simulate a slow operation (like API call)"""
    print("Fetching data...")
    time.sleep(2)  # Blocks for 2 seconds
    return "Data fetched"

def process_data():
    """Simulate processing"""
    print("Processing data...")
    time.sleep(1)  # Blocks for 1 second
    return "Data processed"

# Synchronous execution - one after another
start = time.time()
result1 = fetch_data()  # Takes 2 seconds
result2 = process_data()  # Takes 1 second
end = time.time()

print(f"\nResults: {result1}, {result2}")
print(f"Total time (synchronous): {end - start:.2f} seconds")
# Total: 3 seconds (2 + 1)

Fetching data...
Processing data...

Results: Data fetched, Data processed
Total time (synchronous): 3.00 seconds


In [3]:
# Asynchronous (Non-blocking) Code

import asyncio
import time

async def fetch_data_async():
    """Simulate a slow async operation"""
    print("Fetching data...")
    await asyncio.sleep(2)  # Non-blocking sleep - allows other code to run
    return "Data fetched"

async def process_data_async():
    """Simulate async processing"""
    print("Processing data...")
    await asyncio.sleep(1)  # Non-blocking
    return "Data processed"

# Asynchronous execution - can run concurrently
async def main():
    start = time.time()
    
    # Run both concurrently!
    result1, result2 = await asyncio.gather(
        fetch_data_async(),
        process_data_async()
    )
    
    end = time.time()
    
    print(f"\nResults: {result1}, {result2}")
    print(f"Total time (asynchronous): {end - start:.2f} seconds")
    # Total: ~2 seconds (max of 2 and 1, not sum!)
    return result1, result2

# In Jupyter notebooks, use await directly (no asyncio.run needed!)
await main()

Fetching data...
Processing data...



Results: Data fetched, Data processed
Total time (asynchronous): 2.02 seconds


('Data fetched', 'Data processed')

## Part 3: Understanding Async Functions

Let's learn how to define and use async functions!

In [None]:
# Defining Async Functions

# Regular function
def regular_function():
    return "Hello"

# Async function - use 'async def'
async def async_function():
    return "Hello"

# Calling them is different!
print("Regular function:", regular_function())  # Returns "Hello"

# Async functions don't run immediately - they return a coroutine
# Note: This will show a warning because we're not awaiting it - that's intentional!
# We're just showing what a coroutine object looks like
coro_object = async_function()  # Creates coroutine, doesn't run it
print("Async function (not awaited):", coro_object)  # Returns a coroutine object!
print(f"Type: {type(coro_object)}")  # <class 'coroutine'>

# To actually run an async function, you need to await it:
import asyncio

# In Jupyter notebooks, use await directly (no asyncio.run needed!)
result = await async_function()  # Actually runs the function
print("Async function (awaited):", result)  # Returns "Hello"

Regular function: Hello
Async function (not awaited): <coroutine object async_function at 0x000001D4387DB320>
Async function (awaited): Hello


  result = async_function()


'Hello'

In [11]:
# What is a Coroutine?

import asyncio

async def greet(name: str):
    """An async function creates a coroutine"""
    return f"Hello, {name}!"

# When you call an async function, you get a coroutine object
# Note: This creates the coroutine but doesn't run it yet
coro = greet("Alice")
print(f"Type: {type(coro)}")  # <class 'coroutine'>
print(f"Is coroutine? {asyncio.iscoroutine(coro)}")  # True

# Coroutines must be awaited or run in an event loop
# This WON'T work: coro()  # TypeError: coroutine was never awaited

# In Jupyter notebooks, use await directly to actually run the coroutine:
result = await coro
print(result)

# You can also call and await in one line:
result2 = await greet("Bob")
print(result2)

Type: <class 'coroutine'>
Is coroutine? True
Hello, Alice!
Hello, Bob!


## Part 4: The `await` Keyword

The `await` keyword is how we pause execution and wait for async operations!

In [12]:
# Understanding await

import asyncio
import time

async def slow_operation():
    """Simulates a slow operation"""
    print("Starting slow operation...")
    await asyncio.sleep(2)  # Wait 2 seconds (non-blocking)
    print("Slow operation completed!")
    return "Done"

async def main():
    print("Before await")
    result = await slow_operation()  # Pause here, but allow other code to run
    print("After await")
    print(f"Result: {result}")

await main()

# Key points:
# 1. `await` can only be used inside `async def` functions
# 2. `await` pauses the current function, but not the entire program
# 3. While waiting, Python can run other async code

Before await
Starting slow operation...
Slow operation completed!
After await
Result: Done


In [13]:
# Sequential await - one after another

import asyncio
import time

async def task1():
    await asyncio.sleep(1)
    return "Task 1 done"

async def task2():
    await asyncio.sleep(1)
    return "Task 2 done"

async def main():
    start = time.time()
    
    # Sequential - wait for each to complete
    result1 = await task1()  # Takes 1 second
    result2 = await task2()  # Takes 1 second
    
    end = time.time()
    
    print(f"{result1}, {result2}")
    print(f"Sequential time: {end - start:.2f} seconds")  # ~2 seconds

await main()

Task 1 done, Task 2 done
Sequential time: 2.01 seconds


In [14]:
# Concurrent await - run multiple at once

async def main():
    start = time.time()
    
    # Concurrent - run both at the same time!
    result1, result2 = await asyncio.gather(
        task1(),  # Starts running
        task2()   # Also starts running immediately
    )
    
    end = time.time()
    
    print(f"{result1}, {result2}")
    print(f"Concurrent time: {end - start:.2f} seconds")  # ~1 second (not 2!)

await main()

# asyncio.gather() runs multiple coroutines concurrently
# Total time = max of all operations, not sum!

Task 1 done, Task 2 done
Concurrent time: 1.01 seconds


## Part 5: Event Loop

The event loop is the engine that runs async code. Understanding it helps!

In [15]:
# Event Loop - The Engine Behind Async

import asyncio

async def say_hello():
    await asyncio.sleep(1)
    print("Hello!")
    return "Done"

# In Jupyter notebooks, use await directly (no asyncio.run needed!)
# Jupyter already has an event loop running
result = await say_hello()
print(f"Result: {result}")

# Note: In regular Python scripts, use asyncio.run()
# In Jupyter notebooks, just use await directly!

Hello!
Result: Done


## Part 6: Running Multiple Async Operations

Let's explore different ways to run multiple async operations!

In [None]:
# asyncio.gather() - Run multiple coroutines concurrently

import asyncio
import time

async def fetch_url(url: str, delay: float):
    """Simulate fetching a URL"""
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    start = time.time()
    
    # Run 3 operations concurrently
    results = await asyncio.gather(
        fetch_url("url1.com", 1.0),
        fetch_url("url2.com", 1.5),
        fetch_url("url3.com", 0.5)
    )
    
    end = time.time()
    
    for result in results:
        print(result)
    
    print(f"\nTotal time: {end - start:.2f} seconds")  # ~1.5 seconds (max of all)
    print("If sequential: ~3.0 seconds (1.0 + 1.5 + 0.5)")

await main()

In [None]:
# asyncio.create_task() - Create tasks explicitly

import asyncio
import time

async def fetch_data(id: int, delay: float):
    print(f"Task {id} started")
    await asyncio.sleep(delay)
    print(f"Task {id} completed")
    return f"Result from task {id}"

async def main():
    start = time.time()
    
    # Create tasks (they start running immediately!)
    task1 = asyncio.create_task(fetch_data(1, 1.0))
    task2 = asyncio.create_task(fetch_data(2, 1.0))
    task3 = asyncio.create_task(fetch_data(3, 1.0))
    
    print("All tasks created and running...")
    
    # Wait for all tasks to complete
    results = await asyncio.gather(task1, task2, task3)
    
    end = time.time()
    
    print(f"\nResults: {results}")
    print(f"Total time: {end - start:.2f} seconds")  # ~1 second, not 3!

await main()

# Key difference:
# - gather(): Returns results as a list
# - create_task(): Returns Task objects, can cancel individual tasks

In [None]:
# Running tasks in a loop

import asyncio
import time

async def process_item(item: str, delay: float):
    """Process a single item"""
    await asyncio.sleep(delay)
    return f"Processed {item}"

async def main():
    items = ["A", "B", "C", "D", "E"]
    
    start = time.time()
    
    # Method 1: Using gather (all at once)
    tasks = [process_item(item, 0.5) for item in items]
    results = await asyncio.gather(*tasks)  # Unpack list with *
    
    end = time.time()
    
    print("Results:", results)
    print(f"Time with gather: {end - start:.2f} seconds")  # ~0.5 seconds
    
    # Method 2: Sequential (for comparison)
    start2 = time.time()
    results2 = []
    for item in items:
        result = await process_item(item, 0.5)
        results2.append(result)
    end2 = time.time()
    
    print(f"Time sequential: {end2 - start2:.2f} seconds")  # ~2.5 seconds

await main()

## Part 7: Async Context Managers

Like regular context managers, but for async resources!

In [16]:
# Async Context Managers - For async resource management

import asyncio

# Regular context manager (synchronous)
class Database:
    def __enter__(self):
        print("Connecting to database...")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")

# Async context manager
class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting to database (async)...")
        await asyncio.sleep(0.1)  # Simulate async connection
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection (async)...")
        await asyncio.sleep(0.1)  # Simulate async cleanup

# Usage
async def main():
    # Use async with statement
    async with AsyncDatabase() as db:
        print("Using database...")
        await asyncio.sleep(1)
        print("Done using database")
    # Connection automatically closed here

await main()

Connecting to database (async)...
Using database...
Done using database
Closing database connection (async)...


In [17]:
# Using contextlib.asynccontextmanager

from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def get_connection():
    """Async context manager using decorator"""
    print("Establishing connection...")
    connection = "CONNECTED"
    
    try:
        yield connection  # Provide the resource
    finally:
        print("Closing connection...")
        connection = None

async def main():
    async with get_connection() as conn:
        print(f"Using connection: {conn}")
        await asyncio.sleep(1)
    # Connection automatically closed

await main()

# This is very common in FastAPI for database sessions!

Establishing connection...
Using connection: CONNECTED
Closing connection...


## Part 8: Async Iterators and Generators

Iterating over data asynchronously!

In [18]:
# Async Iterators

import asyncio

class AsyncCounter:
    """Async iterator that counts asynchronously"""
    def __init__(self, max_count: int):
        self.max_count = max_count
        self.current = 0
    
    def __aiter__(self):
        """Return self as async iterator"""
        return self
    
    async def __anext__(self):
        """Return next value"""
        if self.current >= self.max_count:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.5)  # Simulate async operation
        self.current += 1
        return self.current

# Usage
async def main():
    async for count in AsyncCounter(5):
        print(f"Count: {count}")

await main()

Count: 1
Count: 2
Count: 3
Count: 4
Count: 5


In [19]:
# Async Generators

import asyncio

async def async_range(start: int, end: int):
    """Async generator (simpler than async iterator)"""
    for i in range(start, end):
        await asyncio.sleep(0.3)  # Simulate async operation
        yield i

async def main():
    # Use async for with async generator
    async for number in async_range(1, 6):
        print(f"Number: {number}")

await main()

# Async generators are simpler and more Pythonic than async iterators!

Number: 1
Number: 2
Number: 3
Number: 4
Number: 5


## Part 9: Error Handling in Async Code

Handling exceptions in async functions requires special attention!

In [None]:
# Error Handling in Async Functions

import asyncio

async def might_fail(success: bool):
    """Async function that might raise an error"""
    await asyncio.sleep(0.5)
    if not success:
        raise ValueError("Something went wrong!")
    return "Success!"

async def main():
    # Method 1: Try/except works normally
    try:
        result = await might_fail(True)
        print(f"Result: {result}")
    except ValueError as e:
        print(f"Error: {e}")
    
    # Method 2: Handle errors in concurrent operations
    try:
        results = await asyncio.gather(
            might_fail(True),
            might_fail(False),  # This will raise an error
            might_fail(True)
        )
    except ValueError as e:
        print(f"Error in gather: {e}")  # First error stops everything

await main()

In [None]:
# Handling Errors in gather() - return_exceptions

async def task_with_id(id: int, should_fail: bool):
    """Task that might fail"""
    await asyncio.sleep(0.5)
    if should_fail:
        raise ValueError(f"Task {id} failed!")
    return f"Task {id} succeeded"

async def main():
    # return_exceptions=True: Returns exceptions as results instead of raising
    results = await asyncio.gather(
        task_with_id(1, False),
        task_with_id(2, True),   # This will fail
        task_with_id(3, False),
        return_exceptions=True  # Don't stop on first error
    )
    
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

await main()

## Part 10: Timeouts and Cancellation

Sometimes you need to cancel operations that take too long!

In [None]:
# asyncio.wait_for() - Add timeout to async operations

import asyncio

async def slow_operation(duration: float):
    """Operation that takes some time"""
    print(f"Starting operation (will take {duration}s)...")
    await asyncio.sleep(duration)
    return f"Completed in {duration}s"

async def main():
    # Without timeout
    try:
        result = await asyncio.wait_for(slow_operation(2.0), timeout=1.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")
    
    # With sufficient timeout
    try:
        result = await asyncio.wait_for(slow_operation(0.5), timeout=1.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")

await main()

In [None]:
# Cancelling Tasks

import asyncio

async def long_running_task(id: int):
    """A task that runs for a long time"""
    try:
        print(f"Task {id} started")
        await asyncio.sleep(10)  # Simulate long operation
        print(f"Task {id} completed")
        return f"Task {id} result"
    except asyncio.CancelledError:
        print(f"Task {id} was cancelled!")
        raise  # Re-raise to properly cancel

async def main():
    # Create tasks
    task1 = asyncio.create_task(long_running_task(1))
    task2 = asyncio.create_task(long_running_task(2))
    
    # Wait a bit
    await asyncio.sleep(0.5)
    
    # Cancel task1
    task1.cancel()
    
    # Wait for both (task1 will be cancelled, task2 will run)
    try:
        results = await asyncio.gather(task1, task2, return_exceptions=True)
        print(f"Results: {results}")
    except Exception as e:
        print(f"Error: {e}")

await main()

## Part 11: Mixing Async and Sync Code

Sometimes you need to call regular functions from async code, or vice versa!

In [None]:
# Calling Sync Functions from Async Code

import asyncio
import time

def sync_function():
    """Regular synchronous function"""
    time.sleep(0.5)  # Blocking sleep
    return "Sync result"

async def async_function():
    """Async function"""
    await asyncio.sleep(0.5)  # Non-blocking sleep
    return "Async result"

async def main():
    # You can call sync functions from async code
    # BUT: They will block the event loop!
    result1 = sync_function()  # This blocks everything!
    print(f"Sync result: {result1}")
    
    # Better: Use async version
    result2 = await async_function()  # This doesn't block
    print(f"Async result: {result2}")

await main()

# Warning: Calling blocking sync functions in async code
# will block the entire event loop and defeat the purpose of async!

In [None]:
# Running Sync Code in Thread Pool (to avoid blocking)

import asyncio
import time

def blocking_operation(duration: float):
    """A blocking operation (like file I/O or CPU-bound work)"""
    time.sleep(duration)  # Blocks
    return f"Completed after {duration}s"

async def main():
    # Run blocking function in thread pool (doesn't block event loop)
    result = await asyncio.to_thread(blocking_operation, 1.0)
    print(result)
    
    # You can run multiple blocking operations concurrently
    results = await asyncio.gather(
        asyncio.to_thread(blocking_operation, 0.5),
        asyncio.to_thread(blocking_operation, 0.8),
        asyncio.to_thread(blocking_operation, 0.3)
    )
    print(f"Results: {results}")

await main()

# asyncio.to_thread() runs sync code in a separate thread
# This prevents blocking the event loop!

In [None]:
# Calling Async Functions from Sync Code

import asyncio

async def async_function():
    await asyncio.sleep(1)
    return "Async result"

# Method 1: Using asyncio.run() (creates new event loop)
def sync_caller1():
    result = asyncio.run(async_function())
    return result

# Method 2: If already in async context, use await
async def async_caller():
    result = await async_function()
    return result

# Usage in Jupyter - use await directly
# In regular Python scripts, you'd use asyncio.run()
result = await async_function()
print("From async code:", result)

# Note: In Jupyter notebooks, you can use await directly in cells
# In regular Python scripts, use asyncio.run() or await in async functions

## Part 12: Common Async Patterns

Real-world patterns you'll use frequently!

In [None]:
# Pattern 1: Rate Limiting - Control how fast operations run

import asyncio
import time

async def rate_limited_operation(id: int, semaphore: asyncio.Semaphore):
    """Operation that respects rate limiting"""
    async with semaphore:  # Acquire semaphore (only N can run at once)
        print(f"Operation {id} started")
        await asyncio.sleep(1)
        print(f"Operation {id} completed")
        return f"Result {id}"

async def main():
    # Semaphore with limit of 2 (only 2 operations at a time)
    semaphore = asyncio.Semaphore(2)
    
    # Create 5 operations
    tasks = [rate_limited_operation(i, semaphore) for i in range(5)]
    
    start = time.time()
    results = await asyncio.gather(*tasks)
    end = time.time()
    
    print(f"\nResults: {results}")
    print(f"Time: {end - start:.2f}s")  # ~3 seconds (2 at a time)

await main()

In [None]:
# Pattern 2: Retry Logic with Exponential Backoff

import asyncio
import random

async def unreliable_operation(attempt: int):
    """Operation that might fail"""
    await asyncio.sleep(0.1)
    # 70% chance of failure
    if random.random() < 0.7:
        raise ValueError(f"Failed on attempt {attempt}")
    return f"Success on attempt {attempt}"

async def retry_with_backoff(max_retries: int = 3):
    """Retry operation with exponential backoff"""
    for attempt in range(1, max_retries + 1):
        try:
            result = await unreliable_operation(attempt)
            return result
        except ValueError as e:
            if attempt == max_retries:
                raise
            wait_time = 2 ** attempt  # Exponential: 2, 4, 8 seconds
            print(f"Attempt {attempt} failed. Waiting {wait_time}s...")
            await asyncio.sleep(wait_time)
    raise ValueError("All retries exhausted")

async def main():
    try:
        result = await retry_with_backoff()
        print(f"Success: {result}")
    except ValueError as e:
        print(f"Failed: {e}")

await main()

In [None]:
# Pattern 3: Producer-Consumer with Async Queue

import asyncio
import random

async def producer(queue: asyncio.Queue, count: int):
    """Produce items and put them in queue"""
    for i in range(count):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"Item {i}"
        await queue.put(item)
        print(f"Produced: {item}")
    await queue.put(None)  # Signal completion

async def consumer(queue: asyncio.Queue, consumer_id: int):
    """Consume items from queue"""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"Consumer {consumer_id} consumed: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.6))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)  # Max 5 items
    
    # Start producer and consumers
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, 1),
        consumer(queue, 2)
    )

await main()

## Part 13: Async/Await in FastAPI

This is why we're learning async/await! FastAPI uses it extensively!

In [None]:
# FastAPI Example 1: Async Route Handlers

# Simulated FastAPI structure
from typing import Dict, Any
import asyncio

class FastAPIApp:
    """Simplified FastAPI-like structure"""
    routes = []
    
    def get(self, path: str):
        def decorator(func):
            self.routes.append(("GET", path, func))
            return func
        return decorator

app = FastAPIApp()

# Regular (synchronous) route handler
@app.get("/sync-endpoint")
def sync_endpoint() -> Dict[str, str]:
    """Synchronous endpoint - blocks while processing"""
    import time
    time.sleep(1)  # Blocks for 1 second
    return {"message": "Sync response"}

# Async route handler - much better!
@app.get("/async-endpoint")
async def async_endpoint() -> Dict[str, str]:
    """Async endpoint - doesn't block, can handle many requests"""
    await asyncio.sleep(1)  # Non-blocking wait
    return {"message": "Async response"}

print("FastAPI routes registered!")
print("Async endpoints can handle thousands of concurrent requests!")

# In real FastAPI:
# - Use async def for route handlers that do I/O (database, APIs, files)
# - FastAPI automatically runs them in the event loop
# - Much better performance and scalability!

In [None]:
# FastAPI Example 2: Async Database Operations

# Simulated async database
class AsyncDatabase:
    async def fetch_user(self, user_id: int):
        """Simulate async database query"""
        await asyncio.sleep(0.1)  # Simulate network delay
        return {"id": user_id, "name": f"User {user_id}"}
    
    async def fetch_users(self):
        """Simulate fetching multiple users"""
        await asyncio.sleep(0.2)
        return [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"}
        ]

db = AsyncDatabase()

@app.get("/users/{user_id}")
async def get_user(user_id: int) -> Dict[str, Any]:
    """Get single user - async database call"""
    user = await db.fetch_user(user_id)
    return user

@app.get("/users")
async def get_users() -> Dict[str, Any]:
    """Get all users - async database call"""
    users = await db.fetch_users()
    return {"users": users}

print("Database endpoints registered!")
print("Async database calls don't block the server!")

In [None]:
# FastAPI Example 3: Multiple Async Operations

@app.get("/user-profile/{user_id}")
async def get_user_profile(user_id: int) -> Dict[str, Any]:
    """Fetch user profile with multiple async operations"""
    
    # Run multiple async operations concurrently!
    user, posts, followers = await asyncio.gather(
        db.fetch_user(user_id),
        fetch_user_posts(user_id),      # Simulated
        fetch_user_followers(user_id)   # Simulated
    )
    
    return {
        "user": user,
        "posts": posts,
        "followers": followers
    }

async def fetch_user_posts(user_id: int):
    """Simulate fetching posts"""
    await asyncio.sleep(0.1)
    return [{"id": 1, "title": "Post 1"}]

async def fetch_user_followers(user_id: int):
    """Simulate fetching followers"""
    await asyncio.sleep(0.1)
    return [{"id": 2, "name": "Follower"}]

print("Concurrent operations endpoint registered!")
print("All operations run in parallel - much faster!")

In [None]:
# FastAPI Example 4: Async Dependencies

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_session():
    """Async dependency for database session"""
    print("Creating database session...")
    session = "DB_SESSION"
    try:
        yield session
    finally:
        print("Closing database session...")

@app.get("/data")
async def get_data(db: str = None):
    """Endpoint using async dependency"""
    async with get_db_session() as session:
        # Use session here
        await asyncio.sleep(0.1)
        return {"data": "result", "session": session}

print("Async dependency endpoint registered!")
print("FastAPI automatically manages async dependencies!")

## Part 14: Best Practices

Let's learn the do's and don'ts of async/await!

In [None]:
# Best Practice 1: Use async for I/O operations

# Good ‚úÖ - Use async for network, database, file I/O
async def fetch_data():
    await asyncio.sleep(1)  # Simulates network call
    return "Data"

# Avoid ‚ùå - Don't use async for CPU-bound work
async def calculate_fibonacci(n: int):
    # This is CPU-bound, not I/O-bound
    # Async won't help here - use threading or multiprocessing instead
    if n <= 1:
        return n
    return await calculate_fibonacci(n-1) + await calculate_fibonacci(n-2)

# Best Practice 2: Don't block the event loop

# Good ‚úÖ - Non-blocking
async def good_example():
    await asyncio.sleep(1)  # Non-blocking

# Bad ‚ùå - Blocks event loop
async def bad_example():
    import time
    time.sleep(1)  # Blocks everything!

# Best Practice 3: Use gather() for concurrent operations

# Good ‚úÖ - Concurrent
async def fetch_multiple():
    results = await asyncio.gather(
        fetch_data(),
        fetch_data(),
        fetch_data()
    )
    return results

# Less efficient ‚ùå - Sequential
async def fetch_sequential():
    results = []
    for _ in range(3):
        result = await fetch_data()  # One at a time
        results.append(result)
    return results

In [None]:
# Best Practice 4: Handle errors properly

# Good ‚úÖ - Proper error handling
async def robust_operation():
    try:
        result = await might_fail()
        return result
    except ValueError as e:
        print(f"Error: {e}")
        return None

# Good ‚úÖ - Use return_exceptions in gather
async def handle_multiple():
    results = await asyncio.gather(
        operation1(),
        operation2(),
        operation3(),
        return_exceptions=True  # Don't stop on first error
    )
    return results

# Best Practice 5: Use timeouts for external calls

# Good ‚úÖ - Always use timeouts
async def safe_external_call():
    try:
        result = await asyncio.wait_for(
            external_api_call(),
            timeout=5.0  # Don't wait forever
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "Request timed out"}

async def might_fail():
    await asyncio.sleep(0.5)
    return "Success"

async def operation1():
    await asyncio.sleep(0.1)
    return "Op1"

async def operation2():
    await asyncio.sleep(0.1)
    return "Op2"

async def operation3():
    await asyncio.sleep(0.1)
    return "Op3"

async def external_api_call():
    await asyncio.sleep(10)  # Simulates slow API
    return "API result"

## Part 15: Common Pitfalls and How to Avoid Them

Learn from common mistakes!

In [None]:
# Pitfall 1: Forgetting to await

async def fetch_data():
    await asyncio.sleep(1)
    return "Data"

# Wrong ‚ùå - Forgot await
async def wrong_way():
    result = fetch_data()  # Returns coroutine, not result!
    print(result)  # <coroutine object...>

# Correct ‚úÖ - Use await
async def correct_way():
    result = await fetch_data()  # Actually runs and gets result
    print(result)  # "Data"

# Pitfall 2: Blocking the event loop

# Wrong ‚ùå - Blocks event loop
async def blocking_way():
    import time
    time.sleep(5)  # Blocks everything for 5 seconds!

# Correct ‚úÖ - Use async sleep
async def non_blocking_way():
    await asyncio.sleep(5)  # Doesn't block, allows other code to run

# Pitfall 3: Not using gather() for concurrent operations

# Wrong ‚ùå - Sequential (slow)
async def sequential_way():
    result1 = await fetch_data()  # Wait 1s
    result2 = await fetch_data()  # Wait 1s more
    result3 = await fetch_data()  # Wait 1s more
    # Total: 3 seconds

# Correct ‚úÖ - Concurrent (fast)
async def concurrent_way():
    results = await asyncio.gather(
        fetch_data(),  # All run at once
        fetch_data(),
        fetch_data()
    )
    # Total: ~1 second

In [None]:
# Pitfall 4: Creating tasks but not awaiting them

# Wrong ‚ùå - Tasks created but not awaited
async def wrong_task_usage():
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(fetch_data())
    # Tasks run, but we don't wait for them!
    return "Done"  # Returns immediately, tasks might not be done

# Correct ‚úÖ - Await the tasks
async def correct_task_usage():
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(fetch_data())
    results = await asyncio.gather(task1, task2)  # Wait for both
    return results

# Pitfall 5: Using async when you don't need it

# Wrong ‚ùå - Unnecessary async
async def unnecessary_async():
    # No I/O operations, just computation
    return sum(range(1000))  # CPU-bound, async doesn't help

# Correct ‚úÖ - Use regular function for CPU-bound work
def regular_function():
    return sum(range(1000))  # Better for CPU-bound work

# Use async only for I/O-bound operations!

## Part 16: Practice Exercises üèãÔ∏è

Try these exercises to build your confidence!

### Exercise 1: Create an Async Function with Timeout

Create an async function that fetches data with a timeout:

In [None]:
# Exercise: Create fetch_with_timeout function
# It should fetch data but timeout after 2 seconds

# Solution:
async def fetch_data_slow():
    """Simulates slow data fetch"""
    await asyncio.sleep(3)  # Takes 3 seconds
    return "Data fetched"

async def fetch_with_timeout(timeout: float = 2.0):
    """Fetch data with timeout"""
    try:
        result = await asyncio.wait_for(fetch_data_slow(), timeout=timeout)
        return result
    except asyncio.TimeoutError:
        return "Request timed out"

# Test
result = await fetch_with_timeout()
print(result)  # "Request timed out" (because fetch takes 3s, timeout is 2s)

### Exercise 2: Batch Processing with Rate Limiting

Process a list of items with a rate limit (max 3 at a time):

In [None]:
# Exercise: Process items with rate limiting
# Process 10 items, but only 3 at a time

# Solution:
async def process_item(item: str, semaphore: asyncio.Semaphore):
    """Process a single item with rate limiting"""
    async with semaphore:
        print(f"Processing {item}...")
        await asyncio.sleep(0.5)  # Simulate work
        print(f"Completed {item}")
        return f"Processed {item}"

async def process_batch(items: list[str], max_concurrent: int = 3):
    """Process items in batches with rate limiting"""
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [process_item(item, semaphore) for item in items]
    results = await asyncio.gather(*tasks)
    return results

# Test
items = [f"Item {i}" for i in range(10)]
results = await process_batch(items, max_concurrent=3)
print(f"\nProcessed {len(results)} items")

### Exercise 3: Retry with Exponential Backoff

Create a retry mechanism that retries failed operations:

In [None]:
# Exercise: Create retry mechanism
# Retry up to 3 times with exponential backoff (1s, 2s, 4s)

# Solution:
import random

async def unreliable_operation():
    """Operation that randomly fails"""
    await asyncio.sleep(0.1)
    if random.random() < 0.6:  # 60% chance of failure
        raise ValueError("Operation failed")
    return "Success!"

async def retry_operation(max_retries: int = 3):
    """Retry operation with exponential backoff"""
    for attempt in range(1, max_retries + 1):
        try:
            result = await unreliable_operation()
            print(f"Success on attempt {attempt}")
            return result
        except ValueError as e:
            if attempt == max_retries:
                print(f"Failed after {max_retries} attempts")
                raise
            wait_time = 2 ** (attempt - 1)  # 1, 2, 4 seconds
            print(f"Attempt {attempt} failed. Retrying in {wait_time}s...")
            await asyncio.sleep(wait_time)
    raise ValueError("All retries exhausted")

# Test
try:
    result = await retry_operation()
    print(f"Final result: {result}")
except ValueError as e:
    print(f"Final error: {e}")

## Part 17: Key Takeaways & Summary üìù

### Core Concepts
‚úÖ **Async Functions**: Defined with `async def`, return coroutines  
‚úÖ **Await**: Pauses execution until async operation completes  
‚úÖ **Event Loop**: Manages and executes async code  
‚úÖ **Coroutines**: Objects returned by async functions  
‚úÖ **Non-blocking**: Allows other code to run while waiting  

### Key Functions
‚úÖ **asyncio.run()**: Run async code (creates event loop)  
‚úÖ **asyncio.gather()**: Run multiple coroutines concurrently  
‚úÖ **asyncio.create_task()**: Create tasks explicitly  
‚úÖ **asyncio.wait_for()**: Add timeout to operations  
‚úÖ **asyncio.to_thread()**: Run blocking code in thread pool  

### Async Patterns
‚úÖ **Async Context Managers**: `async with` for resource management  
‚úÖ **Async Iterators**: `async for` for async iteration  
‚úÖ **Async Generators**: Simpler async iteration  
‚úÖ **Semaphores**: Rate limiting and concurrency control  
‚úÖ **Queues**: Producer-consumer patterns  

### FastAPI Integration
‚úÖ **Async Route Handlers**: Use `async def` for I/O operations  
‚úÖ **Concurrent Operations**: Use `gather()` for parallel requests  
‚úÖ **Async Dependencies**: Database sessions, connections  
‚úÖ **Performance**: Handle thousands of concurrent requests  

### Best Practices
1. ‚úÖ Use async for I/O-bound operations (network, database, files)
2. ‚úÖ Don't block the event loop (avoid `time.sleep()`)
3. ‚úÖ Use `gather()` for concurrent operations
4. ‚úÖ Always use timeouts for external calls
5. ‚úÖ Handle errors properly with try/except
6. ‚úÖ Use `asyncio.to_thread()` for CPU-bound work
7. ‚úÖ Don't use async for CPU-bound operations

### Common Pitfalls
‚ùå Forgetting to `await` coroutines  
‚ùå Blocking the event loop with sync code  
‚ùå Not using `gather()` for concurrent operations  
‚ùå Creating tasks but not awaiting them  
‚ùå Using async when you don't need it  

---

## üéâ Congratulations!

You now understand:
- ‚úÖ What async/await is and why it matters
- ‚úÖ How to write async functions and use await
- ‚úÖ Event loops, coroutines, and tasks
- ‚úÖ Concurrent execution with gather()
- ‚úÖ Async context managers and iterators
- ‚úÖ Error handling and timeouts
- ‚úÖ How FastAPI uses async/await
- ‚úÖ Best practices and common pitfalls

**You're ready to write high-performance async code and leverage FastAPI's async capabilities!** üöÄ

### Next Steps:
1. Practice writing async functions
2. Build FastAPI endpoints with async handlers
3. Experiment with concurrent operations
4. Learn async database libraries (asyncpg, aiomysql)
5. Build real projects using async/await

Keep practicing and building! üí™