## Библиотеки

In [None]:
import numpy as np
import pandas as pd

import scipy
from scipy import stats
import statsmodels.formula.api as smf

import matplotlib.pyplot as plt
import seaborn as sns
import plotly
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

import copy
from copy import deepcopy

import tqdm
import tqdm.notebook as tqdm

import sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.exceptions import ConvergenceWarning

from IPython.display import Image

plt.style.use('ggplot')

import time
import random
import warnings
pd.options.display.float_format = '{:.10f}'.format

## Данные

Вычисляем размер групп под условия теста:

In [None]:
def get_sample_size(alpha, beta, std_a, std_b, effect):
    '''
    Вычисляет количество наблюдений в каждой группе при заданных:
    alpha, beta - ошибках 1 и 2 рода, 
    std_a, std_b - стандартных отколениях в контрольной и экспериментальной группах,
    effect - минимальном ожидаемом эффекте
    '''
    norm_rv = stats.norm(loc=0, scale=1)
    t_alpha = norm_rv.ppf(1 - alpha / 2)
    t_beta = norm_rv.ppf(1 - beta)
    var = (std_a**2) + (std_b)**2
    sample_size = int((t_alpha + t_beta) ** 2 * var / (effect ** 2))
    return sample_size

alpha = 0.05
beta = 0.2
std_a = 800
std_b = 800
effect = 100

sample_size = get_sample_size(alpha, beta, std_a, std_b, effect)
sample_size

In [None]:
N = 10000                           # общее количество пользователей в популяции
w_one, w_two = 0.5, 0.5             # доли страт в популяции
popul_size_one = int(N * w_one)              # количество пользователей первой страты
popul_size_two = int(N * w_two)              # количество пользователей второй страты
mu_one, mu_two = 2000, 3000         # средние значения метрики в стратах
std_one, std_two = 625, 625         # стандартное отклонение метрики в стратах

# ! Будем считать, что при объединении страт в контрольной и экспериментальной группах стандартное отклоение составит 800
# (именно для такого стандартного отклонения мы вначале рассчитывали количество наблюдений в группах)
    
sample_size = get_sample_size(alpha, beta, std_a, std_b, effect) + 100 # возьмем количество наблюдений в группах с запасом 
                                                                       # на 100 больше, чем нужно
sample_size_one = int(sample_size * w_one)
sample_size_two = int(sample_size * w_two)

strat_to_param = {1: (popul_size_one, sample_size_one, mu_one, std_one), 2: (popul_size_two, sample_size_two, mu_two, std_two)}

Функции для случайного и стратифицированного семплирования данных в базовый датасет:

In [None]:
def get_random_data(strat_to_param):
    """
    Генерирует данные случайным семплированием:
    Возвращает датафрейм со значениями метрики и номерами страт пользователей
    в контрольной и экспериментальной группах:
    strats - cписок с распределением страт в популяции,
    sample_size - размеры групп,
    strat_to_param - словарь с параметрами страт,
    effect - размер эффекта
    """
    
    strats = [1 for _ in range(strat_to_param[1][0])] + \
             [2 for _ in range(strat_to_param[2][0])]
    
    control_strats, pilot_strats = np.random.choice(strats, (2, strat_to_param[2][1] + \
                                                                strat_to_param[2][1]), False)
    
    control, pilot = [], []
    
    for strat, (_, _, mu, std) in strat_to_param.items():
        n_control = np.sum(control_strats == strat)
        n_pilot = np.sum(pilot_strats == strat)
        control += [(x, strat) for x in stats.norm(mu, std).rvs(n_control)]
        pilot += [(x, strat) for x in stats.norm(mu, std).rvs(n_pilot)]
        
    control_df = pd.DataFrame(control, columns = ['metric', 'strat'])
    control_df['is_treatment'] = 0
    pilot_df = pd.DataFrame(pilot, columns = ['metric', 'strat'])
    pilot_df['is_treatment'] = 1
    
    return pd.concat([control_df, pilot_df], axis = 0)

def get_stratified_data(strat_to_param):
    """
    Генерирует данные стратифицированным семплированием:
    Возвращает датафрейм со значениями метрики и номерами страт пользователей
    в контрольной и экспериментальной группах:
    strat_to_param - словарь с параметрами страт
    effect - размер эффекта
    """
    control, pilot = [], []
    
    for strat, (_, n, mu, std) in strat_to_param.items():
        control += [(x, strat) for x in stats.norm(mu, std).rvs(n)]
        pilot += [(x, strat) for x in stats.norm(mu, std).rvs(n)]

    control_df = pd.DataFrame(control, columns = ['metric', 'strat'])
    control_df['is_treatment'] = 0
    pilot_df = pd.DataFrame(pilot, columns = ['metric', 'strat'])
    pilot_df['is_treatment'] = 1
    
    return pd.concat([control_df, pilot_df], axis = 0)

