## **Split System - Practice**

### Hash Function

Хеш-функция преобразует хешируемый объект в некоторый набор символов. Этот набор символов называется хешем.

**Свойства хеш-функции:**
- незначительное изменение входной информации сильно изменяет хеш
- хеш-функция необратима и не позволяет восстанавливать исходный массив информации из символьной строки
- быстро вычисляется
- хеш-функция может приводить любой объем данных к числу заданной длины


In [2]:
import os
from datetime import datetime
import numpy as np
import pandas as pd
from hashlib import md5

In [4]:
# Хешируем строку
md5('Hello'.encode())

<md5 _hashlib.HASH object @ 0x000001F0E3B74650>

In [5]:
# Приводим к 16-ричному числу
md5('Hello'.encode()).hexdigest()

'8b1a9953c4611296a827abf8c47804d7'

In [6]:
# Приводим к 10-тичному числу
int(md5('Hello'.encode()).hexdigest(), 16)

184900800977808474752697256094572479703

С помощью хэш-функции можно «случайно» распределять объекты по группам, если у них есть уникальные идентификаторы.

In [7]:
def get_bucket(value: str, n: int, salt: str=''):
    """Определяет бакет по id.

    value - уникальный идентификатор объекта.
    n - количество бакетов.
    salt - соль для перемешивания.
    """
    hash_value = int(md5((value + salt).encode()).hexdigest(), 16)
    return hash_value % n

In [8]:
n = 100
[get_bucket(str(x), n, 'salt_one') for x in range(10)]

[91, 95, 2, 50, 70, 10, 52, 11, 27, 62]

In [9]:
# если вычислим бакеты для тех же объектов с той же солью, то получим те же значения
[get_bucket(str(x), n, 'salt_one') for x in range(10)]

[91, 95, 2, 50, 70, 10, 52, 11, 27, 62]

In [10]:
# если поменяем солью, то получим другие значения
[get_bucket(str(x), n, 'salt_two') for x in range(10)]

[9, 85, 72, 2, 28, 2, 45, 31, 7, 47]

### Pizzeria's Data

На нашей платформе А/Б тестирования реализована очень простая схема формирования групп для эксперимента.
- для эксперимента генерируется случайная соль
- выбирается количество бакетов
- пользователи разбиваются по бакетам с солью эксперимента
- первый бакет — контрольная группа, второй бакет — экспериментальная группа

Посмотрим, как это работает на примере данных первого эксперимента.

- даты — с 2022-03-23 по 2022-03-30
- количество бакетов — 3
- salt — uSuuwtPc

In [11]:
URL_BASE = 'https://raw.githubusercontent.com/ab-courses/simulator-ab-datasets/main/2022-04-01/'

def read_database(file_name):
    return pd.read_csv(os.path.join(URL_BASE, file_name))

df_sales = read_database('2022-04-01T12_df_sales.csv')
df_sales['date'] = pd.to_datetime(df_sales['date'])
df_users = read_database('experiment_users.csv')

df_sales - информация о покупках, одна строка - один заказ. Атрибуты:
- sale_id - идентификатор покупки
- date - дата покупки
- count_pizza - количество пицц в заказе
- count_drink - количество напитков в заказе
- price - стоимость заказа
- user_id - идентификатор пользователя

df_users - список пользователей, попавших в первый эксперимент. 
Флаг в столбце pilot указывает на группу, 1 - экспериментальная, 0 - контрольная.

In [12]:
df_users.sample(5)

Unnamed: 0,user_id,pilot
14914,7d428e,1
12354,f24bbf,1
15121,effa3f,1
13185,8dcf45,1
11043,fa8a38,0


Посчитаем бакеты сами:

In [13]:
salt = 'uSuuwtPc'
n = 3

df_users['bucket'] = [get_bucket(user_id, n, salt) for user_id in df_users['user_id'].values]

In [14]:
df_users.sample(5)

Unnamed: 0,user_id,pilot,bucket
22671,26d443,1,1
16565,3ae9d3,1,1
22657,9e9968,1,1
4632,ab60c7,0,0
16610,af8f5c,1,1


