# FastAPI框架详细教程 —— 现代化的Python Web API框架

## 引言：为什么要学FastAPI？

在上一节课，我们学习了Flask框架。Flask很棒，但在2024年，有一个更现代的框架正在快速崛起——FastAPI。

让我先问大家几个问题：
- 你是否希望写API时，框架能自动帮你检查数据类型？
- 你是否想要自动生成API文档，让前端同学一目了然？
- 你是否希望代码运行得更快，处理更多请求？
- 你是否想用最新的Python特性来写代码？

如果答案是"是"，那么FastAPI就是为你准备的！

## 一、FastAPI是什么？

### 1.1 FastAPI的定义

FastAPI是一个现代、快速（高性能）的Web框架，用于构建API。它基于Python 3.7+的类型提示特性。

让我们对比一下：
- **Flask**：像是一个经典的瑞士军刀，简单实用
- **FastAPI**：像是一个智能的现代化工具箱，不仅实用还很聪明

### 1.2 FastAPI的特点

1. **极速性能**
   - 性能可媲美NodeJS和Go
   - 基于Starlette和Pydantic

2. **快速开发**
   - 减少约40%的bug
   - 提高开发速度200-300%

3. **智能提示**
   - 自动补全无处不在
   - 减少调试时间

4. **类型安全**
   - 基于Python类型提示
   - 自动数据验证

5. **自动文档**
   - 自动生成交互式API文档
   - 支持Swagger UI和ReDoc

6. **标准化**
   - 基于OpenAPI标准
   - 兼容JSON Schema

### 1.3 一个简单的对比

假设我们要创建一个接收用户信息的API：

**Flask方式：**
```python
@app.route('/user', methods=['POST'])
def create_user():
    data = request.get_json()
    # 手动验证数据
    if not data:
        return {'error': '没有数据'}, 400
    if 'name' not in data:
        return {'error': '缺少name字段'}, 400
    if 'age' not in data:
        return {'error': '缺少age字段'}, 400
    if not isinstance(data['age'], int):
        return {'error': 'age必须是整数'}, 400
    # ... 更多验证
    return {'message': '创建成功'}
```

**FastAPI方式：**
```python
from pydantic import BaseModel

class User(BaseModel):
    name: str
    age: int

@app.post('/user')
def create_user(user: User):
    # 数据自动验证完成！
    return {'message': '创建成功'}
```

看到区别了吗？FastAPI自动帮我们完成了数据验证！

## 二、准备开发环境

### 2.1 Python版本要求

FastAPI需要Python 3.7或更高版本（推荐3.8+）：

```bash
# 检查Python版本
python --version
# 应该显示 Python 3.7.x 或更高
```

### 2.2 创建项目环境

```bash
# 创建项目目录
mkdir fastapi_project
cd fastapi_project

# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
# Windows:
venv\Scripts\activate
# Mac/Linux:
source venv/bin/activate
```

### 2.3 安装FastAPI

```bash
# 安装FastAPI和uvicorn（ASGI服务器）
pip install fastapi
pip install "uvicorn[standard]"

# 或者一次性安装所有依赖
pip install "fastapi[all]"
```

### 2.4 验证安装

```python
# test_install.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}
```

运行：
```bash
uvicorn test_install:app --reload
```

如果看到 `{"Hello": "World"}`，说明安装成功！

## 三、第一个FastAPI应用

### 3.1 Hello FastAPI

创建 `main.py`：

```python
from fastapi import FastAPI

# 创建FastAPI实例
app = FastAPI()

# 定义根路径
@app.get("/")
def read_root():
    return {"message": "Hello FastAPI!"}

# 定义另一个路径
@app.get("/items/{item_id}")
def read_item(item_id: int, q: str = None):
    return {"item_id": item_id, "q": q}
```

### 3.2 运行应用

```bash
# 运行服务器
uvicorn main:app --reload

# 参数说明：
# main: Python文件名（main.py）
# app: FastAPI实例名
# --reload: 自动重载（开发时使用）
```

### 3.3 查看自动文档

FastAPI最酷的特性之一！打开浏览器访问：

1. **Swagger UI**: http://localhost:8000/docs
   - 可以直接在页面上测试API
   - 查看请求/响应模型
   - 尝试不同的参数

2. **ReDoc**: http://localhost:8000/redoc
   - 另一种文档风格
   - 更适合阅读和分享

