# 第 10 章：非同步程式設計

本章節詳細說明 Python 的非同步程式設計，包含 async/await 語法、asyncio 模組，以及與 JavaScript 非同步模型的差異。

**注意**：由於 Jupyter Notebook 已經在事件迴圈中執行，部分範例使用 `await` 而非 `asyncio.run()`。在一般 Python 腳本中，請使用 `asyncio.run(main())`。

---

## 10.1 非同步基礎概念

### 同步 vs 非同步

In [None]:
import time

# 同步程式碼：依序執行，一個完成才執行下一個
def sync_task(name, seconds):
    print(f"{name} 開始")
    time.sleep(seconds)  # 阻塞整個程式
    print(f"{name} 完成")

print("=== 同步執行（每個任務 0.5 秒）===")
start = time.time()
sync_task("任務1", 0.5)
sync_task("任務2", 0.5)
sync_task("任務3", 0.5)
print(f"同步總時間：{time.time() - start:.2f} 秒")

In [None]:
import asyncio
import time

# 非同步程式碼：可以在等待時執行其他任務
async def async_task(name, seconds):
    print(f"{name} 開始")
    await asyncio.sleep(seconds)  # 非阻塞等待
    print(f"{name} 完成")

async def run_async_demo():
    print("=== 非同步執行（並行，每個任務 0.5 秒）===")
    start = time.time()
    # 並行執行三個任務
    await asyncio.gather(
        async_task("任務1", 0.5),
        async_task("任務2", 0.5),
        async_task("任務3", 0.5),
    )
    print(f"非同步總時間：{time.time() - start:.2f} 秒")

# 在 Jupyter 中直接 await
await run_async_demo()

### JavaScript vs Python 非同步模型

| 特性 | JavaScript | Python |
|------|------------|--------|
| 執行環境 | 單執行緒 + 事件迴圈（內建） | 需要明確建立事件迴圈 |
| 啟動方式 | 自動執行 | `asyncio.run()` |
| Promise | 內建 `Promise` | `asyncio.Future` |
| async 函式 | 回傳 `Promise` | 回傳 `Coroutine` |
| 頂層 await | 支援（ES2022+） | 僅在 `asyncio.run()` 內 |

---

## 10.2 async/await 語法

### 定義協程函式（Coroutine）

In [None]:
import asyncio

# 使用 async def 定義協程函式
async def fetch_data(url):
    print(f"開始取得 {url}")
    await asyncio.sleep(1)  # 模擬網路請求
    print(f"完成取得 {url}")
    return {"url": url, "data": "some data"}

# 呼叫協程函式會回傳協程物件，不會立即執行
coro = fetch_data("https://example.com")
print(f"協程物件類型：{type(coro)}")

# 必須使用 await 來執行
result = await coro
print(f"結果：{result}")

### JavaScript 對照

```javascript
// JavaScript
async function fetchData(url) {
    console.log(`開始取得 ${url}`);
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`完成取得 ${url}`);
    return { url, data: "some data" };
}

// 可以直接呼叫，回傳 Promise
fetchData("https://example.com").then(console.log);

// 或在 async 函式中使用 await
async function main() {
    const result = await fetchData("https://example.com");
    console.log(result);
}
main();
```

```python
# Python - 必須透過 asyncio.run() 執行
import asyncio

async def fetch_data(url):
    print(f"開始取得 {url}")
    await asyncio.sleep(1)
    print(f"完成取得 {url}")
    return {"url": url, "data": "some data"}

async def main():
    result = await fetch_data("https://example.com")
    print(result)

asyncio.run(main())  # Python 需要明確啟動
```

### await 關鍵字

In [None]:
import asyncio

async def get_user(user_id):
    await asyncio.sleep(0.3)
    return {"id": user_id, "name": f"User {user_id}"}

async def get_posts(user_id):
    await asyncio.sleep(0.3)
    return [{"id": 1, "title": "Post 1"}, {"id": 2, "title": "Post 2"}]

async def demo_await():
    # await 會暫停執行直到協程完成
    user = await get_user(1)
    print(f"使用者：{user}")

    posts = await get_posts(1)
    print(f"文章：{posts}")

await demo_await()

### 可等待物件（Awaitables）

