# 业务背景：<br>
> 明确问题: Cookie Cats是一款经典的“三连棋”式益智游戏，随着玩家在游戏中不断升级，他们偶尔会遇到一些障碍，需要等待相当长的时间或进行应用内购买才能继续前进。<br>
最初，这个障碍在30层，在这篇notebook中，我将进行一个 AB 测试——把《Cookie Cats》游戏中的障碍从30层移到40层，并关注此举对玩家留存率的影响。

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.stats.proportion as ssp

In [3]:
df = pd.read_csv(r'cookie_cats.txt')

## 第一步: 检查数据集<br>
>1.无缺失值<br>2.无重复项——确保不会有用户既出现在对照组又出现在测试组

In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 90189 entries, 0 to 90188
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   userid          90189 non-null  int64 
 1   version         90189 non-null  object
 2   sum_gamerounds  90189 non-null  int64 
 3   retention_1     90189 non-null  bool  
 4   retention_7     90189 non-null  bool  
dtypes: bool(2), int64(2), object(1)
memory usage: 2.2+ MB


In [5]:
num_duplicates = df.duplicated().sum()
user_duplicates = df.duplicated(subset = 'userid').sum()
print(f"重复条目: {num_duplicates}条\n重复用户: {user_duplicates}人")

重复条目: 0条
重复用户: 0人


## 处理异常值
> 核心指标sum_gamerounds有一个值异常高，在未来会影响到实验/对照组总体均值，删除

In [6]:
df[['userid','version','sum_gamerounds']].sort_values(by='sum_gamerounds', ascending=False).head(10)

Unnamed: 0,userid,version,sum_gamerounds
57702,6390605,gate_30,49854
7912,871500,gate_30,2961
29417,3271615,gate_40,2640
43671,4832608,gate_30,2438
48188,5346171,gate_40,2294
46344,5133952,gate_30,2251
87007,9640085,gate_30,2156
36933,4090246,gate_40,2124
88328,9791599,gate_40,2063
6536,725080,gate_40,2015


In [7]:
df_clean = df[df['sum_gamerounds'] != 49854] # 删除异常值

# 问题1<br>平均游戏次数是否增加了 5 次？<br>
> 类型——单边检验<br>
## H0: 平均游戏次数增加<=5次<br>
## H1: 平均游戏次数增加> 5次<br>

均值类指标计算：<br>
按照公式<br>
$$N_1=kN_2$$
$$N_2=(1+\frac{1}{k})(\sigma\frac{Z_{1-\alpha/2}+Z_{1-\beta}}{\delta})^2$$

$N_1,N_2$: 单组样本量<br>
$k$: 样本比例<br>
$\sigma$: 数据的标准差 (Standard Deviation)<br>
$\delta$: 希望检测到的最小均值差异即MDE ($\mu_{\text{实验}} - \mu_{\text{对照}}$)<br>
$Z_{\alpha/2}$ 和 $Z_{\beta}$: 对应显著性水平和统计功效的常数(${\alpha}$=0.05 & ${\beta}$=0.2时，对应Z值为1.96和0.84)<br>
<br>
>由于数据收集受限，本例中假定：<br>
>1.控制组（'gate_30‘）组的方差与历史数据在5%显著性水平下可视为同方差<br>
>2.实验分流平均，且其他护栏指标（控制变量）保持相等

In [8]:
import scipy.stats as stats
import math

def get_sample_size(x, alpha, beta, delta, oneside = True): 
    # x为待测样本（实验组 or 对照组）
    # alpha：显著性水平
    # beta：第二类错误概率
    # delta：最小可检测效应
    if oneside:
        n = (x.var() * (stats.norm.ppf(1-alpha) + stats.norm.ppf(1-beta)) ** 2) / delta ** 2
    else:
        n = (x.var() * (stats.norm.ppf(1-alpha/2) + stats.norm.ppf(1-beta)) ** 2) / delta ** 2
    return n

