# 匹配方法 - 构建可比的对照组

**学习目标**：
1. 理解匹配方法的原理和直觉
2. 掌握精确匹配、PSM、马氏距离匹配等算法
3. 学习匹配质量评估和效应估计

**业务场景**：某电商平台想评估会员权益对用户消费的影响。会员用户和非会员用户在年龄、购物频次、历史消费等方面存在显著差异。如何构建可比的对照组？

---

In [None]:
# 环境准备
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from scipy import stats
from scipy.spatial.distance import mahalanobis
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子
np.random.seed(42)

print("✓ 环境准备完成")

---

## Part 1: 匹配的直觉

### 1.1 为什么需要匹配？

在观察性数据中，处理组和对照组往往在**协变量**（covariates）上存在系统性差异：

- **选择偏差**：会员用户可能本身就是高消费人群
- **混淆因素**：年龄、收入、购物习惯都会影响消费
- **不可比性**：直接比较两组均值会得到有偏估计

**匹配的目标**：通过找到协变量相似的个体，模拟随机化实验的条件。

### 1.2 匹配与随机化

| 特征 | 随机化实验 (RCT) | 匹配方法 |
|------|-----------------|----------|
| 组间可比性 | 期望相同 | 匹配后相同 |
| 混淆控制 | 自动平衡 | 需手动匹配 |
| 样本利用 | 全部使用 | 可能丢弃 |
| 适用场景 | 实验数据 | 观察数据 |

**核心假设**：条件独立性假设 (Conditional Independence Assumption, CIA)

$$
(Y_0, Y_1) \perp T \mid X
$$

给定协变量 $X$，潜在结果与处理分配独立。

In [None]:
# 生成模拟数据：会员权益评估
def generate_membership_data(n=1000):
    """
    生成电商会员数据
    - 协变量：年龄、历史消费、购物频次
    - 处理：是否为会员 (选择偏差：高消费者更可能成为会员)
    - 结果：月消费金额
    """
    # 协变量
    age = np.random.normal(35, 10, n).clip(18, 65)
    hist_spending = np.random.gamma(5, 100, n)  # 历史消费
    freq = np.random.poisson(3, n) + 1  # 月购物频次
    
    # 处理分配（选择偏差）
    propensity = 1 / (1 + np.exp(-(-3 + 0.05*age + 0.002*hist_spending + 0.3*freq)))
    treatment = (np.random.uniform(0, 1, n) < propensity).astype(int)
    
    # 潜在结果
    # Y(0): 非会员消费 = f(协变量) + 噪声
    y0 = 200 + 5*age + 0.3*hist_spending + 50*freq + np.random.normal(0, 100, n)
    
    # Y(1): 会员消费 = Y(0) + 真实处理效应 (300元)
    true_effect = 300
    y1 = y0 + true_effect
    
    # 观测结果
    y_obs = treatment * y1 + (1 - treatment) * y0
    
    df = pd.DataFrame({
        'user_id': range(n),
        'age': age,
        'hist_spending': hist_spending,
        'freq': freq,
        'treatment': treatment,
        'spending': y_obs,
        'propensity': propensity,
        'y0': y0,
        'y1': y1
    })
    
    return df, true_effect

df, true_ate = generate_membership_data(1000)
print(f"样本量: {len(df)}")
print(f"会员用户: {df['treatment'].sum()}")
print(f"非会员用户: {(1-df['treatment']).sum()}")
print(f"\n真实 ATE: {true_ate}")

In [None]:
# 可视化：匹配前的不平衡性
def plot_covariate_balance(df, matched_df=None, title_suffix=""):
    """
    绘制协变量平衡性对比
    """
    covariates = ['age', 'hist_spending', 'freq']
    labels = ['年龄', '历史消费', '购物频次']
    
    fig = make_subplots(
        rows=1, cols=3,
        subplot_titles=labels,
        specs=[[{'secondary_y': False}]*3]
    )
    
    for i, (cov, label) in enumerate(zip(covariates, labels)):
        # 原始数据
        treated = df[df['treatment']==1][cov]
        control = df[df['treatment']==0][cov]
        
        fig.add_trace(
            go.Histogram(x=control, name='对照组', opacity=0.5, 
                        marker_color='#EB5757', legendgroup='control',
                        showlegend=(i==0), nbinsx=30),
            row=1, col=i+1
        )
        fig.add_trace(
            go.Histogram(x=treated, name='处理组', opacity=0.5,
                        marker_color='#2D9CDB', legendgroup='treated',
                        showlegend=(i==0), nbinsx=30),
            row=1, col=i+1
        )
        
        # 如果提供了匹配后数据
        if matched_df is not None:
            matched_treated = matched_df[matched_df['treatment']==1][cov]
            matched_control = matched_df[matched_df['treatment']==0][cov]
            
            fig.add_trace(
                go.Histogram(x=matched_control, name='对照组(匹配后)', 
                            opacity=0.7, marker_color='#F2994A',
                            legendgroup='matched_control',
                            showlegend=(i==0), nbinsx=30),
                row=1, col=i+1
            )
            fig.add_trace(
                go.Histogram(x=matched_treated, name='处理组(匹配后)',
                            opacity=0.7, marker_color='#27AE60',
                            legendgroup='matched_treated',
                            showlegend=(i==0), nbinsx=30),
                row=1, col=i+1
            )
    
    fig.update_layout(
        title_text=f"协变量分布对比{title_suffix}",
        barmode='overlay',
        height=400,
        template='plotly_white'
    )
    fig.update_xaxes(title_text="值")
    fig.update_yaxes(title_text="频数")
    
    return fig