Базовый датасет:

In [None]:
def generate_base_df(strat_to_param,
                     rho_lst = [0.9, 0.8, 0.7],
                     treatment_effect = 0, 
                     sampling_way = 'random'):
    '''
    Генерирует базовый датасет:
    strat_to_param - словарь с параметрами страт
    rho_lst - список с коэффициентами корреляции для метрики и 3 ковариатов
    sampling_way - способ семплирования (случайное / стратифицированное)
    '''
    
    # sampling
    
    if sampling_way == 'random':
        data = get_random_data(strat_to_param)
    else:
        data = get_stratified_data(strat_to_param)
        
    # covariates creation    
    
    data['metric_pre'] = rho_lst[0] * data['metric'] + \
        np.sqrt(1 - rho_lst[0]**2) * stats.norm(0, data['metric'].std(ddof = 1)).rvs(len(data))  
    data['X1_pre'] = rho_lst[1] * data['metric'] + \
        np.sqrt(1 - rho_lst[1]**2) * stats.norm(0, data['metric'].std(ddof = 1)).rvs(len(data))  
    data['X2_pre'] = rho_lst[2] * data['metric'] + \
        np.sqrt(1 - rho_lst[2]**2) * stats.norm(0, data['metric'].std(ddof = 1)).rvs(len(data))  
    
    data['metric_pre_pre'] = rho_lst[0] * data['metric_pre'] + \
        np.sqrt(1 - rho_lst[0]**2) * stats.norm(0, data['metric_pre'].std(ddof = 1)).rvs(len(data))  
    data['X1_pre_pre'] = rho_lst[1] * data['metric_pre'] + \
        np.sqrt(1 - rho_lst[1]**2) * stats.norm(0, data['metric_pre'].std(ddof = 1)).rvs(len(data))  
    data['X2_pre_pre'] = rho_lst[2] * data['metric_pre'] + \
        np.sqrt(1 - rho_lst[2]**2) * stats.norm(0, data['metric_pre'].std(ddof = 1)).rvs(len(data))  
    
    # treatment
        
    data.loc[data.is_treatment == 1, 'metric'] += treatment_effect
    
    # CUPED
    
    model = smf.ols('metric ~ metric_pre', data = data).fit()
    data['metric_cuped'] = model.resid + data['metric'].mean()
    
    # CUMPED

    model = smf.ols('metric ~ metric_pre + X1_pre + X2_pre', data = data).fit()
    data['metric_cumped'] = model.resid + data['metric'].mean()
    
    # CUPAC
    
    scaler = StandardScaler()
    X_train = deepcopy(data[['metric_pre_pre', 'X1_pre_pre', 'X2_pre_pre']])
    X_test = deepcopy(data[['metric_pre', 'X1_pre', 'X2_pre']])
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    
    warnings.filterwarnings("ignore", category = FutureWarning)
    warnings.filterwarnings("ignore", category = ConvergenceWarning)
    
    model = RandomForestRegressor(n_estimators = 50,
                                  max_depth = 7)

    model.fit(X = X_train, y = data['metric_pre'])
    data['metric_cupac'] = data['metric'] - model.predict(X_test) + data['metric'].mean()
        
    return data

In [None]:
data = generate_base_df(strat_to_param,
                        rho_lst = [0.5, 0.45, 0.4],
                        treatment_effect = 0, 
                        sampling_way = 'random')
data.head()

Визуализация исходной и преобоазованных метрик:

