In [7]:
import pandas as pd
import numpy as np
import datetime
import scipy.stats as stats
from scipy.stats import norm


class SequentialTester:
    def __init__(
        self, metric_name, time_column_name,
        alpha, beta, pdf_one, pdf_two
    ):
        """Создаём класс для проверки гипотезы о равенстве средних тестом Вальда.

        Предполагается, что среднее значение метрики у распределения альтернативной
        гипотезы с плотность pdf_two больше.

        :param metric_name: str, название стобца со значениями измерений.
        :param time_column_name: str, названия столбца с датой и временем измерения.
        :param alpha: float, допустимая ошибка первого рода.
        :param beta: float, допустимая ошибка второго рода.
        :param pdf_one: function, функция плотности распределения метрики при H0.
        :param pdf_two: function, функция плотности распределения метрики при H1.
        """
        self.metric_name = metric_name
        self.time_column_name = time_column_name
        self.alpha = alpha
        self.beta = beta
        self.pdf_one = pdf_one
        self.pdf_two = pdf_two
        
        # YOUR_CODE_HERE
        self.data_control = pd.DataFrame()
        self.data_pilot   = pd.DataFrame()

    def run_test(self, data_control, data_pilot):
        """Запускаем новый тест, проверяет гипотезу о равенстве средних.
        
        :param data_control: pd.DataFrame, данные контрольной группы.
        :param data_pilot: pd.DataFrame, данные пилотной группы.
        
        :return (result, length):
            result: float,
                0 - отклоняем H1,
                1 - отклоняем H0,
                0.5 - недостаточно данных для принятия решения
            length: int, сколько потребовалось данных для принятия решения. Если данных 
                недостаточно, то возвращает текущее кол-во данных. Кол-во данных - это
                кол-во элементов в одном из наборов data_control или data_pilot.
                Гарантируется, что они равны.
        """
        # YOUR_CODE_HERE
        
        # Сохраняем данные.Будут как reference
        if 0 == self.data_control.shape[0]:
            data_control[self.time_column_name] = pd.to_datetime(data_control[self.time_column_name])
            data_pilot[self.time_column_name]   = pd.to_datetime(data_pilot[self.time_column_name])
            self.data_control = pd.concat([self.data_control, data_control], axis=0)
            self.data_pilot   = pd.concat([self.data_pilot, data_pilot], axis=0)
        else:
            print(f"run_test: WARNING: Using pre-saved data: shape={self.data_control.shape}")
        
        return self._test_sequential_wald(
            data_one = self.data_control[self.metric_name].to_numpy(), 
            data_two = self.data_pilot[self.metric_name].to_numpy(),
            pdf_one = self.pdf_one,
            pdf_two = self.pdf_two,
            alpha = self.alpha,
            beta = self.beta)

    
    def add_data(self, data_control, data_pilot):
        """Добавляет новые данные, проверяет гипотезу о равенстве средних.
        
        Гарантируется, что данные новые и не дублируют ранее добавленные.
        
        :param data_control: pd.DataFrame, новые данные контрольной группы.
        :param data_pilot: pd.DataFrame, новые данные пилотной группы.
        
        :return (result, length):
            result: float,
                0 - отклоняем H1,
                1 - отклоняем H0,
                0.5 - недостаточно данных для принятия решения
            length: int, сколько потребовалось данных для принятия решения. Если данных 
                недостаточно, то возвращает текущее кол-во данных. Кол-во данных - это
                кол-во элементов в одном из наборов data_control или data_pilot.
                Гарантируется, что они равны.
        """
        # YOUR_CODE_HERE
        #print(f"Before: {st.data_control.shape=}, {st.data_pilot.shape}")
        self.data_control = self._add_data_2_df1(self.data_control, data_control, self.time_column_name)
        self.data_pilot   = self._add_data_2_df1(self.data_pilot, data_pilot, self.time_column_name)
        #print(f"After: {st.data_control.shape=}, {st.data_pilot.shape}")
 
        return self.run_test(self.data_control, self.data_pilot)

        
    # helpers
    def _add_data_2_df1(self, df1, df2, time_column_name) -> pd.DataFrame:
        """
        Добавляет в df1 новые данные из df2. "новые" - это те, у которых ts > df1_max_ts
        """
        df1_max_ts = df1[time_column_name].max()
        df2[time_column_name] = pd.to_datetime(df2[time_column_name])
        mask = (df2[time_column_name] > df1_max_ts)
        df1 = pd.concat([df1, df2[mask]],ignore_index=True).drop_duplicates()
        #print(df1)
        return df1

    def _test_sequential_wald(self, data_one, data_two, pdf_one, pdf_two, alpha, beta):
        """Последовательно проверяет отличие по мере поступления данных.
    
        pdf_one, pdf_two - функции плотности распределения при нулевой и альтернативной гипотезах
    
        Возвращает 1, если были найдены значимые отличия, иначе - 0. И кол-во объектов при принятии решения.
        """
        lower_bound = np.log(beta / (1 - alpha))
        upper_bound = np.log((1 - beta) / alpha)
    
        min_len = min([len(data_one), len(data_two)])
        data_one = data_one[:min_len]
        data_two = data_two[:min_len]
        delta_data = data_two - data_one
    
        pdf_one_values = pdf_one(delta_data)
        pdf_two_values = pdf_two(delta_data)
    
        z = np.cumsum(np.log(pdf_two_values / pdf_one_values))
    
        indexes_lower = np.arange(min_len)[z < lower_bound]
        indexes_upper = np.arange(min_len)[z > upper_bound]
        first_index_lower = indexes_lower[0] if len(indexes_lower) > 0 else min_len + 1
        first_index_upper = indexes_upper[0] if len(indexes_upper) > 0 else min_len + 1
    
        if first_index_lower < first_index_upper:
            return 0, first_index_lower + 1
        elif first_index_lower > first_index_upper:
            return 1, first_index_upper + 1
        else:
            return 0.5, min_len