control = df[df['version'] == 'gate_30']['sum_gamerounds']  # version == 30
test = df[df['version'] == 'gate_40']['sum_gamerounds'] # version == 40
n_control = math.ceil(get_sample_size(x = control, alpha = 0.05, beta = 0.2, delta = 5, oneside = True))
n_test = math.ceil(get_sample_size(x = test, alpha = 0.05, beta = 0.2, delta = 5, oneside = True))

print(n_control)
print(n_test)
print(f'需要的样本量为:{max(n_control, n_test)}')

if max(n_control, n_test) <= len(df):
    print('样本量足够')
else:
    print('样本量不足')

16299
2639
需要的样本量为:16299
样本量足够


检验样本量是否足够

## 均值类 AB test

In [9]:
def abtest_mean_type(df, col_name, control_tag, test_tag, target_col, delta, alpha = 0.05): # 均值类abtest

    # 数据提取
    control_data = df[df[col_name] == control_tag][target_col].dropna()
    test_data = df[df[col_name] == test_tag][target_col].dropna()
    
    # 计算样本量
    n_calc_c = math.ceil(get_sample_size(control_data, alpha, beta=0.2, delta=delta, oneside = True))
    n_calc_t = math.ceil(get_sample_size(test_data, alpha, beta=0.2, delta=delta, oneside = True))
    
        # 取较大值作为理论所需样本量
    needed_n = max(n_calc_c, n_calc_t)
    
    # 防止抽样数超过实际数据量 
    actual_n_c = min(len(control_data), needed_n)
    actual_n_t = min(len(test_data), needed_n)
    
    print(f"理论所需样本量(delta={delta}): {needed_n}")
    print(f"实际抽样量: Control={actual_n_c}, Test={actual_n_t}")
    
    if actual_n_c < needed_n or actual_n_t < needed_n:
        print(f"实际数据量不足以支持检测 delta={delta} 的效应")

    # 随机抽样
    control_sample = control_data.sample(n=actual_n_c, replace=False, random_state=123)
    test_sample = test_data.sample(n=actual_n_t, replace=False, random_state=123)

    # 方差齐性检验 (Levene's Test)
    # H0: 方差相等
    levene_stat, levene_p = stats.levene(control_sample, test_sample, center='median')
    
    # 根据方差齐性选择检验方法
    if levene_p > alpha:
        # P > 0.05 -> 不能拒绝原假设 -> 认为方差相等 -> 使用标准T检验
        equal_var_flag = True
        test_method_name = "Student's t-test (标准T检验)"
        variance_msg = f"方差齐 (Levene P={levene_p:.4f} > 0.05)"
    else:
        # P <= 0.05 -> 拒绝原假设 -> 方差不相等 -> 使用 Welch's T检验
        equal_var_flag = False
        test_method_name = "Welch's t-test (异方差T检验)"
        variance_msg = f"方差不齐 (Levene P={levene_p:.4f} <= 0.05)"

    # 单边 T 检验
    # equal_var 参数控制是否假设方差相等
    t_stat, p_value = stats.ttest_ind(test_sample, 
                                      control_sample + delta, # 匹配H1
                                      equal_var = equal_var_flag,
                                      alternative = 'greater') # alternative = 'greater —— 第一组test_sample大于第二组；改为'less’则相反
    is_significant = p_value < alpha
    
    mean_c = control_sample.mean()
    mean_t = test_sample.mean()
    real_diff = mean_t - mean_c
    
    print(f"步骤1 [方差]: {variance_msg}")
    print(f"步骤2 [均值]: Control={mean_c:.2f}, Test={mean_t:.2f}, 差值={real_diff:.2f}")
    print(f"步骤3 [检验]: t-stat={t_stat:.4f}, p-value={p_value:.4f} (单尾 greater)")
    
    if is_significant:
        result = f"显著提升 (Reject H0)。Test组均值显著高于Control组。"
    else:
        result = f"无显著提升 (Fail to Reject H0)。"
        
    print(f"最终结论: {result}")

    return {
        't_stat': t_stat,
        'p_value': p_value,
        'mean_diff': real_diff,
        'result': result
    }

In [10]:
abtest_mean_type(df, 'version', 'gate_30', 'gate_40', 'sum_gamerounds', alpha = 0.05, delta = 5)

