In [None]:
from aistudio_common.utils.notebook_utils import NotebookUtils
import numpy as np
from scipy import stats
import scipy as sp
import statsmodels.api as sm
from statsmodels.stats.proportion import proportions_ztest, confint_proportions_2indep
from statsmodels.formula.api import ols
from statsmodels.stats.multitest import multipletests
import pandas as pd

In [None]:
#### df format
### user_id, exp_version_name, metric1, metric2,...


### 比例类指标

#### Chi-squared test (>2组间比较)

In [None]:
contingency_table = np.array([df.groupby('exp_version_name').agg({metric1:np.sum}).values.ravel(), df.groupby('exp_version_name')['user_id'].agg('count').values-df.groupby('exp_version_name').agg({metric:np.sum}).values.ravel()])
chi2_stats,pval,_,_ = stats.chi2_contingency(contingency_table)

#### Z-test/Chi-squared test (2组间比较)

In [None]:
prop_ztest = pd.DataFrame(columns=['指标','比较','统计量','绝对差异','p值','95%置信区间下界','95%置信区间上界'])

for pair in group_pairs:
    cnt = df.loc[df['exp_version_name'].isin(pair)].groupby('exp_version_name').agg({metric: np.sum}).values.ravel()
    nobs = df.loc[df['exp_version_name'].isin(pair)].groupby('exp_version_name')['user_id'].agg('count').values
    stat, pval = proportions_ztest(cnt, nobs)
    lb, ub = confint_proportions_2indep(count1=cnt[0], nobs1=nobs[0], count2=cnt[1], nobs2=nobs[1], alpha=0.05)
    #print("Z-stats={0:.4f},p-value={1:.4f}, diff={2:.4f}, 95% CI lb={3:.4f}, ub={4:.4f}".format(stat,pval,cnt[0]/nobs[0]-cnt[1]/nobs[1],lb,ub))
    
    prop_ztest = pd.concat([prop_ztest, pd.DataFrame({"指标":name,"比较":"{0} vs {1}".format(pair[0],pair[1]), "统计量":stat,"绝对差异":cnt[0]/nobs[0]-cnt[1]/nobs[1], "p值":pval, "95%置信区间下界":lb, "95%置信区间上界":ub}, index=[0])],ignore_index=True) 

### 均值指标

#### ANOVA (>2组间比较) 

In [None]:
model = ols(metric+' ~ C(exp_version_name)', data=df).fit()
aov_table = sm.stats.anova_lm(model, typ=1)
display(aov_table)

#### T-test (2组间比较)

In [None]:
# requires scipy >= 1.6
# scipy.stats.ttest_ind_from_stats(mean1, std1, nobs1, mean2, std2, nobs2, equal_var=True, alternative='two-sided')

# scipy.stats.ttest_ind(a, b, axis=0, equal_var=True, nan_policy='propagate', permutations=None, random_state=None, alternative='two-sided', trim=0)

In [None]:
def welch_ttest(x1, x2, alpha=0.05, two_sided=True):
    
    n1, n2 = x1.size, x2.size
    m1, m2 = np.mean(x1), np.mean(x2)    
    v1, v2 = np.var(x1, ddof=1), np.var(x2, ddof=1)

    pooled_se = np.sqrt(v1 / n1 + v2 / n2)
    delta = m1-m2
    
    tstat = delta / pooled_se
    df = (v1 / n1 + v2 / n2)**2 / (v1**2 / (n1**2 * (n1 - 1)) + v2**2 / (n2**2 * (n2 - 1)))
    
    # t-test
    if two_sided:
        t = stats.t.ppf(1-alpha/2, df)
        pval = 2 * stats.t.cdf(-abs(tstat), df)
    else:
        t = stats.t.ppf(1-alpha, df)
        pval = stats.t.cdf(-abs(tstat), df)
        
    # upper and lower bounds
    lb = delta - t*pooled_se 
    ub = delta + t*pooled_se
  
    return pd.DataFrame(np.array([tstat,delta,pval,lb,ub]).reshape(1,-1),
                         columns=['统计量','绝对差异','p值','95%置信区间下界','95%置信区间上界'])

In [None]:
# Welch t test / t confidence interval
mean_ttest = pd.DataFrame(columns=['指标','比较','统计量','绝对差异','p值','95%置信区间下界','95%置信区间上界'])

for pair in group_pairs:
    ttest = welch_ttest(df.loc[df['exp_version_name']==pair[0],metric], df.loc[df['exp_version_name']==pair[1],metric])
    ttest['指标'], ttest['比较'] = name, "{0} vs {1}".format(pair[0],pair[1])
    mean_ttest = pd.concat([mean_ttest, ttest],ignore_index=True) 

### 非参数方法

#### Kruskal-Wallis test (>2组间比较)

In [None]:
KW_test = pd.DataFrame(columns=['指标','比较','统计量','p值'])

stat,pval = stats.kruskal(df.loc[df['exp_version_name']==groups[0],metric],df.loc[df['exp_version_name']==groups[1],metric],df.loc[df['exp_version_name']==groups[2],metric])
#print("KW test p-value for {0}:{1:3f}".format(name, pval))
KW_test = pd.concat([KW_test, pd.DataFrame({"指标":name,"比较":"{0} vs {1}".format(pair[0],pair[1]), "统计量":stat, "p值":pval}, index=[0])],ignore_index=True) 

#### Mann-Whitney U test (2组间比较)

In [None]:
MWU_test = pd.DataFrame(columns=['指标','比较','统计量','p值'])