### 3.4 代码解析

让我们详细理解这段代码：

```python
from fastapi import FastAPI

# 1. 创建应用实例
app = FastAPI()

# 2. 路径操作装饰器
@app.get("/")  # HTTP GET方法，路径是"/"
def read_root():  # 路径操作函数
    return {"message": "Hello FastAPI!"}  # 返回字典，自动转为JSON

# 3. 带参数的路径
@app.get("/items/{item_id}")  # {item_id}是路径参数
def read_item(
    item_id: int,  # 路径参数，类型是int
    q: str = None  # 查询参数，可选，默认None
):
    return {"item_id": item_id, "q": q}
```

### 3.5 尝试不同的请求

1. 访问根路径：http://localhost:8000/
2. 带路径参数：http://localhost:8000/items/42
3. 带查询参数：http://localhost:8000/items/42?q=somequery

注意：如果你访问 http://localhost:8000/items/foo，会返回错误，因为"foo"不是整数！

## 四、路径参数和查询参数

### 4.1 路径参数

路径参数是URL路径的一部分：

```python
from fastapi import FastAPI

app = FastAPI()

# 1. 基本路径参数
@app.get("/users/{user_id}")
def read_user(user_id: int):
    return {"user_id": user_id}

# 2. 多个路径参数
@app.get("/users/{user_id}/items/{item_id}")
def read_user_item(user_id: int, item_id: str):
    return {
        "user_id": user_id,
        "item_id": item_id
    }

# 3. 预定义值（枚举）
from enum import Enum

class ModelName(str, Enum):
    alexnet = "alexnet"
    resnet = "resnet"
    lenet = "lenet"

@app.get("/models/{model_name}")
def get_model(model_name: ModelName):
    if model_name == ModelName.alexnet:
        return {"model_name": model_name, "message": "Deep Learning FTW!"}
    if model_name == ModelName.lenet:
        return {"model_name": model_name, "message": "LeCNN all the images"}
    return {"model_name": model_name, "message": "Have some residuals"}

# 4. 文件路径参数
@app.get("/files/{file_path:path}")
def read_file(file_path: str):
    return {"file_path": file_path}
```

### 4.2 查询参数

查询参数是URL中?后面的键值对：

```python
from typing import Optional

# 1. 基本查询参数
@app.get("/items/")
def read_items(skip: int = 0, limit: int = 10):
    # 访问：/items/?skip=20&limit=50
    return {"skip": skip, "limit": limit}

# 2. 可选查询参数
@app.get("/search/")
def search_items(
    q: Optional[str] = None,
    category: Optional[str] = None,
    price_min: float = 0,
    price_max: Optional[float] = None
):
    results = {"items": []}
    if q:
        results.update({"q": q})
    if category:
        results.update({"category": category})
    return results

# 3. 布尔类型查询参数
@app.get("/products/")
def read_products(
    in_stock: bool = True,
    on_sale: bool = False
):
    # 访问：/products/?in_stock=false&on_sale=true
    # 或：/products/?in_stock=0&on_sale=1
    return {
        "in_stock": in_stock,
        "on_sale": on_sale
    }

# 4. 多个值的查询参数
from typing import List

@app.get("/items/search/")
def search_multiple(
    q: List[str] = None
):
    # 访问：/items/search/?q=foo&q=bar
    return {"q": q}  # 返回 ["foo", "bar"]
```

### 4.3 参数验证

FastAPI提供了强大的参数验证功能：

```python
from fastapi import Query, Path

# 1. 查询参数验证
@app.get("/items/")
def read_items_validated(
    q: str = Query(
        None,
        min_length=3,
        max_length=50,
        regex="^fixedquery$",
        description="查询字符串",
        title="查询参数"
    )
):
    return {"q": q}

# 2. 路径参数验证
@app.get("/items/{item_id}")
def read_item_validated(
    item_id: int = Path(
        ...,  # ...表示必需参数
        title="项目ID",
        description="要获取的项目ID",
        ge=1,  # 大于等于1
        le=1000  # 小于等于1000
    )
):
    return {"item_id": item_id}

# 3. 数字验证
@app.get("/products/")
def read_products_validated(
    price: float = Query(
        ...,
        gt=0,  # 大于0
        le=100000,  # 小于等于100000
        description="产品价格"
    ),
    quantity: int = Query(
        1,
        ge=1,  # 大于等于1
        le=100,  # 小于等于100
        description="购买数量"
    )
):
    return {
        "price": price,
        "quantity": quantity,
        "total": price * quantity
    }

# 4. 列表参数验证
@app.get("/items/bulk/")
def read_items_bulk(
    item_ids: List[int] = Query(
        ...,
        description="项目ID列表",
        min_items=1,
        max_items=10
    )
):
    # 访问：/items/bulk/?item_ids=1&item_ids=2&item_ids=3
    return {"item_ids": item_ids}
```