In [2]:
#
# Создаем генератор (источник) данных
#

MEAN = 10
EFFECT = 1.03
STD = 1
ALPHA = 0.05
BETA = 0.2

START_TS = '2022-01-01 00:00'            # Timestamp начала эксперимента
metric_name = 'value'           # название столбца со значениями
time_column_name = 'ts'         # название столбца с датой и временем измерения

def pdf_one(x):
    """Функция плотности разницы средних при верности нулевой гипотезы."""
    return stats.norm.pdf(x, 0, np.sqrt(2) * STD)

def pdf_two(x):
    """Функция плотности разницы средних при верности альтернативной гипотезы."""
    return stats.norm.pdf(x, MEAN * (EFFECT-1), np.sqrt(2) * STD)

def create_df(data_ts:np.array, data:np.array, time_column_name:str=time_column_name, metric_name:str=metric_name) -> pd.DataFrame:
    df = pd.DataFrame()
    df[time_column_name]=data_ts
    df[metric_name]=data
    df[time_column_name] = df[time_column_name].dt.strftime("%Y-%m-%d %H:%M")
    return df

def generate_samples(start_ts = START_TS, 
                     sample_size=10, 
                     sample_interval=10, 
                     time_column_name:str=time_column_name, 
                     metric_name:str=metric_name) -> tuple:

    data_a = np.random.normal(MEAN, STD, sample_size)
    data_b = np.random.normal(MEAN * EFFECT, STD, sample_size)

    data_ts = []
    start_ts = datetime.datetime.strptime(start_ts, "%Y-%m-%d %H:%M")
    for i in range(sample_size):
        delta = datetime.timedelta(minutes = sample_interval*i)
        data_ts.append(start_ts + delta)

    data_control= create_df(data_ts, data_a)
    data_pilot  = create_df(data_ts, data_b) 
    
    return data_control, data_pilot

In [3]:
seq_tester = SequentialTester(metric_name = metric_name,
                                     time_column_name = time_column_name,
                                     alpha = ALPHA,
                                     beta = BETA,
                                     pdf_one = pdf_one,
                                     pdf_two = pdf_two)
                                     

In [4]:
data_control, data_pilot = generate_samples(sample_size=5)

seq_tester.run_test(data_control, data_pilot)

(0.5, 5)

In [6]:
data_control, data_pilot = generate_samples(sample_size=15)
seq_tester.add_data(data_control, data_pilot)



(0.5, 15)

In [114]:
def add_data_2_df1(df1, df2, time_column_name = time_column_name) -> pd.DataFrame:
    df1_max_ts = df1[time_column_name].max()
    df2[time_column_name] = pd.to_datetime(df2[time_column_name])
    mask = (df2[time_column_name] > df1_max_ts)
    df1 = pd.concat([df1, df2[mask]],ignore_index=True).drop_duplicates()
    #print(df1)
    return df1

print(f"Before: {st.data_control.shape=}, {st.data_pilot.shape}")

st.data_control = add_data_2_df1(st.data_control, data_control)
st.data_pilot   = add_data_2_df1(st.data_pilot, data_control)

print(f"After: {st.data_control.shape=}, {st.data_pilot.shape}")


st.run_test(st.data_control, st.data_pilot)

Before: st.data_control.shape=(30, 2), (30, 2)
After: st.data_control.shape=(100, 2), (100, 2)


(0, 70)

In [105]:
st.data_control, st.data_pilot

(                   ts      value
 0 2022-01-01 00:00:00  11.478448
 1 2022-01-01 00:10:00   9.856376
 2 2022-01-01 00:20:00  11.885028
 3 2022-01-01 00:30:00  10.018110
 4 2022-01-01 00:40:00  11.411409
 5 2022-01-01 00:50:00  10.759531
 6 2022-01-01 01:00:00  11.376440
 7 2022-01-01 01:10:00   7.681917
 8 2022-01-01 01:20:00  10.653524
 9 2022-01-01 01:30:00   9.100738,
                    ts      value
 0 2022-01-01 00:00:00  10.782323
 1 2022-01-01 00:10:00  11.341916
 2 2022-01-01 00:20:00  10.456278
 3 2022-01-01 00:30:00  11.168543
 4 2022-01-01 00:40:00  10.944976
 5 2022-01-01 00:50:00  10.759531
 6 2022-01-01 01:00:00  11.376440
 7 2022-01-01 01:10:00   7.681917
 8 2022-01-01 01:20:00  10.653524
 9 2022-01-01 01:30:00   9.100738)

In [60]:
# add_data


data_control_union= pd.concat([st.data_control, data_control],ignore_index=True).drop_duplicates()
data_pilot_union = pd.concat([st.data_pilot, data_pilot],ignore_index=True).drop_duplicates()

st.data_pilot.shape, data_pilot.shape, data_pilot_union.shape

((5, 2), (10, 2), (15, 2))

In [None]:
df_union= pd.concat([df1, df2],ignore_index=True).drop_duplicates()
df_union

In [32]:
st.run_test(data_control, data_control)

(0, 624)

In [11]:
# run_test
def run_test(data_control, data_pilot) -> tuple:
    
    return st.test_sequential_wald(data_one = data_control[metric_name].to_numpy(), 
                     data_two = data_pilot[metric_name].to_numpy(),
                     pdf_one = st.pdf_one,
                     pdf_two = st.pdf_two,
                     alpha = st.alpha,
                     beta = st.beta)

run_test(data_control, data_pilot)

(0.5, 2)