In [4]:
import math
import os
import sys
import time

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random as rnd
import seaborn as sns
import scipy.stats as stats

from tqdm import tqdm
import statsmodels
from statsmodels.stats.proportion import proportions_ztest
from statsmodels.stats.power import TTestIndPower
from datetime import datetime, timedelta, date
from collections import namedtuple


import warnings
warnings.simplefilter(action='ignore', category=RuntimeWarning)
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np

import warnings

from typing import Optional, List
from matplotlib.ticker import FuncFormatter
from scipy.stats import t, ttest_ind_from_stats
from scipy import stats
from statsmodels.stats.power import tt_ind_solve_power
from statsmodels.stats.multitest import fdrcorrection

In [5]:
from libs.connections import sogu, anl # для подключения к продовому clickhouse и аналитическому postgresql
"""
#def sogu_query_executor(query):
    # PUT YOUR FUNCTION HERE
    
#def anl_query_executor(query):
    # PUT YOUR FUNCTION HERE

sogu = sogu_query_executor
anl = anl_query_executor
""";

# Технические функции

In [6]:


def binary_search(low, high, need_more_func):
    tolerance = 1e-4
    while high - low > tolerance:
        answer = (low + high) / 2
        if need_more_func(answer):
            low = answer
        else:
            high = answer
    return answer

from libs.sql import literal

def power_proportion(n0, k0, mde):
    p0 = k0 / n0
    p1 = p0 * (1 + mde/100)
    A = stats.binom(n0, p0)
    B = stats.binom(n0, p1)
    A_perc95 = A.ppf(0.95)
    return 1 - B.cdf(A_perc95)

def mde_proportion(n0, k0, alpha=0.05, beta=0.20):
    mde=50
    delta_mde = 25
    for _ in range(18): # 18 шагов достаточно
        pow = power_proportion(n0, k0, mde)
        dlt = pow - (1-beta)
        mde += delta_mde * (-1 if dlt > 0 else +1)
        delta_mde *= 2/3
    return mde

mde_proportion(146544, 16719)

1.8052720360899692

# Банк состояний и банк метрик

In [25]:
class WHERE:
    """ 
        Cрез пользователей эксперимента
    """
    all = {
        'default_bot_filter': ''' device_id <> '' and ua_is_bot = 0 ''',
        'web': ''' platform in ('web', 'backend') ''',
        'web desktop': ''' platform in ('web', 'backend') and ua_is_mobile = 0 ''',
        'web mobile': ''' platform in ('web', 'backend') and ua_is_mobile = 1 ''',
        'app': ''' platform in ('ios', 'android', 'backend') '''
    }

    def __init__(self, name: str, database='clickhouse prod', table='sogu.events'):
        assert database=='clickhouse prod', f'Неподдерживаемая база данных {database}'
        assert table=='sogu.events', f'Неподдерживаемая таблица {table}'
        
        assert name in WHERE.all, f"Фильтр {name} не предусмотрен"
        
        self.where = WHERE.all[name]


    def __repr__(self):
        return f"WHERE({self.name})"


class STATE:
    """
        Состояние. Числитель или знаменатель метрики 
    """
    def set_database(self, database:str):
        if database == '':
            self.database = 'clickhouse prod'
        elif database in ('clickhouse prod', 'postgresql analytics'):
            self.database = database
        else:
            raise ValueError(f"Неподдерживаемая база данных {database}")
        

    def set_table(self, table:str):
        if self.database == 'clickhouse prod':
            if table == '':
                self.table = 'sogu.events'
            elif table == 'sogu.events':
                self.table = table
            else:
                raise ValueError(f"Неподдерживаемая таблица {table} для базы данных {self.database}")
        elif self.database == 'postgresql analytics':
            if table == '':
                self.table = 'analytics.mart_orders'
            elif table == 'analytics.mart_orders':
                self.table = table
            else:
                raise ValueError(f"Неподдерживаемая таблица {table} для базы данных {self.database}")
        else:
            raise ValueError(f"self.database == '{self.database}' выставлена неверно")


    def set_state(self, state_type:str, **kwargs:dict):
        if state_type == 'bare_sogu_event':
            event_name = kwargs['event_name']

            self.required_events = set([event_name])
            self.event_name = event_name
            self.where = f''' event_name = '{event_name}' '''

        elif state_type == 'parametered_sogu_event':
            required_events = kwargs['required_events']
            where = kwargs['where']

            self.required_events = set(required_events)
            self.where = where

        elif state_type == 'calculus_mart_orders':
            where = kwargs['where']
            
            self.required_events = set()
            self.column = where
            self.where = where 
            self.alias = self.name

        elif state_type == 'string_counter':
            self.required_events = set()
            self.column = " 1=1 "
            self.alias = self.name
            
        elif state_type == 'column_is_not_null':
            column = kwargs['column']
            self.column = f'{column} is not null'
            self.alias = self.name 
        
        elif state_type == 'column_value':
            column = kwargs['column']
            self.column = column

        elif state_type == 'experiments_hit':
            experiment_name = kwargs['experiment_name']
            experiment_variants = kwargs['experiment_variants'] # ['orignial', 'top_header', 'floating_bottom_button', ]
            assert self.name.startswith("experiment_"), f"Назовите состояние {self.name} с префиксом 'experiment_', если тип состояния 'experiments_hit'"

            self.required_events = set(['experiments.hit',])

        elif state_type == 'foreign_key':
            column = kwargs['column']
            where = kwargs['where']
            alias = kwargs['alias']
            required_events = kwargs['required_events']

            self.column = column
            self.where = where
            self.alias = alias
            self.required_events = set(required_events)

        elif state_type == 'many_states_or':
            states_list = kwargs['states_list']
            pass

        else:
            raise ValueError(f"Неподдерживаемый state_type {state_type}")
        self.state_type = state_type


    def __init__(self, name:str,
                state_type:str,
                **kwargs:dict):
        database = kwargs['database'] if 'database' in kwargs else ""
        table = kwargs['table'] if 'table' in kwargs else ""

        self.name = name
        self.required_events = set()
        self.set_database(database)
        self.set_table(table)
        self.set_state(state_type, **kwargs)


    def get_hit_where(self):
        if self.state_type == 'experiments_hit':
            return ''' event_name = 'experiments.hit' and JSONExtractString(params, 'experiment') = '{experiment_name}')'''
        if self.state_type == 'parametered_sogu_event':
            return self.where
        
        raise ValueError(f"При state_type='{self.state_type}' стейт не может быть хитом")


    def __repr__(self):
        return f"STATE({self.name})"

