# Importing Libraries

In [1]:
# Load the packages needed for this part
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, evaluation

from pyspark.sql import functions as fn, Row
from pyspark.sql.functions import isnan, when, count
from pyspark import sql
from pyspark.sql.window import Window
import pyspark.sql.functions as func

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

from pyspark.sql.types import IntegerType
import pylab

# Importing CSV Files

In [2]:
#Importing csv files
test_df = spark.read.csv("test.csv", header=True).limit(120000)
train_df = spark.read.csv("train.csv", header=True)
items_df = spark.read.csv("items.csv", header=True)
stores_df = spark.read.csv("stores.csv", header=True)
holidays_df = spark.read.csv("holidays_events.csv", header=True)
transactions_df = spark.read.csv("transactions.csv",header=True)

KeyboardInterrupt: 

# Preprocessing

In [None]:
#Renaming some columns and dropping some unnecessary columns
stores_df = stores_df.withColumnRenamed("type","store_type")
holidays_df = holidays_df.withColumnRenamed("type","holiday_type")
holidays_df = holidays_df.drop('description','transferred')
train_df = train_df.drop('id')
test_df = test_df.drop('id')
# stores_df = stores_df.drop("city")

In [None]:
#Coverting required columns to integer
train_df = train_df.withColumn("store_nbr", train_df["store_nbr"].cast(IntegerType()))
train_df = train_df.withColumn("item_nbr", train_df["item_nbr"].cast(IntegerType()))
train_df = train_df.withColumn("unit_sales", train_df["unit_sales"].cast(IntegerType()))
test_df = test_df.withColumn("store_nbr", test_df["store_nbr"].cast(IntegerType()))
test_df = test_df.withColumn("item_nbr", test_df["item_nbr"].cast(IntegerType()))
items_df = items_df.withColumn("item_nbr",items_df["item_nbr"].cast(IntegerType()))
items_df = items_df.withColumn("class",items_df["class"].cast(IntegerType()))
items_df = items_df.withColumn("perishable",items_df["perishable"].cast(IntegerType()))
stores_df = stores_df.withColumn("store_nbr",stores_df["store_nbr"].cast(IntegerType()))
stores_df = stores_df.withColumn("cluster",stores_df["cluster"].cast(IntegerType()))
transactions_df = transactions_df.withColumn("store_nbr",transactions_df["store_nbr"].cast(IntegerType()))
transactions_df = transactions_df.withColumn("transactions",transactions_df["transactions"].cast(IntegerType()))

# Sampling data

In [None]:
train_df = train_df.sample(False, 0.001,0)

In [None]:
# train_df.count()

In [None]:
test_df = test_df.sample(False, 0.1,0)

In [None]:
# test_df.count()

# Joining CSV Files

In [None]:
#joining the dataframe into train_df
train_df = train_df.join(stores_df, on = 'store_nbr', how = 'left')
train_df = train_df.join(items_df, on = 'item_nbr', how = 'left')
train_df = train_df.join(holidays_df, on = 'date', how = 'left')
train_df = train_df.join(transactions_df, on = ['store_nbr','date'], how = 'left')

In [None]:
#joining the dataframes into test_df
test_df = test_df.join(stores_df, on = 'store_nbr', how = 'left')
test_df = test_df.join(items_df, on = 'item_nbr', how = 'left')
test_df = test_df.join(holidays_df, on = 'date', how = 'left')
test_df = test_df.join(transactions_df, on = ['store_nbr','date'], how = 'left')

# Removing NA values

In [None]:
#Filling null values with some value
train_df = train_df.fillna({'onpromotion':'False','holiday_type':'No Holiday','locale':'Not Applicable',\
                            'locale_name':'Not Applicable', 'transactions':1884})

In [None]:
test_df = test_df.fillna({'onpromotion':'False','holiday_type':'No Holiday','locale':'Not Applicable',\
                            'locale_name':'Not Applicable', 'transactions':1884})

# Removing negative values

In [None]:
train_df = (train_df.filter(train_df.unit_sales > 0))

In [None]:
test_df = (train_df.filter(train_df.unit_sales > 0))

# Split date

In [None]:
split_date = pyspark.sql.functions.split(train_df['date'], '-')     
train_df= train_df.withColumn('Year', split_date.getItem(0))
train_df= train_df.withColumn('Month', split_date.getItem(1))
train_df= train_df.withColumn('Day', split_date.getItem(2))
train_df = train_df.drop('date')