理论所需样本量(delta=5): 16299
实际抽样量: Control=16299, Test=16299
步骤1 [方差]: 方差齐 (Levene P=0.4839 > 0.05)
步骤2 [均值]: Control=50.59, Test=51.12, 差值=0.53
步骤3 [检验]: t-stat=-3.9801, p-value=1.0000 (单尾 greater)
最终结论: 无显著提升 (Fail to Reject H0)。


{'t_stat': np.float64(-3.9800972757503703),
 'p_value': np.float64(0.9999654819183008),
 'mean_diff': np.float64(0.530339284618691),
 'result': '无显著提升 (Fail to Reject H0)。'}

实际上，在样本量很大的情况下（本例中为20k+）T分布收敛于Z分布<br>
下用Z检验执行均值类AB test

In [11]:
from statsmodels.stats.weightstats import ztest

def abtest_large_sample(df, col_name, control_tag, test_tag, target_col, delta, alpha = 0.05, beta = 0.2):
    # 1. 数据提取
    control_data = df[df[col_name] == control_tag][target_col].dropna()
    test_data = df[df[col_name] == test_tag][target_col].dropna()
    
    # 计算样本量
    n_calc_c = math.ceil(get_sample_size(control_data, alpha = alpha, beta=beta, delta=delta, oneside = True))
    n_calc_t = math.ceil(get_sample_size(test_data, alpha = alpha, beta=beta, delta=delta, oneside = True))
    
        # 取较大值作为理论所需样本量
    needed_n = max(n_calc_c, n_calc_t)
    
    # 防止抽样数超过实际数据量 
    actual_n_c = min(len(control_data), needed_n)
    actual_n_t = min(len(test_data), needed_n)
    
    print(f"理论所需样本量(delta={delta}): {needed_n}")
    print(f"实际抽样量: Control={actual_n_c}, Test={actual_n_t}")
    
    if actual_n_c < needed_n or actual_n_t < needed_n:
        print(f"实际数据量不足以支持检测 delta={delta} 的效应")

    # 随机抽样
    control_sample = control_data.sample(n=actual_n_c, replace=False, random_state=123)
    test_sample = test_data.sample(n=actual_n_t, replace=False, random_state=123)

    # 2. 方差齐性检验 (Levene)
    levene_stat, levene_p = stats.levene(test_sample, control_sample, center='median')
    
    # 3. 执行 Z 检验
    print(f"--- 样本量: Control={len(control_sample)}, Test={len(test_sample)} (使用 Z-test) ---")
    
    if levene_p > 0.05:
        # 方差齐：使用合并方差 (Pooled Variance)
        # usevar='pooled' 相当于 equal_var=True
        z_stat, p_value = ztest(test_sample - 5, control_sample, alternative='larger', usevar='pooled') # alternative='larger'表示test > control + 5
        method = "Z-test (Pooled Variance)"
    else:
        # 方差不齐：使用分离方差
        # usevar='unequal' 相当于 equal_var=False
        z_stat, p_value = ztest(test_sample - 5, control_sample, alternative='larger', usevar='unequal')
        method = "Z-test (Unequal Variance)"

    print(f"方差检验 P值: {levene_p:.4f}")
    print(f"使用方法: {method}")
    print(f"统计量 Z: {z_stat:.4f}, P值: {p_value:.4f}")
    
    return p_value

abtest_large_sample(df, 'version', 'gate_30', 'gate_40', 'sum_gamerounds', delta = 5)

理论所需样本量(delta=5): 16299
实际抽样量: Control=16299, Test=16299
--- 样本量: Control=16299, Test=16299 (使用 Z-test) ---
方差检验 P值: 0.4839
使用方法: Z-test (Pooled Variance)
统计量 Z: -3.9801, P值: 1.0000


np.float64(0.9999655564630385)

# 问题 2<br>1 天后玩家留存率是否相对提高了2%？

In [49]:
df.head()