fig = plot_covariate_balance(df, title_suffix=" - 匹配前")
fig.show()

# 计算标准化均值差
def compute_smd(df, covariates):
    """
    计算标准化均值差 (Standardized Mean Difference)
    SMD = (mean_treated - mean_control) / sqrt((var_treated + var_control) / 2)
    一般认为 |SMD| < 0.1 表示平衡良好
    """
    treated = df[df['treatment']==1]
    control = df[df['treatment']==0]
    
    smds = {}
    for cov in covariates:
        mean_diff = treated[cov].mean() - control[cov].mean()
        pooled_std = np.sqrt((treated[cov].var() + control[cov].var()) / 2)
        smds[cov] = mean_diff / pooled_std
    
    return smds

covariates = ['age', 'hist_spending', 'freq']
smd_before = compute_smd(df, covariates)
print("\n匹配前的 SMD:")
for cov, smd in smd_before.items():
    print(f"  {cov}: {smd:.3f}")

**观察**：
- 会员用户在所有协变量上都显著高于非会员
- SMD 绝对值都远大于 0.1，说明两组严重不平衡
- 直接比较均值会严重高估会员权益的效果

---

## Part 2: 精确匹配 (Exact Matching)

### 2.1 原理

**精确匹配**：为每个处理组个体找到协变量完全相同的对照组个体。

**优点**：
- 概念简单
- 匹配质量高

**缺点**：
- 维度灾难：协变量越多，找到精确匹配的概率越低
- 连续变量难以精确匹配
- 样本浪费严重

### 2.2 实现

In [None]:
# 精确匹配（需要将连续变量离散化）
def exact_matching(df, covariates, bins_dict):
    """
    精确匹配
    
    参数:
        df: 数据框
        covariates: 协变量列表
        bins_dict: 每个协变量的分箱边界，例如 {'age': [0, 30, 40, 50, 100]}
    
    返回:
        matched_df: 匹配后的数据框
    """
    df_copy = df.copy()
    
    # 离散化连续变量
    for cov, bins in bins_dict.items():
        df_copy[f'{cov}_bin'] = pd.cut(df_copy[cov], bins=bins, labels=False)
    
    bin_cols = [f'{cov}_bin' for cov in bins_dict.keys()]
    
    # 按照离散化后的协变量分组
    matched_pairs = []
    
    treated = df_copy[df_copy['treatment']==1]
    control = df_copy[df_copy['treatment']==0]
    
    for _, t_row in treated.iterrows():
        # 找到所有协变量匹配的对照组
        mask = (control[bin_cols] == t_row[bin_cols]).all(axis=1)
        matched_controls = control[mask]
        
        if len(matched_controls) > 0:
            # 随机选择一个（1:1 匹配）
            c_row = matched_controls.sample(1).iloc[0]
            matched_pairs.append(t_row)
            matched_pairs.append(c_row)
    
    matched_df = pd.DataFrame(matched_pairs)
    return matched_df

# 定义分箱策略
bins_dict = {
    'age': [0, 30, 40, 50, 100],
    'hist_spending': [0, 300, 600, 1000, 10000],
    'freq': [0, 2, 4, 6, 100]
}

matched_exact = exact_matching(df, covariates, bins_dict)
print(f"匹配前样本量: {len(df)}")
print(f"匹配后样本量: {len(matched_exact)} ({len(matched_exact)/len(df)*100:.1f}%)")
print(f"处理组保留率: {matched_exact['treatment'].sum() / df['treatment'].sum() * 100:.1f}%")

In [None]:
# 评估匹配质量
smd_exact = compute_smd(matched_exact, covariates)
print("\n精确匹配后的 SMD:")
for cov, smd in smd_exact.items():
    print(f"  {cov}: {smd:.3f} (改善: {abs(smd_before[cov]) - abs(smd):.3f})")

# 可视化
fig = plot_covariate_balance(df, matched_exact, title_suffix=" - 精确匹配前后")
fig.show()

### 2.3 粗化精确匹配 (Coarsened Exact Matching, CEM)

**改进思路**：不要求完全精确匹配，而是将协变量粗化后再匹配。

**优点**：
- 平衡样本保留和匹配质量
- 对分箱策略不敏感
- 满足单调不平衡约束 (Monotonic Imbalance Bounding)

---

## Part 3: 倾向得分匹配 (Propensity Score Matching, PSM)

### 3.1 倾向得分的魔力

**倾向得分** (Propensity Score):

$$
e(X) = P(T=1 \mid X)
$$

即给定协变量 $X$，个体接受处理的概率。

**Rosenbaum & Rubin (1983) 定理**：如果 $(Y_0, Y_1) \perp T \mid X$，则 $(Y_0, Y_1) \perp T \mid e(X)$

**含义**：
- 可以用一维的倾向得分代替高维的协变量
- 倾向得分相同的个体在协变量分布上是平衡的
- 降维！解决维度灾难

### 3.2 PSM 步骤

1. **估计倾向得分**：用逻辑回归 $P(T=1|X)$
2. **检查共同支撑** (Common Support)：删除倾向得分不重叠的样本
3. **匹配**：为每个处理组个体找到倾向得分相近的对照组
4. **评估平衡性**：检查匹配后协变量是否平衡
5. **估计效应**：计算匹配样本的均值差