## 五、请求体和数据模型

### 5.1 Pydantic模型

Pydantic是FastAPI的核心组件，用于数据验证：

```python
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime

# 1. 基本模型
class Item(BaseModel):
    name: str
    price: float
    is_offer: Optional[bool] = None

@app.post("/items/")
def create_item(item: Item):
    return item

# 2. 带验证的模型
class Product(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    price: float = Field(..., gt=0, description="产品价格")
    tax: Optional[float] = Field(None, ge=0)
    tags: List[str] = []
    
    class Config:
        schema_extra = {
            "example": {
                "name": "iPhone 13",
                "price": 6999.0,
                "tax": 0.13,
                "tags": ["电子产品", "手机"]
            }
        }

# 3. 嵌套模型
class Image(BaseModel):
    url: str
    name: str

class ProductWithImages(BaseModel):
    name: str
    price: float
    images: List[Image] = []

@app.post("/products/")
def create_product(product: ProductWithImages):
    return product

# 4. 带时间戳的模型
class BlogPost(BaseModel):
    title: str
    content: str
    published: bool = False
    created_at: datetime = Field(default_factory=datetime.now)
    updated_at: Optional[datetime] = None
```

### 5.2 请求体 + 路径参数 + 查询参数

```python
class Item(BaseModel):
    name: str
    price: float
    tax: Optional[float] = None

@app.put("/items/{item_id}")
def update_item(
    item_id: int = Path(..., title="项目ID", ge=0),
    item: Item = None,
    q: Optional[str] = None
):
    result = {"item_id": item_id}
    if q:
        result.update({"q": q})
    if item:
        result.update({"item": item})
    return result
```

### 5.3 多个请求体

```python
class Item(BaseModel):
    name: str
    price: float

class User(BaseModel):
    username: str
    full_name: Optional[str] = None

@app.put("/items/{item_id}")
def update_item_with_user(
    item_id: int,
    item: Item,
    user: User,
    importance: int = Body(...)
):
    return {
        "item_id": item_id,
        "item": item,
        "user": user,
        "importance": importance
    }
```

### 5.4 请求体中的单一值

```python
from fastapi import Body

@app.post("/items/")
def create_item_with_tax(
    item: Item,
    tax: float = Body(..., gt=0)
):
    return {"item": item, "tax": tax}
```

## 六、响应模型

### 6.1 基本响应模型

```python
class UserIn(BaseModel):
    username: str
    password: str
    email: str
    full_name: Optional[str] = None

class UserOut(BaseModel):
    username: str
    email: str
    full_name: Optional[str] = None

@app.post("/users/", response_model=UserOut)
def create_user(user: UserIn):
    # 密码不会包含在响应中
    return user
```

### 6.2 响应模型配置

```python
class Item(BaseModel):
    name: str
    price: float
    tax: float = 0.1
    tags: List[str] = []

@app.post(
    "/items/",
    response_model=Item,
    response_model_exclude_unset=True,  # 排除未设置的值
    response_model_exclude_defaults=True,  # 排除默认值
    response_model_exclude={"tax"}  # 排除特定字段
)
def create_item(item: Item):
    return item

# 或使用include只包含特定字段
@app.get(
    "/items/{item_id}/public",
    response_model=Item,
    response_model_include={"name", "price"}
)
def read_item_public(item_id: int):
    return get_item(item_id)
```

### 6.3 多个响应模型

```python
from typing import Union

class BaseItem(BaseModel):
    name: str
    price: float

class CarItem(BaseItem):
    brand: str
    model: str

class BookItem(BaseItem):
    author: str
    isbn: str

@app.get("/items/{item_id}", response_model=Union[CarItem, BookItem])
def read_specific_item(item_id: int):
    # 根据item_id返回不同类型
    if item_id % 2 == 0:
        return CarItem(name="Tesla", price=50000, brand="Tesla", model="Model 3")
    else:
        return BookItem(name="Python编程", price=59, author="张三", isbn="123456")
```