In [15]:
(df_users['pilot'] == df_users['bucket']).mean()

1.0

Совпадение 100%

Проверим, что для пользователей, которые не попали в эксперимент, номер бакета не 0 и не 1.

In [18]:
all_users = (
    df_sales
    [(df_sales['date'] >= datetime(2022, 3, 23)) & (df_sales['date'] < datetime(2022, 3, 30))]
    ['user_id'].unique()
)
third_bucket_users = list(set(all_users) - set(df_users['user_id'].unique()))
for user_id in third_bucket_users[:10]:
    print(user_id, get_bucket(user_id, n, salt))

8cc27d 2
342ea1 2
2bcdf7 2
da222d 2
b88116 2
578575 2
b4c369 2
b5de40 2
6e60f7 2
704285 2


Всё верно, бакет не 0 и не 1.

### Double Hashing

Посмотрим, как можно реализовать двойное хеширование.

Сначала нужно разбить пользователей по бакетам:

In [21]:
n_buckets = 100
bucket_salt = 'abc123'

# сделаем датафрейм с пользователями
df = df_sales[['user_id']].drop_duplicates()
# распределим пользователей по бакетам
df['bucket'] = [get_bucket(user_id, n_buckets, bucket_salt) for user_id in df['user_id'].values]
df.head()

Unnamed: 0,user_id,bucket
0,1c1543,99
1,a9a6e8,83
2,23420a,23
3,3e8ed5,14
4,cbc468,41


Допустим, мы оценили, что для проведения эксперимента нам нужно 5% пользователей. 
При условии, что у нас 100 бакетов, для эксперимента нужно 5 бакетов. 
Выберем случайные 5 бакетов и распределим попавших в них пользователей на контрольную и экспериментальную группу:

In [24]:
experiment_buckets = np.random.choice(df['bucket'].unique(), 5, False)
print(f'experiment_buckets: {experiment_buckets}')

df_experiment = df[df['bucket'].isin(experiment_buckets)].copy()
experiment_salt = 'qwe456'
df_experiment['pilot'] = [get_bucket(user_id, 2, experiment_salt) for user_id in df_experiment['user_id'].values]

experiment_buckets: [58 36 46 82 19]


Кол-во групп = 2 → pilot = 0 (контрольная), pilot = 1 (тестовая)

Соль 'qwe456' гарантирует равномерное и независимое распределение пользователей в группы

In [25]:
df_experiment.head()

Unnamed: 0,user_id,bucket,pilot
64,c2d425,82,0
76,574a38,82,0
101,638d39,36,0
184,907f56,36,0
200,07ed46,82,1


In [26]:
experiment_salt = 'qwe45sadjfb'
df_experiment['pilot2'] = [get_bucket(user_id, 2, experiment_salt) for user_id in df_experiment['user_id'].values]

Теперь эксперимент делится по-другому (другая соль → другое распределение A/B)

Используется, если хочешь запустить другой эксперимент на том же подмножестве пользователей, но так, чтобы разбиение было ортогональным предыдущему

In [27]:
df_experiment.head(20)

Unnamed: 0,user_id,bucket,pilot,pilot2
64,c2d425,82,0,0
76,574a38,82,0,1
101,638d39,36,0,0
184,907f56,36,0,1
200,07ed46,82,1,1
202,d5cb45,58,0,1
217,8abcab,82,1,0
229,73c0cc,82,1,1
240,cadd30,82,1,1
253,bf1b9c,58,1,0


### Метод для распределения экспериментов по бакетам add_experiment класса SplittingService

In [None]:
from pydantic import BaseModel


class Experiment(BaseModel):
    """
    id - идентификатор эксперимента.
    buckets_count - необходимое количество бакетов.
    conflicts - список идентификаторов экспериментов, которые нельзя проводить
        одновременно на одних и тех же пользователях.
    """
    id: int
    buckets_count: int
    conflicts: list[int] = []