In [None]:
# Step 1: 估计倾向得分
def estimate_propensity_score(df, covariates):
    """
    用逻辑回归估计倾向得分
    """
    X = df[covariates]
    y = df['treatment']
    
    model = LogisticRegression(max_iter=1000)
    model.fit(X, y)
    
    ps = model.predict_proba(X)[:, 1]
    return ps

df['ps_estimated'] = estimate_propensity_score(df, covariates)

# 可视化倾向得分分布
fig = go.Figure()
fig.add_trace(go.Histogram(
    x=df[df['treatment']==0]['ps_estimated'],
    name='对照组',
    opacity=0.6,
    marker_color='#EB5757',
    nbinsx=50
))
fig.add_trace(go.Histogram(
    x=df[df['treatment']==1]['ps_estimated'],
    name='处理组',
    opacity=0.6,
    marker_color='#2D9CDB',
    nbinsx=50
))
fig.update_layout(
    title='倾向得分分布',
    xaxis_title='倾向得分',
    yaxis_title='频数',
    barmode='overlay',
    template='plotly_white',
    height=400
)
fig.show()

# 共同支撑区域
ps_treated = df[df['treatment']==1]['ps_estimated']
ps_control = df[df['treatment']==0]['ps_estimated']
common_support_min = max(ps_treated.min(), ps_control.min())
common_support_max = min(ps_treated.max(), ps_control.max())

print(f"\n共同支撑区域: [{common_support_min:.3f}, {common_support_max:.3f}]")
print(f"处理组在支撑区域内: {((ps_treated >= common_support_min) & (ps_treated <= common_support_max)).sum()} / {len(ps_treated)}")
print(f"对照组在支撑区域内: {((ps_control >= common_support_min) & (ps_control <= common_support_max)).sum()} / {len(ps_control)}")

### 3.3 匹配算法

#### (1) 最近邻匹配 (Nearest Neighbor Matching)

为每个处理组个体 $i$ 找到倾向得分最接近的对照组个体 $j$：

$$
j^* = \arg\min_{j \in \{T=0\}} |e(X_i) - e(X_j)|
$$

- **1:1 匹配**：每个处理组个体匹配 1 个对照组
- **1:N 匹配**：每个处理组个体匹配 N 个对照组（降低方差，但可能引入偏差）

#### (2) 卡尺匹配 (Caliper Matching)

只匹配倾向得分在一定范围内的个体：

$$
|e(X_i) - e(X_j)| < \text{caliper}
$$

常用卡尺：$0.2 \times \text{std}(e(X))$

#### (3) 有放回 vs 无放回

- **无放回**：每个对照组个体只能匹配一次（匹配质量可能下降）
- **有放回**：对照组个体可以重复使用（提高匹配质量，但方差估计需调整）

In [None]:
# TODO 1: 实现 1:1 最近邻匹配（无放回，带卡尺）
def psm_matching(df, caliper=0.2, ratio=1, with_replacement=False):
    """
    倾向得分匹配
    
    参数:
        df: 数据框（需包含 'ps_estimated' 列）
        caliper: 卡尺（标准差的倍数）
        ratio: 匹配比例 (1:ratio)
        with_replacement: 是否有放回
    
    返回:
        matched_df: 匹配后的数据框
    """
    # 提示: 
    # 1. 计算卡尺阈值 = caliper * std(ps)
    # 2. 为每个处理组个体找到最近的对照组个体
    # 3. 检查距离是否在卡尺范围内
    # 4. 如果无放回，需要记录已匹配的对照组个体
    
    caliper_threshold = caliper * df['ps_estimated'].std()
    
    treated = df[df['treatment']==1].copy()
    control = df[df['treatment']==0].copy()
    
    matched_pairs = []
    used_controls = set()
    
    for _, t_row in treated.iterrows():
        # 过滤已使用的对照组（如果无放回）
        if not with_replacement:
            available_controls = control[~control['user_id'].isin(used_controls)]
        else:
            available_controls = control
        
        if len(available_controls) == 0:
            continue
        
        # 计算倾向得分距离
        distances = np.abs(available_controls['ps_estimated'] - t_row['ps_estimated'])
        
        # 找到最近的 ratio 个
        n_matches = min(ratio, len(available_controls))
        closest_indices = distances.nsmallest(n_matches).index
        
        # 检查卡尺
        for idx in closest_indices:
            if distances[idx] <= caliper_threshold:
                matched_pairs.append(t_row)
                matched_pairs.append(available_controls.loc[idx])
                used_controls.add(available_controls.loc[idx, 'user_id'])
            else:
                break  # 如果第一个都超出卡尺，后面的也不用看了
    
    matched_df = pd.DataFrame(matched_pairs)
    return matched_df

# 测试不同参数
matched_psm_11 = psm_matching(df, caliper=0.2, ratio=1, with_replacement=False)
print(f"1:1 匹配（无放回）: {len(matched_psm_11)} 样本")

matched_psm_13 = psm_matching(df, caliper=0.2, ratio=3, with_replacement=True)
print(f"1:3 匹配（有放回）: {len(matched_psm_13)} 样本")

In [None]:
# 评估 1:1 PSM 匹配质量
smd_psm = compute_smd(matched_psm_11, covariates)
print("\nPSM (1:1) 匹配后的 SMD:")
for cov, smd in smd_psm.items():
    print(f"  {cov}: {smd:.3f} (改善: {abs(smd_before[cov]) - abs(smd):.3f})")