Python 中有三種可等待物件：
1. **協程（Coroutine）** - async def 定義的函式呼叫結果
2. **Task** - 包裝協程的任務物件
3. **Future** - 低階的可等待物件

In [None]:
import asyncio

# 1. 協程（Coroutine）
async def my_coroutine():
    await asyncio.sleep(0.1)
    return "coroutine result"

# 2. Task - 包裝協程
async def demo_task():
    # 建立 Task，立即開始執行
    task = asyncio.create_task(my_coroutine())
    print(f"Task 類型：{type(task)}")
    result = await task
    print(f"Task 結果：{result}")

await demo_task()

---

## 10.3 asyncio 模組

### 執行協程的方式

```python
import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 方式 1：asyncio.run()（推薦，Python 3.7+）
asyncio.run(main())

# 方式 2：在 Jupyter Notebook 中
await main()

# 方式 3：手動管理事件迴圈（較舊的方式）
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
```

### asyncio.gather() - 並行執行多個協程

In [None]:
import asyncio
import time

async def fetch_url(url, delay):
    print(f"開始：{url}")
    await asyncio.sleep(delay)
    print(f"完成：{url}")
    return f"{url} 的資料"

async def demo_gather():
    start = time.time()
    
    # gather 會並行執行所有協程
    results = await asyncio.gather(
        fetch_url("url1", 2),
        fetch_url("url2", 1),
        fetch_url("url3", 3),
    )
    
    print(f"\n所有結果：{results}")
    print(f"總時間：{time.time() - start:.2f} 秒（最長的那個）")

await demo_gather()

**JavaScript 對照**：
```javascript
const results = await Promise.all([
    fetchUrl("url1", 2),
    fetchUrl("url2", 1),
    fetchUrl("url3", 3),
]);
```

### gather 的錯誤處理

In [None]:
import asyncio

async def may_fail(name, should_fail=False):
    await asyncio.sleep(0.5)
    if should_fail:
        raise ValueError(f"{name} 失敗了")
    return f"{name} 成功"

async def demo_gather_error():
    # 預設：一個失敗會拋出例外
    print("=== 預設行為（例外會拋出）===")
    try:
        results = await asyncio.gather(
            may_fail("任務1"),
            may_fail("任務2", should_fail=True),
            may_fail("任務3"),
        )
    except ValueError as e:
        print(f"捕捉到錯誤：{e}")

    # return_exceptions=True：錯誤會作為結果回傳
    print("\n=== return_exceptions=True ===")
    results = await asyncio.gather(
        may_fail("任務1"),
        may_fail("任務2", should_fail=True),
        may_fail("任務3"),
        return_exceptions=True
    )
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"任務{i} 錯誤：{result}")
        else:
            print(f"任務{i} 成功：{result}")

await demo_gather_error()

### asyncio.create_task() - 建立任務

In [None]:
import asyncio

async def background_task(name, seconds):
    print(f"{name} 開始")
    await asyncio.sleep(seconds)
    print(f"{name} 完成")
    return f"{name} 的結果"

async def demo_create_task():
    # create_task 會立即開始執行任務（不會阻塞）
    task1 = asyncio.create_task(background_task("任務1", 1))
    task2 = asyncio.create_task(background_task("任務2", 0.5))

    print("任務已建立，繼續做其他事...")
    print("（任務在背景執行中）")

    # 等待任務完成
    result1 = await task1
    result2 = await task2

    print(f"\n結果：{result1}, {result2}")

await demo_create_task()

#### create_task vs await 的差異

```python
# await coro() - 立即等待，完成後才繼續
result = await my_coroutine()  # 阻塞直到完成

# create_task(coro()) - 排程執行，可以稍後等待
task = asyncio.create_task(my_coroutine())  # 立即返回，任務在背景執行
# ... 做其他事 ...
result = await task  # 需要結果時再等待
```

### asyncio.wait() - 更細緻的控制

In [None]:
import asyncio

async def task(name, seconds):
    await asyncio.sleep(seconds)
    return f"{name} 完成"