In [None]:
split_date = pyspark.sql.functions.split(test_df['date'], '-')     
test_df= test_df.withColumn('Year', split_date.getItem(0))
test_df= test_df.withColumn('Month', split_date.getItem(1))
test_df= test_df.withColumn('Day', split_date.getItem(2))
test_df = test_df.drop('date')

In [None]:
test_df = test_df.withColumn("Day", test_df["Day"].cast(IntegerType()))
test_df = test_df.withColumn("Month", test_df["Month"].cast(IntegerType()))
test_df = test_df.withColumn("Year", test_df["Year"].cast(IntegerType()))
train_df = train_df.withColumn("Day", train_df["Day"].cast(IntegerType()))
train_df = train_df.withColumn("Month", train_df["Month"].cast(IntegerType()))
train_df = train_df.withColumn("Year", train_df["Year"].cast(IntegerType()))

# PCA with Onehotencoded Dummy Variables

In [None]:
train_df1 = train_df

In [None]:
inputcolumns = ["onpromotion","state","store_type","family","city","holiday_type","locale","locale_name"]
indexer = [StringIndexer(stringOrderType = 'alphabetAsc', inputCol = col, outputCol = "{0}_index".format(col)) for col in inputcolumns]
encoder = [OneHotEncoder(inputCol = idx.getOutputCol(), outputCol = "{0}_feat".format(idx.getOutputCol()),dropLast = False) for idx in indexer]
assembler = VectorAssembler(inputCols=['onpromotion_index_feat','state_index_feat','store_type_index_feat'\
                                      ,'family_index_feat','city_index_feat','holiday_type_index_feat','locale_index_feat','locale_name_index_feat'\
                                      ,'store_nbr', 'item_nbr','cluster','class','perishable','Month','Day','transactions']\
                            ,outputCol="features")
pipeline = Pipeline(stages=indexer + encoder + [assembler])
model = pipeline.fit(train_df1)
transformed = model.transform(train_df1)

In [None]:
inputcolumns = ["onpromotion","state","store_type","family","city","holiday_type","locale","locale_name"]
testindexer = [StringIndexer(stringOrderType = 'alphabetAsc', inputCol = col, outputCol = "{0}_index".format(col)) for col in inputcolumns]
testencoder = [OneHotEncoder(inputCol = idx.getOutputCol(), outputCol = "{0}_feat".format(idx.getOutputCol()),dropLast = False) for idx in indexer]
testassembler = VectorAssembler(inputCols=['onpromotion_index_feat','state_index_feat','store_type_index_feat'\
                                      ,'family_index_feat','city_index_feat','holiday_type_index_feat','locale_index_feat','locale_name_index_feat'\
                                      ,'store_nbr', 'item_nbr','cluster','class','perishable','Month','Day','transactions']\
                            ,outputCol="features")
testpipeline = Pipeline(stages=indexer + encoder + [assembler])
testmodel = pipeline.fit(test_df)
testtransformed = model.transform(test_df)

In [None]:
# investigate the results
# transformed.show()

In [None]:
transformed = transformed.drop('onpromotion','onpromotion_index','state','state_index','city','city_index',\
                               'store_type','store_type_index','family','family_index','holiday_type','holiday_type_index',\
                               'locale','locale_index','locale_name','locale_name_index')

In [None]:
testtransformed = testtransformed.drop('onpromotion','onpromotion_index','state','state_index','city','city_index',\
                               'store_type','store_type_index','family','family_index','holiday_type','holiday_type_index',\
                               'locale','locale_index','locale_name','locale_name_index')

In [None]:
# train_df1 = transformed.drop('features')

In [None]:
# training_df, validation_df = transformed.randomSplit([0.7, 0.3], seed=0)

In [None]:
# training_df.dtypes

In [None]:
# pcacols

In [None]:
# # assembler2 = feature.VectorAssembler(inputCols = ['store_nbr','item_nbr','cluster','class','perishable'\
# #                                                  ,'transactions','Month','Day','features'],outputCol='pcafeatures')
# std_scaled2 = feature.StandardScaler(inputCol='features', outputCol='standardizedFeatures')
# pca3 = feature.PCA(k=17, inputCol='standardizedFeatures', outputCol='pc')
# dpipe_pca = Pipeline(stages = [std_scaled2, pca3]).fit(training_df)
# dtrain = dpipe_pca.transform(training_df)

