In [1]:
from typing import List, Dict, Any, Tuple, Union, Optional

import pandas as pd

import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

import logging
logging.basicConfig()
logger = logging.getLogger("back_test")
logger.setLevel(logging.INFO)

## TASK 1

In [2]:
from enum import Enum
from typing import List


class BackTestAlgo(str, Enum):
    epsilon_greedy_sum = "epsilon_greedy_sum"

    @classmethod
    def to_list(cls) -> List[str]:
        return list(map(lambda c: c.value, cls))  # type: ignore


class BackTestLevel(str, Enum):
    group_1 = "group_1"
    item_id = "item_id"
    sku_id = "sku_id"

    @classmethod
    def to_list(cls) -> List[str]:
        return list(map(lambda c: c.value, cls))  # type: ignore


class BackTestMetric(str, Enum):
    orders_num = "orders_num"  # выручка

    @classmethod
    def to_list(cls) -> List[str]:
        return list(map(lambda c: c.value, cls))  # type: ignore


In [3]:
# Get A/B Test data
ab_df = pd.read_parquet("./hm/simulated_data.parquet")

# Set relevant data type
ab_df["ds"] = ab_df["ds"].astype(str)

# Calculate margin
ab_df["margin"] = ab_df["markup"] * ab_df["revenue"]

ab_df.head()

Unnamed: 0,group_1,sku_id,ab_test_id,markup,revenue,traffic,orders_num,ds,margin
0,group_1000,sku_100000,ab_100000,0.01,2539.41,0.05,1.0,20231008,25.3941
1,group_1001,sku_100001,ab_100001,0.02,6057.44,0.05,3.0,20231008,121.1488
2,group_1002,sku_100002,ab_100002,0.01,541.35,0.05,8.0,20231008,5.4135
3,group_1002,sku_100003,ab_100003,0.06,697.7,0.05,0.0,20231008,41.862
4,group_1001,sku_100004,ab_100004,0.01,1413.99,0.05,0.0,20231008,14.1399


In [4]:
# Get algorithm data
algo_1 = pd.read_parquet("./hm/algo/algo_1.parquet")
algo_2 = pd.read_parquet("./hm/algo/algo_2.parquet")
algo_3 = pd.read_parquet("./hm/algo/algo_3.parquet")
algo_4 = pd.read_parquet("./hm/algo/algo_4.parquet")
algo_5 = pd.read_parquet("./hm/algo/algo_5.parquet")

# Get A/B test data
ab_df = pd.read_parquet("./hm/simulated_data.parquet")

In [5]:
# Set necessary column types
ab_df = ab_df \
    .assign(ds = lambda x: x["ds"].astype(str),
            margin = lambda x: x["markup"] * x["revenue"])

algo_1 = algo_1 \
    .assign(ds = lambda x: x["ds"].astype(str)) \
    .rename(columns={"markup": "algo_1_markup"})

algo_2 = algo_2 \
    .assign(ds = lambda x: x["ds"].astype(str)) \
    .rename(columns={"markup": "algo_2_markup"})

algo_3 = algo_3 \
    .assign(ds = lambda x: x["ds"].astype(str)) \
    .rename(columns={"markup": "algo_3_markup"})

algo_4 = algo_4 \
    .assign(ds = lambda x: x["ds"].astype(str)) \
    .rename(columns={"markup": "algo_4_markup"})

algo_5 = algo_5 \
    .assign(ds = lambda x: x["ds"].astype(str)) \
    .rename(columns={"markup": "algo_5_markup"})

In [6]:
# Get unique combinations of dates and groups (categories)
ds_df = pd.DataFrame({"ds": ab_df["ds"].unique().tolist()})
group_df = ab_df[["group_1"]].drop_duplicates()

algo_sample = ds_df.join(group_df, how="cross")

algo_sample

Unnamed: 0,ds,group_1
0,20231008,group_1000
1,20231008,group_1001
2,20231008,group_1002
3,20231008,group_1003
4,20231008,group_1004
...,...,...
541,20231021,group_1034
542,20231021,group_1035
543,20231021,group_1036
544,20231021,group_1037


In [7]:
# Get all algorithms data in a single dataframe
algo_joint = algo_sample \
    .merge(algo_1,
           how="left",
           left_on=["ds", "group_1"],
           right_on=["ds", "group_1"]) \
    .merge(algo_2,
           how="left",
           left_on=["ds", "group_1"],
           right_on=["ds", "group_1"]) \
    .merge(algo_3,
           how="left",
           left_on=["ds", "group_1"],
           right_on=["ds", "group_1"]) \
    .merge(algo_4,
           how="left",
           left_on=["ds", "group_1"],
           right_on=["ds", "group_1"]) \
    .merge(algo_5,
           how="left",
           left_on=["ds", "group_1"],
           right_on=["ds", "group_1"])