states_sogu = {
    'device_id': STATE('device_id', 'string_counter'),
    'main_pageview': STATE('main_pageview', 'bare_sogu_event', event_name='main.pageview'),
    'listing_pageview': STATE('listing_pageview', 'bare_sogu_event', event_name='listing.pageview'),
    'experience_pageview': STATE('experience_pageview', 'bare_sogu_event', event_name='experience.pageview'),
    'booking_pageview': STATE('booking_pageview', 'bare_sogu_event', event_name='booking.pageview'),

    'listing_filter_open__auto_listing_open_popup': STATE('listing_filter_open__auto_listing_open_popup', 'parametered_sogu_event', 
                                                          required_events=['listing:filter.open'], 
                                                          where=''' event_name = 'listing:filter.open' and JSONExtractString(params, 'filter_window_type') = 'auto_listing_open_popup' ''', 
                                                          ),
    'listing_filter_apply__auto_listing_open_popup': STATE('listing_filter_apply__auto_listing_open_popup', 'parametered_sogu_event', 
                                                          required_events=['listing:filter.apply'], 
                                                          where=''' event_name = 'listing:filter.apply' and JSONExtractString(params, 'filter_window_type') = 'auto_listing_open_popup' ''', 
                                                          ),
    'listing_filter_apply_changed__auto_listing_open_popup': STATE('listing_filter_apply_changed__auto_listing_open_popup', 'parametered_sogu_event', 
                                                          required_events=['listing:filter.apply'], 
                                                          where=''' event_name = 'listing:filter.apply' and JSONExtractString(params, 'filter_window_type') = 'auto_listing_open_popup' 
\t\tand (JSONExtractInt(params, 'persons_count') <> 1 or JSONExtractString(params, 'duration_interval') <> '[]' or JSONExtractString(params, 'start_time_interval') <> '[]') ''', 
                                                          ),

    'sogu_order_created': STATE('sogu_order_created', 'bare_sogu_event', event_name='booking:order-create.success',),
}

# states для связок sogu с другими таблицами. В основном, для связи созданных заказов с analitics.mart_orders
# как второй пример: experience_id для связи с витриной Э
states_foreign = {
    'foreign_key__order_id__order_created': STATE('foreign_key__order_id__order_created', 'foreign_key', column=''' JSONExtractString(params, 'order_id') ''', where=''' event_name='booking:order-create.success' ''', alias='order_id', required_events=['booking:order-create.success'],),
    'foreign_key__experience_id__experience_view': STATE('foreign_key__experience_id__experience_view', 'foreign_key', column=''' JSONExtractString(params, 'experience_id') ''', where=''' event_name='experience.pageview' ''', alias = 'experience_id', required_events=['experience.pageview'],),
}

# states для связок sogu с другими таблицами
states_postgresql_anal = {
    'anl_order': STATE('anl_order', 'string_counter', database='postgresql analytics',),
    'order_w_paid': STATE('order_w_paid', 'column_is_not_null', column='payment_date', database='postgresql analytics',),
    'order_w_paid_no_refund': STATE('order_w_paid_no_refund', 'calculus_mart_orders', where='payment_date is not null and money_refund_date is null', database='postgresql analytics',),
    'delta_hours_order_to_date_exact': STATE('delta_hours_order_to_date_exact', 'column_value', column='delta_hours_order_to_date_exact',),

    'experiment_ranking_rpv_new': STATE('experiment_ranking_rpv_new', 'experiments_hit', experiment_name='ranking_rpv_new', experiment_variants=['original','ranking_rpv', 'ranking_mab_new']),
}

# states для событий experiments.hit и постанализов sogu с другими таблицами
states_experiment = {
    # для каждого постанализа необходимо добавлять событсвенные state
}