Unnamed: 0,userid,version,sum_gamerounds,retention_1,retention_7
0,116,gate_30,3,False,0
1,337,gate_30,38,True,0
2,377,gate_40,165,True,0
3,483,gate_40,1,False,0
4,488,gate_40,179,True,1


## 比例类指标样本量计算

In [21]:
total_sample_control_1day = df[df['version'] == 'gate_30']['retention_1'] # 控制组样本
total_sample_test_1day = df[df['version'] == 'gate_40']['retention_1']
print(f'控制组样本量：{len(total_sample_control_1day)}')
print(f'实验组样本量:{len(total_sample_test_1day)}')
print('=' * 30) 
# 控制组留存数量
control_retained = (total_sample_control_1day == 1).sum()
test_retained = (total_sample_test_1day == 1).sum()
print(f"控制组留存数量:{control_retained}")
print(f"测试组留存数量:{test_retained}")
print('=' * 30) 
# 留存率
control_retained_rate = control_retained / len(total_sample_control_1day)
test_retained_rate = test_retained / len(total_sample_test_1day)
print(f"控制组留存率:{control_retained_rate}")
print(f"测试组留存率:{test_retained_rate}")

控制组样本量：44700
实验组样本量:45489
控制组留存数量:20034
测试组留存数量:20119
控制组留存率:0.4481879194630872
测试组留存率:0.44228274967574577


## 使用公式计算比率类指标样本量：<br>
方法一<br>
$$n=\frac{(Z_{1-\alpha/2}+Z_{1-\beta})^2\cdot(p(1-p))}{\delta^2}$$
方法二<br>
$$n=\frac{(z_{1-\alpha/2}\sqrt{2\frac{p_1+p_2}{2}(1-\frac{p_1+p_2}{2})}+z_{1-\beta}\sqrt{p_1(1-p_1)+p_2(1-p_2)})^2}{\delta^2}$$

> 本例中假定方差不发生变化，故使用方法一进行计算<br>
> p：baseline中（control group）的比率

In [None]:
from statsmodels.stats.proportion import proportions_ztest

# 计算MDE
delta_p = control_retained_rate * 0.02

# 计算每组样本量
def get_sample_size_p(p, delta, alpha = 0.05, beta = 0.2, oneside = True):
    if oneside:
        n = math.ceil(((stats.norm.ppf(1-alpha) + stats.norm.ppf(1-beta))**2 * p * (1-p)) / delta ** 2)
    else:
        n =  math.ceil(((stats.norm.ppf(1-alpha/2) + stats.norm.ppf(1-beta))**2 * p * (1-p)) / delta ** 2)
    return n

sample_size_prop = get_sample_size_p(p = 0.4482, delta = delta_p, alpha = 0.05, beta = 0.2, oneside = True)
print(f'单组样本量:{sample_size_prop}')