In [None]:
def visual_metrics(n_iter):
    '''
    Визуализирует плотности распределения исходной и трансформированных метрик:
    n_iter - число базовых датасетов, на основе которых строится график
    '''
    alpha = 0.3

    data = generate_base_df(strat_to_param,
                            rho_lst = [0.9, 0.8, 0.7],
                            treatment_effect = 0, 
                            sampling_way = 'stratified')
    
    for _ in tqdm.tqdm(range(n_iter)):
        data = pd.concat([data, 
                          generate_base_df(strat_to_param,
                                           rho_lst = [0.9, 0.8, 0.7],
                                           treatment_effect = 0, 
                                           sampling_way = 'random')],
                        axis = 0)
    metrics = ['metric_cuped', 'metric_cumped', 'metric_cupac']

    fig, axs = plt.subplots(1, 2, figsize=(16, 6))

    ax_1 = axs[0]
    ax_2 = axs[1]
    
    sns.kdeplot(x = 'metric', 
                data = data[data.is_treatment == 0], label = 'Initial', 
                fill = True, alpha = alpha, ax = ax_1)
    for metric in metrics:
        sns.kdeplot(x = metric, 
                    data = data[data.is_treatment == 0], label =  metric.replace('metric_', '').upper(), 
                    fill = True, alpha = alpha, ax = ax_1)
    
    ax_1.set_title('Control', fontsize = 15)
    ax_1.legend(fontsize = 13)
    ax_1.set_xlabel('Metric', fontsize = 15)
    ax_1.set_ylabel('Density', fontsize = 15)

    sns.kdeplot(x = 'metric',
                data = data[data.is_treatment == 1], label = 'Initial', 
                fill = True, alpha = alpha, ax = ax_2)
    for metric in metrics:
        sns.kdeplot(x = metric, 
                    data = data[data.is_treatment == 1], label =  metric.replace('metric_', '').upper(), 
                    fill = True, alpha = alpha, ax = ax_2)
        
    ax_2.set_title('Pilot', fontsize = 15)
    ax_2.set_ylabel('')
    ax_2.set_yticklabels([])
    ax_2.legend(fontsize = 13)
    ax_2.set_xlabel('Metric', fontsize = 15)
    ax_2.set_ylabel('Density', fontsize = 15)
    
    fig.suptitle('Initial and transformed metrics', fontsize = 17)

    fig, axs = plt.subplots(1, 1, figsize=(16, 6))
    sns.kdeplot(x = 'metric', 
                data = data, label = 'Initial', 
                fill = True, alpha = alpha, ax = axs)
    for metric in metrics:
        sns.kdeplot(x = metric, 
                    data = data, label = metric.replace('metric_', '').upper(), 
                    fill = True, alpha = alpha, ax = axs)
        
    axs.set_title('All users', fontsize = 15)
    axs.legend(fontsize = 15)
    axs.set_xlabel('Metric', fontsize = 15)
    axs.set_ylabel('Density', fontsize = 15)
    
    plt.show();
    
visual_metrics(500)

## Тесты

### Снижение дисперсии

Корректируем тест Стьдента:

In [None]:
def calc_ttest(base_df, metric_name = 'metric', mean_calc_way = 'simple', return_deltas = False):
    '''
    Проверяет гипотезу о равенстве средних через t-test Стьюдента:
    base_df -  базовый датасет для тестов
    metric_name - метрка, на основе которой будет проводиться тест
    mean_calc_way - способ подсчета среднего (средне арифметическое / среднее стратифицированное)
    return_deltas - возвращать ли разницы средних (дельты)
    '''
    
    control_df, test_df = base_df[base_df.is_treatment == 0], base_df[base_df.is_treatment == 1]
    
    if mean_calc_way == 'stratified':

        control_mean = (control_df.groupby('strat')[metric_name].mean() * 0.5).sum()
        test_mean = (test_df.groupby('strat')[metric_name].mean() * 0.5).sum()
        control_var = (control_df.groupby('strat')[metric_name].var(ddof = 1) * 0.5).sum()
        test_var = (test_df.groupby('strat')[metric_name].var(ddof = 1) * 0.5).sum()

        delta = test_mean - control_mean
        std = (control_var / len(control_df) + test_var / len(test_df)) ** 0.5
        t = delta / std
    
        pvalue = 2 * (1 - stats.t(len(control_df) + len(test_df)).cdf(np.abs(t)))
    
        if return_deltas == True:
            return (pvalue, delta)
        else:
            return pvalue
    else:
        
        delta = test_df[metric_name].mean() - control_df[metric_name].mean()
        _, pvalue = stats.ttest_ind(control_df[metric_name], test_df[metric_name])
        
        if return_deltas == True:
            return (pvalue, delta)
        else:
            return pvalue

"Оси", в которых будет производиться сравнение:

In [None]:
# Обычный тест, стратификация, постстратификация
methods = {
    
    'random_sampling_simple_mean': {'sampling_way': 'random',
                                    'mean_calc_way': 'simple'},
    'random_sampling_stratified_mean': {'sampling_way': 'random',
                                        'mean_calc_way': 'stratified'},
    'stratified_sampling_stratified_mean': {'sampling_way': 'stratified',
                                            'mean_calc_way': 'stratified'} 
}

# Исходная метрика, CU(M)PED / CUPAC - трансформированные метрики
techniques = {'initial': 'metric', 'cuped': 'metric_cuped', 'cumped': 'metric_cumped', 'cupac': 'metric_cupac'}

Автоматизируем подсчет ошибки I рода:

In [None]:
def calc_first_type_errors(methods, techniques, rho_lst, n_tests):
    '''
    Считает ошибку I рода:
    methods - словарь с методами. Содержит способ семплирования и способ подсчета среднего
    techniques - словарь с техниками. Содержит короткое название и метрики (техники), 
    для которых необходимо подсчитать ошибку I рода
    rho_lst - массив с корреляциями метрики и ковариатов для генерации базового датасета
    на каждой итерации
    n_tests - количество итераций, на основе которых будут посчитаны ошибки I рода
    '''
    
    fte_result = pd.DataFrame()
    
    for method in methods:
        sampling_way = methods[method]['sampling_way']
        mean_calc_way = methods[method]['mean_calc_way']
        
        first_type_errors_dct = {technique: [] for technique in techniques}
        
        for _ in tqdm.tqdm(range(n_tests), leave = None):
            
            base_df = generate_base_df(strat_to_param,
                      rho_lst = rho_lst, # [0.5, 0.45, 0.4], # [0.9, 0.8, 0.7],
                      treatment_effect = 0, 
                      sampling_way = sampling_way)
            
            for technique in techniques:
                pvalue = calc_ttest(base_df, 
                                    metric_name = techniques[technique],  
                                    mean_calc_way = mean_calc_way,
                                    return_deltas = False)
                first_type_errors_dct[technique].append(pvalue < alpha)
            
        data = {f'{method}': {technique: np.array(first_type_errors_dct[technique]).mean() for technique in techniques}}
        data = pd.DataFrame.from_dict(data, orient = 'columns')
        
        if len(fte_result) == 0:
            fte_result = deepcopy(data)
        else:
            fte_result = fte_result.join(data, how = 'left')
    
    return fte_result


rho_lst = [0.7, 0.65, 0.6]
n_tests = 10
fte_result = calc_first_type_errors(methods, techniques, rho_lst, n_tests)
fte_result.to_csv('./csv_saves/fte_result.csv')
fte_result # pd.read_csv('./csv_saves/fte_result.csv', index_col = 0)

Автоматизируем подсчет ошибки II рода и разниц средних (дельт):

In [None]:
def calc_second_type_errors(methods, techniques, rho_lst, n_tests):
    '''
    Считает ошибку II рода, а также разницы средних и их дисперсию:
    methods - словарь с методами. Содержит способ семплирования и способ подсчета среднего
    techniques - словарь с техниками. Содержит короткое название и метрики (техники), 
    для которых необходимо подсчитать ошибку II рода
    rho_lst - массив с корреляциями метрики и ковариатов для генерации базового датасета
    на каждой итерации
    n_tests - количество итераций, на основе которых будут посчитаны ошибки II рода
    '''
    
    ste_result = pd.DataFrame()
    deltas_result = pd.DataFrame()
    deltas_vis = dict()
    metrics_vis = dict()
    
    for method in methods:
        sampling_way = methods[method]['sampling_way']
        mean_calc_way = methods[method]['mean_calc_way']
        
        second_type_errors_dct = {technique: [] for technique in techniques}
        deltas_dct = {technique: [] for technique in techniques}
        metrics_dct = {technique: [] for technique in techniques}
        
        for _ in tqdm.tqdm(range(n_tests), leave = None):

            base_df = generate_base_df(strat_to_param,
                      rho_lst = rho_lst, # [0.5, 0.45, 0.4], # [0.9, 0.8, 0.7],
                      treatment_effect = 100, 
                      sampling_way = sampling_way)
            
            for technique in techniques:
                pvalue, delta = calc_ttest(base_df, 
                                           metric_name = techniques[technique],  
                                           mean_calc_way = mean_calc_way,
                                           return_deltas = True)
                second_type_errors_dct[technique].append(pvalue >= alpha)
                deltas_dct[technique].append(delta)
                metrics_dct[technique] += base_df[techniques[technique]].values.tolist()
            
        ste_data = {f'{method}': {technique: np.array(second_type_errors_dct[technique]).mean() for technique in techniques}}
        ste_data = pd.DataFrame.from_dict(ste_data, orient = 'columns')
        
        deltas_data = {f'{method}': {technique: np.array(deltas_dct[technique]).var(ddof = 1) for technique in techniques}}
        deltas_data = pd.DataFrame.from_dict(deltas_data, orient = 'columns')
        deltas_vis[f'{method}'] = {technique: np.array(deltas_dct[technique]) for technique in techniques}
        
        metrics_dct = {technique: np.array(metrics_dct[technique]) for technique in techniques}
        metrics_vis[f'{method}'] = metrics_dct
        
        if len(ste_result) == 0:
            ste_result = deepcopy(ste_data)
            deltas_result = deepcopy(deltas_data)
        else:
            ste_result = ste_result.join(ste_data, how = 'left')
            deltas_result = deltas_result.join(deltas_data, how = 'left')
    
    return ste_result, deltas_result, deltas_vis, metrics_vis


