# Python 异步编程完整教程

## 课程目标
本课程将全面讲解 Python 的异步编程知识体系，包括：
- 异步编程的基本概念和原理
- 协程（Coroutines）的使用
- async/await 语法
- asyncio 模块详解
- 事件循环机制
- 并发编程模式对比
- 实际应用场景

## 1. 异步编程基础概念

### 1.1 什么是异步编程？

**同步编程（Synchronous）**：代码按顺序执行，一个任务完成后才能执行下一个任务。

**异步编程（Asynchronous）**：允许程序在等待某个操作完成时，继续执行其他任务。

### 1.2 为什么需要异步编程？

- **I/O 密集型任务**：网络请求、文件读写、数据库操作等需要等待的操作
- **提高效率**：在等待期间可以执行其他任务，而不是阻塞等待
- **资源利用**：单线程内实现并发，减少线程切换开销

### 1.3 同步 vs 异步示例对比

In [None]:
import time

# 同步方式：串行执行
def sync_task(name, duration):
    print(f"任务 {name} 开始")
    time.sleep(duration)  # 模拟耗时操作
    print(f"任务 {name} 完成")
    return f"{name} 结果"

print("=== 同步执行 ===")
start = time.time()
sync_task("A", 2)
sync_task("B", 2)
sync_task("C", 2)
print(f"总耗时: {time.time() - start:.2f} 秒\n")

In [None]:
import asyncio

# 异步方式：并发执行
async def async_task(name, duration):
    print(f"任务 {name} 开始")
    await asyncio.sleep(duration)  # 异步等待
    print(f"任务 {name} 完成")
    return f"{name} 结果"

async def main():
    print("=== 异步执行 ===")
    start = time.time()
    
    # 并发执行三个任务
    results = await asyncio.gather(
        async_task("A", 2),
        async_task("B", 2),
        async_task("C", 2)
    )
    
    print(f"总耗时: {time.time() - start:.2f} 秒")
    print(f"结果: {results}")

await main()  # 在 Jupyter 中可以直接使用 await

**关键观察**：
- 同步执行：总耗时约 6 秒（2+2+2）
- 异步执行：总耗时约 2 秒（并发执行）

## 2. 协程（Coroutines）

### 2.1 什么是协程？

协程是一种特殊的函数，可以在执行过程中暂停，并在稍后恢复执行。

**特点**：
- 使用 `async def` 定义
- 调用时不会立即执行，而是返回一个协程对象
- 需要通过 `await` 或事件循环来执行

In [None]:
# 定义一个协程
async def my_coroutine():
    print("协程开始执行")
    await asyncio.sleep(1)
    print("协程执行完成")
    return "协程返回值"

# 调用协程，返回协程对象
coro = my_coroutine()
print(f"协程对象: {coro}")
print(f"类型: {type(coro)}")

# 执行协程
result = await coro
print(f"返回值: {result}")

### 2.2 协程的执行方式

有三种主要方式执行协程：

In [None]:
async def simple_coro(x):
    await asyncio.sleep(0.5)
    return x * 2

# 方式1: 使用 await（在异步函数或 Jupyter 中）
result1 = await simple_coro(5)
print(f"方式1 结果: {result1}")

In [None]:
# 方式2: 使用 asyncio.run()（在普通脚本中）
# 注意：在 Jupyter 中已经有运行中的事件循环，不能使用 asyncio.run()
# 这里仅作演示

# result2 = asyncio.run(simple_coro(10))  # 在普通 Python 脚本中使用
print("方式2: asyncio.run() 用于普通脚本，不在 Jupyter 中使用")

In [None]:
# 方式3: 创建 Task 对象
task = asyncio.create_task(simple_coro(15))
print(f"Task 对象: {task}")
result3 = await task
print(f"方式3 结果: {result3}")

## 3. async 和 await 关键字

### 3.1 async 关键字

- 用于定义协程函数
- `async def` 定义的函数总是返回一个协程对象
- 只能在异步函数内使用 `await`

In [None]:
# async 函数示例
async def greet(name):
    return f"Hello, {name}!"

async def process_greeting(name):
    greeting = await greet(name)  # 等待协程完成
    print(greeting)
    return greeting

result = await process_greeting("Alice")

### 3.2 await 关键字