## 七、表单和文件

### 7.1 表单数据

```python
from fastapi import Form

@app.post("/login/")
def login(
    username: str = Form(...),
    password: str = Form(...)
):
    return {"username": username}

# 更复杂的表单
@app.post("/register/")
def register(
    username: str = Form(..., min_length=3, max_length=20),
    password: str = Form(..., min_length=6),
    email: str = Form(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$'),
    age: int = Form(..., ge=18, le=100),
    agree_terms: bool = Form(...)
):
    return {
        "username": username,
        "email": email,
        "age": age,
        "terms_agreed": agree_terms
    }
```

### 7.2 文件上传

```python
from fastapi import File, UploadFile
from typing import List

# 1. 简单文件上传
@app.post("/uploadfile/")
def create_upload_file(file: bytes = File(...)):
    return {"file_size": len(file)}

# 2. 使用UploadFile（推荐）
@app.post("/upload/")
async def upload_file(file: UploadFile):
    contents = await file.read()
    return {
        "filename": file.filename,
        "content_type": file.content_type,
        "size": len(contents)
    }

# 3. 多文件上传
@app.post("/uploadfiles/")
async def upload_multiple_files(
    files: List[UploadFile] = File(...)
):
    file_infos = []
    for file in files:
        contents = await file.read()
        file_infos.append({
            "filename": file.filename,
            "content_type": file.content_type,
            "size": len(contents)
        })
    return {"files": file_infos}

# 4. 文件和表单数据
@app.post("/create-profile/")
async def create_profile(
    username: str = Form(...),
    bio: str = Form(...),
    avatar: UploadFile = File(...)
):
    return {
        "username": username,
        "bio": bio,
        "avatar_filename": avatar.filename
    }
```

### 7.3 保存上传的文件

```python
import shutil
from pathlib import Path

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@app.post("/save-file/")
async def save_file(file: UploadFile = File(...)):
    # 生成安全的文件名
    file_path = UPLOAD_DIR / file.filename
    
    # 保存文件
    with file_path.open("wb") as buffer:
        shutil.copyfileobj(file.file, buffer)
    
    return {
        "filename": file.filename,
        "saved_path": str(file_path)
    }

# 带文件类型验证
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif"}

@app.post("/upload-image/")
async def upload_image(file: UploadFile = File(...)):
    # 检查文件扩展名
    file_extension = Path(file.filename).suffix.lower()
    if file_extension not in ALLOWED_EXTENSIONS:
        raise HTTPException(
            status_code=400,
            detail=f"文件类型不支持。支持的类型：{ALLOWED_EXTENSIONS}"
        )
    
    # 检查文件大小（5MB限制）
    contents = await file.read()
    if len(contents) > 5 * 1024 * 1024:
        raise HTTPException(status_code=400, detail="文件太大，最大5MB")
    
    # 保存文件...
    return {"message": "图片上传成功"}
```

## 八、错误处理

### 8.1 HTTPException

```python
from fastapi import HTTPException

@app.get("/items/{item_id}")
def read_item(item_id: int):
    if item_id < 1:
        raise HTTPException(
            status_code=400,
            detail="Item ID must be positive"
        )
    
    if item_id > 100:
        raise HTTPException(
            status_code=404,
            detail="Item not found",
            headers={"X-Error": "There goes my error"}
        )
    
    return {"item_id": item_id}
```

### 8.2 自定义异常处理器

```python
from fastapi import Request
from fastapi.responses import JSONResponse

class UnicornException(Exception):
    def __init__(self, name: str):
        self.name = name

@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
    return JSONResponse(
        status_code=418,
        content={"message": f"Oops! {exc.name} did something wrong"}
    )

@app.get("/unicorns/{name}")
def read_unicorn(name: str):
    if name == "yolo":
        raise UnicornException(name=name)
    return {"unicorn_name": name}
```

### 8.3 覆盖默认异常处理器

```python
from fastapi.exceptions import RequestValidationError
from fastapi.responses import PlainTextResponse

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
    return PlainTextResponse(str(exc), status_code=400)

# 自定义404处理
from fastapi.exceptions import HTTPException

@app.exception_handler(404)
async def custom_404_handler(request, exc):
    return JSONResponse(
        status_code=404,
        content={"message": "抱歉，页面不存在"}
    )
```