for pair in group_pairs:        
    stat,pval = stats.mannwhitneyu(df.loc[df['exp_version_name']==pair[0],metric], df.loc[df['exp_version_name']==pair[1],metric])   
    MWU_test = pd.concat([MWU_test, pd.DataFrame({"指标":name,"比较":"{0} vs {1}".format(pair[0],pair[1]), "统计量":stat, "p值":pval}, index=[0])],ignore_index=True) 

#### Bootstrap confidence interval

In [None]:
# requires scipy >= 1.7
# stats.bootstrap

In [None]:
n_resample = 1000
boot_vals = pd.DataFrame(dict(values = np.zeros(n_resample*len(groups)), exp_version_name=np.repeat(groups,n_resample)))
resample_metric_vals = np.zeros(n_resample)
boot_ci = pd.DataFrame(columns=['指标','比较','95%置信区间下界','95%置信区间上界'])

for g in groups:
    sample = df.loc[df['exp_version_name']==g, metric].values
    for i in range(n_resample):
        x = np.random.choice(sample, size=len(sample), replace=True)
        resample_metric_vals[i] = np.mean(x)    # take mean value as an example
    boot_samples.loc[boot_pcts['exp_version_name']==g, 'values']= resample_metric_vals

    for pair in group_pairs:        
        lb, ub = np.percentile(boot_vals.loc[boot_vals['exp_version_name']==pair[0], 'values'].values - boot_vals.loc[boot_vals['exp_version_name']==pair[1], 'values'].values, [2.5,97.5])
        boot_ci = pd.concat([boot_ci, pd.DataFrame({"指标":name,"比较":"{0} vs {1}".format(pair[0],pair[1]), "95%置信区间下界":lb, "95%置信区间上界":ub}, index=[0])],ignore_index=True)

#### Permuation test

In [None]:
# requires scipy >= 1.8
# stats.permutation_test

In [None]:
# **** take mean value as an example **********
# def mean_diff(x, y, axis=0):
#     return np.mean(x, axis=axis) - np.mean(y, axis=axis)

In [None]:
# n_resample = 1000
# permutation_test = pd.DataFrame(columns=['指标','比较','统计量','p值'])

# for pair in group_pairs:        
#     x, y = df.loc[df['exp_version_name']==pair[0], metric].values, df.loc[df['exp_version_name']==pair[1], metric].values
#     perm_test = stats.permutation_test((x,y), statistic=mean_diff, permutation_type='independent', vectorized=False, n_resamples=n_resample, alternative='two-sided', axis=0, random_state=1234)
#     permutation_test = pd.concat([permutation_test, pd.DataFrame({"指标":name,"比较":"{0} vs {1}".format(pair[0],pair[1]), "统计量":perm_test.statistic, "p值":perm_test.pvalue}, index=[0])],ignore_index=True) 

### Delta Method

### Ratio 比率类指标

In [None]:
def delta_method_prop_ratio(p1, p2, n1, n2, alpha = 0.05, two_sided = True):
    rel_diff = (p2-p1)/p1
    var1 = p1*(1-p1)/n1
    var2 = p2*(1-p2)/n2
    se = np.sqrt((p2/p1)**2*(var1/(p1**2) + var2/(p2**2)))
    
    zstat = rel_diff / se
    if two_sided: 
        z = stats.norm.ppf(1-alpha/2)
        pval = 2 * stats.norm.cdf(-abs(zstat))
    else:
        z = stats.norm.ppf(1-alpha)
        pval = stats.norm.cdf(-abs(zstat))
      
    lb = rel_diff - z*se
    ub = rel_diff + z*se

    return pd.DataFrame(np.array([zstat,rel_diff,pval,lb,ub]).reshape(1,-1),
                         columns=['统计量','相对差异','p值','95%置信区间下界','95%置信区间上界'])

In [None]:
def delta_method_mean_ratio(x1, x2, alpha = 0.05, two_sided = True):
    m1, m2 = np.mean(x1),np.mean(x2)
    n1, n2 = len(x1), len(x2)
    var1, var2 = np.var(x1), np.var(x2)
    
    rel_diff = (x2-x1)/x1  # point esitmate
    
    if len(x1)==len(x2):
        cov12 = np.cov(x1, x2, bias = True)[0][1]
        var = (var2/n2)/(m1**2) - 2*m2*cov12/(m1**3) + (m2**2) * (var1/n1)/(m1**4) # variance estimate
    else:
        var = (var2/n2)/(m1**2) + (m2**2) * (var1/n1)/(m1**4) # assuming x1 and x2 are independent

    se = np.sqrt(var)
    
    zstat = rel_diff / se
    if two_sided: 
        z = stats.norm.ppf(1-alpha/2)
        pval = 2 * stats.norm.cdf(-abs(zstat))
    else:
        z = stats.norm.ppf(1-alpha)
        pval = stats.norm.cdf(-abs(zstat))
      
    lb = rel_diff - z*se
    ub = rel_diff + z*se
    
    return pd.DataFrame(np.array([zstat,rel_diff,pval,lb,ub]).reshape(1,-1),
                         columns=['统计量','相对差异','p值','95%置信区间下界','95%置信区间上界'])

### 多重检验修正

In [None]:
# bonferroni corrrection
_,pvals_corrected, _, _ = multipletests(pvals, alpha=0.05, method='bonferroni')
# Benjamini-Hochberg corrrection
_,pvals_corrected, _, _ = multipletests(pvals, alpha=0.05, method='fdr_bh')