- 用于等待协程、Task 或 Future 对象完成
- 只能在 async 函数内使用
- 遇到 await 时，当前协程会暂停，让出控制权
- 等待的操作完成后，协程从暂停处继续执行

In [None]:
async def fetch_data(url, delay):
    print(f"开始获取 {url}")
    await asyncio.sleep(delay)  # 模拟网络延迟
    print(f"完成获取 {url}")
    return f"来自 {url} 的数据"

async def main():
    # 顺序执行（串行）
    print("--- 顺序执行 ---")
    start = time.time()
    data1 = await fetch_data("url1", 1)
    data2 = await fetch_data("url2", 1)
    print(f"顺序执行耗时: {time.time() - start:.2f} 秒\n")
    
    # 并发执行
    print("--- 并发执行 ---")
    start = time.time()
    data3, data4 = await asyncio.gather(
        fetch_data("url3", 1),
        fetch_data("url4", 1)
    )
    print(f"并发执行耗时: {time.time() - start:.2f} 秒")

await main()

### 3.3 await 可以等待的对象

await 可以等待三种类型的对象：
1. 协程（Coroutine）
2. Task
3. Future

In [None]:
async def demo_awaitable():
    # 1. 等待协程
    async def coro():
        return "协程结果"
    result1 = await coro()
    print(f"协程: {result1}")
    
    # 2. 等待 Task
    async def task_func():
        return "Task 结果"
    task = asyncio.create_task(task_func())
    result2 = await task
    print(f"Task: {result2}")
    
    # 3. 等待 Future
    future = asyncio.Future()
    future.set_result("Future 结果")
    result3 = await future
    print(f"Future: {result3}")

await demo_awaitable()

## 4. asyncio 模块详解

### 4.1 asyncio 核心组件

asyncio 是 Python 标准库中的异步 I/O 框架，主要包括：
- 事件循环（Event Loop）
- 协程和任务（Coroutines and Tasks）
- Futures
- 同步原语（Locks, Semaphores, Events）

### 4.2 常用 asyncio 函数

In [None]:
# asyncio.sleep() - 异步睡眠
async def sleep_demo():
    print("开始睡眠")
    await asyncio.sleep(1)
    print("睡眠结束")

await sleep_demo()

In [None]:
# asyncio.gather() - 并发运行多个协程
async def worker(name, delay):
    await asyncio.sleep(delay)
    return f"Worker {name} 完成"

async def gather_demo():
    results = await asyncio.gather(
        worker("A", 1),
        worker("B", 2),
        worker("C", 1.5)
    )
    print("所有任务完成:", results)

await gather_demo()

In [None]:
# asyncio.create_task() - 创建任务并调度到事件循环（任务会在事件循环获得控制权时开始执行）
async def create_task_demo():
    # 创建任务（此时任务被调度但尚未开始执行）
    task1 = asyncio.create_task(worker("Task1", 2))
    task2 = asyncio.create_task(worker("Task2", 1))
    
    print("任务已创建并调度，开始执行其他操作...")
    await asyncio.sleep(0.5)  # 让出控制权，任务开始并发执行
    print("等待任务完成...")
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"结果: {result1}, {result2}")

await create_task_demo()

In [None]:
# asyncio.wait() - 等待多个协程，更灵活的控制
async def wait_demo():
    tasks = [
        asyncio.create_task(worker(f"W{i}", i)) 
        for i in range(1, 4)
    ]
    
    # 等待所有任务完成
    done, pending = await asyncio.wait(tasks)
    
    print(f"已完成: {len(done)}, 待完成: {len(pending)}")
    for task in done:
        print(f"  - {task.result()}")

await wait_demo()

In [None]:
# asyncio.wait_for() - 设置超时
async def slow_operation():
    await asyncio.sleep(3)
    return "完成"

async def timeout_demo():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=1.0)
        print(f"结果: {result}")
    except asyncio.TimeoutError:
        print("操作超时！")

await timeout_demo()

### 4.3 asyncio.gather() vs asyncio.wait()

两者都用于并发运行多个协程，但有区别：

| 特性 | gather() | wait() |
|------|----------|--------|
| 返回值 | 结果列表（按输入顺序） | (done, pending) 集合 |
| 异常处理 | 默认抛出第一个异常 | 将异常包装在 Task 中 |
| 灵活性 | 简单易用 | 更灵活（可控制返回时机） |
| 取消任务 | 全部取消 | 可部分取消 |