rho_lst = [0.35, 0.3, 0.25]
n_tests = 10
ste_result, deltas_result, deltas_vis, metrics_vis = calc_second_type_errors(methods, techniques, rho_lst, n_tests)
ste_result.to_csv('./csv_saves/ste_result.csv')
deltas_result.to_csv('./csv_saves/deltas_result.csv')
ste_result

Визуализация скоращения дисперсии разницы средних:

In [None]:
def visual_deltas_var(deltas_result, rho_lst):
    '''
    Визуализирует скоращение дисперсии разницы средних:
    deltas_result - pd.DataFrame с дисперсиями разницы средних по различным методам
    rho_lst - массив с корреляциями метрики и ковариатов, на основе которого был сгенерирован
    базовый датасет и посчитаны дисперсии разницы средних
    '''
    data = deepcopy(deltas_result)

    columns = [column.replace('_', ' ').replace('ing ', 'ing / ') for column in data.columns]

    rows = list(data.index)

    data = data.to_numpy()

    fig = plt.figure(figsize=(10, 6))
    ax = fig.add_subplot(111, projection='3d')

    xpos, ypos = np.meshgrid(range(data.shape[1]), range(data.shape[0]))

    xpos = xpos.flatten()
    ypos = ypos.flatten()
    zpos = np.zeros(len(xpos))
    dx = dy = 0.8
    dz = data.flatten()

    colors = 1 - dz / dz.max()  
    colors = plt.cm.RdYlBu(colors)  

    ax.bar3d(xpos, ypos, zpos, dx, dy, dz,  color=colors)
    
    initial_metric_index = list(rows).index('initial')
    percent_decrease = (data[initial_metric_index][0] - data) / data[initial_metric_index][0] * 100
    for i, j, z, decrease in zip(xpos, ypos, dz, percent_decrease.flatten()):
        if decrease != 0:
            ax.text(i + dx/2, j + dy/2, z + dz.max() * -0.03, f'{decrease:.2f}%', ha='center', color='black')

    ax.set_xticks([i + 0.5 for i in range(data.shape[1])], minor = True)
    ax.set_yticks([i + 0.5 for i in range(data.shape[0])], minor = True)
    ax.set_xticks(np.arange(-0.5, data.shape[1] - 0.5, 1), minor = False)
    ax.set_yticks(range(1, data.shape[0] + 1), minor = False, labels = rows)
    ax.set_xticklabels(columns, rotation = 15, fontweight='bold')
    ax.set_yticklabels(rows, fontweight='bold', minor = False)
    ax.tick_params(axis='both', pad=10)

    ax.set_title(f'Deltas variance reduction \n Corr(Y, X$_i$) = {rho_lst}', fontsize = 14)
    
    plt.savefig('./plot_saves/deltas_variance.png')
    plt.show();
    
visual_deltas_var(deltas_result, rho_lst)
# Image(filename='./plot_saves/deltas_variance.png')

То же самое, только теперь несколько наборов корреляций на одном графике:

In [None]:
def visual_several_deltas_var(deltas_results, rho_lsts):
    '''
    Визуализирует скоращение дисперсии разницы средних по списку массивов корреляций:
    deltas_result - список с pd.DataFrame, которые содеражт дисперсиии разницы средних по различным методам
    rho_lst - список массивов с корреляциями метрики и ковариатов, на основе которых были сгенерированы
    базовые датасеты и посчитаны дисперсии разницы средних
    '''
    
    fig = plt.figure(figsize=(17, 15))

    for idx in range(1, len(rho_lsts)+1):

        ax = fig.add_subplot(2, 2, idx, projection='3d')


        data = deepcopy(deltas_results[idx-1])

        columns = [column.replace('_', ' ').replace('ing ', 'ing / ') for column in data.columns]

        rows = list(data.index)

        data = data.to_numpy()

        xpos, ypos = np.meshgrid(range(data.shape[1]), range(data.shape[0]))

        xpos = xpos.flatten()
        ypos = ypos.flatten()
        zpos = np.zeros(len(xpos))
        dx = dy = 0.8
        dz = data.flatten()

        colors = 1 - dz / dz.max()  
        colors = plt.cm.RdYlBu(colors)  

        ax.bar3d(xpos, ypos, zpos, dx, dy, dz,  color=colors)

        initial_metric_index = list(rows).index('initial')
        percent_decrease = (data[initial_metric_index][0] - data) / data[initial_metric_index][0] * 100
        for i, j, z, decrease in zip(xpos, ypos, dz, percent_decrease.flatten()):
            if decrease != 0:
                ax.text(i + dx/2, j + dy/2, z + dz.max() * -0.03, f'{decrease:.2f}%', ha='center', color='black')

        ax.set_xticks([i + 0.5 for i in range(data.shape[1])], minor = True)
        ax.set_yticks([i + 0.5 for i in range(data.shape[0])], minor = True)
        ax.set_xticks(np.arange(-0.5, data.shape[1] - 0.5, 1), minor = False)
        ax.set_yticks(range(1, data.shape[0] + 1), minor = False, labels = rows)
        ax.set_xticklabels(columns, rotation = 15, fontweight='bold', fontsize = 14)
        ax.set_yticklabels(rows, fontweight='bold', minor = False, fontsize = 14)
        ax.tick_params(axis='y', pad=10)
        ax.tick_params(axis='x', pad=15)
        
        ax.set_title(f'Corr(Y, X$_i$) = {rho_lsts[idx-1]}', fontsize = 16)
            
    fig.suptitle('Deltas variance reduction', fontsize=20)
    plt.subplots_adjust(top=0.93)
    plt.show();
    
    
