# Power and Sample Size
statsmodels has a number of methods for power calculation

see e.g.: https://machinelearningmastery.com/statistical-power-and-power-analysis-in-python/

## 1.1 最初的实现：手写+个numba修饰

In [46]:
from numba import jit, cuda
import pandas as pd
import numpy as np
from tqdm import tqdm

p0 = np.zeros(100)
p0[:5] = 1
p1 = np.zeros(100)
p1[:6] = 1
effect_size = abs(p0.mean() - p1.mean())

@jit(nopython=False)
def perm_fun(x, nA, nB, double_site_test=True):
    """置换检验"""
    n = nA + nB
    idx_B = set(np.random.choice(n, nB , replace=False))
    idx_A = set(range(n)) - idx_B
    if double_site_test:
        return abs(x[list(idx_B)].mean() - x[list(idx_A)].mean())
    else:
        return x[list(idx_B)].mean() - x[list(idx_A)].mean()

@jit(nopython=False)
def boostrapN(p0=p0, p1=p1, startn=1000, endn=2000, stride=200, alpha=0.05):
    """搜索Power大于80%的样本量：2n"""
    power = []
    for sample_size in range(startn, endn , stride):
        beta = []
        for power_cal_times in tqdm(range(100)):
            # 在p0、p1两个总体上boostrap抽取n+n的样本
            l = np.concatenate((np.random.choice(p0, sample_size, replace=True), np.random.choice(p1, sample_size, replace=True)),axis=0)
            # l = pd.Series(l)
            # 在n的合并样本上进行1000次置换计算差异
            perm_diffs = [perm_fun(l,sample_size,sample_size) for _ in range(1000)]
            # 计算差异大于效应量的频率
            beta.append(np.mean([diff > effect_size for diff in perm_diffs]))
        power_i = np.mean([betai < alpha for betai in beta])
        power.append((sample_size*2, power_i))
    return power
        

def main():
    power = boostrapN()
    for sample_size, power_i in power:
        if power_i > 0.8:
                print(f'当样本量为{sample_size}时，置换检验的p值为显著性的概率POWER为{power_i}')
        else:
            print(f'当样本量为{sample_size}时，POWER={power_i}')

if __name__ == '__main__':
    main()


