In [None]:
import pandas as pd
import numpy as np
import pandera as pa
from abc import ABC
from pathlib import Path
from enum import Enum, auto
from typing import Callable, List


import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

#To be imported if needed
from catboost import CatBoostRegressor

### Generic components

In [None]:
class Feature:
    def __init__(self, 
                 name: str, 
                 is_categorical: bool,
                 aggregation_f: Callable,
                 pa_check: pa.Check = None,
                 transformations: List[Callable] = None):
        self.name = name
        self.is_categorical = is_categorical
        self.aggregation_f = aggregation_f
        self.pa_check = pa_check
        self.transformations = transformations


class DFTransformer(ABC):
    def transform(self, 
                  df: pd.DataFrame,
                  store_interim_dfs=False) -> pd.DataFrame:
        pass


AggregationLevel = {
    "transaction_date": ['store_id', 'sku_id', 'transaction_date'],
    "year_week": ['store_id', 'sku_id', 'year_week'],
    "year": ['store_id', 'sku_id', 'year']
}


class Aggregator(DFTransformer):
    def __init__(self, 
                 aggregation_level: AggregationLevel,
                 feature_list: List[Feature],
                 logger: logging.Logger = None):
        self._aggregation_level = aggregation_level
        self._feature_list = feature_list
        self._logger = logger or logging.getLogger(__name__)

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        original_n_rows = df.shape[0]
        feature_aggregation_f_dict = dict(
                                          [(f.name, f.aggregation_f) 
                                           for f in self._feature_list
                                           if f.name not in self._aggregation_level
                                          ]
                                         )
        df = df.groupby(self._aggregation_level).agg(feature_aggregation_f_dict).reset_index()
        self._logger.info(f"Aggregator aggregated {original_n_rows} rows into {df.shape[0]}")
        return df


class Scoper(DFTransformer):
    def __init__(self, 
                 start_date: str = None, 
                 end_date: str = None, 
                 store_id_list: List[str] = None,
                 cat_id_list: List[str] = None ,
                 sku_id_list: List[str] = None,
                 dept_id_list: List[str] = None,
                 logger: logging.Logger = None
                ):
        self._start_date = start_date
        self._end_date = end_date
        self._store_id_list = store_id_list
        self._cat_id_list = cat_id_list
        self._sku_id_list = sku_id_list
        self._dept_id_list = dept_id_list
        self._logger = logger or logging.getLogger(__name__)

    def transform(self, 
                 df: pd.DataFrame, 
                ) -> pd.DataFrame:
        original_n_rows = df.shape[0]
        if self._start_date:
            df = df[df["transaction_date"] >= self._start_date]
        if self._end_date:
            df = df[df["transaction_date"] < self._end_date]
        if self._store_id_list:
            df = df[df["store_id"].isin(self._store_id_list)]
        if self._cat_id_list:
            df = df[df["cat_id"].isin(self._cat_id_list)]
        if self._sku_id_list:
            df = df[df["sku_id"].isin(self._sku_id_list)]
        if self._dept_id_list:
            df = df[df["dept_id"].isin(self._dept_id_list)]
        self._logger.info(f"Scoper removed {original_n_rows - df.shape[0]} Rows. Remaining number of rows: {df.shape[0]}")
        return df
        

class FeatureSelector(DFTransformer):
    def __init__(self, 
                 feature_list: List[Feature],
                 logger: logging.Logger = None
                ):
        self._feature_list = feature_list
        self._logger = logger or logging.getLogger(__name__)

    def transform(self, 
                  df: pd.DataFrame, 
                 ) -> pd.DataFrame:
        original_n_features = df.shape[1]
        df = df[[f.name for f in self._feature_list]]
        self._logger.info(f"Feature selector removed {original_n_features - df.shape[1]} features, keeping {df.shape[1]}")
        return df




class DemandPredictor(ABC):
    def fit(self):
        pass
    
    def predict(self, data):
        pass


class CatBoostDemandPredictor(DemandPredictor):
    def fit(self, 
            train_df: pd.DataFrame,
            target: pd.Series,
            feature_list: List[Feature]
           ):
        cat_features = [f.name for f in feature_list if f.is_categorical]
        print('cat features: ', cat_features)
        self._model = CatBoostRegressor(cat_features=cat_features)
        self._model.fit(train_df, target)

    def predict(self, prediction_df: pd.DataFrame):
        return self._model.predict(prediction_df)


class Model:
    def __init__(self,
                 feature_list: List[Feature],
                 demand_predictor: DemandPredictor,
                 transformer_list: List[DFTransformer],
                 logger: logging.Logger = None
                ):
        self._feature_list = feature_list
        self._demand_predictor = demand_predictor
        self._transformer_list = transformer_list
        self._logger = logger or logging.getLogger(__name__)

    def preprocess_sales_df(self, sales_df: pd.DataFrame):
        self._logger.info(f"Preprocessing the data. Data shape: {sales_df.shape}")
        for transformer in self._transformer_list:
            sales_df = transformer.transform(sales_df)
        self._logger.info(f"Data shape after preprocessing: {sales_df.shape}")
        return sales_df

    def fit(self, preprocessed_sales_df: pd.DataFrame):
        print(preprocessed_sales_df.head())
        self._demand_predictor.fit(preprocessed_sales_df.drop("sales_qty", axis=1), 
                                   preprocessed_sales_df["sales_qty"], 
                                   self._feature_list)
        
    def predict(self, pred_df: pd.DataFrame):
        return self._demand_predictor.predict(pred_df)


