In [1]:
import findspark

findspark.init()

In [2]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Import, analyse and prepare data

## Store Data

In [None]:
stores = spark.read.format("csv").option("header", "true").load("data/stores.csv")

In [None]:
stores.show()

As I want to make predictions per store, the store data itself is not relevant for my model

## Feature Data

In [None]:
features = spark.read.format("csv").option("header", "true").load("data/features.csv")

In [None]:
features.show()

In [None]:
features.printSchema()

The data has to be transformed into the correct datatypes.

As MarkDown data is not available for more than half of the records, I decided to neglect this information for the model.

In [None]:
from pyspark.sql.types import DoubleType, IntegerType, DateType, BooleanType
from pyspark.sql.functions import to_date


features = features\
    .withColumn("Store", features.Store.cast(IntegerType()))\
    .withColumn("Date", to_date(features.Date, "dd/MM/yyyy"))\
    .withColumn("Temperature", features.Temperature.cast(DoubleType()))\
    .withColumn("Fuel_Price", features.Fuel_Price.cast(DoubleType()))\
    .withColumn("CPI", features.CPI.cast(DoubleType()))\
    .withColumn("Unemployment", features.Unemployment.cast(DoubleType()))\
    .withColumn("IsHoliday", features.IsHoliday.cast(BooleanType()))

features = features.select("Store",\
                           "Date",\
                           "Temperature",\
                           "Fuel_Price",\
                           "CPI",\
                           "Unemployment",\
                           "IsHoliday")

In [None]:
features.printSchema()

In [None]:
features.show()

## Sales Data

In [None]:
sales = spark.read.format("csv").option("header", "true").load("data/sales.csv")

In [None]:
sales.show()

In [None]:
sales.printSchema()

The data has to be transformed into the correct datatypes.


In [None]:
sales = sales\
    .withColumn("Store", sales.Store.cast(IntegerType()))\
    .withColumn("Dept", sales.Dept.cast(IntegerType()))\
    .withColumn("Date", to_date(sales.Date, "dd/MM/yyyy"))\
    .withColumn("Weekly_Sales", sales.Weekly_Sales.cast(DoubleType()))\
    .withColumn("IsHoliday", sales.IsHoliday.cast(BooleanType()))

In [None]:
sales.show()

In [None]:
sales.printSchema()

# Data Statistics

In [None]:
sales.describe().show()

In [None]:
features.describe().show()

# Join Data

In [None]:
df = sales.join(features, ["Store", "Date", "IsHoliday"])
df.show()

In [None]:
df.orderBy('Date').tail(1)

Note: The data ranges from 2010-02-05 (week 5) until 2012-10-26 (week 43 )

# Define Features/Categories

In [None]:
from pyspark.sql.functions import weekofyear, year


# extract year and week from date for further analysis
df = df.withColumn('Week', weekofyear(df.Date))
df = df.withColumn('Year', year(df.Date))
df = df.withColumn('IsHoliday', df.IsHoliday.cast(IntegerType()))
df.show()

In [None]:
df.count()

In [None]:
# check for null entries by comparing count with null rows removed
df.na.drop().count()

In [None]:
df_pd = df.toPandas()

In [None]:
df_pd

In [None]:
import matplotlib.pyplot as plt
from pandas.plotting import register_matplotlib_converters


register_matplotlib_converters()

# group by date and sum weekly sales over all stores and departements
df_grouped =df_pd.groupby(by=['Date'], as_index=False)['Weekly_Sales'].sum()
plt.figure(figsize=(12,4))
plt.xlabel('Date')
plt.ylabel('Weekly_Sales')
plt.plot(df_grouped['Date'], df_grouped['Weekly_Sales'])

There are obvious peaks/trends in the weekly sales during the holidays at the end of 2010 and 2011.

In [None]:
df_group_all =df_pd.groupby(by=['Date'], as_index=False)['Weekly_Sales',\
                                                         'Temperature',\
                                                         'Fuel_Price',\
                                                         'CPI',\
                                                         'Unemployment'].sum()

In [None]:
# visualize correlation of sales and unemployment
df_group_all['Unemployment'] = df_group_all['Unemployment']*2000

plt.figure(figsize=(12,4))
plt.plot(df_group_all['Date'], df_group_all['Weekly_Sales'])
plt.plot(df_group_all['Date'], df_group_all['Unemployment'])
plt.xlabel('Date')
plt.legend(('Weekly_Sales', 'Unemployment'))
plt.savefig('out/unemployment.png')

The graph shows that there is no correlation between weekly sales and unemployment.

