In [1]:
import pandas as pd
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn import base
from sklearn import model_selection
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import RobustScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import datetime
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
matplotlib.rcParams['figure.dpi'] = 144

## Please note that given the data size, I have chosen simplicity over more complicated approaches to solve this problem. This implementation needs modifications as the data size grows.
After parallelizing the data through Spark, data I decided to use Pandas to process the data further. Therefore Apache Arrow was used to increase the performance with columnar data transfer. More info at http://arrow.apache.org/blog/

In [2]:
conf = SparkConf().set("spark.sql.execution.arrow.enabled", "true")
sc= SparkContext(conf=conf)
sqlContext = SQLContext(sc)

To do further analyses, I found Pandas the most comfortable option.

In [3]:
sale_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file:///home/vagrant/datacourse/InterviewPractice/SalesbyHour.csv')
sale_df.cache()

DataFrame[Store_ID: int, Fiscal_Qtr: int, DateStringYYYYMMDD: int, Fiscal_dayofWk: int, Daypart: string, HourlyWeather: string, Hour: int, AvgHourlyTemp: double, SalesRevenue: double]

To better understand the data, I charted the statistics of SalesRevenue data.

In [4]:
sale_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Store_ID,125792,17.39299001526329,8.563273107108882,2,38
Fiscal_Qtr,125792,2.3677419867718137,1.0812984094686178,1,4
DateStringYYYYMMDD,125792,2.0157441416353982E7,10356.963815778025,20131230,20170714
Fiscal_dayofWk,125792,3.8709774866446196,1.9426759659612687,1,7
Daypart,125792,,,Afternoon,Lunch
HourlyWeather,125792,,,clear-day,wind
Hour,125792,13.685218455863648,4.026186626380106,0,23
AvgHourlyTemp,125792,68.54871319320772,15.33677803893824,5.85,100.62
SalesRevenue,125792,118.19987550877798,98.57650967976018,-822.62,3818.51


In [5]:
data=sale_df.toPandas()

In [6]:
data.head()

Unnamed: 0,Store_ID,Fiscal_Qtr,DateStringYYYYMMDD,Fiscal_dayofWk,Daypart,HourlyWeather,Hour,AvgHourlyTemp,SalesRevenue
0,2,3,20170714,5,Afternoon,rain,16,92.43,193.44
1,2,3,20170714,5,Afternoon,rain,14,89.56,323.84
2,2,3,20170714,5,Afternoon,rain,15,90.9,126.09
3,2,3,20170714,5,Breakfast,fog,8,77.35,154.54
4,2,3,20170714,5,Breakfast,partly-cloudy-day,9,79.06,89.6


Charting the data types indicates that we have two categorical features indicated as string objects in our data set. Then it is necessary to convert them into numerical data for further analysis. However, before that to use the same data values in predicting new values, we need a dictionary to translate the categorical values to their new numerical values. This step was performed through CategoricalDict function.

In [7]:
data.dtypes

Store_ID                int64
Fiscal_Qtr              int64
DateStringYYYYMMDD      int64
Fiscal_dayofWk          int64
Daypart                object
HourlyWeather          object
Hour                    int64
AvgHourlyTemp         float64
SalesRevenue          float64
dtype: object

In [8]:
# Obtaining our Categorical translator before conversion.
def CategoricalDict(X,col_names):
    keys=[]
    values=[]
    for col in col_names:
        values.append(X[col].astype('category').cat.codes.unique())
        keys.append(X[col].unique())
    return dict(zip([str(val) for sublist in keys for val in sublist], [val for sublist in values for val in sublist]))
catDict=CategoricalDict(data,['HourlyWeather', 'Daypart'])

I then converted the categorical string values to numerical values. (get_dummies technique could also be used but I preferred the following method for simplicity in visualization and to obtain the prediction for new variables easier.)
Please note that this step could also be implemented at our Gridsearch pipeline but since our corr function needs our data in a numerical format, I performed this step separately. OneHotEncoder is recommend to be performed over all of our categorical data.

In [9]:
# A function to convert cateorical values to numerical.
def CategoricalTransformer(X,col_names):
    for col in col_names:
        X[col]= X[col].astype('category').cat.codes

In [10]:
CategoricalTransformer(data,['HourlyWeather', 'Daypart'])

Let’s find the correlation between independent variables and the target variable.
The result indicates that Sales Revenue has the strongest correlation with Daypart and Average Temperature among the others. Please note that although HourlyWeather had the least correlation, this does not mean we should not include it in our model unless we have to which is not the case here. 