states_all = states_sogu | states_foreign | states_postgresql_anal | states_experiment

class METRIC:
    def __init__(self, denominator, numerator, metric_type):
        assert denominator in states_all, "Знаменателя метрики нет в стейджах"
        assert numerator in states_all, "Числителя метрики нет в стейджах"
        assert metric_type in ('ratio', 'mean', 'user_based', 'binary'), "Укажите верный тип метрики"
        self.denominator = states_all[denominator]
        self.numerator = states_all[numerator]
        self.metric_type = metric_type


    def __repr__(self):
        return f"METRIC{(self.denominator.name, self.numerator.name, self.metric_type)}"
    

    def get_stages_set(self):
        return set([self.denominator, self.numerator])

metrics_all = {
    'Конверсия из девайса в просмотр главной': METRIC('device_id', 'main_pageview', 'binary'),
    'Конверсия из девайса в просмотр листинга': METRIC('device_id', 'listing_pageview', 'binary'),
    'Конверсия из девайса в открытую экскурсию': METRIC('device_id', 'experience_pageview', 'binary'),
    'Среднее количесто листингов на пользователя': METRIC('device_id', 'listing_pageview', 'mean'),

    'Конверсия из девайса в показ автоматического попапа с фильтрами на листинге': METRIC('device_id', 'listing_filter_open__auto_listing_open_popup', 'binary'),
    'Конверсия из девайса в применение фильтров на автоматическом попапе на листингах': METRIC('device_id', 'listing_filter_apply__auto_listing_open_popup', 'binary'),
    'Конверсия из девайса в применение не дефолтных фильтров на автоматическом попапе на листингах': METRIC('device_id', 'listing_filter_apply_changed__auto_listing_open_popup', 'binary'),

    'Конверсия из девайса в страницу букинга': METRIC('device_id', 'booking_pageview', 'binary'),
    'Конверсия из девайса в создание заказа': METRIC('device_id', 'sogu_order_created', 'binary'),

    'Конверсия из девайса в оплаченный заказ без возврата': METRIC('device_id', 'order_w_paid_no_refund', 'binary'),
    'Конверсия в оплаченный заказ без возврата из девайса с заходом на главную': METRIC('main_pageview', 'order_w_paid_no_refund', 'binary'),
    'Конверсия из заказа в оплаченный заказ без возврата': METRIC('anl_order', 'order_w_paid_no_refund', 'binary'),

    'Конверсия из заказа в оплату': METRIC('anl_order', 'order_w_paid', 'binary'), 
    'Часов между заказом и исполнением': METRIC('anl_order', 'delta_hours_order_to_date_exact', 'mean'),
    'Конверсия из листинга в оплаченный заказ': METRIC('listing_pageview', 'order_w_paid', 'binary'),
}

In [26]:
config = {
    # если техническое название эксперимента есть, то experiment_name > ""
    # иначе смотрим н
    'experiment_name': 'download_app_motivators',
    'experiment_group_stages': [
        'ranking_rpv_new_1', 
        'stage_b_name',
    ],
    'variants': {'top_banner':'A', 'bottom_banner':'B'},
    'start_datetime': '2025-09-29',
    'finish_datetime': '2025-10-03',
    'n_groups': 2,
    'split_size': (0.5, 0.5),
    'alpha': 0.05,
    'beta': 0.20,

    'hit_state': 'listing_filter_open__auto_listing_open_popup',
    
    'where_base_part': [
        'default_bot_filter',
        'web mobile',
    ],

    'goal_metric': 'Конверсия из листинга в оплаченный заказ',

    'metrics': [
        'Конверсия из девайса в просмотр главной',
        'Конверсия из девайса в просмотр листинга',
        'Конверсия из девайса в открытую экскурсию',
        'Конверсия из девайса в страницу букинга',
        'Конверсия из девайса в показ автоматического попапа с фильтрами на листинге',
        'Конверсия из девайса в применение фильтров на автоматическом попапе на листингах',
        'Конверсия из девайса в применение не дефолтных фильтров на автоматическом попапе на листингах',

        'Среднее количесто листингов на пользователя',
        'Конверсия из девайса в создание заказа',
        'Конверсия из заказа в оплату',
        'Конверсия из девайса в оплаченный заказ без возврата',
        'Часов между заказом и исполнением',
    ],
}

class SOGU_SCRIPT:
    def set_time(self, config):
        self.start_datetime = config['start_datetime']
        self.finish_datetime = config['finish_datetime']


    def set_metrics_and_states(self, config):
        self.states = set()
        self.metrics = set()
        for metric_name in config['metrics']:
            denominator = states_all[metrics_all[metric_name].denominator.name]
            numerator = states_all[metrics_all[metric_name].numerator.name]
            if denominator.database == 'clickhouse prod' and denominator.table == 'sogu.events':
                self.states.add(denominator)
            if numerator.database == 'clickhouse prod' and numerator.table == 'sogu.events':
                self.states.add(numerator)
            if denominator.database == 'clickhouse prod' and denominator.table == 'sogu.events' and numerator.database == 'clickhouse prod' and numerator.table == 'sogu.events':
                self.metrics.add(metrics_all[metric_name])
        #print("self.states", self.states)
        #print("self.metics", self.metrics)


    def set_where_base_part(self, config):
        where_base_part = config['where_base_part']
        
        WHERE_DT = f'''where toDate(dt) between toDate('{self.start_datetime}') and toDate('{self.finish_datetime}')''' + \