## 5. 事件循环（Event Loop）

### 5.1 什么是事件循环？

事件循环是 asyncio 的核心，负责：
- 调度和执行异步任务
- 处理 I/O 事件
- 管理回调函数

In [None]:
# 获取当前事件循环
loop = asyncio.get_event_loop()
print(f"事件循环: {loop}")
print(f"是否运行中: {loop.is_running()}")

### 5.2 事件循环的工作原理

```
┌───────────────────────────┐
│  事件循环开始              │
└───────────┬───────────────┘
            │
            ▼
┌───────────────────────────┐
│  检查待执行任务            │
└───────────┬───────────────┘
            │
            ▼
┌───────────────────────────┐
│  执行可运行的协程          │
│  (遇到 await 时暂停)       │
└───────────┬───────────────┘
            │
            ▼
┌───────────────────────────┐
│  处理 I/O 事件             │
└───────────┬───────────────┘
            │
            ▼
┌───────────────────────────┐
│  恢复等待完成的协程        │
└───────────┬───────────────┘
            │
            └──────► 循环继续，直到所有任务完成
```

### 5.3 事件循环的使用方式

In [None]:
# 在 Jupyter 中，事件循环已经在运行
# 可以直接使用 await

async def example():
    return "直接在 Jupyter 中执行"

result = await example()
print(result)

In [None]:
# 在普通 Python 脚本中的使用方式（仅作演示）
code_example = '''
import asyncio

async def main():
    print("主函数")
    await asyncio.sleep(1)
    return "完成"

# 方式1: 使用 asyncio.run()（推荐，Python 3.7+）
result = asyncio.run(main())

# 方式2: 手动管理事件循环（旧方式）
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
loop.close()
'''

print("在普通脚本中的使用方式:")
print(code_example)

## 6. Task 和 Future

### 6.1 Task 对象

Task 是对协程的封装，用于在事件循环中调度执行。

In [None]:
async def my_task(n):
    print(f"任务 {n} 开始")
    await asyncio.sleep(1)
    print(f"任务 {n} 完成")
    return n * 2

# 创建 Task
task = asyncio.create_task(my_task(5))

print(f"Task 对象: {task}")
print(f"Task 状态: {task.done()}")

# 等待 Task 完成
result = await task
print(f"结果: {result}")
print(f"Task 状态: {task.done()}")

In [None]:
# Task 的常用方法
async def task_methods_demo():
    task = asyncio.create_task(asyncio.sleep(2))
    
    print(f"done(): {task.done()}")           # 是否完成
    print(f"cancelled(): {task.cancelled()}") # 是否被取消
    
    # 取消任务
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("任务被取消")
    
    print(f"cancelled(): {task.cancelled()}")

await task_methods_demo()

In [None]:
# 获取当前正在运行的任务
async def get_current_task():
    current = asyncio.current_task()
    print(f"当前任务: {current}")
    
    all_tasks = asyncio.all_tasks()
    print(f"所有任务数量: {len(all_tasks)}")

await get_current_task()

### 6.2 Future 对象

Future 是一个低级别的对象，表示一个异步操作的最终结果。Task 是 Future 的子类。

In [None]:
async def future_demo():
    # 创建 Future
    future = asyncio.Future()
    
    print(f"Future 完成状态: {future.done()}")
    
    # 设置结果
    future.set_result("Future 的结果")
    
    print(f"Future 完成状态: {future.done()}")
    
    # 获取结果
    result = await future
    print(f"结果: {result}")

await future_demo()

In [None]:
# Future 回调函数
async def future_callback_demo():
    future = asyncio.Future()
    
    # 添加完成时的回调
    def callback(fut):
        print(f"回调被触发，结果: {fut.result()}")
    
    future.add_done_callback(callback)
    
    # 设置结果会触发回调
    future.set_result("完成")
    
    await asyncio.sleep(0)  # 让事件循环处理回调

await future_callback_demo()

## 7. 异步迭代器和异步上下文管理器

### 7.1 异步迭代器（Async Iterators）

使用 `async for` 遍历异步序列。

In [None]:
# 定义异步迭代器
class AsyncCounter:
    def __init__(self, start, end):
        self.current = start
        self.end = end
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.5)  # 模拟异步操作
        self.current += 1
        return self.current - 1