### The top five predictors are:
* Daypart
* AvgHourlyTemp
* Date
* Hour
* Fiscal_Qtr

In [11]:
data.corr()

Unnamed: 0,Store_ID,Fiscal_Qtr,DateStringYYYYMMDD,Fiscal_dayofWk,Daypart,HourlyWeather,Hour,AvgHourlyTemp,SalesRevenue
Store_ID,1.0,-0.034316,0.381757,0.001978,-0.016085,0.045069,-0.045565,-0.061617,0.031698
Fiscal_Qtr,-0.034316,1.0,-0.23569,-0.000698,-0.001142,-0.03321,-0.00415,0.25229,-0.047402
DateStringYYYYMMDD,0.381757,-0.23569,1.0,-0.006231,-0.011564,0.116066,-0.030798,0.038984,0.173249
Fiscal_dayofWk,0.001978,-0.000698,-0.006231,1.0,0.024729,-0.021084,0.058623,0.012291,0.020733
Daypart,-0.016085,-0.001142,-0.011564,0.024729,1.0,-0.034912,0.025646,0.015433,0.26969
HourlyWeather,0.045069,-0.03321,0.116066,-0.021084,-0.034912,1.0,0.078927,0.112158,0.002888
Hour,-0.045565,-0.00415,-0.030798,0.058623,0.025646,0.078927,1.0,0.255642,-0.076258
AvgHourlyTemp,-0.061617,0.25229,0.038984,0.012291,0.015433,0.112158,0.255642,1.0,0.221708
SalesRevenue,0.031698,-0.047402,0.173249,0.020733,0.26969,0.002888,-0.076258,0.221708,1.0


The labels were extracted.

In [12]:
y=data['SalesRevenue']
X=data.drop(['SalesRevenue'] , axis=1)

I decided to plot the features to better understand the correlation of them with label variable. 
As plot illustrates, the hourly temperature is correlated with the revenue. 
It is also observed that revenue varies with other features. So I decided to keep these features while training the model. I also realized there are outliers in the data so scaling features using statistics that are robust to outliers will benefit our prediction. The RobustScaler performs this task at our preprocessing step.

In [13]:
data_dict=dict(zip(list(X.columns), ['Store', 'Quarter', 'Date', 'WeekDay' 'Day Part','Weather','Time','Temperature'])) 

In [14]:
# Plotting Values correlation with Sales using iPython Widgets.
from ipywidgets import widgets

def sales_plot(column):
    plt.plot(X[column], y, '.')
    plt.xlabel(data_dict[column])
    plt.ylabel('Sales')
    
widgets.interact(sales_plot, column=list(X.columns));

# Process
With a better sense of the data, we’re now able to go through and do some analysis on the data.