f'''\n\t\tand dt between '{self.start_datetime}' and '{self.finish_datetime}' '''
        list_of_where = [WHERE_DT] + [WHERE(new_filter).where for new_filter in where_base_part]
        self.where_base_part = """\n\t\tand""".join(list_of_where)  

    def set_where_hit_part(self, config):
        if self.calculation_mode == 'mde':
            hit_state = states_all[config['hit_state']]
            self.where_hit_part = hit_state.get_hit_where()
        else:
            assert 1==0, 'Предусмотри calculation_mode "experiment_name"'


    def set_requier_events(self):
        self.requier_events = set()
        for state in self.states:
            #print(state, state.state_type, state.required_events)
            self.requier_events |= state.required_events


    def set_states_select(self):
        select_list = []
        for metric in self.metrics:
            if metric.metric_type == 'binary':
                st = f'anyIf(1, {metric.numerator.where}) as "has_{metric.numerator.name}"'
                select_list.append(st)
            elif metric.metric_type == 'mean':
                st = f'countIf(1, {metric.numerator.where}) as "avg_{metric.numerator.name}"'
                select_list.append(st)
            else:
                print(metric, 'без состояния в sogu')
        self.states_select = ',' + '\n\t,'.join(select_list)
    

    def __init__(self, config):
        #self.calculation_mode = 'post_ab' if 'experiment_name' in config else 'mde'
        self.calculation_mode = 'mde'

        self.set_time(config)
        self.set_metrics_and_states(config)
        self.set_requier_events()
        self.set_states_select()

        self.set_where_base_part(config)
        self.set_where_hit_part(config)
        
    
    def get_base_device_id_query(self):
        return f""" 
    with hit_time as (
        select 
            device_id as hit_time_device_id
            ,min(dt) as min_dt
        from sogu.events
        {self.where_base_part}
        and {self.where_hit_part}

        group by device_id
    )
    select 
        device_id
        {self.states_select}
    from sogu.events
    
    inner join hit_time 
    on sogu.events.device_id = hit_time.hit_time_device_id

    {self.where_base_part}
    and event_name in ({literal(self.requier_events)})

    group by device_id
"""
    

    def get_foreign_key_query(self, foreign_key_state_name):
        foreign_key_state = states_all[foreign_key_state_name]
        query = f''' 
with hit_time as (
        select 
            device_id as hit_time_device_id
            ,min(dt) as min_dt
        from sogu.events
        {self.where_base_part}
        and {self.where_hit_part}

        group by device_id
    )

select distinct
    device_id
    ,{foreign_key_state.column}as {foreign_key_state.alias}
from sogu.events

inner join hit_time 
on sogu.events.device_id = hit_time.hit_time_device_id

{self.where_base_part}
\tand event_name in ({literal(foreign_key_state.required_events)})
'''
        return query
    

    def run_base_device_id_query(self, show=True):
        query = self.get_base_device_id_query()
        print("==="*20, 'Запущен sogu скрипт', "==="*20)
        for row in query.split('\n'):
            print(row)
        return sogu(query)
    

    def run_foreign_key_query(self, foreign_key_state_name, show=True):
        query = self.get_foreign_key_query(foreign_key_state_name)
        print("==="*20, 'Запущен sogu скрипт', "==="*20)
        for row in query.split('\n'):
            print(row)
        return sogu(query)


sogu_script = SOGU_SCRIPT(config)