# 使用异步迭代器
async def async_iterator_demo():
    async for num in AsyncCounter(1, 5):
        print(f"数字: {num}")

await async_iterator_demo()

In [None]:
# 异步生成器（更简洁的写法）
async def async_range(start, end):
    for i in range(start, end):
        await asyncio.sleep(0.3)
        yield i

async def async_generator_demo():
    async for num in async_range(10, 15):
        print(f"生成的数字: {num}")

await async_generator_demo()

### 7.2 异步上下文管理器（Async Context Managers）

使用 `async with` 管理异步资源。

In [None]:
# 定义异步上下文管理器
class AsyncResource:
    async def __aenter__(self):
        print("获取资源...")
        await asyncio.sleep(0.5)
        print("资源已获取")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源...")
        await asyncio.sleep(0.5)
        print("资源已释放")
        return False
    
    async def do_something(self):
        print("使用资源执行操作")
        await asyncio.sleep(0.5)

# 使用异步上下文管理器
async def context_manager_demo():
    async with AsyncResource() as resource:
        await resource.do_something()
    print("上下文已退出")

await context_manager_demo()

## 8. 同步原语（Synchronization Primitives）

asyncio 提供了多种同步原语，用于协调多个协程之间的执行。

### 8.1 Lock（锁）

In [None]:
# 使用 Lock 保护共享资源
async def lock_demo():
    lock = asyncio.Lock()
    shared_resource = 0
    
    async def increment(name):
        nonlocal shared_resource
        
        async with lock:  # 获取锁
            print(f"{name} 获得锁")
            temp = shared_resource
            await asyncio.sleep(0.1)  # 模拟处理时间
            shared_resource = temp + 1
            print(f"{name} 释放锁，资源值: {shared_resource}")
    
    # 并发执行多个任务
    await asyncio.gather(
        increment("任务1"),
        increment("任务2"),
        increment("任务3")
    )
    
    print(f"最终资源值: {shared_resource}")

await lock_demo()

### 8.2 Semaphore（信号量）

In [None]:
# 使用 Semaphore 限制并发数量
async def semaphore_demo():
    # 最多允许 2 个并发
    semaphore = asyncio.Semaphore(2)
    
    async def worker(name, duration):
        async with semaphore:
            print(f"{name} 开始工作")
            await asyncio.sleep(duration)
            print(f"{name} 完成工作")
    
    # 创建 5 个任务，但最多同时运行 2 个
    await asyncio.gather(
        worker("工人1", 1),
        worker("工人2", 1),
        worker("工人3", 1),
        worker("工人4", 1),
        worker("工人5", 1)
    )

await semaphore_demo()

### 8.3 Event（事件）

In [None]:
# 使用 Event 进行协程间通信
async def event_demo():
    event = asyncio.Event()
    
    async def waiter(name):
        print(f"{name} 等待事件...")
        await event.wait()  # 等待事件被设置
        print(f"{name} 收到事件信号！")
    
    async def setter():
        await asyncio.sleep(2)
        print("设置事件...")
        event.set()  # 设置事件
    
    # 并发运行等待者和设置者
    await asyncio.gather(
        waiter("等待者1"),
        waiter("等待者2"),
        setter()
    )

await event_demo()

### 8.4 Queue（队列）

In [None]:
# 使用 Queue 实现生产者-消费者模式
async def queue_demo():
    queue = asyncio.Queue(maxsize=3)
    
    async def producer(name, count):
        for i in range(count):
            item = f"{name}-项目{i}"
            await queue.put(item)
            print(f"生产者 {name} 生产: {item}, 队列大小: {queue.qsize()}")
            await asyncio.sleep(0.5)
    
    async def consumer(name):
        while True:
            item = await queue.get()
            print(f"消费者 {name} 消费: {item}")
            await asyncio.sleep(1)
            queue.task_done()
    
    # 创建生产者和消费者任务
    producers = [
        asyncio.create_task(producer("P1", 3)),
        asyncio.create_task(producer("P2", 3))
    ]
    consumers = [
        asyncio.create_task(consumer("C1")),
        asyncio.create_task(consumer("C2"))
    ]
    
    # 等待所有生产者完成
    await asyncio.gather(*producers)
    
    # 等待队列清空
    await queue.join()
    
    # 取消消费者任务
    for c in consumers:
        c.cancel()
    
    print("所有任务完成")

await queue_demo()