## 九、依赖注入

### 9.1 基本依赖

```python
from fastapi import Depends

# 1. 简单依赖函数
def common_parameters(q: str = None, skip: int = 0, limit: int = 10):
    return {"q": q, "skip": skip, "limit": limit}

@app.get("/items/")
def read_items(commons: dict = Depends(common_parameters)):
    return commons

@app.get("/users/")
def read_users(commons: dict = Depends(common_parameters)):
    return commons

# 2. 类作为依赖
class CommonQueryParams:
    def __init__(self, q: str = None, skip: int = 0, limit: int = 10):
        self.q = q
        self.skip = skip
        self.limit = limit

@app.get("/products/")
def read_products(commons: CommonQueryParams = Depends()):
    return {
        "q": commons.q,
        "skip": commons.skip,
        "limit": commons.limit
    }
```

### 9.2 嵌套依赖

```python
# 用户认证示例
def get_current_user(token: str = Header(...)):
    # 模拟验证token
    if token != "secret-token":
        raise HTTPException(status_code=401, detail="Invalid token")
    return {"username": "john", "id": 1}

def get_current_active_user(current_user: dict = Depends(get_current_user)):
    if not current_user.get("is_active", True):
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.get("/users/me")
def read_users_me(current_user: dict = Depends(get_current_active_user)):
    return current_user
```

### 9.3 依赖缓存

```python
# 依赖函数只会被调用一次
def get_db():
    print("Creating DB connection")  # 只打印一次
    return {"db": "connection"}

@app.get("/users/{user_id}")
def read_user(
    user_id: int,
    db = Depends(get_db),
    current_user = Depends(get_current_user)
):
    print("Using DB")
    return {"user_id": user_id}
```

## 十、数据库集成

### 10.1 SQLAlchemy配置

```python
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# 依赖函数
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
```

### 10.2 创建模型

```python
# models.py
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime
from sqlalchemy.sql import func
from database import Base

class Product(Base):
    __tablename__ = "products"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    price = Column(Float)
    is_available = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
```

### 10.3 CRUD操作

```python
# crud.py
from sqlalchemy.orm import Session
from models import Product
from schemas import ProductCreate, ProductUpdate

def get_product(db: Session, product_id: int):
    return db.query(Product).filter(Product.id == product_id).first()

def get_products(db: Session, skip: int = 0, limit: int = 10):
    return db.query(Product).offset(skip).limit(limit).all()

def create_product(db: Session, product: ProductCreate):
    db_product = Product(**product.dict())
    db.add(db_product)
    db.commit()
    db.refresh(db_product)
    return db_product

def update_product(db: Session, product_id: int, product: ProductUpdate):
    db_product = get_product(db, product_id)
    if db_product:
        update_data = product.dict(exclude_unset=True)
        for field, value in update_data.items():
            setattr(db_product, field, value)
        db.commit()
        db.refresh(db_product)
    return db_product

def delete_product(db: Session, product_id: int):
    db_product = get_product(db, product_id)
    if db_product:
        db.delete(db_product)
        db.commit()
    return db_product
```

### 10.4 API端点

```python
# schemas.py
from pydantic import BaseModel
from typing import Optional
from datetime import datetime

class ProductBase(BaseModel):
    name: str
    price: float
    is_available: bool = True

class ProductCreate(ProductBase):
    pass

class ProductUpdate(BaseModel):
    name: Optional[str] = None
    price: Optional[float] = None
    is_available: Optional[bool] = None

class Product(ProductBase):
    id: int
    created_at: datetime
    updated_at: Optional[datetime] = None
    
    class Config:
        orm_mode = True

# main.py
from typing import List

@app.post("/products/", response_model=Product)
def create_product(
    product: ProductCreate,
    db: Session = Depends(get_db)
):
    return crud.create_product(db=db, product=product)

@app.get("/products/", response_model=List[Product])
def read_products(
    skip: int = 0,
    limit: int = 10,
    db: Session = Depends(get_db)
):
    products = crud.get_products(db, skip=skip, limit=limit)
    return products

@app.get("/products/{product_id}", response_model=Product)
def read_product(
    product_id: int,
    db: Session = Depends(get_db)
):
    db_product = crud.get_product(db, product_id=product_id)
    if db_product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    return db_product

@app.put("/products/{product_id}", response_model=Product)
def update_product(
    product_id: int,
    product: ProductUpdate,
    db: Session = Depends(get_db)
):
    db_product = crud.update_product(db, product_id, product)
    if db_product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    return db_product

@app.delete("/products/{product_id}")
def delete_product(
    product_id: int,
    db: Session = Depends(get_db)
):
    db_product = crud.delete_product(db, product_id)
    if db_product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    return {"message": "Product deleted successfully"}
```