In [None]:
# visualize correlation of sales and temperature
df_group_all['Temperature'] = df_group_all['Temperature']*250

plt.figure(figsize=(12,4))
plt.plot(df_group_all['Date'], df_group_all['Weekly_Sales'])
plt.plot(df_group_all['Date'], df_group_all['Temperature'])
plt.xlabel('Date')
plt.legend(('Weekly_Sales', 'Temperature'))

The graph shows that there is no direct correlation between temperature and sales. Nevertheless, theres is some kind of repetitive pattern with the same frequency. This supports the assumption that there is some kind of seasonal trend, as temperatures change according to the season.

In [None]:
# visualize correlation of sales and fuel price
df_group_all['Fuel_Price'] = df_group_all['Fuel_Price']*2000

plt.figure(figsize=(12,4))
plt.plot(df_group_all['Date'], df_group_all['Weekly_Sales'])
plt.plot(df_group_all['Date'], df_group_all['Fuel_Price'])
plt.xlabel('Date')
plt.legend(('Weekly_Sales', 'Fuel_Price'))

The graph shows that there is no correlation between weekly sales and fuel price.

In [None]:
# visualize correlation of sales and CPI (consumer price index)
df_group_all['CPI'] = df_group_all['CPI']*100

plt.figure(figsize=(12,4))
plt.plot(df_group_all['Date'], df_group_all['Weekly_Sales'])
plt.plot(df_group_all['Date'], df_group_all['CPI'])
plt.xlabel('Date')
plt.legend(('Weekly_Sales', 'CPI'))

The graph shows that there is also no correlation between weekly sales and CPI.

In [None]:
# compare weekly sales during the year
df_2010 = df_pd[df_pd['Year']==2010]
df_2010_grouped =df_2010.groupby(by=['Week'], as_index=False)['Weekly_Sales'].sum()

df_2011 = df_pd[df_pd['Year']==2011]
df_2011_grouped =df_2011.groupby(by=['Week'], as_index=False)['Weekly_Sales'].sum()

df_2012 = df_pd[df_pd['Year']==2012]
df_2012_grouped =df_2012.groupby(by=['Week'], as_index=False)['Weekly_Sales'].sum()

plt.figure(figsize=(12,4))
plt.plot(df_2010_grouped['Week'], df_2010_grouped['Weekly_Sales'])
plt.plot(df_2011_grouped['Week'], df_2011_grouped['Weekly_Sales'])
plt.plot(df_2012_grouped['Week'], df_2012_grouped['Weekly_Sales'])

plt.xlabel('Date')
plt.ylabel('Weekly_Sales')
plt.legend(('2010', '2011', '2012'))
plt.savefig('out/years.png')

### Conclusion
It does not make any sense to consider temperature, unemployment, CPI or fuel price as features. As illustrated in the l ast graph, there only pattern visible on first glance is a seasonal trend. Therefore, I decided to proceed with a time-series model. I chose to use Holt-Winters exponential smoothing due to the seasonal trend in the data. 

# Create and train Model

In [None]:
df_group_sales = df_pd.groupby('Date', as_index=False)['Weekly_Sales'].sum()
df_group_sales = df_group_sales.set_index("Date")

df_group_sales

In [None]:
# split in training and test data
split_point = 100

# take first x entries as train data
train = df_group_sales.iloc[:split_point]
# test with remaining entries
test = df_group_sales.iloc[split_point:]

In [None]:
from statsmodels.tsa.holtwinters import ExponentialSmoothing


# fit exponential smoothing model
fit_model = ExponentialSmoothing(train,
                                  seasonal='mul',
                                  seasonal_periods=52,
                                  freq='W-FRI'
                                ).fit()
prediction = fit_model.forecast(len(df_group_sales)-split_point)
prediction


# plot predictions
test.plot(figsize=(12,6))
prediction.plot()
plt.legend(('Weekly_Sales','Prediction'))
plt.ylabel('Sum of Sales')
plt.savefig('out/prediction.png')


# Evaluate Predictions

In [None]:
import numpy as np


def mean_absolute_percentage_error(y_true, y_pred): 
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

print("Mean Absolute Percentage Error = {a}%".format(a=mean_absolute_percentage_error(test['Weekly_Sales'],prediction)))

# Final Conclusions

Exponential Smoothing seems to do a pretty good job. The current ratio is 100 entries for training and 43 entries for testing. 
The mean absolute percentage error will go down if the training set is extended, i.e. taking a ratio of 130 entries for training and the remaining 13 entries for testing will result in an error of 1.48% instead of 2.66% 