## 9. 并发编程模式对比

### 9.1 三种并发方式

Python 提供三种主要的并发编程方式：

| 方式 | 适用场景 | 优点 | 缺点 |
|------|----------|------|------|
| **多线程** (threading) | I/O 密集型任务 | 简单易用 | GIL 限制，不适合 CPU 密集 |
| **多进程** (multiprocessing) | CPU 密集型任务 | 真正并行，绕过 GIL | 内存开销大，通信复杂 |
| **异步** (asyncio) | I/O 密集型任务 | 高效，低开销 | 需要异步库支持 |

### 9.2 性能对比示例

In [None]:
import threading
import multiprocessing

def io_task(name):
    """模拟 I/O 密集型任务"""
    time.sleep(1)
    return f"{name} 完成"

# 1. 同步方式
def sync_approach():
    start = time.time()
    results = [io_task(f"任务{i}") for i in range(5)]
    print(f"同步执行耗时: {time.time() - start:.2f} 秒")

sync_approach()

In [None]:
# 2. 多线程方式
def threading_approach():
    start = time.time()
    threads = []
    
    for i in range(5):
        thread = threading.Thread(target=io_task, args=(f"任务{i}",))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    print(f"多线程执行耗时: {time.time() - start:.2f} 秒")

threading_approach()

In [None]:
# 3. 异步方式
async def async_io_task(name):
    await asyncio.sleep(1)
    return f"{name} 完成"

async def async_approach():
    start = time.time()
    results = await asyncio.gather(
        *[async_io_task(f"任务{i}") for i in range(5)]
    )
    print(f"异步执行耗时: {time.time() - start:.2f} 秒")

await async_approach()

### 9.3 何时使用异步？

**适合使用异步的场景**：
- 网络 I/O：HTTP 请求、WebSocket 连接
- 文件 I/O：大量文件读写
- 数据库操作：异步数据库驱动
- 高并发服务：Web 服务器、聊天服务器

**不适合使用异步的场景**：
- CPU 密集型任务：科学计算、图像处理（使用多进程）
- 简单脚本：没有并发需求
- 阻塞库：第三方库不支持异步

## 10. 实际应用案例

### 10.1 异步 HTTP 请求

In [None]:
# 模拟异步 HTTP 请求
async def fetch_url(url, delay):
    """模拟 HTTP 请求"""
    print(f"开始请求: {url}")
    await asyncio.sleep(delay)  # 模拟网络延迟
    print(f"完成请求: {url}")
    return f"来自 {url} 的数据"

async def fetch_multiple_urls():
    urls = [
        ("https://api.example.com/users", 1),
        ("https://api.example.com/posts", 1.5),
        ("https://api.example.com/comments", 0.8)
    ]
    
    start = time.time()
    
    # 并发请求所有 URL
    results = await asyncio.gather(
        *[fetch_url(url, delay) for url, delay in urls]
    )
    
    print(f"\n总耗时: {time.time() - start:.2f} 秒")
    print(f"获取了 {len(results)} 个结果")
    
    return results

results = await fetch_multiple_urls()

### 10.2 异步文件操作

In [None]:
# 模拟异步文件读写
async def read_file(filename):
    """模拟异步读取文件"""
    print(f"读取文件: {filename}")
    await asyncio.sleep(0.5)  # 模拟 I/O 延迟
    return f"{filename} 的内容"

async def write_file(filename, content):
    """模拟异步写入文件"""
    print(f"写入文件: {filename}")
    await asyncio.sleep(0.5)  # 模拟 I/O 延迟
    print(f"完成写入: {filename}")

async def file_operations():
    # 并发读取多个文件
    files = ["file1.txt", "file2.txt", "file3.txt"]
    contents = await asyncio.gather(
        *[read_file(f) for f in files]
    )
    
    # 处理内容
    processed = [c.upper() for c in contents]
    
    # 并发写入结果
    output_files = ["output1.txt", "output2.txt", "output3.txt"]
    await asyncio.gather(
        *[write_file(f, c) for f, c in zip(output_files, processed)]
    )
    
    print("所有文件操作完成")

await file_operations()

### 10.3 异步数据库操作（模拟）