## 十一、安全和认证

### 11.1 基本认证

```python
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi import Depends, HTTPException, status

security = HTTPBasic()

def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
    correct_username = "admin"
    correct_password = "secret"
    
    if credentials.username != correct_username or credentials.password != correct_password:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Basic"},
        )
    return credentials.username

@app.get("/secure/")
def secure_endpoint(username: str = Depends(get_current_username)):
    return {"message": f"Hello {username}, this is secured!"}
```

### 11.2 JWT认证

```python
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# 配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 密码处理
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

# Token处理
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    
    # 这里应该从数据库获取用户
    user = {"username": username}
    return user

# 登录端点
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 这里应该验证数据库中的用户
    if form_data.username != "testuser" or form_data.password != "testpass":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password"
        )
    
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": form_data.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# 受保护的端点
@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
    return current_user
```

## 十二、中间件和CORS

### 12.1 中间件

```python
from fastapi.middleware.cors import CORSMiddleware
import time

# 1. 自定义中间件
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

# 2. CORS中间件
origins = [
    "http://localhost:3000",
    "http://localhost:8080",
    "https://example.com"
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 3. 其他常用中间件
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware

# 限制可信任的主机
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["example.com", "*.example.com"]
)

# 启用Gzip压缩
app.add_middleware(GZipMiddleware, minimum_size=1000)
```

### 12.2 请求限流

```python
from fastapi import Request
from fastapi.responses import JSONResponse
import time
from collections import defaultdict

# 简单的限流实现
request_counts = defaultdict(lambda: {"count": 0, "window_start": time.time()})
RATE_LIMIT = 10  # 每分钟10次请求
WINDOW_SIZE = 60  # 60秒窗口

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_ip = request.client.host
    current_time = time.time()
    
    # 重置窗口
    if current_time - request_counts[client_ip]["window_start"] > WINDOW_SIZE:
        request_counts[client_ip] = {"count": 0, "window_start": current_time}
    
    # 检查限流
    if request_counts[client_ip]["count"] >= RATE_LIMIT:
        return JSONResponse(
            status_code=429,
            content={"detail": "Rate limit exceeded"}
        )
    
    request_counts[client_ip]["count"] += 1
    response = await call_next(request)
    return response
```

## 十三、后台任务

### 13.1 简单后台任务

```python
from fastapi import BackgroundTasks
import time

def write_log(message: str):
    time.sleep(5)  # 模拟耗时操作
    with open("log.txt", "a") as log_file:
        log_file.write(f"{message}\n")

@app.post("/send-notification/{email}")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks
):
    # 添加后台任务
    background_tasks.add_task(write_log, f"Notification sent to {email}")
    return {"message": "Notification sent"}

# 多个后台任务
def send_email(email: str, message: str):
    print(f"Sending email to {email}: {message}")
    time.sleep(2)

def update_statistics(item_id: int):
    print(f"Updating statistics for item {item_id}")
    time.sleep(1)

@app.post("/process-order/{item_id}")
async def process_order(
    item_id: int,
    email: str,
    background_tasks: BackgroundTasks
):
    background_tasks.add_task(send_email, email, "Your order is being processed")
    background_tasks.add_task(update_statistics, item_id)
    return {"message": "Order is being processed"}
```

### 13.2 使用依赖的后台任务

```python
def get_background_tasks(background_tasks: BackgroundTasks):
    return background_tasks

@app.post("/items/")
async def create_item(
    item: Item,
    background_tasks: BackgroundTasks = Depends(get_background_tasks)
):
    # 处理项目创建
    background_tasks.add_task(write_log, f"Created item: {item.name}")
    return item
```

## 十四、WebSocket支持

### 14.1 基本WebSocket

```python
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")
```

### 14.2 WebSocket聊天室

```python
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    await manager.broadcast(f"Client #{client_id} joined the chat")
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")
```

## 十五、测试

### 15.1 使用pytest测试