rho_lsts = [[0.35, 0.3, 0.25], [0.5, 0.45, 0.4], [0.65, 0.6, 0.55], [0.8, 0.75, 0.7]]
n_tests = 25000
deltas_results = []
for rho_lst in tqdm.tqdm(rho_lsts):
    _, deltas_result, _, _ = calc_second_type_errors(methods, techniques, rho_lst, n_tests)
    deltas_results.append(deltas_result)

visual_several_deltas_var(deltas_results, rho_lsts)

Визуализация плотостей распределения разностей средних для разных методов:

In [None]:
def visual_deltas(deltas_vis):
    '''
    Визуализирует плотности распределения разницы средних для различных методов сокращения дисперсии:
    deltas_vis - словарь, который содержит методы и массивы с разницами средних
    '''
    
    fig, axs = plt.subplots(len(deltas_vis), 1, figsize=(7, 17))    
    for n, method in enumerate(deltas_vis):
        for c, technique in enumerate(deltas_vis[method]):
            sns.kdeplot(deltas_vis[method][technique], 
                        label = f'{technique}',
                        color = sns.color_palette('bright')[c+1],
                        fill = True, 
                        alpha = 0.2, 
                        ax = axs[n])
            axs[n].set_title(f"{method.replace('_', ' ').replace('ing ', 'ing / ')}")
            axs[n].set_ylabel('')
            axs[n].legend()   
            
    plt.subplots_adjust(top=0.95)
    plt.subplots_adjust(hspace=0.3)
    plt.suptitle('Deltas density', fontsize=16)
    sns.set_style("white")
    sns.despine()
    plt.savefig('./plot_saves/deltas_density.png')
    plt.show();
    
visual_deltas(deltas_vis)

### Ошибки I и II рода

Тестируемые значения коэффицитов корреляции:

In [None]:
start_list = [0.05, 0, -0.05]
result_lists = [start_list]

while start_list[0] < 0.95:
    start_list = [x + 0.05 for x in start_list]
    result_lists.append(start_list)

result_lists = [lst for lst in result_lists if lst != [0.05, 0, -0.05]]

rho_lsts = result_lists
n_tests = 10

Визуализация ошибок I и II рода в зависимости от корреляции:

In [None]:
def visual_errors(methods, techniques, rho_lsts, n_tests):
    '''
    Визуализирует значения ошибок I и II в зависимости от корреляции:
    methods - словарь с методами. Содержит способ семплирования и способ подсчета среднего
    techniques - словарь с техниками. Содержит короткое название и метрики (техники), 
    для которых необходимо подсчитать ошибку II рода
    rho_lsts - список массивов с корреляциями метрики и ковариатов, для которых нужно вычислить ошибку I и II рода
    n_tests - количество итераций, на основе которых будут посчитаны ошибки I и II рода
    '''
    fte_data = pd.DataFrame()
    ste_data = pd.DataFrame()
    
    for rho_lst in tqdm.tqdm(rho_lsts, leave = None):
        
        fte_result = calc_first_type_errors(methods, techniques, rho_lst, n_tests)
        ste_result, _, _, _ = calc_second_type_errors(methods, techniques, rho_lst, n_tests)
        
        fte_data = pd.concat([fte_data, fte_result], axis=0)
        ste_data = pd.concat([ste_data, ste_result], axis=0)
        
    fte_dct = fte_data.groupby(fte_data.index).agg(list).to_dict(orient='dict')
    ste_dct = ste_data.groupby(ste_data.index).agg(list).to_dict(orient='dict')
    
    corr_lst = [x[0] for x in rho_lsts]

    titles = ['<b>' + key.replace('_', ' ').replace('ing ', 'ing / ') + '</b>'  for key in list(fte_dct.keys())]
    fig = make_subplots(rows=len(fte_dct.keys()), cols = 1, subplot_titles=titles,  vertical_spacing=0.1)

    tech = list(fte_dct[list(fte_dct.keys())[0]].keys())
    colors = {tech: px.colors.qualitative.Plotly[i] for i, tech in enumerate(tech)}

    for n, method in enumerate(fte_dct):
        for technique in fte_dct[method]:
            fig.add_trace(go.Scatter(
                x=corr_lst,
                y=fte_dct[method][technique],
                legendgroup=f"group{n}_fte",  
                legendgrouptitle_text="I type error",
                showlegend = True if n == 0 else False,
                name=technique,
                mode="lines",
                line=dict(color=colors[technique], dash="dot", width=1)
            ), row=n+1, col=1)

            fig.add_trace(go.Scatter(
                x=corr_lst,
                y=ste_dct[method][technique],
                legendgroup=f"group{n}_ste",  
                legendgrouptitle_text="II type error",
                showlegend = True if n == 0 else False,
                name=technique,
                mode="lines",
                line=dict(color=colors[technique], width=1)
            ), row=n+1, col=1)
            fig.update_xaxes(title_text="Corr(Y, X<sub>1</sub>)", row=n+1, col=1, showline=True, linewidth=1, linecolor='black')
            fig.update_yaxes(title_text="Error", row=n+1, col=1, showline=True, linewidth=1, linecolor='black')

    fig.update_layout(
        title="I / II type error dependency on correlation",
        title_x=0.5,
        plot_bgcolor="white",
        width=600, height=1000)

    fig.show();
    
    return fte_dct, ste_dct

fte_dct, ste_dct = visual_errors(methods, techniques, rho_lsts, n_tests)

### Эксперименты с CUPAC

Преобразуем немного функцию генерации базового датасета по части CUPAC:

In [None]:
def generate_base_df(strat_to_param,
                     rho_lst = [0.9, 0.8, 0.7],
                     treatment_effect = 0, 
                     sampling_way = 'random'):
    '''
    Генерирует базовый датасет с акцентом на модель, лежащую в основе CUPAC:
    strat_to_param - словарь с параметрами страт
    rho_lst - список с коэффициентами корреляции для метрики и 3 ковариатов
    sampling_way - способ семплирования (случайное / стратифицированное)
    '''
    
    # sampling
    
    if sampling_way == 'random':
        data = get_random_data(strat_to_param)
    else:
        data = get_stratified_data(strat_to_param)
        
    # covariates creation    
    
    data['metric_pre'] = rho_lst[0] * data['metric'] + \
        np.sqrt(1 - rho_lst[0]**2) * stats.norm(0, data['metric'].std(ddof = 1)).rvs(len(data))  
    data['X1_pre'] = rho_lst[1] * data['metric'] + \
        np.sqrt(1 - rho_lst[1]**2) * stats.norm(0, data['metric'].std(ddof = 1)).rvs(len(data))  
    data['X2_pre'] = rho_lst[2] * data['metric'] + \
        np.sqrt(1 - rho_lst[2]**2) * stats.norm(0, data['metric'].std(ddof = 1)).rvs(len(data))  
    
    data['metric_pre_pre'] = rho_lst[0] * data['metric_pre'] + \
        np.sqrt(1 - rho_lst[0]**2) * stats.norm(0, data['metric_pre'].std(ddof = 1)).rvs(len(data))  
    data['X1_pre_pre'] = rho_lst[1] * data['metric_pre'] + \
        np.sqrt(1 - rho_lst[1]**2) * stats.norm(0, data['metric_pre'].std(ddof = 1)).rvs(len(data))  
    data['X2_pre_pre'] = rho_lst[2] * data['metric_pre'] + \
        np.sqrt(1 - rho_lst[2]**2) * stats.norm(0, data['metric_pre'].std(ddof = 1)).rvs(len(data))  
    
    # treatment
        
    data.loc[data.is_treatment == 1, 'metric'] += treatment_effect
        
    # CUMPED

    model = smf.ols('metric ~ metric_pre + X1_pre + X2_pre', data = data).fit()
    data['metric_cumped'] = model.resid + data['metric'].mean()
        
    scaler = StandardScaler()
    X_train = deepcopy(data[['metric_pre_pre', 'X1_pre_pre', 'X2_pre_pre']])
    X_test = deepcopy(data[['metric_pre', 'X1_pre', 'X2_pre']])
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    
    # CUPAC_RF
    
    warnings.filterwarnings("ignore", category = FutureWarning)
    warnings.filterwarnings("ignore", category = ConvergenceWarning)
    
    model = RandomForestRegressor(n_estimators = 50, 
                                  max_depth = 7)
    
    model.fit(X = X_train, y = data['metric_pre'])
    data['metric_cupac_rf'] = data['metric'] - model.predict(X_test) + data['metric'].mean()
    
    # CUPAC_GB
    
    model = GradientBoostingRegressor(n_estimators = 50,
                                      criterion = 'squared_error',
                                      max_depth = 3,
                                      subsample = 0.6,
                                      learning_rate = 0.07)
    
    model.fit(X = X_train, y = data['metric_pre'])
    data['metric_cupac_gb'] = data['metric'] - model.predict(X_test) + data['metric'].mean()
    
    # CUPAC_MLP
    
    model =  MLPRegressor(hidden_layer_sizes = (30, 15),
                          solver = 'sgd', 
                          learning_rate_init = 0.000001,
                          learning_rate = 'invscaling',
                          power_t = 0.1,
                          max_iter = 300,
                          alpha = 0.1)
    
    model.fit(X = X_train, y = data['metric_pre'])
    data['metric_cupac_mlp'] = data['metric'] - model.predict(X_test) + data['metric'].mean()
        
    return data

