In [1]:
import numpy as np
import pandas as pd
import uuid
from scipy import stats

In [2]:
# пример датафрейма:
males = pd.DataFrame({'strata':['male' for i in range(20000)], 'metric':np.random.normal(10, 20, 20000)}) # примеры страт
females = pd.DataFrame({'strata':['female' for i in range(100000)], 'metric':np.random.normal(15, 30, 100000)}) # примеры страт
data = pd.concat([males, females])
data['user_id'] = [str(uuid.uuid4()) for i in range(len(data))]
data = data[['user_id', 'strata', 'metric']]
data.sample(4)

Unnamed: 0,user_id,strata,metric
76154,42e9d942-cd49-4ad6-be99-8b228344d106,female,17.172661
41708,2a95210d-0539-4d39-b1ae-c0f27af66cc2,female,-5.444979
99838,3e2b113d-7f95-4f2b-8981-9a42b80650ec,female,41.227906
99341,2c2ed902-c23e-47d8-8032-f893e96718cc,female,-27.040493


In [3]:
# веса (доли в ген. совокупности):
weights = data['strata'].value_counts(normalize=True).to_dict()
weights

{'female': 0.8333333333333334, 'male': 0.16666666666666666}

## Стратифицированное семплирование:

**Стратифицированное семплирование** - процесс составления выборки, при котором доли
страт выборки равны долям страт в генеральной совокупности.

Например, у нас есть общая база клиентов (ген. совокупность) с 6000 пользователями. 
Из них 3000 клиентов мужского пола (cтрата 1), 2000 клиентов женского пола (страта 2) и 1000 без информации о поле (страта 3). 

То есть страта 1 имеет долю ***0.5***, страта 2 имеет долю ***0.33***, cтрата 3 имеет долю ***0.167***

Мы хотим  запустить тест на 300 пользователях. Чтобы сформировать выборку (контроль или тест), надо случайно выбрать:

300×0.5 = 150 клиентов из страты 1

300×0.33 = 100 клиентов из страты 2

300×0.167 = 50 клиентов из страты 3

И так составить 2 выборки: контроль и тест

Стратификация помогает снизить дисперсию и повысить чувствительность тестов, их мощность.

Это происходит из-за снижения межгрупповой дисперсии по формуле:

$$
\sigma^2 = \sum_{k} w_k \sigma_k^2 + \sum_{k} w_k (\mu_k - \mu)^2
$$


In [4]:
sample_size = 1000 # размер выборки (теста и контроля)

dfs_control = []
dfs_test = []

for strata, weight in weights.items():
    strata_data = data[data['strata']==strata] # данные страты
    users_control, users_test = np.random.choice(strata_data['user_id'], (2, int(weight*sample_size)), False)
    df_control, df_test = data[data['user_id'].isin(users_control)], data[data['user_id'].isin(users_test)]
    dfs_control.append(df_control)
    dfs_test.append(df_test)
    
data_control = pd.concat(dfs_control)
data_test = pd.concat(dfs_test)

In [5]:
# видим что в контроле и тесте сохраняются доли страт как в ген. совокупности:

print('КОНТРОЛЬ:\n')
print(data_control['strata'].value_counts(normalize=True))
print('\n\n')
print('ТЕСТ:\n')
print(data_test['strata'].value_counts(normalize=True))

КОНТРОЛЬ:

strata
female    0.833834
male      0.166166
Name: proportion, dtype: float64



ТЕСТ:

strata
female    0.833834
male      0.166166
Name: proportion, dtype: float64


## Проведение стратифицированного теста Стюдента:

In [6]:
# ф-ция расчета стратифицированного среднего:

def stratified_mean(data: pd.DataFrame, weights: dict) -> float:
    start_means = data.groupby(['strata']).agg({"metric":'mean'}).rename(columns={'metric':'avg_metric'})
    weights = pd.Series(weights, name='weight')
    df_means_weights = start_means.join(weights) # страта vs. avg_metric vs. weight

    # если в данных df не было каких-то страт, то часть weights затрётся и сумма весов будет меньше 1:
    df_means_weights['weight'] = df_means_weights['weight'] / df_means_weights['weight'].sum()

    startified_mean = (df_means_weights['avg_metric'] * df_means_weights['weight']).sum() # считаем страт. среднее
    return startified_mean

In [7]:
# ф-ция расчета стратифицированной дисперсии:

def stratified_varience(data: pd.DataFrame, weights: dict) -> float:
    strat_vars = data.groupby('strata')['metric'].var().to_frame().rename(columns={'metric':'varience'})
    weights = pd.Series(weights, name='weight')
    df_vars_weights = strat_vars.join(weights) # страта vs. varience vs. weight

    # если в данных df не было каких-то страт, то часть weights затрётся и сумма весов будет меньше 1:
    df_vars_weights['weight'] = df_vars_weights['weight'] / df_vars_weights['weight'].sum()

    var_strat = (df_vars_weights['weight'] * df_vars_weights['varience']).sum() # считаем страт. дисперсию
    return var_strat

In [10]:
# ф-ция получения p-value стратифицированного теста Стюдента:

def stratified_ttest(df_control: pd.DataFrame, df_test: pd.DataFrame, weights: dict) -> float:
    mean_strat_control = stratified_mean(df_control, weights)
    mean_strat_test    = stratified_mean(df_test, weights)
    var_strat_control = stratified_varience(df_control, weights)
    var_strat_test   = stratified_varience(df_test, weights)
    delta_mean_strat = mean_strat_test - mean_strat_control
    std_mean_strat = (var_strat_test / len(df_test) + var_strat_control / len(df_control)) ** 0.5
    t = delta_mean_strat / std_mean_strat
    pvalue = (1 - stats.norm.cdf(np.abs(t))) * 2
    return pvalue

In [11]:
# пример:
stratified_ttest(data_control, data_test, weights)

0.4391057539760479

## Пост-стратификация:

Пост-стратификация - это метод анализа результатов АБ-теста, который был сплитован обычным случайным сплитованием.

Его результат аналогичен применению стратификации.

Его применяют если нет возомжности создать сплитовалку трафика по стратам.

In [12]:
# пример выборок (контроль и тест), полученные случайным сплитованием:

sample_size = 100
users_control, users_test = np.random.choice(data['user_id'], (2, sample_size), False) # рандомное сплитование
data_control = data[data['user_id'].isin(users_control)]
data_test = data[data['user_id'].isin(users_test)]

In [13]:
# делаем пост-стратификацию:

stratified_ttest(data_control, data_test, weights) # просто применяем ту же ф-цию с весами

0.22451515618302675

Возникает резонный вопрос - а зачем тогда проводить стартификацию если пост-стратификация дает такой же результат?

Стратификация помогает снизить sample_size в формуле MDE за счет уменьшения дисперсии.