class ModelBuilder:
    def __init__(self):
        pass

    def with_demand_predictor(self, demand_predictor: DemandPredictor):
        self._demand_predictor = demand_predictor
        return self

    def with_feature_list(self, feature_list: List[Feature]):
        self._feature_list = feature_list
        return self

    def with_transformer_list(self, transformer_list: List[DFTransformer]):
        self._transformer_list = transformer_list
        return self

    def build(self):
        return Model(
            self._feature_list,
            self._demand_predictor,
            self._transformer_list
        )


class Evaluator:
    def __init__(self, model: Model):
        self._model = model

    def mae(self, 
            demand_predictions: pd.Series,
            demand_actuals: pd.Series):
        return np.mean(np.abs(demand_predictions - demand_actuals))

    def __call__(self, preprocessed_prediction_df: pd.DataFrame):
        demand_predictions = self._model.predict(preprocessed_prediction_df.drop('sales_qty', axis=1))
        demand_actuals = preprocessed_prediction_df['sales_qty']
        return {
                 "mae": self.mae(demand_predictions, demand_actuals) 
               }

### Dataset-specific components

In [None]:
class DataLoader(ABC):
    def _load():
        pass


class M5DataLoader(DataLoader):
    def __init__(self, root_dir: Path):
        self.root_dir = root_dir

    def get_sales_df(self, sales_file: str) -> pd.DataFrame:
        self._load_raw_sales(
                             sales_file
                            )._add_calendar(
                            )._add_prices(
                            )._conform(
                            )
        return self.sales_df

    def _load_raw_sales(self, sales_file: str):
        key_columns = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']
        self.sales_df = pd.read_csv(self.root_dir / sales_file)
        self.sales_df = self.sales_df.melt(id_vars=key_columns, var_name='d',value_name='sales_qty')    
        return self

    def _add_calendar(self):
        self.calendar_df = pd.read_csv(self.root_dir / "calendar.csv")
        calendar_df = self.calendar_df[['date','d', 'wm_yr_wk']]
        self.sales_df = pd.merge(self.sales_df, self.calendar_df, on='d')
        return self

    def _add_prices(self):
        self.prices_df = pd.read_csv(self.root_dir / "sell_prices.csv")
        self.sales_df = pd.merge(self.sales_df, self.prices_df, on=['store_id', 'item_id', 'wm_yr_wk'])
        return self

    def _conform(self):
        self.sales_df = self.sales_df.rename(columns={
                                                "item_id": "sku_id", 
                                                "wm_yr_wk": "year_week",
                                                "date": "transaction_date",
                                                "sell_price": "effective_price"
                                              }
                                            )
        self.sales_df.sales_qty = self.sales_df.sales_qty.astype(float)
        self.sales_df.year_week = self.sales_df.year_week.astype(str)

In [None]:
#TODO ITEMS: 

### Adding arbitrary features (features must come with pandera check + aggregation function + is_cat + transformations(?))
### Define a non-strict pandera contract
### Validate input
### Define aggregation-level
### train
### Add metrics
# use groupby columns to merge back to the original set of features
# Define standard featurizers
# Define/Apply general scope
# Scope and Feature to be extendable
# GM: All preprocess steps to be feature engineering steps. User to choose order
# GM: Define custom preproccesor (possibly using different data sources)
# LP: Add offered articles as contract (infer from sales, or another data source)
# GM: Elasticity 

### Non-strict (extendable) input contract

In [None]:
model_input_schema = pa.DataFrameSchema({
    "effective_price": pa.Column(float, checks=pa.Check.ge(0.0)),
    "sales_qty": pa.Column(float, checks=pa.Check.ge(0.0)),
    "sku_id": pa.Column(str),
    "store_id": pa.Column(str),
    "dept_id": pa.Column(str),
    "cat_id": pa.Column(str),
    "transaction_date": pa.Column(str),
    "year": pa.Column(int, checks=pa.Check.gt(1900)),
    "month": pa.Column(int, checks=(pa.Check.ge(1), pa.Check.le(12))),
    "year_week": pa.Column(str)
})

## Usage example
#### First step: compose model

In [None]:
### Create feature list
feature_list = [
                Feature('effective_price', False, 'mean'),
                Feature('sales_qty', False, 'sum'),
                Feature('store_id', True, 'first'),
                Feature('year_week', False, 'first'),
                Feature('sku_id', True, 'first')
               ]

demand_predictor = CatBoostDemandPredictor()

scoper = Scoper(
    start_date='2012-01-01',
    end_date='2014-01-01',
    cat_id_list=["HOBBIES"]
)

feature_selector = FeatureSelector(feature_list)

aggregator = Aggregator(AggregationLevel["year_week"], feature_list)

model = ModelBuilder(
    ).with_demand_predictor(
        demand_predictor
    ).with_feature_list(
        feature_list
    ).with_transformer_list(
        [
            scoper,
            aggregator,
            feature_selector
        ]
    ).build()

#### Second step: fit and validate

In [None]:
df = M5DataLoader(root_dir=Path("/Users/mehditowhidi/Downloads/m5-forecasting-accuracy")).get_sales_df("sales_train_evaluation.csv")
df = model_input_schema.validate(df)
preprocessed_train_df = model.preprocess_sales_df(df)

model.fit(preprocessed_train_df)

val_df = M5DataLoader(root_dir=Path("/Users/mehditowhidi/Downloads/m5-forecasting-accuracy")).get_sales_df("sales_train_validation.csv")
val_df = model_input_schema.validate(val_df)
preprocessed_val_df = model.preprocess_sales_df(val_df)

Evaluator(model)(preprocessed_val_df)