algo_joint

Unnamed: 0,ds,group_1,algo_1_markup,algo_2_markup,algo_3_markup,algo_4_markup,algo_5_markup
0,20231008,group_1000,0.06,0.02,0.01,0.02,0.05
1,20231008,group_1001,0.04,0.06,0.04,0.02,0.00
2,20231008,group_1002,0.06,0.01,0.05,0.01,0.05
3,20231008,group_1003,0.05,0.00,0.06,0.06,0.00
4,20231008,group_1004,0.06,0.01,0.02,0.03,0.05
...,...,...,...,...,...,...,...
541,20231021,group_1034,0.03,0.00,0.01,0.04,0.05
542,20231021,group_1035,0.05,0.04,0.06,0.04,0.03
543,20231021,group_1036,0.06,0.02,0.01,0.05,0.01
544,20231021,group_1037,0.03,0.05,0.01,0.06,0.04


In [8]:
# Join A/B Test data and Algorithm's data
ab_df_with_algo = ab_df \
    .merge(algo_joint,
           how="left",
           left_on=["ds", "group_1"],
           right_on=["ds", "group_1"])

ab_df_with_algo

Unnamed: 0,group_1,sku_id,ab_test_id,markup,revenue,traffic,orders_num,ds,margin,algo_1_markup,algo_2_markup,algo_3_markup,algo_4_markup,algo_5_markup
0,group_1000,sku_100000,ab_100000,0.01,2539.41,0.05,1.0,20231008,25.3941,0.06,0.02,0.01,0.02,0.05
1,group_1001,sku_100001,ab_100001,0.02,6057.44,0.05,3.0,20231008,121.1488,0.04,0.06,0.04,0.02,0.00
2,group_1002,sku_100002,ab_100002,0.01,541.35,0.05,8.0,20231008,5.4135,0.06,0.01,0.05,0.01,0.05
3,group_1002,sku_100003,ab_100003,0.06,697.70,0.05,0.0,20231008,41.8620,0.06,0.01,0.05,0.01,0.05
4,group_1001,sku_100004,ab_100004,0.01,1413.99,0.05,0.0,20231008,14.1399,0.04,0.06,0.04,0.02,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
511808,group_1002,sku_101093,ab_100019,0.03,178.71,0.05,2.0,20231021,5.3613,0.05,0.06,0.00,0.03,0.03
511809,group_1012,sku_101412,ab_100013,0.03,88.71,0.05,2.0,20231021,2.6613,0.06,0.00,0.04,0.00,0.01
511810,group_1024,sku_103568,ab_100017,0.03,55.29,0.05,1.0,20231021,1.6587,0.05,0.06,0.04,0.02,0.05
511811,group_1001,sku_100697,ab_100004,0.02,8885.78,0.05,1.0,20231021,177.7156,0.03,0.06,0.03,0.00,0.05


In [9]:
# Group by "ds", "group_1", "ab_test_id"
ab_df_joint = ab_df_with_algo \
    .groupby(["ds", "group_1", "ab_test_id"], as_index=False) \
    .agg(traffic = ("traffic", "mean"),
         orders_num = ("orders_num", "sum"),
         markup = ("markup", "mean"),
         algo_1_markup = ("algo_1_markup", "mean"),
         algo_2_markup = ("algo_2_markup", "mean"),
         algo_3_markup = ("algo_3_markup", "mean"),
         algo_4_markup = ("algo_4_markup", "mean"),
         algo_5_markup = ("algo_5_markup", "mean")) \
    .assign(orders_num = lambda x: x["orders_num"] / x["traffic"]) \
    .round(2)

# Group by and normalize
ab_df_joint = ab_df_joint \
    .groupby(["ds", "group_1", "markup"], as_index=False) \
    .agg(orders_num = ("orders_num", "mean"),
         markup = ("markup", "mean"),
         algo_1_markup = ("algo_1_markup", "mean"),
         algo_2_markup = ("algo_2_markup", "mean"),
         algo_3_markup = ("algo_3_markup", "mean"),
         algo_4_markup = ("algo_4_markup", "mean"),
         algo_5_markup = ("algo_5_markup", "mean")) \
    .filter(["group_1", "markup",
             "algo_1_markup", "algo_2_markup", "algo_3_markup", "algo_4_markup", "algo_5_markup",
             "orders_num", "ds"]) \
    .round(2)

In [10]:
# Save the data
ab_df_joint.to_csv("./data/homework_2_1.csv", index=False)

## TASK 2

