In [1]:
%load_ext autoreload
%autoreload 2

In [12]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

from DataManipulation import DataManipulation
from Estimators.XGBoost import XGBoost
from Logging import Logging
from Transformers.FilterDepartment import FilterDepartment
from Transformers.ImputePrice import ImputePrice
from Transformers.LagFeature import LagFeature
from Transformers.LogTransformation import LogTransformation
from Transformers.MonthlyAggregate import MonthlyAggregate
from Transformers.NegativeSales import NegativeSales
from Estimators.RandomForest import RandomForest
from Estimators.ProphetEstimator import ProphetEstimator
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.functions import mean, stddev
from Transformers.Scaling import Scaling
from Evaluator.MAPE import MAPE
import pyspark.sql.functions as F
import pandas as pd
import findspark
import sys
import warnings

if not sys.warnoptions:
    warnings.simplefilter("ignore")

In [3]:
def initialize_session(name):
    return SparkSession.builder.master("local[*]").appName(name).\
        config("spark.driver.bindAddress", "localhost").\
        config("spark.ui.port", "4050").getOrCreate()

In [4]:
findspark.init()
spark = initialize_session("Assignment")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
log = Logging.getLogger()
log.info("Initializing session")

data = DataManipulation()
df = data.get_data()

In [5]:
# df = data.filter_store(df, "WI_1")
filterDepartment = FilterDepartment(inputCol="FOODS_1", filterCol="dept_id")

In [6]:
imputePrice = ImputePrice()
negativeSales = NegativeSales(column="sales")
aggregate = MonthlyAggregate(columns=["store_id", "dept_id", "year", "month"],
                             expressions={"sales": "sum",
                                          "sell_price": "avg",
                                          "event_name_1": "count",
                                          "event_name_2": "count",
                                          "snap_WI": "sum"}
                             )
logTransformation = LogTransformation(inputCols=["sales"])
lagFeatures = LagFeature(partitionBy=["store_id", "dept_id"],
                         orderBy=["year", "month"],
                         lags=[i for i in range(1, 13)],
                         target="sales"
                         )

storeIndexer = StringIndexer(inputCol="store_id", outputCol="store_id_index")
yearIndexer = StringIndexer(inputCol="year", outputCol="year_index")
#scaling = Scaling(inputCols=["sell_price"])
#scaling1 = Scaling(inputCols=["event_name_1"])
#scaling2 = Scaling(inputCols=["event_name_2"])

In [7]:
inputColumns = ["store_id_index", "month", "year_index", "event_name_1", "event_name_2", "sell_price"]
inputColumns.extend(["lag_{}".format(i) for i in range(1, 13)])
vector = VectorAssembler(inputCols=inputColumns,outputCol="features")

In [8]:
log.info("Initiating pipeline")
transformed = Pipeline(stages=[filterDepartment, imputePrice, negativeSales, aggregate,
                               logTransformation, lagFeatures, storeIndexer,
                               yearIndexer, vector]).fit(df).transform(df)

In [9]:
train, test = data.train_test_split(transformed)

# XGBoost

In [28]:
xgbModel = XGBoost(inputCols=inputColumns, labelCol="sales").fit(train)

Training XGBoost
score: 0.038140776496895734
score:                                                                                                                 
0.03040679583651568                                                                                                    
score:                                                                                                                 
0.0301911522897184                                                                                                     
score:                                                                                                                 
0.030295133603962483                                                                                                   
score:                                                                                                                 
0.02991132022344387                                                                                                

In [60]:
pred = xgbModel.transform(test)
print(pred.show())

+-----+----+-----+----------+------------------+
|store|year|month|prediction|            actual|
+-----+----+-----+----------+------------------+
|  0.0| 5.0|    1| 4.0389442|3.7662640906519957|
|  0.0| 5.0|    2| 3.7691476| 3.991447598003803|
|  0.0| 5.0|    3| 3.9158623|3.8987251815894934|
|  0.0| 5.0|    4| 3.9507616| 4.073571728304925|
|  0.0| 5.0|    5| 3.8980036| 3.770557474850995|
|  1.0| 5.0|    1|  3.945459| 3.922154325231059|
|  1.0| 5.0|    2| 3.7430043|   3.9703933720796|
|  1.0| 5.0|    3| 3.8384516| 4.006380458549693|
|  1.0| 5.0|    4| 3.8938484|  4.14674801363064|
|  1.0| 5.0|    5| 3.8458862|4.1285285037974395|
|  2.0| 5.0|    1| 3.6775389|3.8274338954007794|
|  2.0| 5.0|    2|  3.805361|3.8232785569516707|
|  2.0| 5.0|    3| 3.8210578|3.9811387826406603|
|  2.0| 5.0|    4|  3.794316| 3.773932647467645|
|  2.0| 5.0|    5|  3.546763| 4.146686055647526|
|  3.0| 5.0|    1| 3.8259609| 3.850829959848531|
|  3.0| 5.0|    2| 3.8181422| 3.932879457823799|
|  3.0| 5.0|    3| 3

In [70]:
#pred.repartition(1).write.csv("RFresult", header="true")
pred.toPandas().to_csv("XGBresult.csv", header=True, index=False)

In [30]:
mape = MAPE(labelCol="actual", predictionCol="prediction")
score = mape.evaluate(pred)
print("Score: ", score)