```python
# test_main.py
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_read_root():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Hello FastAPI!"}

def test_create_item():
    response = client.post(
        "/items/",
        json={"name": "Test Item", "price": 10.5}
    )
    assert response.status_code == 200
    assert response.json()["name"] == "Test Item"

def test_read_item_not_found():
    response = client.get("/items/999")
    assert response.status_code == 404

# 测试认证
def test_secure_endpoint_no_auth():
    response = client.get("/secure/")
    assert response.status_code == 401

def test_secure_endpoint_with_auth():
    response = client.get(
        "/secure/",
        auth=("admin", "secret")
    )
    assert response.status_code == 200
```

### 15.2 异步测试

```python
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_async_endpoint():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/")
    assert response.status_code == 200
```

## 十六、部署

### 16.1 生产环境配置

```python
# config.py
from pydantic import BaseSettings

class Settings(BaseSettings):
    app_name: str = "My FastAPI App"
    debug: bool = False
    database_url: str
    secret_key: str
    
    class Config:
        env_file = ".env"

settings = Settings()

# main.py
from config import settings

app = FastAPI(
    title=settings.app_name,
    debug=settings.debug
)
```

### 16.2 使用Uvicorn部署

```bash
# 生产环境运行
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

# 或使用配置文件
# gunicorn_conf.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"

# 运行
gunicorn main:app -c gunicorn_conf.py
```

### 16.3 Docker部署

```dockerfile
# Dockerfile
FROM python:3.9

WORKDIR /code

COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt

COPY ./app /code/app

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]
```

## 十七、性能优化

### 17.1 异步操作

```python
import asyncio
import aiohttp

# 异步HTTP请求
async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.json()

@app.get("/aggregate-data/")
async def aggregate_data():
    urls = [
        "https://api.example.com/data1",
        "https://api.example.com/data2",
        "https://api.example.com/data3"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return {"results": results}
```

### 17.2 缓存

```python
from functools import lru_cache
import redis

# 内存缓存
@lru_cache(maxsize=100)
def get_expensive_computation(param: str):
    # 模拟耗时计算
    import time
    time.sleep(2)
    return f"Result for {param}"

@app.get("/compute/{param}")
def compute(param: str):
    result = get_expensive_computation(param)
    return {"result": result}

# Redis缓存
redis_client = redis.Redis(host='localhost', port=6379, db=0)

@app.get("/cached-data/{key}")
async def get_cached_data(key: str):
    # 尝试从缓存获取
    cached = redis_client.get(key)
    if cached:
        return {"data": cached.decode(), "from_cache": True}
    
    # 计算结果并缓存
    result = f"Computed result for {key}"
    redis_client.setex(key, 300, result)  # 5分钟过期
    return {"data": result, "from_cache": False}
```

## 十八、实战项目：RESTful API博客系统

### 18.1 项目结构

```
blog_api/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── database.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── post.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── post.py
│   ├── crud/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── post.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── deps.py
│   │   └── v1/
│   │       ├── __init__.py
│   │       ├── users.py
│   │       └── posts.py
│   └── core/
│       ├── __init__.py
│       └── security.py
├── tests/
├── requirements.txt
└── .env
```

### 18.2 完整实现

```python
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1 import users, posts
from app.database import engine, Base

# 创建数据库表
Base.metadata.create_all(bind=engine)

app = FastAPI(
    title="Blog API",
    description="A simple blog API built with FastAPI",
    version="1.0.0"
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 路由
app.include_router(users.router, prefix="/api/v1/users", tags=["users"])
app.include_router(posts.router, prefix="/api/v1/posts", tags=["posts"])

@app.get("/")
def read_root():
    return {
        "message": "Welcome to Blog API",
        "docs": "/docs",
        "redoc": "/redoc"
    }

# app/models/user.py
from sqlalchemy import Column, Integer, String, Boolean
from app.database import Base

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    is_superuser = Column(Boolean, default=False)

# app/schemas/user.py
from pydantic import BaseModel, EmailStr
from typing import Optional

class UserBase(BaseModel):
    email: EmailStr
    username: str
    is_active: bool = True
    is_superuser: bool = False

class UserCreate(UserBase):
    password: str

class UserUpdate(BaseModel):
    email: Optional[EmailStr] = None
    username: Optional[str] = None
    password: Optional[str] = None
    is_active: Optional[bool] = None

class UserInDB(UserBase):
    id: int
    
    class Config:
        orm_mode = True

class User(UserInDB):
    pass

# app/api/v1/users.py
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, schemas
from app.api import deps

router = APIRouter()

@router.post("/", response_model=schemas.User)
def create_user(
    user_in: schemas.UserCreate,
    db: Session = Depends(deps.get_db)
):
    user = crud.user.get_by_email(db, email=user_in.email)
    if user:
        raise HTTPException(
            status_code=400,
            detail="Email already registered"
        )
    user = crud.user.create(db, obj_in=user_in)
    return user

@router.get("/", response_model=List[schemas.User])
def read_users(
    skip: int = 0,
    limit: int = 100,
    db: Session = Depends(deps.get_db),
    current_user: schemas.User = Depends(deps.get_current_active_superuser)
):
    users = crud.user.get_multi(db, skip=skip, limit=limit)
    return users

@router.get("/me", response_model=schemas.User)
def read_user_me(
    current_user: schemas.User = Depends(deps.get_current_active_user)
):
    return current_user
```