def abtest_prop_type(df, col_name, control_tag, test_tag, target_col, alpha=0.05):
    df[target_col] = df[target_col].astype(int)
    # 数据提取
    control_data = df[df[col_name] == control_tag][target_col].dropna()
    test_data = df[df[col_name] == test_tag][target_col].dropna()

    real_n_c = len(control_data)
    real_n_t = len(test_data)
    sample_size_prop
    
    if real_n_c < sample_size_prop or real_n_t < sample_size_prop:
        print(f"实际数据不足以满足计算出的样本量 {sample_size_prop}。将使用实际最大可用数据。")
        control_sample = control_data
        test_sample = test_data
    else:
        # 随机抽样 (模拟按照设定样本量进行实验)
        control_sample = control_data.sample(n=sample_size_prop, replace=False, random_state=123)
        test_sample = test_data.sample(n=sample_size_prop, replace=False, random_state=123)
    
    # “成功次数 (count)”和“总数 (nobs)”
    count_c = control_sample.sum() # 对照组留存人数
    n_c = len(control_sample)      # 对照组总人数
    
    count_t = test_sample.sum()    # 实验组留存人数
    n_t = len(test_sample)         # 实验组总人数
    
    counts = np.array([count_t, count_c])   # [实验组成功数, 对照组成功数]
    nobs = np.array([n_t, n_c])             # [实验组总数, 对照组总数]

    # Z 检验
    # alternative='larger' 表示实验组比例 > 对照组比例
    z_stat, p_value = proportions_ztest(count=counts, 
                                        nobs=nobs, 
                                        alternative='larger')
    
    # 5. 计算实际观测到的均值
    mean_c = count_c / n_c
    mean_t = count_t / n_t
    real_diff_abs = mean_t - mean_c
    real_diff_rel = (mean_t - mean_c) / mean_c if mean_c != 0 else 0

    # 6. 判定结果
    is_significant = p_value < alpha
    result_str = "显著提升 (Reject H0)" if is_significant else "无显著提升 (Fail to Reject H0)"

    # 7. 打印报告
    print("-" * 30)
    print(f"样本量 (每组): {n_t}")
    print(f"步骤1 [均值]: Control={mean_c:.4%}, Test={mean_t:.4%}")
    print(f"步骤2 [差异]: 绝对差异={real_diff_abs:.4%}, 相对提升={real_diff_rel:.2%}")
    print(f"步骤3 [检验]: Z-score={z_stat:.4f}, P-value={p_value:.4f}")
    print(f"最终结论: {result_str}")
    print("-" * 30)

    return {
        'z_stat': z_stat,
        'p_value': p_value,
        'mean_control': mean_c,
        'mean_test': mean_t,
        'diff_abs': real_diff_abs,
        'result': result_str
    }
abtest_prop_type(df, 'version', 'gate_30', 'gate_40', 'retention_1')

单组样本量:19030
******************************
------------------------------
样本量 (每组): 19030
步骤1 [均值]: Control=44.9553%, Test=44.2039%
步骤2 [差异]: 绝对差异=-0.7514%, 相对提升=-1.67%
步骤3 [检验]: Z-score=-1.4747, P-value=0.9299
最终结论: 无显著提升 (Fail to Reject H0)
------------------------------


{'z_stat': np.float64(-1.4746831678040735),
 'p_value': np.float64(0.9298511282609606),
 'mean_control': np.float64(0.44955333683657384),
 'mean_test': np.float64(0.4420388859695218),
 'diff_abs': np.float64(-0.007514450867052047),
 'result': '无显著提升 (Fail to Reject H0)'}

# 问题 3<br>7 天后玩家留存率是否提高了 5%

In [53]:
total_sample_control_7day = df[df['version'] == 'gate_30']['retention_7'] # 控制组样本
total_sample_test_7day = df[df['version'] == 'gate_40']['retention_7']

# 控制组留存数量
control_retained_7day = (total_sample_control_7day == 1).sum()
test_retained_7day = (total_sample_test_7day == 1).sum()
print(f"控制组留存数量:{control_retained_7day }")
print(f"测试组留存数量:{test_retained_7day }")
print('=' * 30) 
# 留存率
control_retained_rate_7day = control_retained_7day / len(total_sample_control_7day)
test_retained_rate_7day = test_retained_7day / len(total_sample_test_7day)
print(f"控制组留存率:{control_retained_rate_7day}")
print(f"测试组留存率:{test_retained_rate_7day}")

控制组留存数量:8502
测试组留存数量:8279
控制组留存率:0.19020134228187918
测试组留存率:0.18200004396667327


In [55]:
from statsmodels.stats.proportion import proportions_ztest

# 计算MDE
delta_p_7day = control_retained_rate_7day * 0.05

# 计算每组样本量
def get_sample_size_p(p, delta, alpha = 0.05, beta = 0.2, oneside = True):
    if oneside:
        n = math.ceil(((stats.norm.ppf(1-alpha) + stats.norm.ppf(1-beta))**2 * p * (1-p)) / delta ** 2)
    else:
        n =  math.ceil(((stats.norm.ppf(1-alpha/2) + stats.norm.ppf(1-beta))**2 * p * (1-p)) / delta ** 2)
    return n

sample_size_prop_7day = get_sample_size_p(p = 0.4482, delta = delta_p_7day, alpha = 0.05, beta = 0.2, oneside = True)
print(f'单组样本量:{sample_size_prop_7day}')