async def demo_wait():
    tasks = [
        asyncio.create_task(task("A", 1)),
        asyncio.create_task(task("B", 2)),
        asyncio.create_task(task("C", 3)),
    ]

    # 等待第一個完成
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    print(f"第一個完成：{[t.result() for t in done]}")
    print(f"還在執行：{len(pending)} 個")

    # 等待剩餘的完成
    done, pending = await asyncio.wait(pending)
    print(f"全部完成：{[t.result() for t in done]}")

await demo_wait()

#### return_when 選項

| 選項 | 說明 |
|------|------|
| `FIRST_COMPLETED` | 第一個完成時回傳 |
| `FIRST_EXCEPTION` | 第一個例外時回傳 |
| `ALL_COMPLETED` | 全部完成時回傳（預設） |

### asyncio.wait_for() - 設定超時

In [None]:
import asyncio

async def slow_operation():
    print("慢速操作開始...")
    await asyncio.sleep(5)
    print("慢速操作完成")
    return "完成"

async def demo_timeout():
    try:
        # 設定 1 秒超時
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=1.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("操作超時！")

await demo_timeout()

**JavaScript 對照**：
```javascript
const result = await Promise.race([
    slowOperation(),
    new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), 1000)
    )
]);
```

### asyncio.sleep() vs time.sleep()

In [None]:
import asyncio
import time

# 示範：time.sleep() 會阻塞所有協程
async def blocking_task(name):
    print(f"{name} 開始")
    time.sleep(0.5)  # 阻塞！其他協程無法執行
    print(f"{name} 完成")

async def demo_blocking():
    print("=== 使用 time.sleep()（阻塞）===")
    start = time.time()
    await asyncio.gather(
        blocking_task("A"),
        blocking_task("B"),
        blocking_task("C"),
    )
    print(f"總時間：{time.time() - start:.2f} 秒（依序執行）")

await demo_blocking()

In [None]:
import asyncio
import time

# 示範：asyncio.sleep() 不會阻塞
async def non_blocking_task(name):
    print(f"{name} 開始")
    await asyncio.sleep(0.5)  # 非阻塞！其他協程可以執行
    print(f"{name} 完成")

async def demo_non_blocking():
    print("=== 使用 asyncio.sleep()（非阻塞）===")
    start = time.time()
    await asyncio.gather(
        non_blocking_task("A"),
        non_blocking_task("B"),
        non_blocking_task("C"),
    )
    print(f"總時間：{time.time() - start:.2f} 秒（並行執行）")

await demo_non_blocking()

---

## 10.4 TaskGroup（Python 3.11+）

Python 3.11 引入了更好的任務管理方式。

In [None]:
import asyncio
import sys

print(f"Python 版本：{sys.version_info.major}.{sys.version_info.minor}")

async def fetch(url):
    await asyncio.sleep(0.5)
    return f"{url} 的資料"

# TaskGroup 需要 Python 3.11+
if sys.version_info >= (3, 11):
    async def demo_taskgroup():
        # 使用 TaskGroup 管理多個任務
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch("url1"))
            task2 = tg.create_task(fetch("url2"))
            task3 = tg.create_task(fetch("url3"))

        # 離開 context manager 時，所有任務都已完成
        print(f"task1: {task1.result()}")
        print(f"task2: {task2.result()}")
        print(f"task3: {task3.result()}")
    
    await demo_taskgroup()
else:
    print("TaskGroup 需要 Python 3.11+，使用 gather 代替：")
    results = await asyncio.gather(
        fetch("url1"),
        fetch("url2"),
        fetch("url3"),
    )
    for i, result in enumerate(results, 1):
        print(f"task{i}: {result}")

#### TaskGroup vs gather 的差異

| 特性 | `asyncio.gather()` | `asyncio.TaskGroup` |
|------|-------------------|--------------------|
| 語法 | 函式呼叫 | context manager |
| 錯誤處理 | `return_exceptions=True` | ExceptionGroup |
| 取消行為 | 需手動處理 | 一個失敗會取消其他 |
| Python 版本 | 3.7+ | 3.11+ |

---

## 10.5 非同步迭代器與生成器

### 非同步迭代器

In [None]:
import asyncio

class AsyncRange:
    """非同步的 range"""

    def __init__(self, stop):
        self.stop = stop
        self.current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # 模擬非同步操作
        value = self.current
        self.current += 1
        return value