In [None]:
# 模拟异步数据库操作
class AsyncDatabase:
    def __init__(self):
        self.data = {}
    
    async def connect(self):
        print("连接数据库...")
        await asyncio.sleep(0.5)
        print("数据库已连接")
    
    async def query(self, sql):
        print(f"执行查询: {sql}")
        await asyncio.sleep(1)  # 模拟查询延迟
        return [{"id": i, "name": f"用户{i}"} for i in range(3)]
    
    async def insert(self, table, data):
        print(f"插入数据到 {table}: {data}")
        await asyncio.sleep(0.5)
        return True
    
    async def close(self):
        print("关闭数据库连接")
        await asyncio.sleep(0.2)

async def database_demo():
    db = AsyncDatabase()
    
    await db.connect()
    
    # 并发执行多个查询
    results = await asyncio.gather(
        db.query("SELECT * FROM users"),
        db.query("SELECT * FROM posts"),
        db.query("SELECT * FROM comments")
    )
    
    print(f"\n获取到 {len(results)} 个查询结果")
    
    # 批量插入
    await asyncio.gather(
        db.insert("users", {"name": "Alice"}),
        db.insert("users", {"name": "Bob"})
    )
    
    await db.close()

await database_demo()

### 10.4 异步 Web 服务器（概念示例）

In [None]:
# 模拟异步 Web 服务器处理请求
async def handle_request(request_id, path):
    """处理单个 HTTP 请求"""
    print(f"请求 {request_id}: {path} - 开始处理")
    
    # 模拟各种异步操作
    if path == "/api/users":
        await asyncio.sleep(0.5)  # 数据库查询
        response = {"users": ["Alice", "Bob"]}
    elif path == "/api/posts":
        await asyncio.sleep(1)  # 复杂查询
        response = {"posts": ["Post 1", "Post 2"]}
    else:
        await asyncio.sleep(0.2)
        response = {"message": "Not Found"}
    
    print(f"请求 {request_id}: {path} - 处理完成")
    return response

async def web_server_simulation():
    """模拟处理多个并发请求"""
    requests = [
        (1, "/api/users"),
        (2, "/api/posts"),
        (3, "/api/users"),
        (4, "/api/comments"),
        (5, "/api/posts")
    ]
    
    start = time.time()
    
    # 并发处理所有请求
    responses = await asyncio.gather(
        *[handle_request(rid, path) for rid, path in requests]
    )
    
    print(f"\n处理 {len(requests)} 个请求，总耗时: {time.time() - start:.2f} 秒")
    print(f"平均响应时间: {(time.time() - start) / len(requests):.2f} 秒")

await web_server_simulation()

## 11. 异步编程最佳实践

### 11.1 常见陷阱和解决方案

In [None]:
# 陷阱1: 在异步函数中使用阻塞操作
import requests  # 同步库

# ❌ 错误示例
async def bad_example():
    # time.sleep() 会阻塞整个事件循环！
    # time.sleep(1)  # 不要这样做
    
    # requests 是同步库，会阻塞事件循环！
    # response = requests.get("https://api.example.com")  # 不要这样做
    pass

# ✅ 正确示例
async def good_example():
    # 使用异步睡眠
    await asyncio.sleep(1)
    
    # 使用异步 HTTP 库（如 aiohttp）
    # async with aiohttp.ClientSession() as session:
    #     async with session.get("https://api.example.com") as response:
    #         data = await response.json()
    pass

print("示例：避免在异步函数中使用阻塞操作")

In [None]:
# 陷阱2: 忘记 await
async def fetch_data():
    await asyncio.sleep(0.5)
    return "数据"

# ❌ 错误：忘记 await
async def bad_usage():
    result = fetch_data()  # 这返回的是协程对象，不是结果！
    print(f"结果类型: {type(result)}")

# ✅ 正确：使用 await
async def good_usage():
    result = await fetch_data()  # 正确获取结果
    print(f"结果: {result}")

await bad_usage()
await good_usage()

In [None]:
# 陷阱3: 不必要的串行执行
async def task1():
    await asyncio.sleep(1)
    return "任务1"

async def task2():
    await asyncio.sleep(1)
    return "任务2"

# ❌ 低效：串行执行（耗时2秒）
async def sequential():
    start = time.time()
    result1 = await task1()
    result2 = await task2()
    print(f"串行耗时: {time.time() - start:.2f} 秒")

# ✅ 高效：并发执行（耗时1秒）
async def concurrent():
    start = time.time()
    result1, result2 = await asyncio.gather(task1(), task2())
    print(f"并发耗时: {time.time() - start:.2f} 秒")

