# Kaggle M5 Feature Generation

In [1]:
from datetime import datetime
import numpy as np
import pandas as pd
import dask.dataframe as dd
from catboost import CatBoostRegressor, Pool
import os
import pickle
import seaborn as sns
import matplotlib.pylab as plt
from pandas.plotting import autocorrelation_plot
import shap
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
from tqdm import tqdm
import itertools
import multiprocessing
import glob

## Feature generation

Pre-training split:
- product not yet released (leading zeros)
- product no longer sold (trailing zeros)
- lags and rolling lags (mean, std)
- day, week, month, year, week of month, day of week, weekend
- events
- price change (week, month, year)
- price min, max, std, mean, nunique
- snap purchases allowed
- release?
- snap state?
- out of stock? sales volatility x recent sales
- features that are hard to learn if scale changes across products e.g. implied changes over time

Post-training split
- id, cat, dep mean and std

In [2]:
pd.options.display.max_columns = 999

In [3]:
def generate_inputs():
    calendar_df = pd.read_csv("data/calendar.csv", low_memory=False)
    sales_df = pd.read_csv("data/sales_train_validation.csv", low_memory=False)
    prices_df = pd.read_csv("data/sell_prices.csv", low_memory=False)

    return (
        sales_df
        .assign(
            **{
                f"d_{1914+i}": np.nan for i in range(56)
            }
        )
        .melt(
            id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
            var_name="d",
            value_name="sales"
        )
        .merge(calendar_df, on=["d"])
        .merge(prices_df, on=["store_id", "item_id", "wm_yr_wk"])
        .assign(total_sales=lambda x: x["sales"] * x["sell_price"])
        .assign(day=lambda x: x["d"].str.slice(start=2).astype(int))
        .assign(date=lambda x: pd.to_datetime(x["date"]))
        .assign(dayofyear=lambda x: x["date"].dt.dayofyear)
        .assign(dayofmonth=lambda x: x["date"].dt.day)
        .drop(columns=["d", "wm_yr_wk", "weekday"])
    )

def downcast_variables(df):
    return (
        df
        .assign(id=lambda x: x["id"].astype("category"))
        .assign(item_id=lambda x: x["item_id"].astype("category"))
        .assign(dept_id=lambda x: x["dept_id"].astype("category"))
        .assign(cat_id=lambda x: x["cat_id"].astype("category"))
        .assign(store_id=lambda x: x["store_id"].astype("category"))
        .assign(state_id=lambda x: x["state_id"].astype("category"))
        .assign(event_name_1=lambda x: x["event_name_1"].astype("category"))
        .assign(event_type_1=lambda x: x["event_type_1"].astype("category"))
        .assign(event_name_2=lambda x: x["event_name_2"].astype("category"))
        .assign(event_type_2=lambda x: x["event_type_2"].astype("category"))
        .assign(sales=lambda x: pd.to_numeric(x["sales"], downcast="unsigned"))
        .assign(wday=lambda x: pd.to_numeric(x["wday"], downcast="unsigned"))
        .assign(month=lambda x: pd.to_numeric(x["month"], downcast="unsigned"))
        .assign(year=lambda x: pd.to_numeric(x["year"], downcast="unsigned"))
        .assign(snap_CA=lambda x: pd.to_numeric(x["snap_CA"], downcast="unsigned"))
        .assign(snap_TX=lambda x: pd.to_numeric(x["snap_TX"], downcast="unsigned"))
        .assign(snap_WI=lambda x: pd.to_numeric(x["snap_WI"], downcast="unsigned"))
        .assign(day=lambda x: pd.to_numeric(x["day"], downcast="unsigned"))
        .assign(dayofyear=lambda x: pd.to_numeric(x["dayofyear"], downcast="unsigned"))
        .assign(dayofmonth=lambda x: pd.to_numeric(x["dayofmonth"], downcast="unsigned"))
        .assign(sell_price=lambda x: pd.to_numeric(x["sell_price"], downcast="float"))
        .assign(total_sales=lambda x: pd.to_numeric(x["total_sales"], downcast="float"))
        .sort_values(by=["id", "date"], ascending=True)
    )

def write_dist_inputs(df, groups, path, prefix):
    for group, group_df in tqdm(df.groupby(groups)):
        group_df.to_parquet(path + prefix + "_".join(group) + ".parquet")
        
