In [49]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

sales_data.head()

Row(InvoiceNo=536365, StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, UnitPrice=2.55, CustomerID=17850, Country='United Kingdom', InvoiceDate=datetime.date(2010, 1, 12), Year=2010, Month=1, Week=2, Day=12, DayOfWeek=1)

In [50]:
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum","UnitPrice": "avg"})
daily_sales_data = daily_sales_data.withColumnRenamed("sum(Quantity)", "Quantity")

# Splitting the data into training and testing sets based on a specific date
train_data = daily_sales_data.filter(col("InvoiceDate") <= "2011-09-25")
test_data = daily_sales_data.filter(col("InvoiceDate") > "2011-09-25")



In [51]:
# pandas DF
pd_daily_train_data = train_data.toPandas()

                                                                                

In [52]:
#Regression Model
#indexing
country_index=StringIndexer(inputCol="Country", outputCol="country_index").setHandleInvalid("keep")
stock_index=StringIndexer(inputCol="StockCode", outputCol="Stock_index").setHandleInvalid("keep")
#Creating the vector 
feature_cols = ["country_index", "Stock_index", "Month", "Year","DayOfWeek", "Day", "Week"]
assembler= VectorAssembler( inputCols=feature_cols , outputCol="features" )
#the regression model
rf=RandomForestRegressor(featuresCol="features", labelCol="Quantity" , maxBins=4000)
#pipeline
sales_piped= Pipeline(stages=[country_index, stock_index, assembler, rf])
#create the model
model = sales_piped.fit(train_data)


                                                                                

24/04/15 17:37:04 WARN DAGScheduler: Broadcasting large task binary with size 1522.4 KiB


                                                                                

24/04/15 17:37:05 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB


                                                                                

In [53]:
#Evaluation

test_prediction= model.transform(test_data)

test_prediction= test_prediction.withColumn("prediction", col("prediction").cast("double"))

mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="Quantity", predictionCol="prediction")

mae=mae_evaluator.evaluate(test_prediction)


# Getting the weekly sales of all countries
weekly_test_predictions = test_prediction.groupBy("Year", "Week").agg({"prediction": "sum"})

# Finding the quantity sold on the 39 week. 
promotion_week = weekly_test_predictions.filter(col('Week')==39)

# Storing prediction as quantity_sold_w30
quantity_sold_w39 = int(promotion_week.select("sum(prediction)").collect()[0][0])

# Stop the Spark session
my_spark.stop()

                                                                                