In [2]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy import stats

# 设置随机种子
np.random.seed(42)


In [3]:
# 实验参数
n = 10000  # 总样本量
tau = 0.1  # 真实处理效应
strata_weights = np.array([0.5, 0.3, 0.2])  # 层权重
n_strata = len(strata_weights)

# 生成分层变量
strata = np.random.choice([0, 1, 2], size=n, p=strata_weights)

# 生成协变量X（实验前指标）
X = np.zeros(n)
for s in range(n_strata):
    mask = (strata == s)
    size = mask.sum()
    if s == 0:
        X[mask] = np.random.normal(10, 2, size)
    elif s == 1:
        X[mask] = np.random.normal(20, 4, size)
    else:
        X[mask] = np.random.normal(30, 6, size)

# 生成处理组分配（整体随机化）
treatment = np.random.choice([0, 1], size=n, p=[0.5, 0.5])

# 生成结果变量Y（实验后指标）
epsilon = np.random.normal(0, 3, n)  # 噪声项
Y = X + tau * treatment + epsilon

# 创建数据框
df = pd.DataFrame({
    'strata': strata,
    'X': X,
    'treatment': treatment,
    'Y': Y
})

# 事前分层采样数据（分层随机化）
df_stratified = df.copy()
df_stratified['treatment_stratified'] = 0
for s in range(n_strata):
    mask = (df_stratified['strata'] == s)
    size = mask.sum()
    df_stratified.loc[mask, 'treatment_stratified'] = np.random.choice(
        [0, 1], size=size, p=[0.5, 0.5]
    )

In [4]:
def simple_random_sampling(df):
    control = df[df['treatment'] == 0]['Y']
    treated = df[df['treatment'] == 1]['Y']
    
    ate = treated.mean() - control.mean()
    var_ate = control.var()/len(control) + treated.var()/len(treated)
    se = np.sqrt(var_ate)
    ci_low = ate - 1.96 * se
    ci_high = ate + 1.96 * se
    t_stat = ate / se
    p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
    
    return {
        'method': 'Simple Random Sampling',
        'ate': ate,
        'se': se,
        'ci_low': ci_low,
        'ci_high': ci_high,
        'p_value': p_value
    }

In [5]:
simple_random_sampling(df)

{'method': 'Simple Random Sampling',
 'ate': np.float64(0.12195268728070019),
 'se': np.float64(0.18406137158835204),
 'ci_low': np.float64(-0.23880760103246979),
 'ci_high': np.float64(0.48271297559387016),
 'p_value': np.float64(0.5076089674342916)}

In [27]:
def stratified_sampling(df, strata_weights: np.ndarray=np.array([0.5, 0.3, 0.2])):
    # 存储每个分层的均值
    strata_means_control = []
    strata_means_treated = []
    # 存储每个分层的 方差
    strata_vars_control = []
    strata_vars_treated = []

    #存储每个分层的样本量
    strata_sizes = []


    for s in range(n_strata):
        df_strata = df[df['strata'] == s]
        control = df_strata[df_strata['treatment'] == 0]['Y']
        treated = df_strata[df_strata['treatment'] == 1]['Y']
        
        strata_means_control.append(control.mean())
        strata_means_treated.append(treated.mean())

        strata_vars_control.append(control.var())
        strata_vars_treated.append(treated.var())
        
        strata_sizes.append(len(df_strata))
    # 计算ATE
    ate = np.dot(strata_weights, np.array(strata_means_treated)) - np.dot(strata_weights, np.array(strata_means_control))

    n_control = len(df[df['treatment'] == 0])
    n_treated = len(df[df['treatment'] == 1])
    #计算ATE的方差
    var_ate = 1 / n_control * np.dot(strata_weights, np.array(strata_vars_control)) + 1 / n_treated * np.dot(strata_weights, np.array(strata_vars_treated)) +\
                1/n_control**2 *np.dot(1-strata_weights, np.array(strata_vars_control)) + 1/n_treated**2 *np.dot(1-strata_weights, np.array(strata_vars_treated))
    se = np.sqrt(var_ate)
    ci_low = ate - 1.96 * se
    ci_high = ate + 1.96 * se
    t_stat = ate / se
    p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
    
    return {
        'method': 'Post-Stratification',
        'ate': ate,
        'se': se,
        'ci_low': ci_low,
        'ci_high': ci_high,
        'p_value': p_value
    }

In [28]:
stratified_sampling(df_stratified, strata_weights)

{'method': 'Post-Stratification',
 'ate': np.float64(0.028270895578820898),
 'se': np.float64(0.09722197409053267),
 'ci_low': np.float64(-0.16228417363862313),
 'ci_high': np.float64(0.21882596479626493),
 'p_value': np.float64(0.7712141544885893)}