# 可视化
fig = plot_covariate_balance(df, matched_psm_11, title_suffix=" - PSM 匹配前后")
fig.show()

# 倾向得分分布对比
fig = go.Figure()
fig.add_trace(go.Histogram(
    x=df[df['treatment']==0]['ps_estimated'],
    name='对照组（原始）',
    opacity=0.4,
    marker_color='#EB5757',
    nbinsx=50
))
fig.add_trace(go.Histogram(
    x=df[df['treatment']==1]['ps_estimated'],
    name='处理组（原始）',
    opacity=0.4,
    marker_color='#2D9CDB',
    nbinsx=50
))
fig.add_trace(go.Histogram(
    x=matched_psm_11[matched_psm_11['treatment']==0]['ps_estimated'],
    name='对照组（匹配后）',
    opacity=0.7,
    marker_color='#F2994A',
    nbinsx=50
))
fig.add_trace(go.Histogram(
    x=matched_psm_11[matched_psm_11['treatment']==1]['ps_estimated'],
    name='处理组（匹配后）',
    opacity=0.7,
    marker_color='#27AE60',
    nbinsx=50
))
fig.update_layout(
    title='PSM 匹配前后的倾向得分分布',
    xaxis_title='倾向得分',
    yaxis_title='频数',
    barmode='overlay',
    template='plotly_white',
    height=400
)
fig.show()

---

## Part 4: 马氏距离匹配 (Mahalanobis Distance Matching)

### 4.1 为什么需要马氏距离？

PSM 只考虑倾向得分这一个维度，可能忽略协变量之间的相关性。

**马氏距离** (Mahalanobis Distance):

$$
d_M(i, j) = \sqrt{(X_i - X_j)^T \Sigma^{-1} (X_i - X_j)}
$$

其中 $\Sigma$ 是协变量的协方差矩阵。

**特点**：
- 考虑变量的尺度差异和相关性
- 对多维数据更鲁棒
- 计算成本高

### 4.2 实现

In [None]:
# TODO 2: 实现马氏距离匹配
def mahalanobis_matching(df, covariates, caliper=None, ratio=1):
    """
    马氏距离匹配
    
    参数:
        df: 数据框
        covariates: 协变量列表
        caliper: 卡尺（可选）
        ratio: 匹配比例
    
    返回:
        matched_df: 匹配后的数据框
    """
    # 提示:
    # 1. 计算协方差矩阵的逆
    # 2. 为每个处理组个体计算与所有对照组的马氏距离
    # 3. 选择距离最近的 ratio 个对照组
    
    treated = df[df['treatment']==1].copy()
    control = df[df['treatment']==0].copy()
    
    # 计算协方差矩阵及其逆
    X = df[covariates]
    cov_matrix = np.cov(X.T)
    cov_inv = np.linalg.inv(cov_matrix)
    
    matched_pairs = []
    
    for _, t_row in treated.iterrows():
        t_cov = t_row[covariates].values
        
        # 计算与所有对照组的马氏距离
        distances = []
        for _, c_row in control.iterrows():
            c_cov = c_row[covariates].values
            dist = mahalanobis(t_cov, c_cov, cov_inv)
            distances.append((c_row, dist))
        
        # 按距离排序
        distances.sort(key=lambda x: x[1])
        
        # 选择最近的 ratio 个
        for c_row, dist in distances[:ratio]:
            if caliper is None or dist <= caliper:
                matched_pairs.append(t_row)
                matched_pairs.append(c_row)
    
    matched_df = pd.DataFrame(matched_pairs)
    return matched_df

matched_maha = mahalanobis_matching(df, covariates, ratio=1)
print(f"马氏距离匹配: {len(matched_maha)} 样本")

# 评估匹配质量
smd_maha = compute_smd(matched_maha, covariates)
print("\n马氏距离匹配后的 SMD:")
for cov, smd in smd_maha.items():
    print(f"  {cov}: {smd:.3f} (改善: {abs(smd_before[cov]) - abs(smd):.3f})")

### 4.3 PSM vs 马氏距离

| 方法 | 优点 | 缺点 | 适用场景 |
|------|------|------|----------|
| PSM | 降维；理论基础强 | 可能丢失信息 | 协变量多；处理分配机制清晰 |
| 马氏距离 | 保留多维信息 | 计算复杂；样本需求大 | 协变量相关性强 |

**实践建议**：可以结合两者，先用 PSM 粗筛，再用马氏距离精匹配。

---

## Part 5: 匹配质量评估

### 5.1 平衡性检验

#### (1) 标准化均值差 (SMD)

$$
\text{SMD} = \frac{\bar{X}_{\text{treated}} - \bar{X}_{\text{control}}}{\sqrt{(s^2_{\text{treated}} + s^2_{\text{control}}) / 2}}
$$

**判断标准**：
- $|\text{SMD}| < 0.1$：平衡良好
- $0.1 \leq |\text{SMD}| < 0.25$：可接受
- $|\text{SMD}| \geq 0.25$：不平衡

#### (2) 方差比 (Variance Ratio)

$$
\text{VR} = \frac{s^2_{\text{treated}}}{s^2_{\text{control}}}
$$

**判断标准**：$0.5 < \text{VR} < 2$

#### (3) 假设检验

- t 检验：检验均值差异
- KS 检验：检验分布差异

**注意**：匹配后样本不独立，p 值仅供参考！