Score:  0.03440495505899868


In [31]:
xgbModel.save("XGBoostBestModel")

# Random Forest

In [32]:
rfModel = RandomForest(featuresCol="features", labelCol="sales").fit(train)

Training Random Forest
score: 0.011476324896382584
score:                                                                                                                 
0.02997232685130526                                                                                                    
score:                                                                                                                 
0.030305298698946905                                                                                                   
score:                                                                                                                 
0.030254975808959937                                                                                                   
score:                                                                                                                 
0.03016691127838474                                                                                          

In [53]:
predRf = rfModel.transform(test)
predRf.select("store_id", "year", "month", "sales", "prediction").show()

+--------+----+-----+------------------+------------------+
|store_id|year|month|             sales|        prediction|
+--------+----+-----+------------------+------------------+
|    CA_1|2016|    1| 3.959089114367392| 3.973188818744754|
|    CA_1|2016|    2|3.9631264410819047|   3.9842871115052|
|    CA_1|2016|    3| 4.006380458549693|3.9704498068074487|
|    CA_1|2016|    4|  3.95525468282018|3.9904649691232867|
|    CA_1|2016|    5|  3.88058495606498|3.9977359691860155|
|    CA_2|2016|    1|  4.14674801363064| 4.081151340642005|
|    CA_2|2016|    2| 4.104282207094438| 4.078755324558147|
|    CA_2|2016|    3| 4.121428518367963| 4.059489182785702|
|    CA_2|2016|    4| 4.146686055647526|4.0629658390873855|
|    CA_2|2016|    5| 4.073571728304925| 4.067109754296584|
|    CA_3|2016|    1| 4.172369376763842| 4.059356119997409|
|    CA_3|2016|    2|4.1285285037974395| 4.074892297993666|
|    CA_3|2016|    3|4.1170059966359664|  4.07102261219478|
|    CA_3|2016|    4| 4.066102196766773|

In [59]:
RFresult = predRf.select("store_id", "year", "month", "sales", "prediction")
#RFresult.repartition(1).write.csv("RFresult", header="true")
result = RFresult.toPandas()

In [71]:
result.to_csv("RFresult.csv",header=True, index=False)

In [35]:
mape = MAPE(labelCol="sales", predictionCol="prediction")
score = mape.evaluate(predRf)
print("Score: ", score)

Score:  0.010606751116485118


# Prophet

In [9]:
transformedProphet = transformed.withColumn("ds", F.to_date(F.concat_ws("-", "Year","month")))
trainProphet, testProphet = data.train_test_split(transformedProphet)

In [10]:
def getStores(data):
    storesName = data.select("store_id").distinct().collect()
    stores = {}
    for store in storesName:
        stores[store.store_id] = data.filter(df["store_id"] == store.store_id)    
    return stores

In [11]:
trainStores = getStores(trainProphet)
testStores = getStores(testProphet)

In [13]:
#prophetModel = ProphetEstimator(labelCol="sales").fit(trainStores["WI_2"])
#pred = prophetModel.transform(testStores["WI_2"])
#pred.show()

In [14]:
models = {}
for key, value in trainStores.items():
    prophetModel = ProphetEstimator(labelCol="sales").fit(value)
    models[key] = prophetModel  

Training Prophet


INFO:numexpr.utils:NumExpr defaulting to 8 threads.
INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


Training Prophet


INFO:fbprophet:Disabling weekly seasonality. Run prophet with weekly_seasonality=True to override this.
INFO:fbprophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.


In [15]:
predictions = {}
for key, value in models.items():    
    data = testStores[key]
    pred = value.transform(data)
    predictions[key] = pred

In [22]:
from functools import reduce
from pyspark.sql import DataFrame

def union_all(*dfs):
    return reduce(DataFrame.union, dfs)

In [23]:
predProphet = union_all(*predictions.values())

In [24]:
predProphet.show()

+-----+----+-----+------------------+------------------+
|store|year|month|             sales|              yhat|
+-----+----+-----+------------------+------------------+
| WI_2|2016|    1| 4.025018972282759|3.7844230019530993|
| WI_2|2016|    2|   3.9703933720796| 4.066482393414515|
| WI_2|2016|    3| 4.064120905829622| 4.005805930697541|
| WI_2|2016|    4| 4.080806804334363| 4.028455399559174|
| WI_2|2016|    5| 4.027145665774341|3.9845950463125046|
| WI_3|2016|    1|3.9421569284674907| 3.681262273031782|
| WI_3|2016|    2|3.9200189160289147| 3.963220942588123|
| WI_3|2016|    3|3.8987251815894934|3.9311875477247513|
| WI_3|2016|    4|3.8765642139838454|3.9044075735421537|
| WI_3|2016|    5|   3.8750612633917| 3.923496247527877|
| TX_2|2016|    1|3.8605176774617465|3.6267798026311406|
| TX_2|2016|    2| 3.890532791927745|3.9494741822809907|
| TX_2|2016|    3|3.9088601730172763| 3.883800425638834|
| TX_2|2016|    4| 3.850829959848531|3.8043470537167083|
| TX_2|2016|    5|3.82327855695

In [25]:
mape = MAPE(labelCol="sales", predictionCol="yhat")
score = mape.evaluate(predProphet)
print("Score: ", score)

Score:  0.02683103959912822