In [None]:
data = generate_base_df(strat_to_param,
                        rho_lst = [0.5, 0.45, 0.4],
                        treatment_effect = 0, 
                        sampling_way = 'stratified')

data.head()

Новые "оси", в которых будет производиться сравнение:

In [None]:
# Обычный тест, стратификация, постстратификация
methods = {
    
    'random_sampling_simple_mean': {'sampling_way': 'random',
                                    'mean_calc_way': 'simple'},
    'random_sampling_stratified_mean': {'sampling_way': 'random',
                                        'mean_calc_way': 'stratified'},
    'stratified_sampling_stratified_mean': {'sampling_way': 'stratified',
                                            'mean_calc_way': 'stratified'} 
}

# Исходная метрика, CUMPED / CUPAC (RF, GBDT, MLP) - трансформированные метрики
techniques = {'initial': 'metric',
              'cumped': 'metric_cumped',
              'cupac_rf': 'metric_cupac_rf', 
              'cupac_gb': 'metric_cupac_gb', 
              'cupac_mlp': 'metric_cupac_mlp'}

#### Снижение дисперсии

Визуализация скоращения дисперсии разницы средних:

In [None]:
rho_lst = [0.65, 0.6, 0.55]
n_tests = 25000
_, deltas_result, deltas_vis, _ = calc_second_type_errors(methods, techniques, rho_lst, n_tests)

new_index = ['cumped', 'cupac_mlp', 'cupac_gb', 'cupac_rf', 'initial']
deltas_result = deltas_result.reindex(new_index)
visual_deltas_var(deltas_result, rho_lst)

Плотности распределения разностей средних для разных вариантов CUPAC:

In [None]:
visual_deltas(deltas_vis)

#### Ошибки I и II рода

Визуализация значений ошибок I и II рода в зависимости от корреляции и модели, лежащей в основе CUPAC:

In [None]:
start_list = [0.05, 0, -0.05]
result_lists = [start_list]

while start_list[0] < 0.95:
    start_list = [x + 0.05 for x in start_list]
    result_lists.append(start_list)

result_lists = [lst for lst in result_lists if lst != [0.05, 0, -0.05]]

rho_lsts = result_lists
n_tests = 1000

In [None]:
fte_dct, ste_dct = visual_errors(methods, techniques, rho_lsts, n_tests)

Материалы по теме исследования:

* https://www.microsoft.com/en-us/research/group/experimentation-platform-exp/articles/deep-dive-into-variance-reduction/

* https://towardsdatascience.com/how-to-double-a-b-testing-speed-with-cuped-f80460825a90
    
* https://research.facebook.com/blog/2020/10/increasing-the-sensitivity-of-a-b-tests-by-utilizing-the-variance-estimates-of-experimental-units/
    
* https://craft.faire.com/how-to-speed-up-your-a-b-test-outlier-capping-and-cuped-8c9df21c76b
    
* https://booking.ai/how-booking-com-increases-the-power-of-online-experiments-with-cuped-995d186fff1d
    
* https://habr.com/ru/companies/yandex/articles/497804/
    
* https://doordash.engineering/2020/06/08/improving-experimental-power-through-control-using-predictions-as-covariate-cupac/