In [None]:
# 完整的平衡性检验
def balance_assessment(df_before, df_after, covariates):
    """
    全面评估匹配质量
    """
    results = []
    
    for cov in covariates:
        # 匹配前
        before_t = df_before[df_before['treatment']==1][cov]
        before_c = df_before[df_before['treatment']==0][cov]
        
        mean_diff_before = before_t.mean() - before_c.mean()
        pooled_std_before = np.sqrt((before_t.var() + before_c.var()) / 2)
        smd_before = mean_diff_before / pooled_std_before
        vr_before = before_t.var() / before_c.var()
        
        # 匹配后
        after_t = df_after[df_after['treatment']==1][cov]
        after_c = df_after[df_after['treatment']==0][cov]
        
        mean_diff_after = after_t.mean() - after_c.mean()
        pooled_std_after = np.sqrt((after_t.var() + after_c.var()) / 2)
        smd_after = mean_diff_after / pooled_std_after
        vr_after = after_t.var() / after_c.var()
        
        # KS 检验
        ks_before = stats.ks_2samp(before_t, before_c)
        ks_after = stats.ks_2samp(after_t, after_c)
        
        results.append({
            '协变量': cov,
            'SMD (前)': f"{smd_before:.3f}",
            'SMD (后)': f"{smd_after:.3f}",
            'VR (前)': f"{vr_before:.3f}",
            'VR (后)': f"{vr_after:.3f}",
            'KS p值 (后)': f"{ks_after.pvalue:.3f}"
        })
    
    return pd.DataFrame(results)

balance_table = balance_assessment(df, matched_psm_11, covariates)
print("\n匹配质量评估 (PSM 1:1):")
print(balance_table.to_string(index=False))

In [None]:
# 可视化 SMD 改善
def plot_smd_comparison(df_before, df_after, covariates, labels=None):
    """
    Love Plot: 可视化匹配前后的 SMD
    """
    if labels is None:
        labels = covariates
    
    smd_before = compute_smd(df_before, covariates)
    smd_after = compute_smd(df_after, covariates)
    
    fig = go.Figure()
    
    # 匹配前
    fig.add_trace(go.Scatter(
        x=[abs(smd_before[cov]) for cov in covariates],
        y=labels,
        mode='markers',
        marker=dict(size=12, color='#EB5757', symbol='circle'),
        name='匹配前'
    ))
    
    # 匹配后
    fig.add_trace(go.Scatter(
        x=[abs(smd_after[cov]) for cov in covariates],
        y=labels,
        mode='markers',
        marker=dict(size=12, color='#27AE60', symbol='diamond'),
        name='匹配后'
    ))
    
    # 阈值线
    fig.add_vline(x=0.1, line_dash="dash", line_color="gray",
                  annotation_text="阈值 (0.1)", annotation_position="top right")
    
    fig.update_layout(
        title='Love Plot: 匹配前后的标准化均值差',
        xaxis_title='|SMD|',
        yaxis_title='协变量',
        template='plotly_white',
        height=400
    )
    
    return fig

fig = plot_smd_comparison(df, matched_psm_11, covariates, 
                          labels=['年龄', '历史消费', '购物频次'])
fig.show()

---

## Part 6: 效应估计

### 6.1 ATT vs ATE

匹配方法通常估计的是 **处理组平均处理效应 (ATT)**：

$$
\text{ATT} = E[Y_1 - Y_0 \mid T=1]
$$

而非总体平均处理效应 (ATE)：

$$
\text{ATE} = E[Y_1 - Y_0]
$$

**原因**：匹配是以处理组为基准，为其寻找对照组。

**估计**：

$$
\widehat{\text{ATT}} = \frac{1}{N_T} \sum_{i: T_i=1} \left( Y_i - \frac{1}{M_i} \sum_{j \in \mathcal{M}(i)} Y_j \right)
$$

其中 $\mathcal{M}(i)$ 是个体 $i$ 的匹配集，$M_i$ 是匹配个数。

### 6.2 方差估计

**挑战**：
- 匹配样本不独立（尤其是有放回匹配）
- 倾向得分是估计出来的（估计误差）

**解决方案**：
1. **Abadie-Imbens 方差估计**（考虑匹配不确定性）
2. **Bootstrap**（重抽样）
3. **稳健标准误**

In [None]:
# TODO 3: 实现 ATT 估计和 Bootstrap 方差估计
def estimate_att(matched_df):
    """
    估计 ATT
    """
    treated = matched_df[matched_df['treatment']==1]
    control = matched_df[matched_df['treatment']==0]
    
    att = treated['spending'].mean() - control['spending'].mean()
    return att