def abtest_prop_type(df, col_name, control_tag, test_tag, target_col, alpha=0.05):
    df[target_col] = df[target_col].astype(int)
    # 数据提取
    control_data_7day = df[df[col_name] == control_tag][target_col].dropna()
    test_data_7day = df[df[col_name] == test_tag][target_col].dropna()

    real_n_c = len(control_data_7day)
    real_n_t = len(test_data_7day)
    
    if real_n_c < sample_size_prop_7day or real_n_t < sample_size_prop_7day:
        print(f"实际数据不足以满足计算出的样本量 {sample_size_prop_7day}。将使用实际最大可用数据。")
        control_sample_7day = control_data_7day
        test_sample_7day = test_data_7day
    else:
        # 随机抽样 (模拟按照设定样本量进行实验)
        control_sample_7day = control_data_7day.sample(n=sample_size_prop_7day, replace=False, random_state=123)
        test_sample_7day = test_data_7day.sample(n=sample_size_prop_7day, replace=False, random_state=123)
    
    # “成功次数 (count)”和“总数 (nobs)”
    count_c_7day = control_sample_7day.sum() # 对照组留存人数
    n_c_7day = len(control_sample_7day)      # 对照组总人数
    
    count_t_7day = test_sample_7day.sum()    # 实验组留存人数
    n_t_7day = len(test_sample_7day)         # 实验组总人数
    
    counts_7day = np.array([count_t_7day, count_c_7day])   # [实验组成功数, 对照组成功数]
    nobs_7day = np.array([n_t_7day, n_c_7day])             # [实验组总数, 对照组总数]

    # Z 检验
    # alternative='larger' 表示实验组比例 > 对照组比例
    z_stat, p_value = proportions_ztest(count=counts_7day, 
                                        nobs=nobs_7day, 
                                        alternative='larger')
    
    # 5. 计算实际观测到的均值
    mean_c_7day = count_c_7day / n_c_7day
    mean_t_7day = count_t_7day / n_t_7day
    real_diff_abs_7day = mean_t_7day - mean_c_7day
    real_diff_rel_7day = (mean_t_7day - mean_c_7day) / mean_c_7day if mean_c_7day != 0 else 0

    # 6. 判定结果
    is_significant_7day = p_value < alpha
    result_str_7day = "显著提升 (Reject H0)" if is_significant_7day else "无显著提升 (Fail to Reject H0)"

    # 7. 打印报告
    print("-" * 30)
    print(f"样本量 (每组): {n_t_7day}")
    print(f"步骤1 [均值]: Control={mean_c_7day:.4%}, Test={mean_t_7day:.4%}")
    print(f"步骤2 [差异]: 绝对差异={real_diff_abs_7day:.4%}, 相对提升={real_diff_rel_7day:.2%}")
    print(f"步骤3 [检验]: Z-score={z_stat:.4f}, P-value={p_value:.4f}")
    print(f"最终结论: {result_str_7day}")
    print("-" * 30)

    return {
        'z_stat': z_stat,
        'p_value': p_value,
        'mean_control': mean_c_7day,
        'mean_test': mean_t_7day,
        'diff_abs': real_diff_abs_7day,
        'result': result_str_7day
    }
abtest_prop_type(df, 'version', 'gate_30', 'gate_40', 'retention_7')

单组样本量:16907
------------------------------
样本量 (每组): 16907
步骤1 [均值]: Control=18.6609%, Test=18.6787%
步骤2 [差异]: 绝对差异=0.0177%, 相对提升=0.10%
步骤3 [检验]: Z-score=0.0419, P-value=0.4833
最终结论: 无显著提升 (Fail to Reject H0)
------------------------------


{'z_stat': np.float64(0.041867534916404166),
 'p_value': np.float64(0.4833021485389254),
 'mean_control': np.float64(0.18660909682380078),
 'mean_test': np.float64(0.1867865381203052),
 'diff_abs': np.float64(0.00017744129650443252),
 'result': '无显著提升 (Fail to Reject H0)'}