async def demo_async_iter():
    print("使用 async for 迭代：")
    async for num in AsyncRange(5):
        print(f"  {num}")

await demo_async_iter()

### 非同步生成器

In [None]:
import asyncio

async def async_generator(n):
    """非同步生成器"""
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i

async def demo_async_gen():
    # 使用 async for 迭代
    print("async for 迭代：")
    async for num in async_generator(5):
        print(f"  收到：{num}")

    # 使用非同步列表推導式（Python 3.6+）
    print("\n非同步列表推導式：")
    results = [num async for num in async_generator(5)]
    print(f"  結果：{results}")

await demo_async_gen()

### 非同步 Context Manager

In [None]:
import asyncio

class AsyncResource:
    """非同步資源管理"""

    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        print(f"[{self.name}] 取得資源...")
        await asyncio.sleep(0.2)
        print(f"[{self.name}] 資源就緒")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"[{self.name}] 釋放資源...")
        await asyncio.sleep(0.1)
        print(f"[{self.name}] 資源已釋放")

    async def do_something(self):
        print(f"[{self.name}] 使用資源中...")
        await asyncio.sleep(0.1)

async def demo_async_context():
    # 使用 async with
    async with AsyncResource("DB連線") as resource:
        await resource.do_something()

await demo_async_context()

---

## 10.6 實際應用範例

### 模擬 HTTP 請求

In [None]:
import asyncio
import time
import random

async def fetch_url(url):
    """模擬非同步 HTTP 請求"""
    delay = random.uniform(0.5, 2.0)
    print(f"開始取得 {url}...")
    await asyncio.sleep(delay)
    return {"url": url, "status": 200, "time": delay}

async def demo_http():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/posts",
        "https://api.example.com/comments",
        "https://api.example.com/photos",
    ]

    start = time.time()
    
    # 並行請求所有 URL
    results = await asyncio.gather(*[fetch_url(url) for url in urls])
    
    print("\n=== 結果 ===")
    for result in results:
        print(f"{result['url']}: {result['time']:.2f}s")
    
    print(f"\n總時間：{time.time() - start:.2f} 秒")

await demo_http()

### 生產者-消費者模式

In [None]:
import asyncio
import random

async def producer(queue, name, count):
    """生產者：產生資料放入佇列"""
    for i in range(count):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"[生產] {item}")
        await asyncio.sleep(random.uniform(0.1, 0.3))

async def consumer(queue, name):
    """消費者：從佇列取出並處理資料"""
    while True:
        item = await queue.get()
        if item is None:  # 結束信號
            break
        print(f"[消費] {name} 處理：{item}")
        await asyncio.sleep(random.uniform(0.1, 0.2))
        queue.task_done()

async def demo_producer_consumer():
    queue = asyncio.Queue(maxsize=5)

    # 建立生產者和消費者
    producers = [
        asyncio.create_task(producer(queue, "P1", 3)),
        asyncio.create_task(producer(queue, "P2", 3)),
    ]
    consumers = [
        asyncio.create_task(consumer(queue, "C1")),
        asyncio.create_task(consumer(queue, "C2")),
    ]

    # 等待生產者完成
    await asyncio.gather(*producers)
    print("\n--- 生產者完成 ---")

    # 等待佇列清空
    await queue.join()

    # 發送結束信號
    for _ in consumers:
        await queue.put(None)

    # 等待消費者完成
    await asyncio.gather(*consumers)
    print("\n--- 全部完成 ---")

await demo_producer_consumer()

### 限制並發數量（Semaphore）

In [None]:
import asyncio
import time

async def fetch_with_semaphore(semaphore, url, id):
    """使用 Semaphore 限制並發"""
    async with semaphore:
        print(f"[{id}] 開始：{url}")
        await asyncio.sleep(1)  # 模擬請求
        print(f"[{id}] 完成：{url}")
        return f"{url} 的資料"

async def demo_semaphore():
    # 限制同時最多 3 個並發
    semaphore = asyncio.Semaphore(3)

    urls = [f"url{i}" for i in range(8)]
    
    print(f"總共 {len(urls)} 個請求，最多 3 個並發\n")
    start = time.time()
    
    tasks = [
        fetch_with_semaphore(semaphore, url, i) 
        for i, url in enumerate(urls)
    ]
    results = await asyncio.gather(*tasks)
    
    print(f"\n全部完成：{len(results)} 個結果")
    print(f"總時間：{time.time() - start:.2f} 秒")