# Pre-processing
In order to run our models on the data, I had to transform many of the variables. The following pre-processing steps were taken (Some were taken prior to Gridsearch pipeline:
* Transforming categorical data into numerical values. (already performed above)
* Extracting labels from the dataset and dropping features with the lowest correlation. (already performed above)
* Removing outliers and scaling data using RobustScaler
* Cross-validation using random permutation cross-validator which outperforms K-fold by shuffling the data. This helps us to minimize overfitting. Given the size of the data, splitting data into training and testing sets with a ration of 4 to 1 makes sense. 

This is worth mentioning that using OneHotEncoding is strongly recommended given many of our features are categorical. However, since our selected RandomForest model does not need much of feature engineering, I decided to leave this for future. To note, the recommended step after OneHotEncoding of features with a variety of values is to apply a dimension reduction technique similar to PCA!

In [15]:
cv=model_selection.ShuffleSplit(n_splits=5, test_size=0.2) 

# Models
I fit the data against multiple models with different parameters. The models I tested include:
    * Ridge Regression
    * Linear Regression
    * Random Forest
    * Residual Model
    
Pipeline for each of the above models are included at below, and the R^2 score of each is obtained to measure how well the model explains the X variables.

It is worth mentioning that in residual estimator, I used a linear model to fit the linear part of the data, and used a non-linear model to fit the residual that the linear model can't fit. This estimator takes Ridge and RandomForest estimators. It uses the first to fit the raw data and the second to fit the residuals of the first.

In [16]:
# Linear least squares with l2 regularization.
Ridge_pipe = Pipeline([
    ('scaler', RobustScaler()),
    ('LinearModel', Ridge())
    ])
Ridge_param_grid={'LinearModel__alpha': np.linspace(0,15,16)} #Regularization strength, Gridsearch will find the best 
                                                                #parameter and will use it to train the model.

Ridge_GridResult = model_selection.GridSearchCV(Ridge_pipe, 
                                          param_grid=Ridge_param_grid, 
                                          n_jobs=4, # run each hyperparameter in one of two parallel jobs
                                          cv=cv, #Random permutation cross-validator
                                         )

In [17]:
# Ordinary least squares Linear Regression.
LR_pipe = Pipeline([
    ('scaler', RobustScaler()),
    ('LinearModel', LinearRegression())
    ])
LR_param_grid={'LinearModel__normalize': [True]} #Regularization strength

LR_GridResult = model_selection.GridSearchCV(LR_pipe, 
                                          param_grid=LR_param_grid, 
                                          n_jobs=4, # run each hyperparameter in one of two parallel jobs
                                          cv=cv, #Random permutation cross-validator
                                         )

In [18]:
# A random forest regressor.
RF_pipe = Pipeline([
    ('scaler', RobustScaler()),
    ('LinearModel', RandomForestRegressor())
    ])
RF_param_grid={'LinearModel__min_samples_leaf':[1]} #I hardcoded this parameter as it maximized the accuracy
                                                    #the downside is it increases the coputation time.

RF_GridResult = model_selection.GridSearchCV(RF_pipe, 
                                          param_grid=RF_param_grid, 
                                          n_jobs=4, # run each hyperparameter in one of two parallel jobs
                                          cv=cv, #Random permutation cross-validator
                                         )

In [20]:
class ResidualEstimator(base.BaseEstimator, base.RegressorMixin):
    
    def __init__(self, linear, nonlinear):
        self.linear = linear
        self.nonlinear = nonlinear
        return None
    
    def fit(self, X, y=None):
        self.linear.fit(X, y)
        residuals = y - self.linear.predict(X)
        self.nonlinear.fit(X, residuals)
        return self #fit always returns self
    
    def predict(self, data):
        return self.nonlinear.predict(data) + self.linear.predict(data)

In [21]:
# Residual Estimator
non_linear=RandomForestRegressor()
linear=Ridge()
residEstim=ResidualEstimator(linear,non_linear)

RM_pipe = Pipeline([
    ('Model', ResidualEstimator(linear,non_linear))
    ])
RM_grid={'Model__nonlinear__min_samples_leaf':[1],
            'Model__linear__alpha': np.linspace(0,1,15)
           #"LinearModel__max_depth": range(1,11)
           }

RM_GridResult = model_selection.GridSearchCV(RM_pipe, 
                                          param_grid=RM_grid, 
                                          n_jobs=4, # run each hyperparameter in one of two parallel jobs
                                          cv=cv, #Random permutation cross-validator
                                         )

In [22]:
Ridge_est=Ridge_GridResult.fit(X, y)
LR_est=LR_GridResult.fit(X, y)
RF_est=RF_GridResult.fit(X, y)
RM_est=RM_GridResult.fit(X, y)

### Scores indicate that our RandomForest model outperforms the  compare to the other models. 

In [23]:
print "R^2 of Ridge estimator: ", Ridge_est.score(X, y) 
print "R^2 of Linear Regression estimator: ", LR_est.score(X, y)
print "R^2 of RandomForest estimator: ", RF_est.score(X, y)
print "R^2 of Residual estimator: ", RM_est.score(X, y)

R^2 of Ridge estimator:  0.174075898218
R^2 of Linear Regression estimator:  0.174075915739
R^2 of RandomForest estimator:  0.924128358436
R^2 of Residual estimator:  0.927526228409


Our RandomForest model was selected to predict the total sales revenue of all stores on 2017-07-15.
At first, we need to create new values in a Panda friendly format and then feed them into our model to predict the total revenue.

In [24]:
def final_prediction_data():
    listofdata=[]
    for store in range (1,15):
        listofdata.append([store, 3, 20170715, datetime.datetime(2017, 07, 15).weekday(), catDict['Lunch'], catDict['clear-day'], 12, 86])
    return (listofdata)

In [25]:
print "Sales revenue of each store on 2017/07/15:", RF_est.predict(np.array(final_prediction_data()))
print "Total sales revenue of all 14 stores on 2017/07/15:",sum(RF_est.predict(np.array(final_prediction_data())))

Sales revenue of each store on 2017/07/15: [ 375.605  375.605  375.605  375.605  375.605  375.605  189.679  189.679
  189.679  189.679  189.679  189.679  189.679  176.744]
Total sales revenue of all 14 stores on 2017/07/15: 3758.127


### Our model predicted the total sales revenue of all 14 stores on 2017/07/15: 3758.127