def sales_rolling_transform(df):
    return (
        df
        .assign(
            **{
                f"sales_lag{lag}_win{win}": (
                    df
                    .groupby("id")
                    .rolling(win, min_periods=1)[f"sales_lag{lag}"].mean()
                    .astype("float32")
                    .values
                )
                for lag in range(1,56+1)
                for win in (7, 30, 121, 365)
            }
        )
    )

def price_rolling_transform(df):
    return (
        df
        .assign(
            **{
                f"sell_price_lag{lag}_win{win}": (
                    df
                    .groupby("id")
                    .rolling(win, min_periods=1)[f"sell_price_lag{lag}"].mean()
                    .astype("float32")
                    .values
                )
                for lag in (0, 7, 14, 28)
                for win in (7, 30, 121, 365)
            }
        )
    )

def sales_lags(df):
    return (
        df
        .assign(
            **{
                f"sales_lag{lag}": df.groupby("id")["sales"].shift(lag).astype("float32")
                for lag in range(1,56+1)
            }
        )
    )

def price_lags(df):
    return (
        df
        .assign(
            **{
                f"sell_price_lag{lag}": df.groupby("id")["sell_price"].shift(lag).astype("float32")
                for lag in (0, 7, 14, 28)
            }
        )
    )

def change_in_sales(df):
    return (
        df
        .assign(
            **{
                f"change_in_sales_lag{lag+1}_w_o_w": df[f"sales_lag{lag+1}_win7"] - df[f"sales_lag{lag+1+7}_win7"]
                for lag in range(28)
            }
        )
        .assign(
            **{
                f"change_in_sales_lag{lag+1}_w_o_m": df[f"sales_lag{lag+1}_win7"] - df[f"sales_lag{lag+1+28}_win7"]
                for lag in range(28)
            }
        )
        .assign(
            **{
                f"change_in_sales_lag{lag+1}_m_o_m": df[f"sales_lag{lag+1}_win30"] - df[f"sales_lag{lag+1+28}_win30"]
                for lag in range(28)
            }
        )
    )

def change_in_prices(df):
    return (
        df
        .assign(
            **{
                "change_in_price_w_o_w": df["sell_price_lag0_win7"] / df["sell_price_lag7_win7"],
                "change_in_price_w_o_m": df["sell_price_lag0_win7"] / df["sell_price_lag28_win7"],
                "change_in_price_m_o_m": df["sell_price_lag0_win30"] / df["sell_price_lag28_win30"],
            }
        )
    )

def snap_eligible(df):
    df["snap"] = np.where(
        df["state_id"] == "CA",
        df["snap_CA"],
        np.where(df["state_id"] == "TX", df["snap_TX"], df["snap_WI"])
    )
    return df

def train_eval_test_assignment(df, splits=(0.8, 0.1, 0.1)):
    assert sum(splits) == 1, "The sum of the splits must equal 1"
    assert len(splits) == 3, "Three split values must be provided"
    return np.random.choice(["train", "eval", "test"], size=len(df), p=splits)

def random_group_assignment(df):
    df["group"] = np.where(
        df.day >= 1914,
        "forecast",
        train_eval_test_assignment(df, splits=(0.8, 0.1, 0.1))
    )
    return df

def encode_event_null(df):
    for col in ("event_name_1", "event_type_1", "event_name_2", "event_type_2"):
        df[col] = df[col].cat.add_categories("NA").fillna("NA")
    return df

In [4]:
def data_pipeline(filename):
    (
        pd.read_parquet(filename)
        .pipe(encode_event_null)
        .pipe(random_group_assignment)
        .pipe(sales_lags)
        .pipe(sales_rolling_transform)
        .pipe(price_lags)
        .pipe(price_rolling_transform)
        .pipe(change_in_sales)
        .pipe(change_in_prices)
        .pipe(snap_eligible)
        .to_parquet(filename)
    )
    return filename

In [7]:
(
    generate_inputs()
    .pipe(downcast_variables)
    .pipe(write_dist_inputs, groups=["cat_id", "store_id"], path="features/", prefix="inputs_")
)

100%|██████████| 30/30 [00:22<00:00,  1.32it/s]


In [8]:
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
    files = glob.glob("features/inputs_*_*.parquet")
    with tqdm(total=len(files)) as pbar:
        for _ in pool.imap_unordered(data_pipeline, (filename for filename in files)):
            pbar.update()

100%|██████████| 30/30 [2:33:57<00:00, 307.92s/it]   


In [9]:
dd.read_parquet("features/inputs_*_*.parquet").to_parquet("features/inputs.parquet")