In [1]:
import dask.dataframe as dask_df
import numpy as np
import pyspark.sql.types as T
import plotly.express as px
import warnings
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag
from pyspark.sql.functions import mean
from pyspark.sql.functions import max
from pyspark.sql.functions import col
from pyspark.sql.functions import round
from pyspark.sql.functions import udf
from pyspark.sql.functions import split
from pyspark.sql import Window

# suppress warning messages
warnings.filterwarnings('ignore')

In [2]:
# this function returns a Spark session

def init_spark():
    spark = SparkSession \
        .builder \
        .appName('M5 Forecasting') \
        .config('spark.some.config.option', 'some-value') \
        .getOrCreate()
    return spark

In [3]:
# read melted sales data for all stores from the disk into a Spark dataframe

spark = init_spark()

sales_melted = spark.read.csv('./m5-forecasting-accuracy/sales_by_store/*', header = True). \
                     drop('_c0')
sales_melted = sales_melted.withColumn('units_sold', sales_melted['units_sold'].cast('int'))

In [4]:
# create mean features for various categorical features

mean_by_item = sales_melted.groupBy('item_id').agg(round(mean('units_sold'), 4).alias('item_mean'))
mean_by_dept = sales_melted.groupBy('dept_id').agg(round(mean('units_sold'), 4).alias('dept_mean'))
mean_by_cat = sales_melted.groupBy('cat_id').agg(round(mean('units_sold'), 4).alias('cat_mean'))
mean_by_store = sales_melted.groupBy('store_id').agg(round(mean('units_sold'), 4).alias('store_mean'))

In [5]:
# since fitting a model to the entire data at once is a very costly operation that will not be feasible on a personal computer,
# we will fit a model to an individual store
# we can fit different models for different stores just by changing the selection below

# select a store to apply feature engineering and machine learning model fitting on

current_store = 'CA_1'

In [6]:
# read melted sales data for the specified store, calendar data, and prices data into Spark dataframes
# merge the three dataframes
# obtaining day numbers from the 'd' column, e.g. taking out 41 from d_41
# changing columns' datatypes appropriately
# replacing null events' values with 'NoEvent'

spark = init_spark()

sales_melted = spark.read.csv('./m5-forecasting-accuracy/sales_by_store/' + current_store + '/*', header = True). \
                     drop('_c0')
sales_melted = sales_melted.where(sales_melted['d'].isin(['d_'+str(i) for i in np.arange(676, 1942)]))

calendar = spark.read.csv('./m5-forecasting-accuracy/calendar.csv', header = True)
prices = spark.read.csv('./m5-forecasting-accuracy/sell_prices.csv', header = True)

sales_details = sales_melted.join(calendar, 'd', 'left_outer'). \
                             join(prices, ['store_id', 'item_id', 'wm_yr_wk'], 'left_outer')

sales_details = sales_details.select(split(sales_details.d, '_')[1].alias('day'), \
                                     'item_id', 'dept_id', 'store_id', 'cat_id', 'date', 'wday', 'month', \
                                     'year', 'event_name_1', 'event_type_1', 'event_name_2', 'event_type_2', 'snap_CA', 
                                     'snap_TX', 'snap_WI', 'sell_price', 'units_sold')

sales_details = sales_details.withColumn('day', sales_details['day'].cast('int')). \
                              withColumn('wday', sales_details['wday'].cast('int')). \
                              withColumn('month', sales_details['month'].cast('int')). \
                              withColumn('year', sales_details['year'].cast('int')). \
                              withColumn('snap_CA', sales_details['snap_CA'].cast('int')). \
                              withColumn('snap_TX', sales_details['snap_TX'].cast('int')). \
                              withColumn('snap_WI', sales_details['snap_WI'].cast('int')). \
                              withColumn('units_sold', sales_details['units_sold'].cast('int')). \
                              withColumn('sell_price', sales_details['sell_price'].cast('float'))

sales_details = sales_details.fillna('NoEvent', subset = ['event_name_1', 'event_type_1', 'event_name_2', 'event_type_2'])

In [7]:
# introducing lags to create lag features
# since we have introduced lags of 7, 14, and 21 days too, we will need to make forecasts recursively over periods of 7 days
# that is, in order to make forecast for days 8 to 14, we will first need a forecast for days 1 to 7
# using these forecasted units sold of days 1 to 7, we will recalculate features for days 8 to 14
# then we can make a forecast for days 8 to 14

lags = [7, 14, 21, 28, 35, 42, 49]
window = Window.partitionBy(['item_id']).orderBy('day')

for lag_duration in lags:
    sales_details = sales_details.withColumn('sold_lag_' + str(lag_duration), \
                                             lag(col = sales_details['units_sold'], offset = lag_duration).over(window))