await demo_semaphore()

### 超時與取消

In [None]:
import asyncio

async def long_running_task():
    """長時間執行的任務"""
    try:
        print("任務開始")
        await asyncio.sleep(10)
        print("任務完成")
        return "結果"
    except asyncio.CancelledError:
        print("任務被取消！")
        raise  # 重新拋出，讓呼叫者知道任務被取消

async def demo_cancel():
    # 方式 1：使用 wait_for 設定超時
    print("=== 測試超時 ===")
    try:
        result = await asyncio.wait_for(
            long_running_task(),
            timeout=1.0
        )
    except asyncio.TimeoutError:
        print("超時了！\n")

    # 方式 2：手動取消任務
    print("=== 測試手動取消 ===")
    task = asyncio.create_task(long_running_task())

    await asyncio.sleep(0.5)
    print("呼叫 task.cancel()...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("確認任務已被取消")

await demo_cancel()

---

## 10.7 與同步程式碼整合

### 在 async 中執行同步程式碼

In [None]:
import asyncio
import time

def blocking_io():
    """模擬阻塞的 I/O 操作"""
    print("  阻塞操作開始...")
    time.sleep(1)
    print("  阻塞操作完成")
    return "IO 結果"

def cpu_intensive():
    """模擬 CPU 密集運算"""
    print("  CPU 運算開始...")
    total = sum(i * i for i in range(10**6))
    print("  CPU 運算完成")
    return total

async def demo_executor():
    loop = asyncio.get_event_loop()

    print("=== 在執行緒池中執行阻塞 I/O ===")
    # None 表示使用預設的執行緒池
    result = await loop.run_in_executor(None, blocking_io)
    print(f"結果：{result}\n")

    print("=== 在執行緒池中執行 CPU 密集運算 ===")
    result = await loop.run_in_executor(None, cpu_intensive)
    print(f"結果：{result}")

await demo_executor()

### 在同步程式碼中執行 async

```python
import asyncio

async def async_function():
    await asyncio.sleep(1)
    return "非同步結果"

# 方式 1：使用 asyncio.run()（推薦）
def sync_wrapper():
    result = asyncio.run(async_function())
    return result

# 呼叫
result = sync_wrapper()
print(result)
```

---

## 10.8 常見問題與陷阱

### 1. 忘記 await

In [None]:
import asyncio

async def get_data():
    await asyncio.sleep(0.1)
    return "資料"

async def demo_forgot_await():
    # 錯誤：忘記 await
    data_wrong = get_data()  # 這回傳協程物件，不是結果！
    print(f"忘記 await：{data_wrong}")
    print(f"類型：{type(data_wrong)}")

    # 正確
    data_correct = await get_data()
    print(f"\n正確 await：{data_correct}")
    print(f"類型：{type(data_correct)}")
    
    # 清理未等待的協程
    await data_wrong

await demo_forgot_await()

### 2. 在同步函式中使用 await

```python
# 錯誤：await 只能在 async 函式中使用
def sync_function():
    await asyncio.sleep(1)  # SyntaxError!

# 正確
async def async_function():
    await asyncio.sleep(1)
```

### 3. 使用阻塞操作

In [None]:
import asyncio
import time

# 展示問題
print("錯誤示範：time.sleep 會阻塞事件迴圈")
print("正確做法：使用 asyncio.sleep")
print("")
print("如果必須使用阻塞操作，放到執行緒池：")
print("")
print("async def better_example():")
print("    loop = asyncio.get_event_loop()")
print("    await loop.run_in_executor(None, time.sleep, 1)")

### 4. 沒有正確處理例外

In [None]:
import asyncio

async def risky_task():
    await asyncio.sleep(0.1)
    raise ValueError("出錯了")

async def demo_exception_handling():
    # 正確：總是 await 任務並處理例外
    task = asyncio.create_task(risky_task())
    
    try:
        await task
    except ValueError as e:
        print(f"正確捕捉到錯誤：{e}")

await demo_exception_handling()

### 5. 依序 await 而非並行執行

In [None]:
import asyncio
import time

async def slow_operation(name):
    await asyncio.sleep(1)
    return f"{name} 完成"

async def demo_sequential_vs_parallel():
    # 錯誤：依序 await（失去並行的好處）
    print("=== 依序 await（慢）===")
    start = time.time()
    r1 = await slow_operation("A")
    r2 = await slow_operation("B")
    r3 = await slow_operation("C")
    print(f"結果：{r1}, {r2}, {r3}")
    print(f"時間：{time.time() - start:.2f} 秒\n")

    # 正確：使用 gather 並行執行
    print("=== 使用 gather（快）===")
    start = time.time()
    r1, r2, r3 = await asyncio.gather(
        slow_operation("A"),
        slow_operation("B"),
        slow_operation("C"),
    )
    print(f"結果：{r1}, {r2}, {r3}")
    print(f"時間：{time.time() - start:.2f} 秒")

await demo_sequential_vs_parallel()

---

## 練習題

### 練習 1：並行下載

模擬並行下載多個檔案：

In [None]:
# 練習 1：你的程式碼
import asyncio

async def download_file(filename, size):
    """
    模擬下載檔案

    Args:
        filename: 檔案名稱
        size: 檔案大小（MB），影響下載時間（每 MB 0.1 秒）

    Returns:
        dict: {"filename": filename, "size": size, "status": "completed"}
    """
    # 你的程式碼
    pass

async def main():
    files = [
        ("file1.zip", 10),
        ("file2.zip", 5),
        ("file3.zip", 15),
        ("file4.zip", 3),
    ]

    # 並行下載所有檔案
    # results = ???
    pass

# await main()

In [None]:
# 練習 1：參考解答
import asyncio
import time

async def download_file(filename, size):
    """模擬下載檔案"""
    print(f"開始下載 {filename} ({size} MB)...")
    # 每 MB 0.1 秒
    await asyncio.sleep(size * 0.1)
    print(f"完成下載 {filename}")
    return {"filename": filename, "size": size, "status": "completed"}

async def main():
    files = [
        ("file1.zip", 10),
        ("file2.zip", 5),
        ("file3.zip", 15),
        ("file4.zip", 3),
    ]

    start = time.time()
    
    # 並行下載所有檔案
    results = await asyncio.gather(
        *[download_file(name, size) for name, size in files]
    )

    print("\n=== 下載結果 ===")
    for result in results:
        print(f"  {result}")
    
    total_size = sum(f[1] for f in files)
    print(f"\n總大小：{total_size} MB")
    print(f"總時間：{time.time() - start:.2f} 秒")
    print(f"（如果依序下載需要 {total_size * 0.1:.1f} 秒）")

await main()

### 練習 2：限流器

實作一個限制每秒請求數的裝飾器：

In [None]:
# 練習 2：你的程式碼
import asyncio
from functools import wraps

def rate_limit(calls_per_second):
    """
    限制每秒呼叫次數的裝飾器

    @rate_limit(5)  # 每秒最多 5 次
    async def api_call():
        ...
    """
    # 你的程式碼
    pass

In [None]:
# 練習 2：參考解答
import asyncio
import time
from functools import wraps

def rate_limit(calls_per_second):
    """限制每秒呼叫次數的裝飾器"""
    min_interval = 1.0 / calls_per_second
    last_called = [0.0]  # 使用 list 以便在閉包中修改
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 計算需要等待的時間
            elapsed = time.time() - last_called[0]
            wait_time = min_interval - elapsed
            
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            
            last_called[0] = time.time()
            return await func(*args, **kwargs)
        return wrapper
    return decorator

# 測試
@rate_limit(3)  # 每秒最多 3 次
async def api_call(id):
    print(f"[{time.time():.2f}] API 呼叫 {id}")
    return f"結果 {id}"

async def test_rate_limit():
    print("測試限流器（每秒最多 3 次）：")
    start = time.time()
    
    # 快速呼叫 6 次
    for i in range(6):
        await api_call(i)
    
    print(f"\n總時間：{time.time() - start:.2f} 秒")

await test_rate_limit()

### 練習 3：非同步快取

實作一個非同步函式的快取裝飾器：

In [None]:
# 練習 3：你的程式碼
import asyncio
from functools import wraps
import time

def async_cache(ttl=60):
    """
    非同步函式的快取裝飾器

    @async_cache(ttl=30)
    async def fetch_data(key):
        ...
    """
    # 你的程式碼
    pass

In [None]:
# 練習 3：參考解答
import asyncio
from functools import wraps
import time

def async_cache(ttl=60):
    """非同步函式的快取裝飾器"""
    cache = {}  # {key: (value, timestamp)}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 建立快取鍵
            key = (args, tuple(sorted(kwargs.items())))
            
            # 檢查快取
            if key in cache:
                value, timestamp = cache[key]
                if time.time() - timestamp < ttl:
                    print(f"  [快取命中] {args}")
                    return value
                else:
                    print(f"  [快取過期] {args}")
            
            # 呼叫函式並快取結果
            print(f"  [呼叫函式] {args}")
            result = await func(*args, **kwargs)
            cache[key] = (result, time.time())
            return result
        
        # 提供清除快取的方法
        wrapper.clear_cache = lambda: cache.clear()
        return wrapper
    return decorator

# 測試
@async_cache(ttl=2)  # 2 秒過期
async def fetch_data(key):
    await asyncio.sleep(0.5)  # 模擬耗時操作
    return f"{key} 的資料"

async def test_cache():
    print("測試非同步快取（TTL=2秒）：\n")
    
    print("第一次呼叫：")
    result1 = await fetch_data("user_1")
    print(f"結果：{result1}\n")
    
    print("第二次呼叫（應該命中快取）：")
    result2 = await fetch_data("user_1")
    print(f"結果：{result2}\n")
    
    print("等待 2.5 秒讓快取過期...")
    await asyncio.sleep(2.5)
    
    print("\n第三次呼叫（快取已過期）：")
    result3 = await fetch_data("user_1")
    print(f"結果：{result3}")

await test_cache()

---

## 小結

### JavaScript vs Python 非同步對照表

| 功能 | JavaScript | Python |
|------|------------|--------|
| 定義 | `async function` | `async def` |
| 等待 | `await` | `await` |
| 執行 | 自動 / `.then()` | `asyncio.run()` |
| 並行執行 | `Promise.all()` | `asyncio.gather()` |
| 競速 | `Promise.race()` | `asyncio.wait(FIRST_COMPLETED)` |
| 任一完成 | `Promise.any()` | 需自行實作 |
| 超時 | 手動實作 | `asyncio.wait_for()` |
| 建立任務 | 自動 | `asyncio.create_task()` |
| 延遲 | `setTimeout` + Promise | `asyncio.sleep()` |
| 佇列 | 無內建 | `asyncio.Queue` |
| 信號量 | 無內建 | `asyncio.Semaphore` |

### asyncio 常用 API

| API | 用途 |
|-----|------|
| `asyncio.run(coro)` | 執行協程 |
| `asyncio.create_task(coro)` | 建立任務 |
| `asyncio.gather(*coros)` | 並行執行多個協程 |
| `asyncio.wait(tasks)` | 等待任務完成 |
| `asyncio.wait_for(coro, timeout)` | 帶超時的等待 |
| `asyncio.sleep(seconds)` | 非同步延遲 |
| `asyncio.Queue()` | 非同步佇列 |
| `asyncio.Semaphore(n)` | 信號量（限制並發） |
| `asyncio.Lock()` | 非同步鎖 |
| `asyncio.Event()` | 非同步事件 |

### 關鍵差異提醒

1. **Python 需要明確啟動事件迴圈**（`asyncio.run()`）
2. **await 只能在 async 函式內使用**
3. **不要使用 `time.sleep()`**，使用 `asyncio.sleep()`
4. **注意阻塞操作**，使用 `run_in_executor()` 處理
5. **記得處理任務的例外**
6. **使用 `asyncio.gather()` 並行執行**，不要依序 await

### 非同步關鍵字對照

| Python | 說明 |
|--------|------|
| `async def` | 定義協程函式 |
| `await` | 等待協程完成 |
| `async for` | 非同步迭代 |
| `async with` | 非同步 context manager |
| `yield` in async | 非同步生成器 |