def bootstrap_att(matched_df, n_bootstrap=1000):
    """
    Bootstrap 估计 ATT 的标准误和置信区间
    
    参数:
        matched_df: 匹配后的数据框
        n_bootstrap: Bootstrap 次数
    
    返回:
        att, se, ci_lower, ci_upper
    """
    # 提示:
    # 1. 对匹配后的数据进行有放回抽样（保持配对结构）
    # 2. 计算每次 Bootstrap 样本的 ATT
    # 3. 用 Bootstrap 分布估计标准误和置信区间
    
    att_point = estimate_att(matched_df)
    
    # 识别匹配对（假设按顺序排列：处理-对照-处理-对照...）
    treated_indices = matched_df[matched_df['treatment']==1].index
    control_indices = matched_df[matched_df['treatment']==0].index
    
    # 确保配对
    n_pairs = min(len(treated_indices), len(control_indices))
    
    att_bootstrap = []
    for _ in range(n_bootstrap):
        # 有放回抽样配对
        sampled_pair_indices = np.random.choice(n_pairs, n_pairs, replace=True)
        
        sampled_treated = matched_df.loc[treated_indices[sampled_pair_indices]]
        sampled_control = matched_df.loc[control_indices[sampled_pair_indices]]
        
        att_b = sampled_treated['spending'].mean() - sampled_control['spending'].mean()
        att_bootstrap.append(att_b)
    
    att_bootstrap = np.array(att_bootstrap)
    se = att_bootstrap.std()
    ci_lower = np.percentile(att_bootstrap, 2.5)
    ci_upper = np.percentile(att_bootstrap, 97.5)
    
    return att_point, se, ci_lower, ci_upper

# 估计不同匹配方法的 ATT
methods = {
    '精确匹配': matched_exact,
    'PSM (1:1)': matched_psm_11,
    'PSM (1:3)': matched_psm_13,
    '马氏距离': matched_maha
}

print(f"真实 ATE: {true_ate}\n")
print("ATT 估计结果:")
print("-" * 70)

for name, matched_df in methods.items():
    if len(matched_df) > 0:
        att, se, ci_lower, ci_upper = bootstrap_att(matched_df, n_bootstrap=500)
        bias = att - true_ate
        print(f"{name:15} ATT={att:7.2f}  SE={se:6.2f}  95% CI=[{ci_lower:7.2f}, {ci_upper:7.2f}]  Bias={bias:7.2f}")
    else:
        print(f"{name:15} 无法匹配")

In [None]:
# 可视化效应估计
def plot_treatment_effect_estimates(methods_dict, true_ate):
    """
    可视化不同方法的效应估计
    """
    results = []
    
    for name, matched_df in methods_dict.items():
        if len(matched_df) > 0:
            att, se, ci_lower, ci_upper = bootstrap_att(matched_df, n_bootstrap=500)
            results.append({
                'method': name,
                'att': att,
                'se': se,
                'ci_lower': ci_lower,
                'ci_upper': ci_upper
            })
    
    results_df = pd.DataFrame(results)
    
    fig = go.Figure()
    
    # 点估计
    fig.add_trace(go.Scatter(
        x=results_df['att'],
        y=results_df['method'],
        mode='markers',
        marker=dict(size=12, color='#2D9CDB'),
        error_x=dict(
            type='data',
            symmetric=False,
            array=results_df['ci_upper'] - results_df['att'],
            arrayminus=results_df['att'] - results_df['ci_lower']
        ),
        name='ATT 估计'
    ))
    
    # 真实值
    fig.add_vline(x=true_ate, line_dash="dash", line_color="#27AE60",
                  annotation_text=f"真实 ATE = {true_ate}",
                  annotation_position="top right")
    
    fig.update_layout(
        title='不同匹配方法的 ATT 估计及 95% 置信区间',
        xaxis_title='ATT',
        yaxis_title='匹配方法',
        template='plotly_white',
        height=400
    )
    
    return fig

fig = plot_treatment_effect_estimates(methods, true_ate)
fig.show()

### 6.3 敏感性分析

**问题**：如果存在未观测的混淆因素怎么办？

**Rosenbaum 边界 (Rosenbaum Bounds)**：评估未观测混淆对结果的影响。

**思路**：假设存在一个未观测变量 $U$，使得两个协变量相同的个体在处理概率上相差 $\Gamma$ 倍：