class SplittingService:

    def __init__(self, buckets_count):
        """Класс для распределения экспериментов и пользователей по бакетам.

        :param buckets_count (int): количество бакетов.
        """
        self.buckets_count = buckets_count
        self.buckets = [[] for _ in range(buckets_count)]

    def add_experiment(self, experiment):
        """Проверяет можно ли добавить эксперимент, добавляет если можно.

        :param experiment (Experiment): параметры эксперимента, который нужно запустить
        :return success, buckets:
            success (boolean) - можно ли добавить эксперимент, True - можно, иначе - False
            buckets (list[list[int]]]) - список бакетов, в каждом бакете перечислены идентификаторы эксперименты,
                которые в нём проводятся.
        """
        # список из элементов [bucket_id, количество совместных экспериментов]
        available_buckets_meta = []
        # Если в бакете есть хотя бы один конфликтующий эксперимент — пропускаем
        for bucket_id, bucket in enumerate(self.buckets):
            if set(experiment.conflicts) & set(bucket):
                continue
            # Иначе добавляем бакет как потенциально подходящий
            available_buckets_meta.append((bucket_id, len(bucket)))
        # Если не хватает подходящих бакетов — вернуть отказ
        if len(available_buckets_meta) < experiment.buckets_count:
            return False, self.buckets
        # Сортируем бакеты по количеству уже запущенных экспериментов — от большего к меньшему.
        # Это минимизирует общее число занятых бакетов (плотно "упаковываем" эксперименты).
        sorted_available_buckets_meta = sorted(available_buckets_meta, key=lambda x: -x[1])
        # Берём нужное количество бакетов и добавляем в них текущий эксперимент
        for bucket_id, _ in sorted_available_buckets_meta[:experiment.buckets_count]:
            self.buckets[bucket_id].append(experiment.id)
        return True, self.buckets

 

def check_correct_buckets(buckets, experiments):
    for experiment in experiments:
        buckets_with_exp = [b for b in buckets if experiment.id in b]
        assert experiment.buckets_count == len(buckets_with_exp), 'Неверное количество бакетов с экспериментом'
        parallel_experiments = set(sum(buckets_with_exp, []))
        err_msg = 'Несовместные эксперименты в одном бакете'
        assert len(set(experiment.conflicts) & parallel_experiments) == 0, err_msg


if __name__ == '__main__':
    experiments = [
        Experiment(id=1, buckets_count=4, conflicts=[4]),
        Experiment(id=2, buckets_count=2, conflicts=[3]),
        Experiment(id=3, buckets_count=2, conflicts=[2]),
        Experiment(id=4, buckets_count=1, conflicts=[1]),
        
        Experiment(id=1, buckets_count=2),
        Experiment(id=2, buckets_count=2),
        Experiment(id=3, buckets_count=2),
        Experiment(id=4, buckets_count=5),
        
        Experiment(id=1, buckets_count=2, conflicts=[2, 4]),
        Experiment(id=2, buckets_count=2, conflicts=[1]),
        Experiment(id=3, buckets_count=2),
        Experiment(id=4, buckets_count=4, conflicts=[1]),
        
        Experiment(id=1, buckets_count=2, conflicts=[3]),
        Experiment(id=2, buckets_count=2, conflicts=[3]),
        Experiment(id=3, buckets_count=2, conflicts=[1, 2]),
        
        Experiment(id=1, buckets_count=2, conflicts=[4, 5]),
        Experiment(id=2, buckets_count=2, conflicts=[4, 5]),
        Experiment(id=3, buckets_count=2, conflicts=[4, 5]),
        Experiment(id=4, buckets_count=2, conflicts=[1, 2, 3]),
        Experiment(id=5, buckets_count=2, conflicts=[1, 2, 3]),
        Experiment(id=6, buckets_count=4, conflicts=[7]),
        Experiment(id=7, buckets_count=1, conflicts=[6]),
        
    ]
    ideal_answers = [True, True, True, False]

    splitting_service = SplittingService(buckets_count=4)
    added_experiments = []
    for index, (experiment, ideal_answer) in enumerate(zip(experiments, ideal_answers)):
        success, buckets = splitting_service.add_experiment(experiment)
        assert success == ideal_answer, 'Сплит-система работает неоптимально или некорректно.'
        if success:
            added_experiments.append(experiment)
        check_correct_buckets(buckets, added_experiments)
    print('simple test passed')