await sequential()
await concurrent()

### 11.2 性能优化技巧

In [None]:
# 技巧1: 使用 create_task 提前调度任务
async def optimization_demo():
    # 创建并调度任务（不立即等待，让任务在后台运行）
    task1 = asyncio.create_task(asyncio.sleep(1))
    task2 = asyncio.create_task(asyncio.sleep(1))
    
    # 可以在等待之前做其他事情
    # 当我们执行下面的代码时，task1 和 task2 已经在后台并发运行
    print("任务已调度，执行其他操作...")
    
    # 稍后等待结果
    await task1
    await task2
    print("任务完成")

await optimization_demo()

In [None]:
# 技巧2: 使用 Semaphore 控制并发量
async def limited_concurrency_demo():
    semaphore = asyncio.Semaphore(3)  # 最多3个并发
    
    async def limited_task(n):
        async with semaphore:
            print(f"任务 {n} 开始")
            await asyncio.sleep(1)
            print(f"任务 {n} 完成")
    
    # 创建10个任务，但最多同时运行3个
    await asyncio.gather(*[limited_task(i) for i in range(10)])

await limited_concurrency_demo()

### 11.3 异常处理

In [None]:
# 异步异常处理
async def risky_operation(n):
    await asyncio.sleep(0.5)
    if n == 2:
        raise ValueError(f"任务 {n} 失败")
    return f"任务 {n} 成功"

async def exception_handling_demo():
    # 演示1: gather 默认会传播第一个异常
    print("=== 演示1: 默认异常处理 ===")
    try:
        results = await asyncio.gather(
            *[risky_operation(i) for i in range(4)]
        )
    except ValueError as e:
        print(f"捕获异常: {e}\n")

    # 演示2: 使用 return_exceptions=True 继续执行所有任务
    print("=== 演示2: return_exceptions=True ===")
    results = await asyncio.gather(
        *[risky_operation(i) for i in range(4)],
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i}: {result}")

await exception_handling_demo()

## 12. 总结

### 核心要点

1. **异步编程基础**
   - 异步编程通过非阻塞方式提高 I/O 密集型任务的效率
   - 使用 `async def` 定义协程，使用 `await` 等待异步操作

2. **关键概念**
   - **协程**：可暂停和恢复的函数
   - **事件循环**：调度和执行异步任务的核心
   - **Task**：协程的封装，可在事件循环中调度
   - **Future**：表示异步操作的最终结果

3. **常用工具**
   - `asyncio.gather()`: 并发运行多个协程
   - `asyncio.create_task()`: 创建任务
   - `asyncio.wait()`: 灵活的等待控制
   - `asyncio.Queue`: 异步队列
   - 同步原语：Lock, Semaphore, Event

4. **最佳实践**
   - 避免在异步函数中使用阻塞操作
   - 合理使用并发执行，避免不必要的串行
   - 使用 Semaphore 控制并发量
   - 正确处理异常

5. **应用场景**
   - 网络请求（HTTP, WebSocket）
   - 数据库操作
   - 文件 I/O
   - Web 服务器
   - 实时通信

### 延伸学习

- **异步库**：aiohttp（HTTP）, aiofiles（文件）, asyncpg（PostgreSQL）
- **异步框架**：FastAPI, aiohttp, Sanic, Tornado
- **深入主题**：异步生成器、异步推导式、uvloop

### 参考资源

- [Python asyncio 官方文档](https://docs.python.org/3/library/asyncio.html)
- [Real Python - Async IO in Python](https://realpython.com/async-io-python/)
- [PEP 492 - Coroutines with async and await syntax](https://www.python.org/dev/peps/pep-0492/)

## 练习题

### 练习 1: 并发下载
编写一个异步函数，模拟下载 10 个文件（使用 `asyncio.sleep` 模拟下载时间），并计算总耗时。

### 练习 2: 生产者-消费者
实现一个完整的异步生产者-消费者模式，包括多个生产者和消费者。

### 练习 3: 超时处理
编写一个函数，同时发起多个异步请求，如果某个请求超时（超过指定时间），则跳过它继续执行其他请求。

### 练习 4: 限流
实现一个异步 API 客户端，使用 Semaphore 限制同时进行的请求数量不超过 5 个。

In [None]:
# 在此处完成练习