## 十九、FastAPI vs Flask 对比

### 19.1 性能对比

| 特性 | Flask | FastAPI |
|------|-------|----------|
| 性能 | 中等 | 高（异步支持） |
| 并发处理 | 同步（需要额外配置） | 原生异步 |
| 类型检查 | 无 | 自动类型验证 |
| API文档 | 需要插件 | 自动生成 |
| 开发速度 | 快 | 更快 |

### 19.2 代码对比

**用户注册功能：**

Flask版本：
```python
@app.route('/register', methods=['POST'])
def register():
    data = request.get_json()
    
    # 手动验证
    if not data:
        return jsonify({'error': '没有数据'}), 400
    if 'email' not in data:
        return jsonify({'error': '缺少email'}), 400
    if 'password' not in data:
        return jsonify({'error': '缺少password'}), 400
    
    # 更多验证...
    
    return jsonify({'message': '注册成功'})
```

FastAPI版本：
```python
@app.post('/register')
async def register(user: UserCreate):
    # 自动验证完成！
    return {'message': '注册成功'}
```

## 二十、总结和下一步

### 20.1 我们学到了什么

1. **FastAPI基础**
   - 创建应用
   - 路径操作
   - 请求和响应

2. **数据验证**
   - Pydantic模型
   - 自动验证
   - 类型提示

3. **高级特性**
   - 依赖注入
   - 数据库集成
   - 认证授权
   - WebSocket
   - 后台任务

4. **生产部署**
   - 配置管理
   - 测试
   - 性能优化

### 20.2 FastAPI的优势

1. **开发效率高**
   - 自动数据验证
   - 自动API文档
   - 智能代码提示

2. **性能出色**
   - 异步支持
   - 高并发处理

3. **现代化**
   - 基于最新Python特性
   - 符合OpenAPI标准

4. **易于维护**
   - 类型安全
   - 代码清晰

### 20.3 适用场景

FastAPI特别适合：
- RESTful API开发
- 微服务架构
- 实时数据处理
- 高性能要求的应用
- 需要自动文档的项目

### 20.4 继续学习

1. **深入主题**
   - GraphQL集成
   - gRPC支持
   - 消息队列集成
   - 分布式任务

2. **相关技术**
   - Docker容器化
   - Kubernetes部署
   - CI/CD流程
   - 监控和日志

3. **实战项目**
   - 完整的电商API
   - 实时聊天应用
   - 物联网数据平台
   - 微服务架构实践

### 20.5 学习资源

1. **官方资源**
   - FastAPI文档：https://fastapi.tiangolo.com/
   - GitHub：https://github.com/tiangolo/fastapi

2. **推荐扩展**
   - SQLModel：SQL数据库的现代ORM
   - Pydantic：数据验证库
   - Starlette：ASGI框架
   - Uvicorn：ASGI服务器

### 20.6 结语

FastAPI代表了Python Web开发的未来方向。它结合了现代Python的最佳实践，提供了出色的开发体验和运行性能。

记住这些要点：
- **类型提示是你的朋友**：充分利用Python的类型系统
- **异步当需要时使用**：不是所有操作都需要异步
- **自动文档是福利**：好好利用自动生成的API文档
- **测试很重要**：FastAPI让测试变得简单

继续实践，不断探索，你会发现FastAPI能帮你构建出色的现代Web应用！

**Happy Coding with FastAPI! 🚀**