$$
\frac{1}{\Gamma} \leq \frac{P(T=1 \mid X, U)}{P(T=1 \mid X, U')} \leq \Gamma
$$

计算在不同 $\Gamma$ 下，结论是否会改变（p 值边界）。

**解读**：
- $\Gamma = 1$：无未观测混淆
- $\Gamma = 2$：未观测混淆使处理概率相差 2 倍
- 如果在 $\Gamma = 2$ 下结论仍显著，说明结果对未观测混淆较稳健

In [None]:
# 简化版 Rosenbaum 敏感性分析
def rosenbaum_bounds_simulation(matched_df, gamma_range=np.arange(1, 3.1, 0.2)):
    """
    模拟 Rosenbaum 边界
    
    思路：
    - 对于每个 gamma，模拟未观测混淆导致的处理分配偏差
    - 重新分配处理，计算效应估计
    - 看效应是否仍显著
    """
    treated = matched_df[matched_df['treatment']==1]
    control = matched_df[matched_df['treatment']==0]
    
    observed_effect = treated['spending'].mean() - control['spending'].mean()
    
    results = []
    
    for gamma in gamma_range:
        # 模拟 1000 次
        effects = []
        for _ in range(1000):
            # 为每对匹配个体，以概率 p 互换处理状态
            # p 的范围由 gamma 决定
            p = gamma / (1 + gamma)
            
            # 随机翻转一部分配对
            n_pairs = min(len(treated), len(control))
            flip = np.random.binomial(1, p, n_pairs)
            
            # 重新计算效应（这里简化处理）
            perturbed_effect = observed_effect * (1 - 0.1 * (gamma - 1) * np.random.uniform(0.5, 1.5))
            effects.append(perturbed_effect)
        
        effects = np.array(effects)
        p_value = (effects <= 0).mean()  # 单侧检验
        
        results.append({
            'gamma': gamma,
            'p_value': p_value,
            'effect_lower': np.percentile(effects, 5),
            'effect_upper': np.percentile(effects, 95)
        })
    
    return pd.DataFrame(results), observed_effect

sensitivity_df, obs_effect = rosenbaum_bounds_simulation(matched_psm_11)

# 可视化
fig = make_subplots(rows=1, cols=2, 
                    subplot_titles=['P-value vs Gamma', 'Effect Bounds vs Gamma'])

# P-value
fig.add_trace(
    go.Scatter(x=sensitivity_df['gamma'], y=sensitivity_df['p_value'],
               mode='lines+markers', name='P-value',
               line=dict(color='#2D9CDB')),
    row=1, col=1
)
fig.add_hline(y=0.05, line_dash="dash", line_color="red",
              annotation_text="α=0.05", row=1, col=1)

# Effect bounds
fig.add_trace(
    go.Scatter(x=sensitivity_df['gamma'], y=sensitivity_df['effect_upper'],
               mode='lines', name='Upper bound',
               line=dict(color='#27AE60', dash='dash')),
    row=1, col=2
)
fig.add_trace(
    go.Scatter(x=sensitivity_df['gamma'], y=sensitivity_df['effect_lower'],
               mode='lines', name='Lower bound',
               line=dict(color='#EB5757', dash='dash'),
               fill='tonexty', fillcolor='rgba(45, 156, 219, 0.2)'),
    row=1, col=2
)
fig.add_hline(y=0, line_dash="dash", line_color="gray", row=1, col=2)
fig.add_hline(y=obs_effect, line_dash="dot", line_color="#2D9CDB",
              annotation_text=f"Observed = {obs_effect:.1f}", row=1, col=2)

fig.update_xaxes(title_text="Γ (未观测混淆强度)", row=1, col=1)
fig.update_xaxes(title_text="Γ (未观测混淆强度)", row=1, col=2)
fig.update_yaxes(title_text="P-value", row=1, col=1)
fig.update_yaxes(title_text="ATT", row=1, col=2)

fig.update_layout(
    title_text='Rosenbaum 敏感性分析（模拟）',
    template='plotly_white',
    height=400,
    showlegend=True
)

fig.show()

print("\n敏感性分析结果:")
print(sensitivity_df.head(10).to_string(index=False))

---

## Part 7: 业务案例

### 案例 1: 评估会员权益效果

**背景**：电商平台推出付费会员服务，需要评估会员权益对用户消费的真实影响。

**挑战**：
- 会员用户本身就是高价值用户（选择偏差）
- 无法进行随机化实验（商业考虑）

**方案**：使用 PSM 匹配

**步骤**：
1. 收集协变量：年龄、历史消费、购物频次、注册时长等
2. 估计倾向得分
3. 1:3 匹配（提高统计效率）
4. 检查平衡性
5. 估计 ATT
6. 敏感性分析

In [None]:
# 完整的会员权益评估流程
print("=" * 70)
print("会员权益效果评估报告")
print("=" * 70)

print("\n1. 数据概览")
print(f"   总样本量: {len(df)}")
print(f"   会员用户: {df['treatment'].sum()} ({df['treatment'].mean()*100:.1f}%)")
print(f"   非会员用户: {(1-df['treatment']).sum()} ({(1-df['treatment']).mean()*100:.1f}%)")

print("\n2. 协变量平衡性（匹配前）")
for cov in covariates:
    t_mean = df[df['treatment']==1][cov].mean()
    c_mean = df[df['treatment']==0][cov].mean()
    print(f"   {cov:15} 处理组={t_mean:8.2f}  对照组={c_mean:8.2f}  差异={t_mean-c_mean:8.2f}")

print("\n3. PSM 1:3 匹配")
print(f"   匹配后样本量: {len(matched_psm_13)}")
print(f"   处理组保留率: {matched_psm_13['treatment'].sum() / df['treatment'].sum() * 100:.1f}%")

print("\n4. 协变量平衡性（匹配后）")
for cov in covariates:
    t_mean = matched_psm_13[matched_psm_13['treatment']==1][cov].mean()
    c_mean = matched_psm_13[matched_psm_13['treatment']==0][cov].mean()
    smd = abs(compute_smd(matched_psm_13, [cov])[cov])
    status = "✓" if smd < 0.1 else ("⚠" if smd < 0.25 else "✗")
    print(f"   {cov:15} 处理组={t_mean:8.2f}  对照组={c_mean:8.2f}  SMD={smd:.3f} {status}")

att, se, ci_lower, ci_upper = bootstrap_att(matched_psm_13, n_bootstrap=1000)
print("\n5. 效应估计")
print(f"   ATT: {att:.2f} 元/月")
print(f"   标准误: {se:.2f}")
print(f"   95% 置信区间: [{ci_lower:.2f}, {ci_upper:.2f}]")
print(f"   是否显著: {'是 (p<0.05)' if ci_lower > 0 else '否'}")

print("\n6. 业务解读")
print(f"   - 会员权益使用户月消费平均增加 {att:.0f} 元")
print(f"   - 如果会员年费为 {12*50} 元，ROI = {att*12 / (12*50):.2f}")
print(f"   - 推荐: {'值得推广' if att > 200 else '需要优化权益'}")

print("\n" + "=" * 70)

### 案例 2: 广告投放效果评估

**背景**：某 App 在部分城市投放了户外广告，想评估广告对新用户注册的影响。

**数据**：
- 处理组：投放广告的城市
- 对照组：未投放广告的城市
- 协变量：城市人口、GDP、互联网渗透率、竞品数量等

**挑战**：
- 样本量小（城市维度）
- 协变量维度高
- 地理相关性

**方案**：马氏距离匹配 + 地理加权

In [None]:
# TODO 4: 实现广告投放效果评估
def generate_advertising_data(n_cities=100):
    """
    生成广告投放数据（城市维度）
    """
    # 协变量
    population = np.random.gamma(5, 100, n_cities)  # 人口（万）
    gdp = np.random.gamma(10, 500, n_cities)  # GDP（亿）
    internet_penetration = np.random.beta(8, 2, n_cities)  # 互联网渗透率
    competitors = np.random.poisson(3, n_cities)  # 竞品数量
    
    # 处理分配（倾向于人口多、GDP高的城市）
    propensity = 1 / (1 + np.exp(-(-2 + 0.01*population + 0.001*gdp)))
    treatment = (np.random.uniform(0, 1, n_cities) < propensity).astype(int)
    
    # 潜在结果（新用户注册数）
    y0 = (10*population + 0.5*gdp + 100*internet_penetration - 20*competitors + 
          np.random.normal(0, 50, n_cities))
    true_effect = 100  # 广告带来 100 个新用户
    y1 = y0 + true_effect
    
    y_obs = treatment * y1 + (1 - treatment) * y0
    
    df = pd.DataFrame({
        'city_id': range(n_cities),
        'population': population,
        'gdp': gdp,
        'internet_penetration': internet_penetration,
        'competitors': competitors,
        'treatment': treatment,
        'new_users': y_obs,
        'y0': y0,
        'y1': y1
    })
    
    return df, true_effect

df_ad, true_effect_ad = generate_advertising_data(100)

# 使用马氏距离匹配
ad_covariates = ['population', 'gdp', 'internet_penetration', 'competitors']
matched_ad = mahalanobis_matching(df_ad, ad_covariates, ratio=2)

# 评估
print("广告投放效果评估")
print(f"真实效果: {true_effect_ad} 新用户")
print(f"\n匹配样本量: {len(matched_ad)} (城市对)")

# 平衡性
balance_ad = balance_assessment(df_ad, matched_ad, ad_covariates)
print("\n平衡性检验:")
print(balance_ad.to_string(index=False))

# 效应估计
att_ad, se_ad, ci_lower_ad, ci_upper_ad = bootstrap_att(matched_ad, n_bootstrap=500)
print(f"\nATT: {att_ad:.2f} 新用户")
print(f"95% CI: [{ci_lower_ad:.2f}, {ci_upper_ad:.2f}]")
print(f"偏差: {att_ad - true_effect_ad:.2f}")

---

## Part 8: 练习与思考题

### 练习 1: 完善卡尺选择

实现一个函数，自动选择最优卡尺（通过交叉验证平衡性和样本量）。

In [None]:
# TODO 5: 实现最优卡尺选择
def optimal_caliper_selection(df, caliper_range=np.arange(0.05, 0.5, 0.05)):
    """
    选择最优卡尺
    
    目标：平衡匹配质量（SMD）和样本保留率
    
    返回:
        optimal_caliper, results_df
    """
    # 提示:
    # 1. 对每个卡尺值进行匹配
    # 2. 计算平均 SMD 和样本保留率
    # 3. 定义一个综合指标（如：样本保留率 - λ * 平均SMD）
    # 4. 选择综合指标最大的卡尺
    
    pass  # 你的代码

# 测试
# optimal_caliper, results = optimal_caliper_selection(df)
# print(f"最优卡尺: {optimal_caliper}")

### 思考题

1. **匹配 vs 回归**：什么时候用匹配，什么时候用回归？能否结合两者？

2. **多值处理**：如果处理不是二元的（如：广告投放强度 0/50%/100%），如何匹配？

3. **时间维度**：如果需要评估处理的动态效应（如：会员第 1 个月 vs 第 6 个月的效果），匹配方法如何调整？

4. **网络效应**：如果个体之间存在网络效应（如：用户的朋友也是会员），匹配假设是否成立？

5. **外部有效性**：匹配方法估计的 ATT 能否推广到未匹配的总体？

---

## 总结

### 核心要点

1. **匹配的本质**：通过构建可比的对照组，模拟随机化

2. **关键假设**：条件独立性假设 (CIA) + 共同支撑

3. **方法选择**：
   - 精确匹配：协变量少且离散
   - PSM：协变量多，处理分配机制清晰
   - 马氏距离：协变量相关性强

4. **匹配质量**：必须检查平衡性（SMD < 0.1）

5. **敏感性分析**：评估未观测混淆的影响

### 最佳实践

1. **预分析**：先可视化协变量分布，了解不平衡程度
2. **多种方法**：尝试不同匹配算法，比较稳健性
3. **保留诊断**：报告匹配前后的平衡性统计
4. **透明报告**：说明样本损失、匹配参数选择
5. **结合方法**：匹配后可以再做回归调整（doubly robust）

### 延伸阅读

- Rosenbaum & Rubin (1983): "The Central Role of the Propensity Score"
- Stuart (2010): "Matching Methods for Causal Inference: A Review"
- Imbens & Rubin (2015): *Causal Inference for Statistics, Social, and Biomedical Sciences*

---

**下一讲预告**：工具变量法 - 当存在未观测混淆时如何识别因果效应？