In [None]:
import warnings
warnings.filterwarnings('ignore')

## 1. Data Ingestion

### 1.1 Load Cluster Dataset

In [5]:
%%spark
import pandas as pd
df = spark.read.format('csv').option('header','true').load("/mnt/resource/o9_spark_temp/jhub/5300/martinfabbri_5300/Sales.csv")
sales = df.toPandas()
sales['Date'] = pd.to_datetime(sales['Date'])
sales['StoreId'] = sales['StoreId'].astype(int)
sales['WeeklySales'] = sales['WeeklySales'].astype(float)
sales['Department'] = sales['Department'].astype(int)
sales['IsHoliday'] = sales['IsHoliday'].astype(bool)

sales.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   StoreId  Department       Date  WeeklySales  IsHoliday
0        1           1 2010-05-02     24924.50       True
1        1           1 2010-12-02     46039.49       True
2        1           1 2010-02-19     41595.55       True
3        1           1 2010-02-26     19403.54       True
4        1           1 2010-05-03     21827.90       True

In [None]:
%%spark
df.summary().show()

### 1.2 Load EKG Dataset

In [6]:
%%spark
from o9_ibpl_magics import spark_ibpl

df = spark_ibpl('select ([Store].[Store_ID] * [Store].[Type] * [Store].[Size]) on column;',spark)

stores_df = df.withColumnRenamed("StoreStoreID","StoreId")
stores = stores_df.toPandas()
stores['StoreId']  = stores['StoreId'].astype(int)
stores.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   StoreId StoreType StoreSize
0        1         A    151315
1       10         B    126512
2       11         A    207499
3       12         B    112238
4       13         A    219622

### 1.3 Load External Dataset - Azure Blob Storage

In [7]:
%%spark
csv_path = "https://o9demostorage.blob.core.windows.net/o9demodata/Features.csv"
features = pd.read_csv(csv_path, encoding='utf8')
features['Date'] = pd.to_datetime(features['Date'])
features['StoreId']  = features['StoreId'].astype(int)
features.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   StoreId       Date  Temperature  ...         CPI  Unemployment  IsHoliday
0        1 2010-05-02        42.31  ...  211.096358         8.106      False
1        1 2010-12-02        38.51  ...  211.242170         8.106       True
2        1 2010-02-19        39.93  ...  211.289143         8.106      False
3        1 2010-02-26        46.63  ...  211.319643         8.106      False
4        1 2010-05-03        46.50  ...  211.350143         8.106      False

[5 rows x 12 columns]

## 2. Data Processing

### 2.1 Merging Dataframes

In [8]:
%%spark
df=pd.merge(sales,features, on=['StoreId','Date', 'IsHoliday'], how='left')
df=pd.merge(df,stores, on=['StoreId'], how='left')

df=df.fillna(0)
df['Temperature'] = (df['Temperature']- 32) * 5./9.

types_encoded, types = df['StoreType'].factorize()
df['Type'] = types_encoded

df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   StoreId  Department       Date  ...  StoreType  StoreSize  Type
0        1           1 2010-05-02  ...          A     151315     0
1        1           1 2010-12-02  ...          A     151315     0
2        1           1 2010-02-19  ...          A     151315     0
3        1           1 2010-02-26  ...          A     151315     0
4        1           1 2010-05-03  ...          A     151315     0

[5 rows x 17 columns]

### 2.2 Remove Duplicates 

In [9]:
%%spark
print('training_data duplicated:{}'.format(df.duplicated().sum()))
df.drop_duplicates(inplace=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

training_data duplicated:0

### 2.3 Feature Engineering

In [10]:
%%spark
tab_info = pd.DataFrame(df.dtypes).T.rename(index={0:'column Type'}) 
tab_info = tab_info.append(pd.DataFrame(df.isnull().sum()).T.rename(index={0:'null values (nb)'}))
tab_info = tab_info.append(pd.DataFrame(df.isnull().sum()/df.shape[0]*100).T.
                                       rename(index={0: 'null values (%)'}))
tab_info

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                 StoreId Department            Date  ... StoreType StoreSize   Type
column Type        int64      int64  datetime64[ns]  ...    object    object  int64
null values (nb)       0          0               0  ...         0         0      0
null values (%)        0          0               0  ...         0         0      0

[3 rows x 17 columns]

In [11]:
%%spark
df_average_sales_week = df.groupby(by=['Date'], as_index=False)['WeeklySales'].sum()
df_average_sales = df_average_sales_week.sort_values('WeeklySales', ascending=False)
ts = df_average_sales_week.set_index('Date')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3. Model Training

In [12]:
%%spark
from sklearn.linear_model import LinearRegression

def fit_ar_model(ts, orders):  
    X=np.array([ ts.values[(i-orders)].squeeze() if i >= np.max(orders) else np.array(len(orders) * [np.nan]) for i in range(len(ts))])
    mask = ~np.isnan(X[:,:1]).squeeze()
    Y= ts.values
    lin_reg=LinearRegression()
    lin_reg.fit(X[mask],Y[mask])
    print(lin_reg.coef_, lin_reg.intercept_)
    print('Score factor: %.2f' % lin_reg.score(X[mask],Y[mask]))
    return lin_reg.coef_, lin_reg.intercept_
    
def predict_ar_model(ts, orders, coef, intercept):
    return np.array([np.sum(np.dot(coef, ts.values[(i-orders)].squeeze())) + intercept  if i >= np.max(orders) else np.nan for i in range(len(ts))])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
%%spark
import numpy as np
orders = np.array([1,6,52])
coef, intercept = fit_ar_model(ts,orders)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[ 0.13488444 -0.06693086  0.53027452]] [19022705.56418591]
Score factor: 0.41

## 4. Inference

In [14]:
%%spark
pred = pd.DataFrame(index=ts.index, data=predict_ar_model(ts, orders, coef, intercept))
pred.tail()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                               0
Date                            
2012-10-08   [44987134.91007443]
2012-10-19   [47516831.19430323]
2012-10-26  [45679766.043321975]
2012-11-05  [46493833.400539674]
2012-12-10   [46741235.19014284]