In [11]:
# Epsilon Greedy Algorithm
class EpsilonGreedySum:
    def __init__(
        self,
        epsilon: float = 0.005,
        do_show_intersection: bool = True,
    ):
        """
        epsilon - разница между наценками, которую считаем незначимой
        do_show_intersection - показывать ли пересечение исторических и предсказанных наценок:
        - если высокий процент, то BackTest'у можно доверять
        - если низкий процент, то недостаточно данных для проведения BackTest'a
        """
        self.epsilon = epsilon
        self.do_show_intersection = do_show_intersection

    def calculate_group_metrics(
        self,
        df: pd.DataFrame,
        lvl: str,
        prefix: str,
        metrics: List[BackTestMetric],
    ) -> pd.DataFrame:
        """
        Считает предсказания по метрикам с учетом epsilon
        Calculates predictions for given markup. The right algorithm's markup is defined as closest to real A/B test markup with +/- epsilon distance to it
        """

        # Only those A/B test markups are left that are between corresponding Algorithm's Markup (+/- defined epsilon)
        df_filtered = df[
            df["markup"].between(
                df[f"{prefix}_markup"] - self.epsilon,
                df[f"{prefix}_markup"] + self.epsilon,
            )
        ]

        # Get dictionary of metric and corresponding aggregation function (mean)
        agg_functions = {metric: "mean" for metric in metrics}

        # Group by date (ds), level (group_1) and algorithm's markup (algo_1_markup) and aggregate using the aggregations provided above
        stats_df = (
            df_filtered.groupby(["ds", lvl, f"{prefix}_markup"])
            .agg(agg_functions)
            .reset_index()
        )

        # Rename the columns
        stats_df = stats_df.rename(
            columns={metric: f"{prefix}_{metric}" for metric in metrics}
        )

        # Round the numbers
        stats_df = stats_df.round(2)

        return stats_df

    @staticmethod
    def show_intersection(
        lvl: str,
        control_stats_df: pd.DataFrame,
        test_stats_df: pd.DataFrame,
        stats_df: pd.DataFrame,
    ) -> None:
        """
        Показывает пересечение исторических и предсказанных наценок:
        - если высокий процент, то BackTest'у можно доверять
        - если низкий процент, то недостаточно данных для проведения BackTest'a
        """
        control_distinct_df = (
            control_stats_df.groupby("ds")[lvl].nunique().reset_index(name="control")
        )
        test_distinct_df = (
            test_stats_df.groupby("ds")[lvl].nunique().reset_index(name="test")
        )
        inter_distinct_df = (
            stats_df.groupby("ds")[lvl].nunique().reset_index(name="inter")
        )

        distinct_df = pd.merge(
            test_distinct_df, control_distinct_df, how="left", on=["ds"]
        )
        distinct_df = pd.merge(distinct_df, inter_distinct_df, how="left", on=["ds"])

        distinct_df["inter / test"] = distinct_df["inter"] / distinct_df["test"]
        distinct_df["inter / control"] = distinct_df["inter"] / distinct_df["control"]

        logger.info(f"Unique {lvl} intersection:\n{distinct_df.to_markdown()}")
        mean_distinct_df = distinct_df.drop(columns=["ds"]).mean().round(2)
        logger.info(f"Mean unique {lvl} intersection:\n{mean_distinct_df.to_markdown()}")

    def calculate_groups_metrics(
        self,
        df: pd.DataFrame,
        lvl: str,
        metrics: List[BackTestMetric],
    ) -> pd.DataFrame:
        """
        1) Считает для каждой группы предсказания по метрикам с учетом epsilon
        2) Показывает пересечение исторических и предсказанных наценок
        """

        # Gets predicitons for each algorithm's markup by considering real data from A/B test where those real markups are selected that are within +/- epsilon distance from algorithm's markup
        df_01 = self.calculate_group_metrics(
            df=df,
            lvl=lvl,
            prefix="algo_1",
            metrics=metrics,
        )

        df_02 = self.calculate_group_metrics(
            df=df,
            lvl=lvl,
            prefix="algo_2",
            metrics=metrics,
        )

        df_03 = self.calculate_group_metrics(
            df=df,
            lvl=lvl,
            prefix="algo_3",
            metrics=metrics,
        )

        df_04 = self.calculate_group_metrics(
            df=df,
            lvl=lvl,
            prefix="algo_4",
            metrics=metrics,
        )

        df_05 = self.calculate_group_metrics(
            df=df,
            lvl=lvl,
            prefix="algo_5",
            metrics=metrics,
        )

        # Get list of columns that are both present in TEST and CONTROL datasets
        common_columns = list(
            set(df_01.columns).intersection(set(df_02.columns))
        )

        # Join TEST and CONTROL datasets by the columns that are defined above
        stats_df = df_01 \
            .merge(df_02,
                   how="inner",
                   on=common_columns) \
            .merge(df_03,
                   how="inner",
                   on=common_columns) \
            .merge(df_04,
                   how="inner",
                   on=common_columns) \
            .merge(df_05,
                   how="inner",
                   on=common_columns)

        # IT'S NECESSARY TO DESCRIBE THIS PART
        if self.do_show_intersection:
            self.show_intersection(
                lvl=lvl,
                control_stats_df=df_01,
                test_stats_df=df_02,
                stats_df=stats_df,
            )

        return stats_df

    @staticmethod
    def calculate_statistics(df: pd.DataFrame, metrics: List[BackTestMetric]) -> pd.DataFrame:
        """
        Считает значения метрик по дням для контрольной и тестовой группам
        """

        # Create an empty dicitonary with "algo" and "ds" keys
        result: Dict[str, Any] = {
            "algo": [],
            "ds": [],
        }

        # Update the dictionary with keys for each provided metric
        result.update({metric: [] for metric in metrics})

        # Define aggregation functions for each provided metric (CONTROL GROUP)
        agg_functions = {f"algo_1_{metric}": "sum" for metric in metrics}

        # Defina aggregation funcitons for each provided metric (TEST GROUP)
        agg_functions.update({f"algo_2_{metric}": "sum" for metric in metrics})
        agg_functions.update({f"algo_3_{metric}": "sum" for metric in metrics})
        agg_functions.update({f"algo_4_{metric}": "sum" for metric in metrics})
        agg_functions.update({f"algo_5_{metric}": "sum" for metric in metrics})

        # Group by date (ds) and aggregate by functions defined above
        stats_df = df.groupby("ds").agg(agg_functions).reset_index()

        # Calculate number of rows in "stats_df" dataframe (actually, it's number of unique dates in the dataset)
        stats_rows_num = stats_df.shape[0]

        # Iterate through different algorithms
        for group in ["algo_1", "algo_2", "algo_3", "algo_4", "algo_5"]:

            # Fill in "algo" key of "result" dictionary with name of algorithm
            result["algo"] += [group] * stats_rows_num

            # Iterate trhough provided metrics
            for metric in metrics:

                # Store the metric results in "result" dictionary for given metric and group
                result[metric] += stats_df[f"{group}_{metric}"].tolist()

            # Store dates in "result" dictionary for given group
            result["ds"] += stats_df["ds"].tolist()

        # Convert the dictionary to dataframe
        result_df = pd.DataFrame(data=result)

        return result_df

    def run(
        self,
        df: pd.DataFrame,
        lvl: str,
        metrics: List[BackTestMetric],
    ) -> pd.DataFrame:
        """
        Входная точка алгоритма BackTest'а на основе epsilon
        """

        # For each date, group (item category) and algorithm's markup that is close to real A/B test markup by +/- epsilon distance, calculate mean value of metric of interest
        stats_df = self.calculate_groups_metrics(df=df, lvl=lvl, metrics=metrics)

        # For each date and algorithm, calculate the metric of interest
        result_df = self.calculate_statistics(df=stats_df, metrics=metrics)

        return result_df