In [21]:
def stratified_sampling(df):
    strata_means = []
    strata_vars = []
    
    for s in range(n_strata):
        df_strata = df[df['strata'] == s]
        control = df_strata[df_strata['treatment_stratified'] == 0]['Y']
        treated = df_strata[df_strata['treatment_stratified'] == 1]['Y']
        
        strata_means.append(treated.mean() - control.mean())
        strata_vars.append(control.var()/len(control) + treated.var()/len(treated))
        
    # 使用实验前的分层比例作为权重
    ate = np.dot(strata_weights, strata_means)
    var_ate = np.dot(strata_weights**2, strata_vars)
    se = np.sqrt(var_ate)
    ci_low = ate - 1.96 * se
    ci_high = ate + 1.96 * se
    t_stat = ate / se
    p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
    
    return {
        'method': 'Stratified Sampling',
        'ate': ate,
        'se': se,
        'ci_low': ci_low,
        'ci_high': ci_high,
        'p_value': p_value
    }

In [None]:
def post_stratification(df):
    strata_means = []
    strata_vars = []
    strata_sizes = []


    for s in range(n_strata):
        df_strata = df[df['strata'] == s]
        control = df_strata[df_strata['treatment'] == 0]['Y']
        treated = df_strata[df_strata['treatment'] == 1]['Y']
        
        strata_means.append(treated.mean() - control.mean())
        strata_vars.append(control.var()/len(control) + treated.var()/len(treated))
        strata_sizes.append(len(df_strata))
    
    strata_weights = np.array(strata_sizes) / sum(strata_sizes)
    ate = np.dot(strata_weights, strata_means)
    var_ate = np.dot(strata_weights**2, strata_vars)
    se = np.sqrt(var_ate)
    ci_low = ate - 1.96 * se
    ci_high = ate + 1.96 * se
    t_stat = ate / se
    p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
    
    return {
        'method': 'Post-Stratification',
        'ate': ate,
        'se': se,
        'ci_low': ci_low,
        'ci_high': ci_high,
        'p_value': p_value
    }

In [24]:
post_stratification(df)

{'method': 'Post-Stratification',
 'ate': np.float64(0.02728485429708627),
 'se': np.float64(0.09683683676329485),
 'ci_low': np.float64(-0.16251534575897164),
 'ci_high': np.float64(0.21708505435314418),
 'p_value': np.float64(0.7781266954297545)}

In [None]:
def post_stratification(df):
    # 存储每个分层的均值
    strata_means_control = []
    strata_means_treated = []
    # 存储每个分层的 方差
    strata_vars_control = []
    strata_vars_treated = []

    #存储每个分层的样本量
    strata_sizes = []


    for s in range(n_strata):
        df_strata = df[df['strata'] == s]
        control = df_strata[df_strata['treatment'] == 0]['Y']
        treated = df_strata[df_strata['treatment'] == 1]['Y']
        
        strata_means_control.append(control.mean())
        strata_means_treated.append(treated.mean())

        strata_vars_control.append(control.var())
        strata_vars_treated.append(treated.var())
        
        strata_sizes.append(len(df_strata))
    # 计算权重
    strata_weights = np.array(strata_sizes) / sum(strata_sizes)
    # 计算ATE
    ate = np.dot(strata_weights, np.array(strata_means_treated)) - np.dot(strata_weights, np.array(strata_means_control))

    n_control = len(df[df['treatment'] == 0])
    n_treated = len(df[df['treatment'] == 1])
    #计算ATE的方差
    var_ate = 1 / n_control * np.dot(strata_weights, np.array(strata_vars_control)) + 1 / n_treated * np.dot(strata_weights, np.array(strata_vars_treated))
    se = np.sqrt(var_ate)
    ci_low = ate - 1.96 * se
    ci_high = ate + 1.96 * se
    t_stat = ate / se
    p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
    
    return {
        'method': 'Post-Stratification',
        'ate': ate,
        'se': se,
        'ci_low': ci_low,
        'ci_high': ci_high,
        'p_value': p_value
    }

In [25]:
post_stratification(df)

{'method': 'Post-Stratification',
 'ate': np.float64(0.02728485429708627),
 'se': np.float64(0.09683683676329485),
 'ci_low': np.float64(-0.16251534575897164),
 'ci_high': np.float64(0.21708505435314418),
 'p_value': np.float64(0.7781266954297545)}

In [29]:
def cuped(df):
    # 使用对照组估计θ
    control_df = df[df['treatment'] == 0]
    X_mean = df['X'].mean()
    model = sm.OLS(control_df['Y'], sm.add_constant(control_df['X'])).fit()
    theta = model.params[1]
    
    # 计算调整后的Y
    df['Y_adj'] = df['Y'] - theta * (df['X'] - X_mean)
    
    control = df[df['treatment'] == 0]['Y_adj']
    treated = df[df['treatment'] == 1]['Y_adj']
    
    ate = treated.mean() - control.mean()
    var_ate = control.var()/len(control) + treated.var()/len(treated)
    se = np.sqrt(var_ate)
    ci_low = ate - 1.96 * se
    ci_high = ate + 1.96 * se
    t_stat = ate / se
    p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
    
    return {
        'method': 'CUPED',
        'ate': ate,
        'se': se,
        'ci_low': ci_low,
        'ci_high': ci_high,
        'p_value': p_value
    }

In [30]:
cuped(df)

  theta = model.params[1]


{'method': 'CUPED',
 'ate': np.float64(0.03631764277105276),
 'se': np.float64(0.060784556227788326),
 'ci_low': np.float64(-0.08282008743541236),
 'ci_high': np.float64(0.15545537297751788),
 'p_value': np.float64(0.5501860104127316)}