In [1]:
import pandas as pd
import numpy as np
import scipy.stats as stats
from scipy.stats.mstats import kruskalwallis

# 正态性检验
def check_normality(data, group_name):
    # Anderson-Darling test 检验   (5 ≤ N ≤ 25)
    statistic1, critical_values1, significance_level1 = stats.anderson(data, dist='norm')
    name1 = 'Anderson-Darling'
    degree1 = data.shape[0]
    p_value1 = critical_values1[2]
    if p_value1 > 0.05:
        result1 = '正态分布'
    else:
        result1 = '非正态分布'
    range1 = '5 ≤ 样本量 ≤ 25'

    # Shapiro-Wilk算法检验正态分布性（官方文档的说明是大于5000的样本时，p值可能不准） (7 ≤ N ≤ 2000)
    if len(data) < 3:
        result2 = '-'
        degree2 = '-'
        statistic2 = '-'
        p_value2 = '-'
    else:
        statistic = stats.shapiro(data)
        degree2 = data.shape[0]
        statistic2 = statistic[0]
        p_value2 = statistic[1]
        if p_value2 > 0.05:
            result2 = '正态分布'
        else:
            result2 = '非正态分布'
    name2 = 'Shapiro-Wilk'
    range2 = '7 ≤ 样本量 ≤ 2000'

    # （Kolmogorov-Smirnov test ，样本量大于2000时适用）（N ＞ 2000）
    statistic = stats.kstest(data, 'norm')
    name3 = 'Kolmogorov-Smirnov'
    degree3 = data.shape[0]
    statistic3 = statistic[0]
    p_value3 = statistic[1]
    if p_value3 > 0.05:
        result3 = '正态分布'
    else:
        result3 = '非正态分布'
    range3 = '样本量 ＞ 2000'

    df_result = pd.DataFrame([[group_name, name1, degree1, statistic1, p_value1, result1, range1],
                              [group_name, name2, degree2, statistic2, p_value2, result2, range2],
                              [group_name, name3, degree3, statistic3, p_value3, result3, range3]],
                             columns=['变量', '检验方法', '自由度', '统计量', 'P值', '结果', '建议使用范围'])
    
    return df_result
    
    
# 连续资料的检验
def return_pvalue(flag, args):
    # 根据正态性(flag)和方差齐性(pp2) 采用不同的检验方法
    try:
        _, pp2 = stats.levene(*args)
    except:
        pp2 = 1

    if len(args) < 2:
        tt = '-'
    elif len(args) == 2: ## 两组数据
        if flag == 1 and pp2 >= 0.05:   ## 正态、方差齐：T-test
            try:
                tt = stats.ttest_ind(*args)[1]
            except:
                tt = '-'
            tt_name = 'T-test'
        elif flag == 1 and pp2 < 0.05:  ## 正态、方差不齐：Welch’s t-test
            try:
                tt = stats.ttest_ind(*args, equal_var = False)[1]
            except:
                tt = '-'
            tt_name = 'Welch’s t-test'
        else:                            ## 非正态：Mann-Whitney U
            try:
                tt = stats.mannwhitneyu(*args)[1]
            except:
                tt = '-'  
            tt_name = 'Mann-Whitney U'
    else:  ## 多于两组数据
        if flag == 1 and pp2 >= 0.05:   ## 正态、方差齐：ANOVA
            try:
                tt = stats.f_oneway(*args)[1]
            except:
                tt = '-'
            tt_name = 'ANOVA' 
        else:                            ## 非正态：Kruskal-Wallis
            try:
                tt = kruskalwallis(*args)[1]
            except:
                tt = '-'
            tt_name = 'Kruskal-Wallis'

    if tt != '-':
        if tt >= 0.001:
            tt = round(tt, 3)
        else:
            tt = '<0.001'
            
    return tt, tt_name

    