In [None]:
# pca_model = dpipe_pca.stages[-1]
# pc1 = np.absolute(pca_model.pc.toArray()[:, 0]).tolist()
# pc2 =np.absolute(pca_model.pc.toArray()[:, 1]).tolist()

In [None]:
# explainedVariance = dpipe_pca.stages[-1].explainedVariance
# explainedVariance

In [None]:
# import numpy as np
# y = explainedVariance.toArray().tolist()
# y = np.sort(np.cumsum(y))/np.sum(y)
# plt.plot(y)
# plt.xlabel('number of components')
# plt.ylabel('Explained variance');

# Linear Regression

In [None]:
lineartrainingdf, linearvalidationdf = transformed.randomSplit([0.7, 0.3], seed=0)

In [None]:
pcacols = [x for x in lineartrainingdf.columns]
pcacols.pop(2)
pcacols.pop(6)

In [None]:
lrcols = pcacols

In [None]:
# train_df.select(fn.max("unit_sales")).show()

In [None]:
# train_df.select(fn.min("unit_sales")).show()

In [None]:
lrmodel1 = Pipeline(stages=[
#     feature.VectorAssembler(inputCols = pcacols, outputCol = 'features'),
    regression.LinearRegression(featuresCol='features', labelCol='unit_sales')  
]).fit(lineartrainingdf)


In [None]:
a = lrmodel1.transform(linearvalidationdf)

In [None]:
rmse = fn.sqrt(fn.mean((fn.col('unit_sales') - fn.col('prediction'))**2)).alias('rmse')

In [None]:
lrmodel1.transform(linearvalidationdf).select(rmse).show()

In [None]:
coef = lrmodel1.stages[-1].coefficients.toArray()
coef

In [None]:
sales_col_list = ['item_nbr','store_nbr','cluster','class','perishable','Month','Day']

l1 = model.stages[0].labels
for i in l1:
    label1 = i.replace(' ','_')
    sales_col_list.append('onpromotion_'+ label1)
    
l2 = model.stages[1].labels
for i in l2:
    label2 = i.replace(' ','_')
    sales_col_list.append('state_'+ label2)
    
l3 = model.stages[2].labels
for i in l3:
    label3 = i.replace(' ','_')
    sales_col_list.append('store_type_'+ label3)
    
l4 = model.stages[3].labels
for i in l4:
    label4 = i.replace(' ','_')
    sales_col_list.append('family_'+ label4)
    
l5 = model.stages[4].labels
for i in l5:
    label5 = i.replace(' ','_')
    sales_col_list.append('holiday_type_'+ label5)
    
l6 = model.stages[5].labels
for i in l6:
    label6= i.replace(' ','_')
    sales_col_list.append('locale_'+ label6)
    
l7 = model.stages[6].labels
for i in l7:
    label7= i.replace(' ','_')
    sales_col_list.append('locale_name_'+ label7)
    
l8 = model.stages[7].labels
for i in l8:
    label8 = i.replace(' ','_')
    sales_col_list.append('city_'+ label8)


In [None]:
feature_importance = pd.DataFrame(list(zip(sales_col_list, coef)),\
                                 columns = ['feature', 'Coefficient']).sort_values('Coefficient', ascending=False)

In [None]:
feature_importance = feature_importance.loc[feature_importance['Coefficient']>0.05]

In [None]:
feature_importance.head(10)

# Random Forest

In [None]:
train_df2 = train_df
transformed.dtypes

In [None]:
rftraining_df, rfvalidation_df = transformed.randomSplit([0.7, 0.3], seed=0)

In [None]:
RF_pipe = Pipeline(stages = [regression.RandomForestRegressor(featuresCol='features', labelCol='unit_sales')]).fit(rftraining_df)

In [None]:
evaluator = evaluation.RegressionEvaluator(labelCol='unit_sales', metricName='rmse')

In [None]:
rmseRF=evaluator.evaluate(RF_pipe.transform(rfvalidation_df))

In [None]:
print(rmseRF)

In [None]:
pcacols.pop(15)
imp = RF_pipe.stages[-1].featureImportances.toArray()
feature_importance = pd.DataFrame(list(zip(pcacols, imp)),\
                                 columns = ['feature', 'importance']).sort_values('importance', ascending=False)

In [None]:
feature_importance.head(10)

# Plot

In [None]:
a.show()