simple test passed


### Метод для распределения пользователей по бакетам и группам. Метод process_user класса SplittingService.


In [32]:
from pydantic import BaseModel
from hashlib import md5


class Experiment(BaseModel):
    """
    id - идентификатор эксперимента.
    salt - соль эксперимента (для случайного распределения пользователей на контрольную/пилотную группы)
    """
    id: int
    salt: str


class SplittingService:

    def __init__(self, buckets_count, bucket_salt, buckets=None, id2experiment=None):
        """Класс для распределения экспериментов и пользователей по бакетам.

        :param buckets_count (int): количество бакетов.
        :param bucket_salt (str): соль для разбиения пользователей по бакетам.
            При одной соли каждый пользователь должен всегда попадать в один и тот же бакет.
            Если изменить соль, то распределение людей по бакетам должно измениться.
        :param buckets (list[list[int]]) - список бакетов, в каждом бакете перечислены идентификаторы
            эксперименты, которые в нём проводятся.
        :param id2experiment (dict[int, Experiment]) - словарь пар: идентификатор эксперимента - эксперимент.
        """
        self.buckets_count = buckets_count
        self.bucket_salt = bucket_salt
        if buckets:
            self.buckets = buckets
        else:
            self.buckets = [[] for _ in range(buckets_count)]
        if id2experiment:
            self.id2experiment = id2experiment
        else:
            self.id2experiment = {}

    def _get_hash_modulo(self, value: str, modulo: int, salt: str):
        """Вычисляем остаток от деления: (hash(value) + salt) % modulo."""
        hash_value = int(md5(str.encode(value + salt)).hexdigest(), 16)
        return hash_value % modulo

    def process_user(self, user_id):
        """Определяет в какие эксперименты попадает пользователь.

        Сначала нужно определить бакет пользователя.
        Затем для каждого эксперимента в этом бакете выбрать пилотную или контрольную группу.

        :param user_id (str): идентификатор пользователя
        :return bucket_id, experiment_groups:
            - bucket_id (int) - номер бакета (индекс элемента в self.buckets)
            - experiment_groups (list[tuple]) - список пар: id эксперимента, группа.
                Группы: 'A', 'B'.
            Пример: (8, [(194, 'A'), (73, 'B')])
        """
        bucket_id = self._get_hash_modulo(user_id, self.buckets_count, self.bucket_salt) # Определяем, в какой бакет попал пользователь
        # Получаем все эксперименты, которые находятся в этом бакете
        experiment_ids = self.buckets[bucket_id] 
        experiments = [
            self.id2experiment[experiment_id] for experiment_id in experiment_ids
        ]

        # Определяем для каждого эксперимента, в какую группу (A или B) попал пользователь
        experiment_groups = []
        for experiment in experiments:
            second_hash = self._get_hash_modulo(user_id, 2, experiment.salt)
            group = 'B' if second_hash == 1 else 'A'
            experiment_groups.append((experiment.id, group))
        return bucket_id, experiment_groups


if __name__ == '__main__':
    id2experiment = {
        0: Experiment(id=0, salt='0'),
        1: Experiment(id=1, salt='1')
    }
    buckets = [[0, 1], [1], [], []]
    buckets_count = len(buckets)
    bucket_salt = 'a2N4'

    splitting_service = SplittingService(buckets_count, bucket_salt, buckets, id2experiment)
    user_ids = [str(x) for x in range(1000)]
    for user_id in user_ids:
        bucket_id, experiment_groups = splitting_service.process_user(user_id)
        assert bucket_id in [0, 1, 2, 3], 'Неверный bucket_id'
        assert len(experiment_groups) == len(buckets[bucket_id]), 'Неверное количество экспериментов в бакете'
        for exp_id, group in experiment_groups:
            assert exp_id in id2experiment, 'Неверный experiment_id'
            assert group in ['A', 'B'], 'Неверная group'
    print('simple test passed')


simple test passed


_get_hash_modulo — основной механизм стабильно-рандомного назначения.

process_user — находит, в каком бакете находится пользователь и в какие A/B группы он попадает для каждого эксперимента в этом бакете.