# 连续资料的三线表格式输出
def feican_p(df, col_list, col, del_nan=False):
    u_list = list(set(df[col].dropna()))
    u_list.sort()
    u_name = [col + '_' + str(i) for i in u_list]

    df_describe_output = []

    ## 先进行正态检验，用flag标志，默认为正态分布，检验结果为非正态的话再修改flag值
    flag = 1
    for i in range(len(col_list)):
        df_c = df[[col, col_list[i]]].dropna()
        if df_c.shape[0] > 0:
            dd = check_normality(df_c[col_list[i]], '-')
            if df_c.shape[0] <= 2000:
                if dd.loc[1, '结果'] == '非正态分布':
                    flag = 0
                else:
                    pass
            else:
                if dd.loc[2, '结果'] == '非正态分布':
                    flag = 0
                else:
                    pass

    for i in range(len(col_list)):
        df_c = df[[col, col_list[i]]].dropna()
        if df_c.shape[0] > 0:
            df_describe = pd.DataFrame([], columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
            df_describe['变量'] = [col_list[i]]
            if flag == 1:
                df_describe['Total'] = str(round(df_c[col_list[i]].mean(), 2)) + "±%.2f" % df_c[col_list[i]].std()
            else:
                df_describe['Total'] = str(round(np.median(df_c[col_list[i]]), 2)) + "(%.2f-%.2f)" % (
                    np.percentile(df_c[col_list[i]], 25), np.percentile(df_c[col_list[i]], 75))
            args = []
            tol = []
            for j in range(len(u_list)):
                xx = df_c[df_c[col] == u_list[j]]
                if flag == 1:
                    if xx.shape[0] > 1:
                        df_describe[u_name[j]] = str(round(xx[col_list[i]].mean(), 2)) + "±%.2f" % xx[col_list[i]].std()
                    else:
                        df_describe[u_name[j]] = str(round(xx[col_list[i]].mean(), 2)) + "±0.00"
                else:
                    if xx.shape[0] > 1:
                        df_describe[u_name[j]] = str(round(np.median(xx[col_list[i]]), 2)) + "(%.2f-%.2f)" % (
                            np.percentile(xx[col_list[i]], 25), np.percentile(xx[col_list[i]], 75))
                    else:
                        df_describe[u_name[j]] = "(0.00-0.00)"
                tol.append(str(xx.shape[0]) + ('(%.2f' % (xx.shape[0] / df_c.shape[0] * 100)+'%)'))
                args.append(list(xx[col_list[i]]))
            # 检验
            tt, tt_name = return_pvalue(flag, args)
            df_describe['P值'] = [tt]
            df_describe['Methods'] = [tt_name]
            
            if del_nan == False:
                df_describe0 = pd.DataFrame([['n(%s)' % col_list[i], df_c.shape[0]] + tol + ['', '']],
                                            columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
                df_describe_output.append(df_describe0)
            df_describe_output.append(df_describe)
        else:
            if del_nan == False:
                df_describe0 = pd.DataFrame([['n(%s)' % col_list[i], 0] + ['-'] * (len(u_name) + 2)],
                                            columns=['变量', 'Total'] + u_name + ['P值'])
            else:
                df_describe0 = pd.DataFrame([[col_list[i], 0] + ['-'] * (len(u_name) + 2)],
                                            columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
            df_describe_output.append(df_describe0)

    df_describe_output = pd.concat(df_describe_output, axis=0)
    df_describe_output = df_describe_output.replace('nan(-)', '-')
    return df_describe_output


# 分类资料检验
def chi2_xxx(myarray):
    # 2 * 2 
    # 当n>=40且所有的理论频数>=5时，可直接用卡方检验
    # 当n>=40但有1<=理论频数<5时，可用校正的卡方检验或Fisher确切概率法
    # 当n<40,或者理论频数<1时，可用Fisher确切概率法
    # r*c(只需判断是否需要校正)
    if myarray.shape == (1, 2) or myarray.shape == (2, 1):
        chi2, tt = stats.power_divergence(list(myarray.reshape(-1)), lambda_='log-likelihood')
        tt_name = 'G-test'
    elif myarray.shape == (2, 2):
        sum_ = sum(sum(myarray))
        min_ = myarray.min()
        if sum_ >= 40 and min_ >= 5:
            chi2, tt, _, _ = stats.chi2_contingency(myarray, correction=False)
            tt_name = 'Chi-square'
        elif sum_ >= 40 and  1 <= min_ < 5:
            chi2, tt, _, _ = stats.chi2_contingency(myarray, correction=True)
            tt_name = 'Yates’ continuity correction'
        else:
            chi2, tt = stats.fisher_exact(myarray)
            tt_name = 'Fisher exact test'
    else:
        chi2, tt, dof, _ = stats.chi2_contingency(myarray, correction=False)
        tt_name = 'Chi-square'
        if dof == 1:
            chi2, tt, _, _ = stats.chi2_contingency(myarray, correction=True)
            tt_name = 'Yates’ continuity correction'
    return chi2, tt, tt_name


# 分类资料的三线表格式输出
def kafang_p(df, col_list, col, del_nan=False):
    u_list = list(set(df[col].dropna()))
    u_list.sort()
    u_name = [col + '_' + str(i) for i in u_list]
    df_describe_output = []
    ## 分类型（卡方检验）
    for i in range(len(col_list)):
        df_c = df[[col, col_list[i]]].dropna()
        if df_c.shape[0] > 0:
            name = list(set(df_c[col_list[i]]))
            name.sort()
            nn = []
            df_describe1 = []
            for j in range(len(name)):
                df_describe = pd.DataFrame([], columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
                df_describe['变量'] = [str(col_list[i]) + '_' + str(name[j])]
                sum_ = 0
                tol = []
                for k in range(len(u_list)):
                    xx = df_c[df_c[col] == u_list[k]]
                    xx_c = xx[xx[col_list[i]] == name[j]][col_list[i]].count()
                    df_describe[u_name[k]] = [str(xx_c) + '(%.2f'%(xx_c / xx.shape[0] * 100) + '%)']
                    sum_ = sum_ + xx_c
                    tol.append(str(xx.shape[0]) + ('(%.2f' % (xx.shape[0] / df_c.shape[0] * 100) + '%)'))
                    nn.append(xx_c)
                df_describe['Total'] = [str(sum_) + '(%.2f' % ((sum_) / df_c.shape[0] * 100) + '%)']
                df_describe['Methods'] = ''
                df_describe['P值'] = ''
                df_describe1.append(df_describe)
            df_describe1 = pd.concat(df_describe1, axis=0)
            myarray = np.array(nn).reshape((len(name), -1))
            dfmyarray = pd.DataFrame(myarray)
            myarray = np.array(dfmyarray.loc[:, ~(dfmyarray == 0).all()])
            ## 下面进行卡方检验
            chi2, tt, tt_name = chi2_xxx(myarray)
            if tt >= 0.001:
                tt = round(tt, 3)
            else:
                tt = '<0.001'

            if del_nan == False:
                df_describe2 = pd.DataFrame([['n(%s)' % col_list[i], df_c.shape[0]] + tol + [tt_name, tt]],
                                            columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
            else:
                df_describe2 = pd.DataFrame([[col_list[i]] + [''] * (len(u_name) + 1) + [tt_name, tt]],
                                            columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
            df_describe2 = pd.concat([df_describe2, df_describe1], axis=0)
            df_describe_output.append(df_describe2)
        else:
            if del_nan == False:
                df_describe2 = pd.DataFrame([['n(%s' % col_list[i] + '%)', 0] + ['-'] * (len(u_name) + 2)],
                                            columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
            else:
                df_describe2 = pd.DataFrame([[col_list[i], 0] + ['-'] * (len(u_name) + 2)],
                                            columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
            df_describe_output.append(df_describe2)
    df_describe_output = pd.concat(df_describe_output, axis=0)
    df_describe_output = df_describe_output.replace('0(nan)', '-')
    return df_describe_output


def main(df_replace, cons_subsets, dis_subsets, group, del_nan=True):
    df_threeline_merge = []
    if del_nan == True:
        df_cc = df_replace[cons_subsets + dis_subsets + [group]].dropna().reset_index(drop=True)
        u_list = list(set(df_cc[group].dropna()))
        u_name = [group + '_' + str(i) for i in u_list]
        ll = []
        for i in range(len(u_list)):
            lx = df_cc[df_cc[group] == u_list[i]].shape[0]
            ll.append(str(lx) + '(%.2f' % (lx / df_cc.shape[0] * 100)+'%)')
        df_threeline0 = pd.DataFrame([['n', df_cc.shape[0]] + ll + ['', '']], 
                                     columns=['变量', 'Total'] + u_name + ['Methods', 'P值'])
        df_threeline_merge.append(df_threeline0)
    else:
        df_cc = df_replace.copy()

    if dis_subsets != []:
        df_threeline = kafang_p(df_cc, dis_subsets, group, del_nan)
        df_threeline_merge.append(df_threeline)

    if cons_subsets != []:
        df_threeline = feican_p(df_cc, cons_subsets, group, del_nan)
        df_threeline_merge.append(df_threeline)

    df_threeline_merge = pd.concat(df_threeline_merge, axis=0)
    return df_threeline_merge


## 读取数据

1) csv 数据读取格式：

pd.read_csv("xxx.csv")

pd.read_csv("xxx.csv", encoding='gb18030')

pd.read_csv("xxx.csv", encoding='utf-8')

如有需要，添加参数 engine='python'

2) xlsx读取格式：

pd.read_excel("xxx.xlsx")

In [2]:
df = pd.read_excel("电信客户流失.xlsx")

In [3]:
df.head()

Unnamed: 0,地区,用时,年龄,婚姻,住址,收入,学历,工龄,退休,性别,...,三方通话,手机支付,长途日志,免费服务日志,设备日志,电话卡日志,无线日志,收入日志,类型,流失
0,2,13,44,1,9,64,4,5,0,0,...,0,0,1.308333,,,2.014903,,4.158883,1,1
1,3,11,33,1,7,136,5,5,0,0,...,1,0,1.481605,3.032546,,2.72458,3.575151,4.912655,4,1
2,3,68,52,1,24,116,1,29,0,1,...,1,0,2.898671,2.890372,,3.409496,,4.75359,3,0
3,2,33,33,0,12,33,2,0,0,1,...,0,0,2.246015,,,,,3.496508,1,1
4,2,23,30,1,9,30,1,2,0,0,...,1,0,1.84055,,,,,3.401197,3,0


## 直接调用mian函数
### 参数说明
**df_replace**: 需要统计的数据集 (pd.DataFrame)

**cons_subsets**: 连续型变量 (list)

**dis_subsets**: 分类型变量 (list)

**group**: 分组变量 (str)

**del_nan**: 是否删除缺失值，默认为True, 不删除写False (Bool)

示例1

In [4]:
df_replace = df.copy()
cons_subsets = ['用时', '年龄', '住址', '收入']
dis_subsets = ['地区', '三方通话', '工龄', '性别']
group = '流失'
del_nan = True

In [5]:
df_threeline = main(df_replace, cons_subsets, dis_subsets, group, del_nan=del_nan)
df_threeline

Unnamed: 0,变量,Total,流失_0,流失_1,Methods,P值
0,n,1000,726(72.60%),274(27.40%),,
0,地区,,,,Chi-square,0.816
0,地区_1,322(32.20%),232(31.96%),90(32.85%),,
0,地区_2,334(33.40%),240(33.06%),94(34.31%),,
0,地区_3,344(34.40%),254(34.99%),90(32.85%),,
...,...,...,...,...,...,...
0,性别_1,517(51.70%),374(51.52%),143(52.19%),,
0,用时,34.0(17.00-54.00),41.5(23.00-59.00),17.0(8.00-33.00),Mann-Whitney U,<0.001
0,年龄,40.0(32.00-51.00),43.0(34.00-53.00),35.0(28.25-44.00),Mann-Whitney U,<0.001
0,住址,9.0(3.00-18.00),10.0(4.00-20.00),5.0(2.00-11.00),Mann-Whitney U,<0.001


In [6]:
## 表格太长的话选择一些行
df_threeline.iloc[:30, :]

Unnamed: 0,变量,Total,流失_0,流失_1,Methods,P值
0,n,1000,726(72.60%),274(27.40%),,
0,地区,,,,Chi-square,0.816
0,地区_1,322(32.20%),232(31.96%),90(32.85%),,
0,地区_2,334(33.40%),240(33.06%),94(34.31%),,
0,地区_3,344(34.40%),254(34.99%),90(32.85%),,
0,三方通话,,,,Chi-square,0.039
0,三方通话_0,498(49.80%),347(47.80%),151(55.11%),,
0,三方通话_1,502(50.20%),379(52.20%),123(44.89%),,
0,工龄,,,,Chi-square,<0.001
0,工龄_0,106(10.60%),63(8.68%),43(15.69%),,


In [7]:
# 导出表格到本地
df_threeline.to_csv('表格1.csv', index=False, encoding='gb18030')

示例2

In [8]:
del_nan = False
cons_subsets = ['电话卡日志', '年龄', '免费服务日志', '无线日志']
dis_subsets = ['地区', '婚姻', '退休', '性别']
group = '类型'

In [9]:
df_threeline = main(df_replace, cons_subsets, dis_subsets, group, del_nan=del_nan)
df_threeline

Unnamed: 0,变量,Total,类型_1,类型_2,类型_3,类型_4,Methods,P值
0,n(地区),1000,266(26.60%),217(21.70%),281(28.10%),236(23.60%),Chi-square,0.69
0,地区_1,322(32.20%),75(28.20%),78(35.94%),95(33.81%),74(31.36%),,
0,地区_2,334(33.40%),92(34.59%),69(31.80%),92(32.74%),81(34.32%),,
0,地区_3,344(34.40%),99(37.22%),70(32.26%),94(33.45%),81(34.32%),,
0,n(婚姻),1000,266(26.60%),217(21.70%),281(28.10%),236(23.60%),Chi-square,0.015
0,婚姻_0,505(50.50%),155(58.27%),102(47.00%),142(50.53%),106(44.92%),,
0,婚姻_1,495(49.50%),111(41.73%),115(53.00%),139(49.47%),130(55.08%),,
0,n(退休),1000,266(26.60%),217(21.70%),281(28.10%),236(23.60%),Chi-square,0.03
0,退休_0,953(95.30%),255(95.86%),210(96.77%),259(92.17%),229(97.03%),,
0,退休_1,47(4.70%),11(4.14%),7(3.23%),22(7.83%),7(2.97%),,