In [None]:
x = a.groupBy('Month').agg(fn.avg('unit_sales').alias('Unit_Sales'),fn.avg('prediction').alias('Predicted_Sales')).toPandas()

In [None]:
sns.lineplot(x="Month",y="Unit_Sales", data = x)
sns.lineplot(x="Month",y="Predicted_Sales", data = x)

In [None]:
x1 = a.groupBy('Month').agg(fn.sum('unit_sales').alias('Unit_Sales'),fn.sum('prediction').alias('Predicted_Sales')).toPandas()

In [None]:
sns.lineplot(x="Month",y="Unit_Sales", data = x1)
sns.lineplot(x="Month",y="Predicted_Sales", data = x1)
plt.legend(labels=['Actual Sales','Predicted Sales'])
plt.ylabel("Sum of Sales")
# plt.xlabel('Month')
# plt.ylabel('Unit Sales')
# plt.title('Unit Sales for each Month')

# Linear Regression with Important Features

In [None]:
transformed.dtypes

In [None]:
moderate = transformed.select("item_nbr","Day","Month","perishable","class","family_index_feat","unit_sales")
moderate = moderate.groupby("item_nbr","Month","Day","perishable","class","family_index_feat").agg(fn.sum("unit_sales"))
lineartrainingdf2, linearvalidationdf2 = moderate.randomSplit([0.7, 0.3], seed=0)

In [None]:
lineartrainingdf2.dtypes

In [None]:
lrmodcols = ['item_nbr','Month','Day','perishable','class','family_index_feat']

In [None]:
lrmodel2 = Pipeline(stages=[
    feature.VectorAssembler(inputCols = lrmodcols, outputCol = 'features'),
    regression.LinearRegression(featuresCol='features', labelCol='sum(unit_sales)')  
]).fit(lineartrainingdf2)


In [None]:
lrmodel2.transform(linearvalidationdf2)

In [None]:
rmse2 = fn.sqrt(fn.mean((fn.col('sum(unit_sales)') - fn.col('prediction'))**2)).alias('rmse')

In [None]:
lrmodel2.transform(linearvalidationdf2).select(rmse2).show()

# Random Forest with Important Features 

In [None]:
moderatef = transformed.select("item_nbr","Day","Month","perishable","class","family_index_feat","unit_sales")
moderatef = moderatef.groupby("item_nbr","Month","Day","perishable","class","family_index_feat").agg(fn.sum("unit_sales"))
randomtrainingdf2, randomvalidationdf2 = moderatef.randomSplit([0.7, 0.3], seed=0)

In [None]:
RF_modpipe = Pipeline(stages = [feature.VectorAssembler(inputCols = lrmodcols, outputCol = 'features'),\
                                regression.RandomForestRegressor(featuresCol='features', labelCol='sum(unit_sales)')]).fit(randomtrainingdf2)

In [None]:
evaluator = evaluation.RegressionEvaluator(labelCol='sum(unit_sales)', metricName='rmse')

In [None]:
rmseRF=evaluator.evaluate(RF_modpipe.transform(randomvalidationdf2))

In [None]:
print(rmseRF)

# Prediction

In [None]:
# testmod = testtransformed.select("item_nbr","Day","Month","perishable","class","family_index_feat","unit_sales")
# testmod = testmod.groupby("item_nbr","Month","Day","perishable","class","family_index_feat").agg(fn.sum("unit_sales"))
# randomtrainingdf2, randomvalidationdf2 = moderatef.randomSplit([0.7, 0.3], seed=0)

In [None]:
# testmodel = Pipeline(stages=[
#     feature.VectorAssembler(inputCols = pcacols, outputCol = 'features'),
#     regression.RandomForestRegressor(featuresCol='features', labelCol='sum(unit_sales)')  
# ]).fit(testtransformed)
final_model = RF_modpipe

In [None]:
testmoderatef = testtransformed.select("item_nbr","Day","Month","perishable","class","family_index_feat")

In [None]:
def PredictSales(M, I):
    abc = testmoderatef.filter(fn.col('Month') == M)
    abc = abc.filter(fn.col('item_nbr') == I)
    data = final_model.transform(abc)
    sales = data.select(fn.sum('prediction')).alias("Predicted Sales").show()

In [None]:
PredictSales(2,302952)

In [None]:
final_model.transform(randomvalidationdf2).groupby('Month','item_nbr').agg(fn.sum('sum(unit_sales)')).show()