In [8]:
# adding the mean features we calculated beforeheand to our data

sales_details = sales_details.join(mean_by_item, 'item_id', 'left_outer'). \
                              join(mean_by_dept, 'dept_id', 'left_outer'). \
                              join(mean_by_cat, 'cat_id', 'left_outer'). \
                              join(mean_by_store, 'store_id', 'left_outer')

In [9]:
# introducing rolling mean features
# we calculate mean over windows of sizes 7, 14, 28, 56, and 112
# we have kept the lag to 28 days, so we won't need a recursive feature calculation for these features

window = Window.partitionBy(['item_id']).orderBy('day')

window_sizes = np.array([7, 14, 28, 56, 112])
stop = -28
start_values = stop - window_sizes + 1

for index, start in enumerate(start_values.tolist()):
    col_name = 'sold_rolling_mean_window_' + str(window_sizes[index]) + '_lag_' + str(abs(stop))
    sales_details = sales_details.withColumn(col_name, \
                                  mean(col = sales_details['units_sold']).over(window.rangeBetween(start, stop)).cast('float'))

In [10]:
# introducing rolling mean features
# we calculate mean over windows of size 7
# we have kept the lag to 7 and 365 days
# we will need recursive feature calculation for the feature with a lag of 7 days

window = Window.partitionBy(['item_id']).orderBy('day')

window_size = 7
stop_values = np.array([-7, -365])

for stop in stop_values.tolist():
    col_name = 'sold_rolling_mean_window_' + str(window_size) + '_lag_' + str(abs(stop))
    sales_details = sales_details.withColumn(col_name, \
                                             mean(col = sales_details['units_sold']). \
                                             over(window.rangeBetween(stop - window_size + 1, stop)).cast('float'))

In [11]:
# introducing expanding mean feature
# we have kept the lag to 28 days, so we won't need a recursive feature calculation for this feature

stop_values = np.array([-7, -365])

for stop in stop_values.tolist():
    window = Window.partitionBy(['item_id']).orderBy('day').rangeBetween(Window.unboundedPreceding, stop)

    sales_details = sales_details.withColumn('sold_expanding_mean_lag_' + str(abs(stop)), \
                                             mean(col = sales_details['units_sold']).over(window).cast('float'))

In [12]:
# introducing rolling mean features for price
# we calculate mean over a window of size 7, and max over a window of size 365
# we have kept the lag to 1
# we won't need recursive feature calculation since we assume that we will have price details available in advance

window = Window.partitionBy(['item_id']).orderBy('day')

window_size = 7
stop = -1
start = stop - window_size + 1

col_name = 'price_rolling_mean_window_' + str(window_size) + '_lag_' + str(abs(stop))
sales_details = sales_details.withColumn(col_name, \
                              mean(col = sales_details['sell_price']).over(window.rangeBetween(start, stop)).cast('float'))

window_size = 365
stop = -1
start = stop - window_size + 1

col_name = 'price_rolling_max_window_' + str(window_size) + '_lag_' + str(abs(stop))
sales_details = sales_details.withColumn(col_name, \
                              max(col = sales_details['sell_price']).over(window.rangeBetween(start, stop)).cast('float'))

In [13]:
# introducing revenue and trend features
# we won't need recursive feature calculation for revenue feature but we will need it for variance trend feature

window = Window.partitionBy(['item_id']).orderBy('day')

sales_details = sales_details.withColumn('sold_revenue_lag_28', \
                                         lag(col = (sales_details['units_sold'] * sales_details['sell_price']), \
                                         offset = 28).over(window))

sales_details = sales_details.withColumn('variance_trend_lag_7', \
                                         sales_details['sold_rolling_mean_window_7_lag_7'] - sales_details['item_mean'])

In [14]:
# to reduce the size of the dataset to be able to run further steps in vanilla python environment,
# we take the data from 2013 to 2016
# also more recent data should be more relevant for the predictions

sales_details = sales_details.where(sales_details['day'] >= 1041)

In [15]:
# splitting the dataset into train and test in such a way that test set consists of data for the last 28 days and train set
# consists of the entire remaining data
# saving this data in files to load it with ease in pandas, and also to avoid recalculation every time
 
train = sales_details[sales_details['day'] < 1914]
test = sales_details[sales_details['day'] >= 1914]

sales_details.write.csv('./m5-forecasting-accuracy/train_test_split/sales_details/', header = True)
train.write.csv('./m5-forecasting-accuracy/train_test_split/train/', header = True)
test.write.csv('./m5-forecasting-accuracy/train_test_split/test/', header = True)