# Python 装饰器全面教程

本教程深入讲解 Python 装饰器的核心概念、使用方法和实战应用。

## 目录
1. [装饰器基础概念](#装饰器基础概念)
2. [函数装饰器](#函数装饰器)
3. [带参数的装饰器](#带参数的装饰器)
4. [类装饰器](#类装饰器)
5. [装饰器进阶技巧](#装饰器进阶技巧)
6. [内置装饰器](#内置装饰器)
7. [实战案例](#实战案例)
8. [最佳实践](#最佳实践)

## 1. 装饰器基础概念 {#装饰器基础概念}

### 1.1 什么是装饰器？

装饰器是一种设计模式，用于在不修改原函数代码的情况下，为函数添加新的功能。

**核心概念：**
- 装饰器本质上是一个函数，接受一个函数作为参数
- 返回一个新的函数，通常是对原函数的包装
- 使用 `@decorator_name` 语法糖应用装饰器

In [None]:
# 最简单的装饰器示例
def my_decorator(func):
    """一个简单的装饰器"""
    def wrapper():
        print("在函数执行之前")
        func()
        print("在函数执行之后")
    return wrapper

# 使用装饰器的两种方式

# 方式1: 使用 @ 语法糖（推荐）
@my_decorator
def say_hello():
    print("Hello, World!")

# 方式2: 手动调用装饰器（等价于方式1）
def say_goodbye():
    print("Goodbye, World!")
say_goodbye = my_decorator(say_goodbye)

# 测试
print("=== 使用 @ 语法 ===")
say_hello()

print("\n=== 手动装饰 ===")
say_goodbye()

### 1.2 装饰器的工作原理

理解装饰器的执行流程。

In [None]:
def trace_decorator(func):
    """跟踪装饰器执行流程"""
    print(f"1. 装饰器被调用，接收函数: {func.__name__}")
    
    def wrapper(*args, **kwargs):
        print(f"3. wrapper被调用，参数: args={args}, kwargs={kwargs}")
        result = func(*args, **kwargs)
        print(f"5. 原函数执行完毕，返回值: {result}")
        return result
    
    print(f"2. 装饰器返回 wrapper 函数")
    return wrapper

@trace_decorator
def add(a, b):
    print(f"4. 原函数 add 正在执行: {a} + {b}")
    return a + b

print("\n=== 调用被装饰的函数 ===")
result = add(3, 5)
print(f"6. 最终结果: {result}")

## 2. 函数装饰器 {#函数装饰器}

### 2.1 处理带参数的函数

装饰器需要处理各种参数形式的函数。

In [None]:
def universal_decorator(func):
    """通用装饰器，可以处理任意参数"""
    def wrapper(*args, **kwargs):
        print(f"调用函数: {func.__name__}")
        print(f"位置参数: {args}")
        print(f"关键字参数: {kwargs}")
        result = func(*args, **kwargs)
        print(f"返回值: {result}")
        return result
    return wrapper

@universal_decorator
def greet(name, greeting="Hello"):
    return f"{greeting}, {name}!"

@universal_decorator
def calculate(x, y, operation="add"):
    if operation == "add":
        return x + y
    elif operation == "multiply":
        return x * y
    return None

# 测试
print("=== 测试1 ===")
greet("Alice")

print("\n=== 测试2 ===")
greet("Bob", greeting="Hi")

print("\n=== 测试3 ===")
calculate(5, 3, operation="multiply")

### 2.2 常用功能装饰器

实现一些常见的装饰器功能。

In [None]:
import time
import functools

# 1. 计时装饰器
def timer(func):
    """计算函数执行时间"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 执行耗时: {end - start:.4f}秒")
        return result
    return wrapper

# 2. 重试装饰器
def retry(max_attempts=3):
    """失败时自动重试"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"第 {attempt + 1} 次尝试失败: {e}")
                    if attempt == max_attempts - 1:
                        raise
                    time.sleep(0.5)
        return wrapper
    return decorator

# 3. 日志装饰器
def log(func):
    """记录函数调用"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print(f"[LOG] 调用 {func.__name__} - 参数: args={args}, kwargs={kwargs}")
        result = func(*args, **kwargs)
        print(f"[LOG] {func.__name__} 返回: {result}")
        return result
    return wrapper

# 测试计时器
@timer
def slow_function():
    """模拟耗时操作"""
    time.sleep(1)
    return "完成"

print("=== 计时器测试 ===")
slow_function()

# 测试日志
@log
def multiply(a, b):
    return a * b

print("\n=== 日志测试 ===")
multiply(3, 4)

### 2.3 保留函数元数据

使用 `functools.wraps` 保留原函数的元数据。

In [None]:
import functools

# 不使用 functools.wraps
def bad_decorator(func):
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

# 使用 functools.wraps
def good_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

@bad_decorator
def bad_example():
    """这是原始函数的文档"""
    pass

@good_decorator
def good_example():
    """这是原始函数的文档"""
    pass

# 比较
print("=== 不使用 functools.wraps ===")
print(f"函数名: {bad_example.__name__}")
print(f"文档: {bad_example.__doc__}")

print("\n=== 使用 functools.wraps ===")
print(f"函数名: {good_example.__name__}")
print(f"文档: {good_example.__doc__}")

## 3. 带参数的装饰器 {#带参数的装饰器}

### 3.1 三层嵌套结构

带参数的装饰器需要额外一层函数包装。

In [None]:
def repeat(times):
    """重复执行函数指定次数"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            results = []
            for i in range(times):
                print(f"第 {i + 1} 次执行:")
                result = func(*args, **kwargs)
                results.append(result)
            return results
        return wrapper
    return decorator

@repeat(times=3)
def greet(name):
    print(f"Hello, {name}!")
    return f"Greeted {name}"

# 测试
results = greet("Alice")
print(f"\n返回结果: {results}")

### 3.2 可选参数的装饰器

实现既可以带参数也可以不带参数的装饰器。

In [None]:
import functools

def optional_decorator(func=None, *, prefix="[INFO]"):
    """可选参数的装饰器"""
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            print(f"{prefix} 调用 {f.__name__}")
            return f(*args, **kwargs)
        return wrapper
    
    if func is None:
        # 带参数调用: @optional_decorator(prefix="...")
        return decorator
    else:
        # 不带参数调用: @optional_decorator
        return decorator(func)

# 不带参数使用
@optional_decorator
def func1():
    print("执行 func1")

# 带参数使用
@optional_decorator(prefix="[DEBUG]")
def func2():
    print("执行 func2")

# 测试
print("=== 不带参数 ===")
func1()

print("\n=== 带参数 ===")
func2()

### 3.3 实用的参数化装饰器

实现一些常用的参数化装饰器。

In [None]:
import time
import functools

# 1. 限流装饰器
def rate_limit(max_calls, time_window):
    """限制函数在指定时间窗口内的调用次数"""
    calls = []
    
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            # 清理过期的调用记录
            calls[:] = [t for t in calls if now - t < time_window]
            
            if len(calls) >= max_calls:
                raise Exception(f"超过限流阈值: {max_calls}次/{time_window}秒")
            
            calls.append(now)
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 2. 缓存装饰器（简化版）
def cache(ttl=60):
    """简单的缓存装饰器"""
    def decorator(func):
        cached = {}
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = str(args) + str(kwargs)
            now = time.time()
            
            if key in cached:
                value, timestamp = cached[key]
                if now - timestamp < ttl:
                    print(f"[缓存命中] {func.__name__}")
                    return value
            
            print(f"[执行函数] {func.__name__}")
            result = func(*args, **kwargs)
            cached[key] = (result, now)
            return result
        return wrapper
    return decorator

# 3. 类型检查装饰器
def type_check(**expected_types):
    """检查函数参数类型"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 检查关键字参数类型
            for name, expected_type in expected_types.items():
                if name in kwargs:
                    value = kwargs[name]
                    if not isinstance(value, expected_type):
                        raise TypeError(
                            f"参数 '{name}' 应为 {expected_type.__name__}, "
                            f"但收到 {type(value).__name__}"
                        )
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 测试缓存装饰器
@cache(ttl=5)
def expensive_calc(x, y):
    time.sleep(0.5)
    return x * y

print("=== 缓存装饰器测试 ===")
print(f"结果1: {expensive_calc(3, 4)}")
print(f"结果2: {expensive_calc(3, 4)}")

# 测试类型检查
@type_check(name=str, age=int)
def create_user(name, age):
    return f"用户: {name}, 年龄: {age}"

print("\n=== 类型检查测试 ===")
print(create_user(name="Alice", age=25))

try:
    print(create_user(name="Bob", age="30"))  # 类型错误
except TypeError as e:
    print(f"捕获错误: {e}")

## 4. 类装饰器 {#类装饰器}

### 4.1 使用类实现装饰器

装饰器也可以用类来实现，通过 `__call__` 方法。

In [None]:
import functools
import time

class CountCalls:
    """统计函数调用次数的装饰器类"""
    
    def __init__(self, func):
        functools.update_wrapper(self, func)
        self.func = func
        self.count = 0
    
    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"调用 {self.func.__name__} 第 {self.count} 次")
        return self.func(*args, **kwargs)
    
    def reset(self):
        """重置计数器"""
        self.count = 0

@CountCalls
def say_hello():
    print("Hello!")

# 测试
say_hello()
say_hello()
say_hello()
print(f"\n总调用次数: {say_hello.count}")

# 重置
say_hello.reset()
print(f"重置后计数: {say_hello.count}")

### 4.2 带参数的类装饰器

实现可配置的类装饰器。

In [None]:
import functools
import time

class Timer:
    """计时装饰器类"""
    
    def __init__(self, unit="seconds", precision=4):
        self.unit = unit
        self.precision = precision
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            elapsed = time.time() - start
            
            # 根据单位转换时间
            if self.unit == "milliseconds":
                elapsed *= 1000
                unit_str = "ms"
            else:
                unit_str = "s"
            
            print(f"{func.__name__} 执行耗时: {elapsed:.{self.precision}f}{unit_str}")
            return result
        return wrapper

# 使用秒为单位
@Timer(unit="seconds", precision=2)
def task1():
    time.sleep(0.5)
    return "完成"

# 使用毫秒为单位
@Timer(unit="milliseconds", precision=1)
def task2():
    time.sleep(0.1)
    return "完成"

# 测试
print("=== 计时器测试 ===")
task1()
task2()

### 4.3 装饰类本身

装饰器不仅可以装饰函数，还可以装饰类。

In [None]:
def singleton(cls):
    """单例模式装饰器"""
    instances = {}
    
    @functools.wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
            print(f"创建 {cls.__name__} 实例")
        else:
            print(f"返回已存在的 {cls.__name__} 实例")
        return instances[cls]
    
    return get_instance

def add_methods(cls):
    """为类添加方法"""
    def to_dict(self):
        return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
    
    cls.to_dict = to_dict
    return cls

@singleton
class Database:
    """数据库连接类（单例模式）"""
    def __init__(self, host, port):
        self.host = host
        self.port = port
        print(f"连接数据库: {host}:{port}")

@add_methods
class User:
    """用户类（添加了 to_dict 方法）"""
    def __init__(self, name, age):
        self.name = name
        self.age = age

# 测试单例模式
print("=== 单例模式测试 ===")
db1 = Database("localhost", 3306)
db2 = Database("localhost", 3306)
print(f"db1 和 db2 是同一个对象: {db1 is db2}")

# 测试添加方法
print("\n=== 添加方法测试 ===")
user = User("Alice", 25)
print(f"用户字典: {user.to_dict()}")

## 5. 装饰器进阶技巧 {#装饰器进阶技巧}

### 5.1 多个装饰器叠加

理解装饰器的执行顺序。

In [None]:
def decorator_a(func):
    print(f"装饰器A 包装 {func.__name__}")
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print("[A] 执行前")
        result = func(*args, **kwargs)
        print("[A] 执行后")
        return result
    return wrapper

def decorator_b(func):
    print(f"装饰器B 包装 {func.__name__}")
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print("[B] 执行前")
        result = func(*args, **kwargs)
        print("[B] 执行后")
        return result
    return wrapper

def decorator_c(func):
    print(f"装饰器C 包装 {func.__name__}")
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print("[C] 执行前")
        result = func(*args, **kwargs)
        print("[C] 执行后")
        return result
    return wrapper

print("=== 定义时的装饰顺序 ===")
@decorator_a
@decorator_b
@decorator_c
def my_function():
    print("[原函数] 执行")

print("\n=== 调用时的执行顺序 ===")
my_function()

print("\n说明: 装饰器从下到上定义，从上到下执行")

### 5.2 组合装饰器

实用的装饰器组合示例。

In [None]:
import time
import functools

# 定义常用装饰器
def log_call(func):
    """记录函数调用"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print(f"[LOG] 调用 {func.__name__}")
        return func(*args, **kwargs)
    return wrapper

def measure_time(func):
    """测量执行时间"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start
        print(f"[TIME] {func.__name__} 耗时 {elapsed:.3f}s")
        return result
    return wrapper

def cache_result(func):
    """缓存结果"""
    cached = {}
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        key = str(args) + str(kwargs)
        if key in cached:
            print(f"[CACHE] 命中 {func.__name__}")
            return cached[key]
        result = func(*args, **kwargs)
        cached[key] = result
        return result
    return wrapper

# 组合使用多个装饰器
@log_call
@measure_time
@cache_result
def fibonacci(n):
    """计算斐波那契数列（递归）"""
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# 测试
print("=== 第一次调用 ===")
result1 = fibonacci(10)
print(f"结果: {result1}\n")

print("=== 第二次调用（缓存命中）===")
result2 = fibonacci(10)
print(f"结果: {result2}")

### 5.3 装饰器工厂

创建可配置的装饰器生成器。

In [None]:
def create_validator(**validators):
    """创建参数验证装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 验证参数
            for param_name, validator_func in validators.items():
                if param_name in kwargs:
                    value = kwargs[param_name]
                    if not validator_func(value):
                        raise ValueError(
                            f"参数 '{param_name}' 验证失败: {value}"
                        )
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 创建不同的验证装饰器
validate_positive = create_validator(
    age=lambda x: x > 0,
    score=lambda x: 0 <= x <= 100
)

validate_string = create_validator(
    name=lambda x: isinstance(x, str) and len(x) > 0,
    email=lambda x: '@' in x
)

# 使用验证装饰器
@validate_positive
def set_score(age, score):
    return f"年龄: {age}, 分数: {score}"

@validate_string
def create_account(name, email):
    return f"账户: {name}, 邮箱: {email}"

# 测试
print("=== 验证成功 ===")
print(set_score(age=25, score=90))
print(create_account(name="Alice", email="alice@example.com"))

print("\n=== 验证失败 ===")
try:
    print(set_score(age=-5, score=90))
except ValueError as e:
    print(f"错误: {e}")

try:
    print(create_account(name="Bob", email="invalid-email"))
except ValueError as e:
    print(f"错误: {e}")

## 6. 内置装饰器 {#内置装饰器}

### 6.1 @property

将方法转换为属性。

In [None]:
class Circle:
    """圆形类"""
    
    def __init__(self, radius):
        self._radius = radius
    
    @property
    def radius(self):
        """获取半径"""
        print("获取半径")
        return self._radius
    
    @radius.setter
    def radius(self, value):
        """设置半径"""
        if value <= 0:
            raise ValueError("半径必须为正数")
        print(f"设置半径为 {value}")
        self._radius = value
    
    @property
    def area(self):
        """计算面积（只读属性）"""
        return 3.14159 * self._radius ** 2
    
    @property
    def circumference(self):
        """计算周长（只读属性）"""
        return 2 * 3.14159 * self._radius

# 使用
circle = Circle(5)
print(f"半径: {circle.radius}")
print(f"面积: {circle.area:.2f}")
print(f"周长: {circle.circumference:.2f}")

# 修改半径
circle.radius = 10
print(f"\n新半径: {circle.radius}")
print(f"新面积: {circle.area:.2f}")

### 6.2 @staticmethod 和 @classmethod

类的静态方法和类方法。

In [None]:
class DateUtil:
    """日期工具类"""
    
    format_string = "%Y-%m-%d"
    
    def __init__(self, year, month, day):
        self.year = year
        self.month = month
        self.day = day
    
    @staticmethod
    def is_leap_year(year):
        """静态方法：判断是否为闰年"""
        return (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0)
    
    @classmethod
    def from_string(cls, date_string):
        """类方法：从字符串创建实例"""
        year, month, day = map(int, date_string.split('-'))
        return cls(year, month, day)
    
    @classmethod
    def today(cls):
        """类方法：创建今天的日期"""
        import datetime
        today = datetime.date.today()
        return cls(today.year, today.month, today.day)
    
    def __str__(self):
        return f"{self.year}-{self.month:02d}-{self.day:02d}"

# 测试静态方法
print("=== 静态方法测试 ===")
print(f"2024 是闰年吗? {DateUtil.is_leap_year(2024)}")
print(f"2023 是闰年吗? {DateUtil.is_leap_year(2023)}")

# 测试类方法
print("\n=== 类方法测试 ===")
date1 = DateUtil.from_string("2024-12-25")
print(f"从字符串创建: {date1}")

date2 = DateUtil.today()
print(f"今天: {date2}")

### 6.3 @functools.lru_cache

Python 内置的 LRU 缓存装饰器。

In [None]:
import functools
import time

# 使用 lru_cache
@functools.lru_cache(maxsize=128)
def fibonacci_cached(n):
    """带缓存的斐波那契数列"""
    if n < 2:
        return n
    return fibonacci_cached(n-1) + fibonacci_cached(n-2)

# 不使用缓存
def fibonacci_no_cache(n):
    """不带缓存的斐波那契数列"""
    if n < 2:
        return n
    return fibonacci_no_cache(n-1) + fibonacci_no_cache(n-2)

# 性能对比
print("=== 性能对比 ===")

# 测试不带缓存
start = time.time()
result1 = fibonacci_no_cache(30)
time1 = time.time() - start
print(f"不带缓存: fibonacci(30) = {result1}, 耗时 {time1:.4f}s")

# 测试带缓存
start = time.time()
result2 = fibonacci_cached(30)
time2 = time.time() - start
print(f"带缓存: fibonacci(30) = {result2}, 耗时 {time2:.4f}s")

print(f"\n性能提升: {time1/time2:.0f}x")

# 查看缓存信息
print(f"\n缓存信息: {fibonacci_cached.cache_info()}")

## 7. 实战案例 {#实战案例}

### 7.1 企业级缓存装饰器（详解）

分析 day_01.py 中的 DistributedCacheDecorator。

In [None]:
import threading
import time
from functools import wraps
from typing import Any, Callable, Dict

class DistributedCacheDecorator:
    """
    分布式缓存装饰器，支持TTL（生存时间）、线程安全与一致性控制
    
    核心特性:
    1. TTL（Time To Live）: 缓存过期时间控制
    2. 线程安全: 使用 RLock 确保并发安全
    3. 分布式同步: 支持同步到 Redis 等分布式缓存
    4. 缓存键生成: 基于函数名和参数自动生成唯一键
    """

    def __init__(self, ttl: int = 300, cache_backend: str = 'redis'):
        """
        初始化缓存装饰器
        
        Args:
            ttl: 缓存存活时间（秒），默认300秒（5分钟）
            cache_backend: 缓存后端类型，默认'redis'
        """
        self.ttl = ttl
        self.cache_backend = cache_backend
        self._cache: Dict[str, Any] = {}  # 本地缓存
        self._lock = threading.RLock()  # 可重入锁

    def __call__(self, func: Callable) -> Callable:
        """
        装饰器主逻辑
        
        工作流程:
        1. 生成缓存键
        2. 检查缓存是否存在且未过期
        3. 如果缓存命中，直接返回缓存值
        4. 如果缓存未命中，执行函数并缓存结果
        5. 同步到分布式缓存后端
        """

        @wraps(func)  # 保留原函数元数据
        def wrapper(*args, **kwargs) -> Any:
            # 生成缓存键：函数名 + 参数哈希
            cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"

            with self._lock:  # 线程安全操作
                # 检查缓存命中
                if cache_key in self._cache:
                    cached_item = self._cache[cache_key]
                    # 检查是否过期
                    if time.time() - cached_item['timestamp'] < self.ttl:
                        print(f"✓ 缓存命中: {cache_key}")
                        return cached_item['value']
                    else:
                        print(f"✗ 缓存过期: {cache_key}")

                # 缓存未命中，执行原函数
                print(f"→ 执行函数: {func.__name__}")
                result = func(*args, **kwargs)

                # 写入缓存
                self._cache[cache_key] = {
                    'value': result,
                    'timestamp': time.time()
                }

                # 同步到分布式缓存
                if self.cache_backend == 'redis':
                    self._sync_to_redis(cache_key, result)

                return result

        return wrapper

    def _sync_to_redis(self, key: str, value: Any):
        """同步到 Redis（模拟）"""
        print(f"↑ 同步到Redis: {key}")

# 使用示例
print("=== DistributedCacheDecorator 演示 ===")

@DistributedCacheDecorator(ttl=5, cache_backend='redis')
def fetch_user_data(user_id: int) -> dict:
    """模拟获取用户数据"""
    print(f"  [模拟] 从数据库查询用户 {user_id}...")
    time.sleep(0.5)  # 模拟数据库查询延迟
    return {"id": user_id, "name": f"User{user_id}", "email": f"user{user_id}@example.com"}

# 第一次调用 - 缓存未命中
print("\n1. 第一次调用:")
start = time.time()
data1 = fetch_user_data(1001)
print(f"结果: {data1}")
print(f"耗时: {time.time() - start:.3f}s")

# 第二次调用 - 缓存命中
print("\n2. 第二次调用（缓存命中）:")
start = time.time()
data2 = fetch_user_data(1001)
print(f"结果: {data2}")
print(f"耗时: {time.time() - start:.3f}s")

# 等待缓存过期
print("\n3. 等待6秒，缓存过期...")
time.sleep(6)

# 第三次调用 - 缓存过期
print("\n4. 缓存过期后调用:")
start = time.time()
data3 = fetch_user_data(1001)
print(f"结果: {data3}")
print(f"耗时: {time.time() - start:.3f}s")

### 7.2 API 限流装饰器

实现基于令牌桶算法的限流装饰器。

In [None]:
import time
import functools
from collections import deque

class RateLimiter:
    """
    基于令牌桶的限流装饰器
    
    特性:
    - 令牌桶算法：支持突发流量
    - 时间窗口：限制指定时间内的请求数
    - 自动清理：清理过期的请求记录
    """
    
    def __init__(self, max_calls: int, time_window: float):
        """
        初始化限流器
        
        Args:
            max_calls: 时间窗口内的最大调用次数
            time_window: 时间窗口大小（秒）
        """
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = deque()
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            
            # 清理过期的调用记录
            while self.calls and now - self.calls[0] > self.time_window:
                self.calls.popleft()
            
            # 检查是否超过限流
            if len(self.calls) >= self.max_calls:
                oldest_call = self.calls[0]
                wait_time = self.time_window - (now - oldest_call)
                raise Exception(
                    f"限流: 超过 {self.max_calls} 次/{self.time_window}秒, "
                    f"请等待 {wait_time:.2f} 秒"
                )
            
            # 记录调用时间
            self.calls.append(now)
            print(f"[限流器] 当前请求数: {len(self.calls)}/{self.max_calls}")
            
            return func(*args, **kwargs)
        return wrapper

# 使用示例
@RateLimiter(max_calls=3, time_window=5.0)
def api_call(request_id):
    """模拟API调用"""
    print(f"  → 处理请求 {request_id}")
    return f"Response-{request_id}"

print("=== 限流装饰器演示（最多3次/5秒）===")

# 快速发送4个请求
for i in range(4):
    try:
        print(f"\n请求 {i+1}:")
        result = api_call(i+1)
        print(f"  ✓ 成功: {result}")
    except Exception as e:
        print(f"  ✗ 失败: {e}")
    time.sleep(0.5)

### 7.3 性能监控装饰器

实现完整的性能监控系统。

In [None]:
import time
import functools
from collections import defaultdict
from typing import Dict, List

class PerformanceMonitor:
    """
    性能监控装饰器
    
    功能:
    - 记录函数执行时间
    - 统计调用次数
    - 计算平均、最小、最大执行时间
    - 生成性能报告
    """
    
    def __init__(self):
        self.stats: Dict[str, List[float]] = defaultdict(list)
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            elapsed = time.time() - start
            
            # 记录执行时间
            self.stats[func.__name__].append(elapsed)
            
            return result
        return wrapper
    
    def report(self) -> str:
        """生成性能报告"""
        lines = ["\n=== 性能监控报告 ==="]
        lines.append(f"{'函数名':<20} {'调用次数':<10} {'平均(s)':<12} {'最小(s)':<12} {'最大(s)':<12}")
        lines.append("-" * 66)
        
        for func_name, times in self.stats.items():
            count = len(times)
            avg = sum(times) / count
            min_time = min(times)
            max_time = max(times)
            
            lines.append(
                f"{func_name:<20} {count:<10} {avg:<12.6f} {min_time:<12.6f} {max_time:<12.6f}"
            )
        
        return "\n".join(lines)
    
    def reset(self):
        """重置统计数据"""
        self.stats.clear()

# 创建全局监控器
monitor = PerformanceMonitor()

@monitor
def fast_operation(x):
    """快速操作"""
    time.sleep(0.01)
    return x * 2

@monitor
def slow_operation(x):
    """慢速操作"""
    time.sleep(0.1)
    return x ** 2

@monitor
def variable_operation(x):
    """耗时可变的操作"""
    import random
    time.sleep(random.uniform(0.01, 0.05))
    return x + 1

# 执行测试
print("=== 执行性能测试 ===")
for i in range(5):
    fast_operation(i)
    slow_operation(i)
    variable_operation(i)

# 生成报告
print(monitor.report())

### 7.4 异步装饰器

为异步函数编写装饰器。

In [None]:
import asyncio
import functools
import time

def async_timer(func):
    """异步函数计时装饰器"""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        result = await func(*args, **kwargs)
        elapsed = time.time() - start
        print(f"[异步计时] {func.__name__} 耗时: {elapsed:.3f}s")
        return result
    return wrapper

def async_retry(max_attempts=3, delay=1.0):
    """异步重试装饰器"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    print(f"[重试] 第 {attempt + 1} 次尝试失败: {e}")
                    if attempt < max_attempts - 1:
                        await asyncio.sleep(delay)
                    else:
                        raise
        return wrapper
    return decorator

class AsyncRateLimiter:
    """异步限流装饰器"""
    
    def __init__(self, max_concurrent=3):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.max_concurrent = max_concurrent
    
    def __call__(self, func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            async with self.semaphore:
                print(f"[并发控制] 执行 {func.__name__} (最大并发: {self.max_concurrent})")
                return await func(*args, **kwargs)
        return wrapper

# 使用示例
@async_timer
@async_retry(max_attempts=3, delay=0.5)
async def fetch_data(url, fail_count=0):
    """模拟异步获取数据"""
    print(f"  → 请求: {url}")
    await asyncio.sleep(0.5)
    
    # 模拟随机失败
    import random
    if random.random() < fail_count * 0.3:
        raise Exception("网络错误")
    
    return f"数据来自 {url}"

limiter = AsyncRateLimiter(max_concurrent=2)

@limiter
async def process_item(item_id):
    """模拟处理任务"""
    print(f"  → 处理任务 {item_id}")
    await asyncio.sleep(1)
    print(f"  ✓ 完成任务 {item_id}")
    return f"结果-{item_id}"

# 测试异步装饰器
async def test_async_decorators():
    print("=== 异步装饰器演示 ===")
    
    # 测试计时和重试
    print("\n1. 异步请求（带重试）:")
    try:
        result = await fetch_data("https://api.example.com/data", fail_count=1)
        print(f"成功: {result}")
    except Exception as e:
        print(f"失败: {e}")
    
    # 测试并发控制
    print("\n2. 并发控制（最多2个并发）:")
    tasks = [process_item(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(f"\n所有任务完成: {results}")

# 运行测试
await test_async_decorators()

## 8. 最佳实践 {#最佳实践}

### 8.1 装饰器设计原则

编写高质量装饰器的指导原则。

### 装饰器设计的黄金法则

1. **保留元数据**
   - 始终使用 `@functools.wraps(func)`
   - 保持函数的 `__name__`, `__doc__`, `__module__` 等属性

2. **处理任意参数**
   - 使用 `*args, **kwargs` 接受任意参数
   - 确保装饰器通用性

3. **单一职责**
   - 每个装饰器只做一件事
   - 通过组合实现复杂功能

4. **可配置性**
   - 提供合理的默认参数
   - 支持自定义配置

5. **文档完善**
   - 清晰的文档字符串
   - 说明参数、返回值和副作用

6. **错误处理**
   - 优雅地处理异常
   - 提供有意义的错误信息

7. **性能考虑**
   - 避免不必要的开销
   - 适当使用缓存

8. **线程安全**
   - 在并发场景中使用锁
   - 注意共享状态的访问

### 8.2 常见陷阱和解决方案

In [None]:
import functools

# 陷阱1: 忘记使用 @wraps
print("=== 陷阱1: 忘记使用 @wraps ===")

def bad_decorator(func):
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

def good_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

@bad_decorator
def func1():
    """这是func1的文档"""
    pass

@good_decorator
def func2():
    """这是func2的文档"""
    pass

print(f"不使用 @wraps: 函数名={func1.__name__}, 文档={func1.__doc__}")
print(f"使用 @wraps: 函数名={func2.__name__}, 文档={func2.__doc__}")

# 陷阱2: 装饰器参数和函数参数混淆
print("\n=== 陷阱2: 参数混淆 ===")

def repeat_wrong(func, times=3):  # 错误：func不应该是参数
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        for _ in range(times):
            func(*args, **kwargs)
    return wrapper

def repeat_correct(times=3):  # 正确：times是装饰器参数
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for _ in range(times):
                result = func(*args, **kwargs)
            return result
        return wrapper
    return decorator

@repeat_correct(times=2)
def say(message):
    print(message)

say("Hello")

# 陷阱3: 修改可变默认参数
print("\n=== 陷阱3: 可变默认参数 ===")

def bad_cache(func, cache=[]):  # 危险：可变默认参数
    @functools.wraps(func)
    def wrapper(*args):
        if args not in cache:
            cache.append(args)
        return func(*args)
    return wrapper

def good_cache(func):
    cache = []  # 正确：在函数内部创建
    @functools.wraps(func)
    def wrapper(*args):
        if args not in cache:
            cache.append(args)
        return func(*args)
    return wrapper

print("正确的缓存实现避免了可变默认参数的陷阱")

### 8.3 装饰器性能优化

In [None]:
import functools
import time

# 优化1: 使用 __slots__ 减少内存
class OptimizedDecorator:
    """使用 __slots__ 优化内存"""
    __slots__ = ['func', 'count']
    
    def __init__(self, func):
        self.func = func
        self.count = 0
    
    def __call__(self, *args, **kwargs):
        self.count += 1
        return self.func(*args, **kwargs)

# 优化2: 延迟计算
def lazy_property(func):
    """延迟计算属性"""
    attr_name = '_lazy_' + func.__name__
    
    @property
    @functools.wraps(func)
    def wrapper(self):
        if not hasattr(self, attr_name):
            setattr(self, attr_name, func(self))
        return getattr(self, attr_name)
    
    return wrapper

class DataProcessor:
    def __init__(self, data):
        self.data = data
    
    @lazy_property
    def expensive_computation(self):
        """耗时的计算，只执行一次"""
        print("执行昂贵的计算...")
        time.sleep(0.5)
        return sum(self.data)

# 测试
print("=== 延迟计算优化 ===")
processor = DataProcessor([1, 2, 3, 4, 5])
print("创建对象，尚未计算")
print(f"第一次访问: {processor.expensive_computation}")
print(f"第二次访问（缓存）: {processor.expensive_computation}")

# 优化3: 使用生成器减少内存
def batch_process(batch_size=100):
    """批量处理装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(items):
            for i in range(0, len(items), batch_size):
                batch = items[i:i + batch_size]
                yield from func(batch)
        return wrapper
    return decorator

@batch_process(batch_size=3)
def process_items(batch):
    """处理一批数据"""
    print(f"处理批次，大小: {len(batch)}")
    return [x * 2 for x in batch]

print("\n=== 批量处理优化 ===")
data = list(range(10))
results = list(process_items(data))
print(f"处理结果: {results}")

## 总结

### 装饰器核心概念回顾

1. **基础概念**
   - 装饰器是接受函数并返回函数的可调用对象
   - 使用 `@decorator` 语法糖应用装饰器
   - 本质是闭包和高阶函数的应用

2. **装饰器类型**
   - 函数装饰器：最常用，使用函数定义
   - 类装饰器：使用类定义，通过 `__call__` 实现
   - 带参数装饰器：需要额外一层函数嵌套

3. **常见应用场景**
   - 缓存：避免重复计算
   - 日志：记录函数调用
   - 计时：性能监控
   - 重试：提高可靠性
   - 限流：控制访问频率
   - 权限检查：访问控制

4. **最佳实践**
   - 使用 `@functools.wraps` 保留元数据
   - 处理任意参数 `*args, **kwargs`
   - 保持单一职责
   - 提供良好的文档
   - 注意线程安全
   - 考虑性能影响

### 学习资源

- [PEP 318 - Decorators for Functions and Methods](https://www.python.org/dev/peps/pep-0318/)
- [Python Decorator Library](https://wiki.python.org/moin/PythonDecoratorLibrary)
- [functools 模块文档](https://docs.python.org/3/library/functools.html)