register = {
    "epsilon_greedy_sum": EpsilonGreedySum,
}


def run_algo(
    df: pd.DataFrame,
    lvl: BackTestLevel,
    algo: BackTestAlgo,
    metrics: List[BackTestMetric],
    algo_params: Optional[Dict[str, Any]] = None,
) -> pd.DataFrame:
    """
    Входная точка алгоритма BackTest'а
    """

    # Get class of provided algorithm
    algo_obj = register.get(algo)

    # If there's no class of algorithm, then display an error
    if algo_obj is None:
        raise ValueError(
            f"You should provide `algo` from the list: {BackTestAlgo.to_list()}"
        )

    # Get algorithm parameters if defined. Otherwise, set an empty dictionary
    algo_params = algo_params or {}

    #
    result_df = algo_obj(**algo_params).run(df=df, lvl=lvl, metrics=metrics)

    return result_df

In [12]:
# Parameters of the algorithm
ALGO_PARAMS = {
    "epsilon": 0.01,
    "do_show_intersection": False,
}

METRICS = [BackTestMetric.orders_num]
LVL = BackTestLevel.group_1
ALGO = BackTestAlgo.epsilon_greedy_sum

# Run the algorithm
result_df = run_algo(
    df=ab_df_joint.copy(),
    algo=ALGO,
    lvl=LVL,
    algo_params=ALGO_PARAMS,
    metrics=METRICS
)

# Rename columns
result_df.columns = ["algo", "ds", "orders_num"]

# Save the results
result_df.to_csv("./data/homework_2_2.csv", index=False)

## TASK 3

In [13]:
# Arrange algorithms
result_df \
    .groupby(["algo"], as_index=False) \
    .agg(orders_num = ("orders_num", "sum")) \
    .sort_values("orders_num", ascending=False)

Unnamed: 0,algo,orders_num
4,algo_5,4282026.49
0,algo_1,4262385.09
2,algo_3,4227637.97
1,algo_2,4215161.79
3,algo_4,4194876.17