In [27]:
devices = sogu_script.run_base_device_id_query()
print("len(devices)", len(devices))
devices.head(2)

 
    with hit_time as (
        select 
            device_id as hit_time_device_id
            ,min(dt) as min_dt
        from sogu.events
        where toDate(dt) between toDate('2025-09-29') and toDate('2025-10-03')
		and dt between '2025-09-29' and '2025-10-03' 
		and device_id <> '' and ua_is_bot = 0 
		and platform in ('web', 'backend') and ua_is_mobile = 1 
        and  event_name = 'listing:filter.open' and JSONExtractString(params, 'filter_window_type') = 'auto_listing_open_popup' 

        group by device_id
    )
    select 
        device_id
        ,countIf(1,  event_name = 'listing.pageview' ) as "avg_listing_pageview"
	,anyIf(1,  event_name = 'booking:order-create.success' ) as "has_sogu_order_created"
	,anyIf(1,  event_name = 'booking.pageview' ) as "has_booking_pageview"
	,anyIf(1,  event_name = 'experience.pageview' ) as "has_experience_pageview"
	,anyIf(1,  event_name = 'main.pageview' ) as "has_main_pageview"
	,anyIf(1,  event_name = 'listing:filter.apply' and JSON

Unnamed: 0,device_id,avg_listing_pageview,has_sogu_order_created,has_booking_pageview,has_experience_pageview,has_main_pageview,has_listing_filter_apply_changed__auto_listing_open_popup,has_listing_filter_apply__auto_listing_open_popup,has_listing_filter_open__auto_listing_open_popup,has_listing_pageview
0,122f5064-381b-4a17-9016-8a2253028f0a,8,0,0,1,0,0,0,1,1
1,d3422c4c-e245-49e1-8b3d-1e54451b4742,2,0,0,0,0,0,1,1,1


In [31]:
foreign_order_id_table = sogu_script.run_foreign_key_query("foreign_key__order_id__order_created")
print("len(foreign_order_id_table)", len(foreign_order_id_table))
foreign_order_id_table.head(2)

 
with hit_time as (
        select 
            device_id as hit_time_device_id
            ,min(dt) as min_dt
        from sogu.events
        where toDate(dt) between toDate('2025-09-29') and toDate('2025-10-03')
		and dt between '2025-09-29' and '2025-10-03' 
		and device_id <> '' and ua_is_bot = 0 
		and platform in ('web', 'backend') and ua_is_mobile = 1 
        and  event_name = 'listing:filter.open' and JSONExtractString(params, 'filter_window_type') = 'auto_listing_open_popup' 

        group by device_id
    )

select distinct
    device_id
    , JSONExtractString(params, 'order_id') as order_id
from sogu.events

inner join hit_time 
on sogu.events.device_id = hit_time.hit_time_device_id

where toDate(dt) between toDate('2025-09-29') and toDate('2025-10-03')
		and dt between '2025-09-29' and '2025-10-03' 
		and device_id <> '' and ua_is_bot = 0 
		and platform in ('web', 'backend') and ua_is_mobile = 1 
	and event_name in ('booking:order-create.success')

len(foreign_order_i

Unnamed: 0,device_id,order_id
0,8d2d79b5-b070-4b5e-9904-3617ab90176b,5688284
1,07db3abd-9015-42b3-b1d1-71251f81c894,5688287


# Достаём данные из PostgreSQL

In [43]:
class FOREIGN_TABLE:
    def set_keys_values(self, keys_values):
        self.keys_values = keys_values


    def set_metrics_and_states(self, config):
        self.states = set()
        self.metrics = set()
        for metric_name in config['metrics']:
            denominator = states_all[metrics_all[metric_name].denominator.name]
            numerator = states_all[metrics_all[metric_name].numerator.name]
            if denominator.database == self.database and denominator.table == self.table:
                self.states.add(denominator)
            if numerator.database == self.database and numerator.table == self.table:
                self.states.add(numerator)
            if denominator.database == self.database and denominator.table == self.table and numerator.database == self.database and numerator.table == self.table:
                self.metrics.add(metrics_all[metric_name])
        print("self.states", self.states)
        print("self.metics", self.metrics)

    
    def set_states_select(self):
        select_list = []
        for state in self.states:
            st = f'{state.column} as {state.alias}'
            select_list.append(st)
        self.states_select = ',' + '\n\t,'.join(select_list)


    def __init__(self, 
                 config,
                 key_values,
                 database:str='postgresql analytics', 
                 table:str='analytics.mart_orders',
                 key_column:str='order_id',
        ):
        self.database = database
        self.table = table
        self.key_column = key_column

        self.set_keys_values(key_values)
        self.set_metrics_and_states(config)
        self.set_states_select()


    def get_query(self):
        query = f'''
    select 
        {self.key_column}
        {self.states_select}
    from {self.table}
    where {self.key_column} in ({literal(self.keys_values)})
'''
        return query
    
    def run_query(self):
        if self.database == 'postgresql analytics':
            query = self.get_query()
            print("==="*20, 'Запущен скрипт к аналитическому постгресу', "==="*20)
            for row in (query[:400] + "...)").split('\n'):
                print(row)
            table = anl(query)
            table['order_id'] = table['order_id'].astype(str)
            return table
        raise f"База данных {self.database} пока не поддерживается"
    
mart_orders = FOREIGN_TABLE(config, foreign_order_id_table['order_id'])
mart_orders_table = mart_orders.run_query()

print("len(mart_orders_table)", len(mart_orders_table))
mart_orders_table.head(3)

self.states {STATE(order_w_paid_no_refund), STATE(order_w_paid), STATE(anl_order)}
self.metics {METRIC('anl_order', 'order_w_paid', 'binary')}

    select 
        order_id
        ,payment_date is not null and money_refund_date is null as order_w_paid_no_refund
	,payment_date is not null as order_w_paid
	, 1=1  as anl_order
    from analytics.mart_orders
    where order_id in ('5688284','5688287','5688288','5688289','5688293','5688295','5688296','5688297','5688302','5688304','5688305','5688307','5688311','5688316','5688320','5688322','5...)
len(mart_orders_table) 5129


Unnamed: 0,order_id,order_w_paid_no_refund,order_w_paid,anl_order
0,5691449,False,False,True
1,5703681,True,True,True
2,5699948,True,True,True


# Собираем финальную таблицу

In [45]:
main = devices \
        .merge(foreign_order_id_table, how='left', on='device_id') \
        .merge(mart_orders_table, how='left', on='order_id') \
        .fillna(0)
print('len(main)', len(main))
display(main.head(2))


columns_for_max = set(main.columns) - set(['device_id', 'order_id'])
columns_for_mean = set() # на будущее
gr_main = main.groupby('device_id') \
        .agg({col: 'max' for col in columns_for_max}
             | {col: 'mean' for col in columns_for_max}
             )
print("len(gr_main)", len(gr_main))
display(gr_main.head(2))

len(main) 116412


Unnamed: 0,device_id,avg_listing_pageview,has_sogu_order_created,has_booking_pageview,has_experience_pageview,has_main_pageview,has_listing_filter_apply_changed__auto_listing_open_popup,has_listing_filter_apply__auto_listing_open_popup,has_listing_filter_open__auto_listing_open_popup,has_listing_pageview,order_id,order_w_paid_no_refund,order_w_paid,anl_order
0,122f5064-381b-4a17-9016-8a2253028f0a,8,0,0,1,0,0,0,1,1,0,0,0,0
1,d3422c4c-e245-49e1-8b3d-1e54451b4742,2,0,0,0,0,0,1,1,1,0,0,0,0


len(gr_main) 115281


Unnamed: 0_level_0,order_w_paid_no_refund,has_booking_pageview,has_listing_pageview,has_listing_filter_apply__auto_listing_open_popup,has_main_pageview,has_sogu_order_created,avg_listing_pageview,has_listing_filter_apply_changed__auto_listing_open_popup,order_w_paid,anl_order,has_listing_filter_open__auto_listing_open_popup,has_experience_pageview
device_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
00008627-1bcd-4978-a2da-293107c093f1,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0
00011e3f-f51f-49d2-a1ff-9377a5117c1a,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0


# Блок для MDE

In [None]:
class MDE:
    METHOD_LIST = ('proportion_z_test', 'xi_2', '2 histograms', 'bootstrap', )
    def __init__(self, 
                 data: pd.DataFrame,
                 config: dict,
                 calculus_format: str = 'all_columns',
                 goal_metric: str = '',
                 metrics_columns: List[str] = [],
                 days_interval: int = np.nan, 
                 infinity: int = 1_000,
                 ):
        self.data = data 
        self.alpha = config['alpha']
        self.beta = config['beta']
        
        self.infinity = infinity
    
        if days_interval is np.nan:
            days_interval = (datetime.fromisoformat(config['finish_datetime']) - datetime.fromisoformat(config['start_datetime'])).days + 1
        self.days_interval = days_interval

        if not metrics_columns:
            for column in data.columns:
                if data[column].dtype in (int, float):
                    metrics_columns.append(column)
                    continue
                print('Столбец', column, "не участвует в расчёте mde")
        print('Для расчёта MDE используем столбцы')
        print(metrics_columns)
        self.metrics_columns = metrics_columns

        if not goal_metric:
            goal_metric = metrics_columns[0]
        print('Целевая метрика:', goal_metric)
        self.goal_metric = goal_metric
    

    def calculate_one_mde_conversion(self, n_a, n_b, p_a, test_type='two-sided', method:str='proportion_z_test') -> float:
        """
        Выдаёт одно значние: относительный mde для конверсии
        При данных в группе А n_a - размер группы А, p_a - конверсия в группе А.
        n_b - размер группы B
        """
        if method == 'proportion_z_test':
            z_critical = stats.norm.ppf(1 - self.alpha/(2 if test_type == 'two-sided' else 1))
            alternative = 'smaller' if test_type == 'one-sided' else 'two-sided'

            p_b = binary_search(
                p_a, 1,
                lambda p_b: stats.proportions_ztest([n_a*p_a, n_b*p_b], [n_a, n_b], alternative=alternative)[0] > z_critical
            )
            return p_b / p_a - 1
        
        elif method in MDE.METHOD_LIST:
            raise ValueError(f'Метод {method} пока не реализован')
        
        raise ValueError(f'Неподдерживаемй метод расчёта mde конверсии {method}. Выберите один из {self.method_list}')


    def calculate_list_mde_conversion(self, n_a_list:List[int], p_a, ratio:float=1.0, test_type='two-sided', method:str='proportion_z_test') -> List[float]:
        """
            ratio - во сколько раз тестовая группа больше контрольной
            n_a_list - массив количеств размеров выборок А
            p_a - конверсия в группе А
        """
        return [
            self.calculate_one_mde_conversion(n_a, int(n_a*ratio), p_a, test_type=test_type, method=method)
            for n_a in n_a_list]


    def calculate_one_mde_general(self, group_a:List[float], nobs1:int, ratio:float=1.0, test_type:str='two-sided', method:str='bootstrap' ) -> float: 
        """
            Выдаёт одно значние: относительный mde размера выбррки nobs1 для любой метрики
            group_a - исторические данные группы А для определения mean и std
            nobs1 - объём реальной группы А. (объём исторических данных обычно в два раза большое объёма настоящей выборки)
            ratio - во сколько раз тестовая группа больше контрольной

        """
        mean_a = np.mean(group_a)
        std_a = np.std(group_a)
        if std_a == 0:
            return 0

        if method == 'TTestIndPower':
            #print('TTestIndPower', mean_a, std_a, nobs1, )
            mde_std = TTestIndPower().solve_power(nobs1=nobs1, alpha=self.alpha, power=1-self.beta, ratio=ratio, alternative=test_type)
            return (mde_std * std_a) / mean_a
        
        elif method == 'bootstrap':
            def get_group_b_from_group_a(group_a:List[float], uplift:float) -> List[float]:
                # возможно, нужна более интеллектуальная трансформация
                group_b = np.array(group_a) * (1+uplift)
                return group_b
            
            def get_one_pvalue(group_a, group_b):
                """  
                    Расчитывает одно pvalue в симуляции
                """
                A = np.random.choice(group_a, size=nobs1)
                B = np.random.choice(group_b, size=int(nobs1*ratio))
                return stats.ttest_ind(A, B, equal_var=False, alternative=test_type)[1]

            def get_error_type2(uplift):
                """
                    Вероятность совершить ошибку II рода при данном uplift
                """
                group_b = get_group_b_from_group_a(group_a, uplift)
                pvalues = np.array([get_one_pvalue(group_a, group_b) for _ in range(self.infinity)])
                return (pvalues > self.alpha).mean()
            mde = binary_search(0, 0.8, need_more_func=lambda uplift: get_error_type2(uplift) > self.beta)

            def rnd_choice(M, size):
                return [np.mean(np.random.choice(M, size=size))
                        for _ in range(self.infinity)
                        ]

            plt.hist(rnd_choice(group_a, nobs1), color='green')
            plt.hist(rnd_choice(get_group_b_from_group_a(group_a, mde), int(nobs1*ratio)), color='red')
            return 


        assert 1==0, f'Введён неподдерживаемй метод расчёта mde конверсии {method}. Выберите один из {self.method_list}'
    
    def main_calculus(self, weeks_list:tuple=(1, 2, 3, 4, 5), method='TTestIndPower') -> tuple:
        metrics_list = self.data.columns
        result = pd.DataFrame({'metric': metrics_list, 
                               #'Наблюдений': [len(data[metric]) for metric in metrics_list],
                               
                               })
        nobs1_weeks = {}
        for week in weeks_list:
            nobs1 = int((7 * week / self.days_interval) * len(data))
            week_column_name = f'mde week{'s' if week > 1 else ''} {week}'
            nobs1_weeks[week_column_name] = f'{str(nobs1)} в одной группе'

            new_col = []
            for metric in metrics_list:
                #print('go calculate_one_mde_general', week, metric,)
                group_a = self.data[metric]
                mde_float = self.calculate_one_mde_general(group_a, nobs1, method=method)
                mde_string = f'{round(mde_float*100, 1)}%'
                #print(x)
                new_col.append(mde_string)
            
            result[week_column_name] = new_col

        print(nobs1_weeks)
        result['Бэйзлайн'] = [np.mean(data[metric]) for metric in metrics_list]
        return result.set_index('metric'), nobs1_weeks


half_size = len(gr_main) // 2
data = gr_main.sample(n=half_size)
mde_object = MDE(data, config)
res, nobs1_weeks = mde_object.main_calculus()
res

Столбец anl_order не участвует в расчёте mde
Столбец order_w_paid не участвует в расчёте mde
Столбец order_w_paid_no_refund не участвует в расчёте mde
Для расчёта MDE используем столбцы
['has_listing_filter_apply__auto_listing_open_popup', 'has_main_pageview', 'has_listing_pageview', 'has_sogu_order_created', 'has_listing_filter_open__auto_listing_open_popup', 'avg_listing_pageview', 'has_experience_pageview', 'has_booking_pageview', 'has_listing_filter_apply_changed__auto_listing_open_popup']
Целевая метрика: has_listing_filter_apply__auto_listing_open_popup
{'mde week 1': '80696 в одной группе', 'mde weeks 2': '161392 в одной группе', 'mde weeks 3': '242088 в одной группе', 'mde weeks 4': '322784 в одной группе', 'mde weeks 5': '403480 в одной группе'}


Unnamed: 0_level_0,mde week 1,mde weeks 2,mde weeks 3,mde weeks 4,mde weeks 5,Бэйзлайн
metric,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
has_listing_filter_apply__auto_listing_open_popup,1.7%,1.2%,1.0%,0.8%,0.7%,0.409247
has_main_pageview,3.6%,2.5%,2.1%,1.8%,1.6%,0.13069
anl_order,7.4%,5.2%,4.3%,3.7%,3.3%,0.034247
order_w_paid,9.0%,6.4%,5.2%,4.5%,4.0%,0.022459
has_listing_pageview,0.2%,0.2%,0.1%,0.1%,0.1%,0.970559
has_sogu_order_created,7.4%,5.2%,4.3%,3.7%,3.3%,0.034247
has_listing_filter_open__auto_listing_open_popup,0%,0%,0%,0%,0%,1.0
order_w_paid_no_refund,9.2%,6.5%,5.3%,4.6%,4.1%,0.021501
avg_listing_pageview,2.0%,1.4%,1.2%,1.0%,0.9%,2.890076
has_experience_pageview,1.8%,1.3%,1.0%,0.9%,0.8%,0.381645


In [None]:
# необзятальная часть с переименовыванием стейджей в метрики
# TODO: занести в объект
pd.set_option('display.max_colwidth', None)
print(nobs1_weeks)
q = res.loc[['has_listing_filter_apply__auto_listing_open_popup', 'has_listing_filter_apply_changed__auto_listing_open_popup', 'has_experience_pageview', 'has_booking_pageview', 'has_sogu_order_created', 'order_w_paid', 'order_w_paid_no_refund']]
def find_metric_name_from_numerator_name(numerator_name):
    numerator_name = numerator_name.replace('has_', '').replace('avg_', '')
    for metric_name in config['metrics']:
        metric = metrics_all[metric_name]
        #print(metric_name, numerator_name, metric.numerator.name, numerator_name == metric.numerator.name)
        if numerator_name == metric.numerator.name:
            #print('Присвоить', metric_name)
            return metric_name
    return numerator_name
q = q.reset_index()
q['Метрика'] = q['metric'].apply(find_metric_name_from_numerator_name)
del q['metric']
q[['Метрика', 'mde week 1', 'mde weeks 2', 'mde weeks 3', 'mde weeks 4', 'mde weeks 5', 'Бэйзлайн']].set_index('Метрика')

{'mde week 1': '80696 в одной группе', 'mde weeks 2': '161392 в одной группе', 'mde weeks 3': '242088 в одной группе', 'mde weeks 4': '322784 в одной группе', 'mde weeks 5': '403480 в одной группе'}


Unnamed: 0_level_0,mde week 1,mde weeks 2,mde weeks 3,mde weeks 4,mde weeks 5,Бэйзлайн
Метрика,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
Конверсия из девайса в применение фильтров на автоматическом попапе на листингах,1.7%,1.2%,1.0%,0.8%,0.7%,0.409247
Конверсия из девайса в применение не дефолтных фильтров на автоматическом попапе на листингах,4.4%,3.1%,2.5%,2.2%,2.0%,0.092713
Конверсия из девайса в открытую экскурсию,1.8%,1.3%,1.0%,0.9%,0.8%,0.381645
Конверсия из девайса в страницу букинга,3.8%,2.7%,2.2%,1.9%,1.7%,0.117158
Конверсия из девайса в создание заказа,7.4%,5.2%,4.3%,3.7%,3.3%,0.034247
Конверсия из заказа в оплату,9.0%,6.4%,5.2%,4.5%,4.0%,0.022459
Конверсия из девайса в оплаченный заказ без возврата,9.2%,6.5%,5.3%,4.6%,4.1%,0.021501


# Блок для постанализа

In [None]:
# переключение между mde режимом и режимом постанализа ещё в разработке
# симулируем разбиение на АБ группы на текущих данных. То есть запускаем постанализ на АА тест
gr_main['AB'] = rnd.choices(['A', 'B'], k=len(gr_main))

In [None]:
def calculus(df):
    """
        Расчёт всех метрик в датафрейме с pvalue
    """
    print("Соотношение uid A на B", len(df.query('AB == "A"')), len(df.query('AB == "B"')))

    metrics = list(set(df.columns) - set(['ab_variant_min', 'ab_variant_max', 'ab_variant', 'AB']))
    gr = df.groupby('AB').agg({m: list for m in metrics})

    gr.loc['pv'] = [round(stats.ttest_ind(gr[m]['A'], gr[m]['B']).pvalue, 3) for m in metrics]
    gr.loc['delta'] = round((gr.iloc[1].apply(lambda x: np.mean(x)) / gr.iloc[0].apply(lambda x: np.mean(x)) - 1)*100, 1)
    #gr.rename(columns={m: 'uid2'+m for m in metrics_uid2}
    #                 | {m: m+' per_uid' for m in metrics_per_uid}, inplace=True)

    T = gr.T
    T['delta'] = T['delta'].apply(lambda x: str(x)+'%' if x<0 else f'+{str(x)}%')
    T['A'] = T['A'].apply(lambda x: round(np.mean(x), 4))
    T['B'] = T['B'].apply(lambda x: round(np.mean(x), 4))
    return T[['A', 'B', 'delta', 'pv']]


res = calculus(gr_main)
res

Соотношение uid A на B 57636 57645


AB,A,B,delta,pv
has_listing_filter_apply__auto_listing_open_popup,0.41,0.4109,+0.2%,0.763
has_main_pageview,0.1328,0.1313,-1.1%,0.469
anl_order,0.035,0.0343,-1.9%,0.537
order_w_paid,0.0239,0.0226,-5.5%,0.129
has_listing_pageview,0.9709,0.9693,-0.2%,0.113
has_sogu_order_created,0.035,0.0343,-1.9%,0.537
has_listing_filter_open__auto_listing_open_popup,1.0,1.0,+0.0%,
order_w_paid_no_refund,0.0229,0.0217,-5.1%,0.17
avg_listing_pageview,2.8954,2.8721,-0.8%,0.348
has_experience_pageview,0.3802,0.3787,-0.4%,0.621


In [None]:
# ежедневный запуск постанализа с графиками в разработке