Compilation is falling back to object mode WITH looplifting enabled because Function "boostrapN" failed type inference due to: [1mUntyped global name 'tqdm':[0m [1m[1mCannot determine Numba type of <class 'type'>[0m
[1m
File "F:\admin\TEMP\ipykernel_23368\109395061.py", line 29:[0m
[1mdef boostrapN(p0=p0, p1=p1, startn=1000, endn=2000, stride=200, alpha=0.05):
    <source elided>
        beta = []
[1m        for power_cal_times in tqdm(range(100)):
[0m        [1m^[0m[0m
[0m[0m
  @jit(nopython=False)
Compilation is falling back to object mode WITHOUT looplifting enabled because Function "boostrapN" failed type inference due to: [1m[1mCannot determine Numba type of <class 'numba.core.dispatcher.LiftedLoop'>[0m
[1m
File "F:\admin\TEMP\ipykernel_23368\109395061.py", line 27:[0m
[1mdef boostrapN(p0=p0, p1=p1, startn=1000, endn=2000, stride=200, alpha=0.05):
    <source elided>
    power = []
[1m    for sample_size in range(startn, endn , stride):
[0m    [1m^[0m[0m


KeyboardInterrupt: 

## 1.2 BingAI给出的多进程实现

In [45]:
from joblib import Parallel, delayed
from numba import jit, cuda
import pandas as pd
import numpy as np
from tqdm import tqdm

p0 = np.zeros(100)
p0[:5] = 1
p1 = np.zeros(100)
p1[:6] = 1
effect_size = abs(p0.mean() - p1.mean())

def perm_fun_parallel(x, nA, nB, double_site_test=True):
    """置换检验"""
    n = nA + nB
    idx_B = set(np.random.choice(n, nB , replace=False))
    idx_A = set(range(n)) - idx_B
    mean_B = x[list(idx_B)].sum() / nB
    mean_A = x[list(idx_A)].sum() / nA
    if double_site_test:
        return abs(mean_B - mean_A)
    else:
        return mean_B - mean_A

def boostrapN(p0=p0,p1=p1,startn=400,endn=1200,
                       stride=200,alpha=0.05):
    """搜索Power大于80%的样本量：2n"""
    power = []
    for sample_size in range(startn,endn,stride):
        beta = []
        for power_cal_times in tqdm(range(1000)):
            # 在p0、p1两个总体上boostrap抽取n+n的样本
            l = np.concatenate((np.random.choice(p0,sample_size,
                                                 replace=True),
                                np.random.choice(p1,sample_size,
                                                 replace=True)),
                               axis=0)
            # 在n的合并样本上进行10000次置换计算差异
            perm_diffs = Parallel(n_jobs=-1)(delayed(perm_fun_parallel)(l,sample_size,sample_size) for _ in range(1000))
            # 计算差异大于效应量的频率
            beta.append(np.mean([diff > effect_size for diff in perm_diffs]))
        power_i = np.mean([betai < alpha for betai in beta])
        power.append((sample_size*2,power_i))
    return power

def main():
    power = boostrapN()
    for sample_size, power_i in power:
        if power_i > 0.8:
                print(f'当样本量为{sample_size}时，置换检验的p值为显著性的概率POWER为{power_i}')
        else:
            print(f'当样本量为{sample_size}时，POWER={power_i}')

if __name__ == '__main__':
    main()

 53%|█████▎    | 526/1000 [02:12<01:59,  3.96it/s]


KeyboardInterrupt: 

## 1.3 根据BingAI提示的代码,自己去github看文档和issue写的代码

现在的问题是内存占用太大,可以试试把中间过程存到磁盘

In [43]:

# 导入必要的库
import numpy as np
from scipy.stats import permutation_test

# 定义两个数据集
p0 = np.zeros(100)
p0[:5] = 1
p1 = np.zeros(100)
p1[:8] = 1
effect_size = abs(p0.mean() - p1.mean())

# 定义一个目标功效（例如80%）
power = 0.8
alpha = 0.05
# 定义一个初始样本量（例如400）
sample_size = 400

# 定义一个函数，用来计算均值
def means(x,y,axis):
    return np.mean(x, axis=axis) - np.mean(y, axis=axis)

# 使用一个循环来逐渐增加样本量，直到达到目标功效
while True:
    x_sample = np.random.choice(p0, size=(sample_size,1000), replace=True)
    y_sample = np.random.choice(p1, size=(sample_size,1000), replace=True)

    # 对这两个数据集进行置换检验，并行采样1000次，并测试它们的均值是否相等
    p_value = permutation_test((x_sample,y_sample),statistic=means,n_resamples=1000,vectorized=True)

    current_power = (p_value.pvalue<=alpha).mean()

    if current_power >= power:
        break
    else:
        sample_size += 400

# 打印最终结果：达到目标功效所需的最小样本量
print(sample_size , current_power)

1200 0.843


In [10]:
l = np.concatenate([np.random.choice(p0, 300, replace=True), np.random.choice(p1, 300, replace=True)],axis=0)
l.mean()

0.24

In [1]:
import cProfile
import re
import pstats


cProfile.run('re.compile("power_boots")', filename='result.cprofile')
p = pstats.Stats('result.cprofile')
# 按照运行时间和函数名进行排序
# p.strip_dirs().sort_stats("cumulative", "name").print_stats(0.5)
p.strip_dirs().sort_stats("cumulative", "name").print_stats(30)
# 按照函数名排序，只打印前 3 行函数的信息, 参数还可为小数, 表示前百分之几的函数信息
# 如果想知道有哪些函数调用了 ccc
# p.print_callers(0.5, "ccc")
# 查看 ccc() 函数中调用了哪些函数
# p.print_callees("ccc")

Mon Feb 27 15:12:26 2023    result.cprofile

         179 function calls (178 primitive calls) in 0.000 seconds

   Ordered by: cumulative time, function name
   List reduced from 40 to 30 due to restriction <30>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.000    0.000 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 <string>:1(<module>)
        1    0.000    0.000    0.000    0.000 re.py:250(compile)
        1    0.000    0.000    0.000    0.000 re.py:289(_compile)
        1    0.000    0.000    0.000    0.000 sre_compile.py:759(compile)
        1    0.000    0.000    0.000    0.000 sre_parse.py:937(parse)
        1    0.000    0.000    0.000    0.000 sre_parse.py:435(_parse_sub)
        1    0.000    0.000    0.000    0.000 sre_parse.py:493(_parse)
        1    0.000    0.000    0.000    0.000 sre_compile.py:598(_code)
        1    0.000    0.000    0.000    0.000 sre_compile.py:536(_compile_in

<pstats.Stats at 0x211b6a0e190>

### 经验bootstrap方法是正确的；而百分位法没有依据
https://zhuanlan.zhihu.com/p/41099219

In [8]:
from sklearn.utils import resample
import pandas as pd
from tqdm import tqdm
L = pd.Series([30,37,36,43,42,48,43,46,41,42])
print(L.mean())
results = []
for nrepeat in tqdm(range(100000)):
    sample = resample(L,replace=True)
    results.append(sample.mean())
results = pd.Series(results)
confidence_interval = list(results.quantile([0.025, 0.975]))
# 百分位bootstrap。这个无依据
print(confidence_interval[0], confidence_interval[1])

# 经验bootsrap方法1。正法
# results_delta = results.copy()-L.mean()
# print(L.mean()-list(results_delta.quantile([0.025, 0.975]))[1], L.mean()-list(results_delta.quantile([0.025, 0.975]))[0])

# 等价的经验boostrap方法2
print(2*L.mean()-confidence_interval[1], 2*L.mean()-confidence_interval[0])

  1%|          | 1038/100000 [00:00<00:09, 10370.61it/s]

40.8


100%|██████████| 100000/100000 [00:09<00:00, 10732.54it/s]

37.6 43.7
37.89999999999999 43.99999999999999



