In [1]:
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder.appName('second_case_study').getOrCreate()
spark

### Reading dataset

In [5]:
data = spark.read.csv('C:\\Users\\Haque\\Downloads\\Online Retail1.csv', header=True, inferSchema=True)

In [6]:
data2 = data.select('CustomerID','InvoiceDate','Quantity')

In [8]:
# extracting month year from date
data3 = data2.withColumn('InvoiceDate2',substring(col('InvoiceDate'),1,10))
data3 = data3.withColumn('Month',(split(col('InvoiceDate2'),'/')[0]))
data3 = data3.withColumn('Year',trim(split(col('InvoiceDate2'),'/')[2]))
data3 = data3.withColumn('Day',(split(col('InvoiceDate2'),'/')[1]))
data3 = data3.withColumn('monyy',concat(split(col('InvoiceDate2'),'/')[0],trim(split(col('InvoiceDate2'),'/')[2])))

In [9]:
data3.show()

+----------+--------------+--------+------------+-----+----+---+------+
|CustomerID|   InvoiceDate|Quantity|InvoiceDate2|Month|Year|Day| monyy|
+----------+--------------+--------+------------+-----+----+---+------+
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:26|       8|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:26|       2|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:28|       6|  12/1/2010 |   12|2010|  1|122010|
|     17850|12/1/2010 8:28|       6|  12/1/2010 |   12|2010|  1|122010|
|     13047|12/1/2010 8:34|      32|  12/1/2010 |   12|2010|  1|122010|
|     13047|12/1/2010 8:34|       6|  12/1/2010 |   12|2010|  1|

In [11]:
#checking null CustomerID
data3.filter((col('CustomerID').isNull())).show()

#changing null customer ID to 0 by assuming null customers are the not registered customers hence their id is null
data3 = data3.withColumn('CustomerID',when(col('CustomerID').isNull(),0).otherwise(col('CustomerID')))

In [12]:
# making buckets for customer id
from pyspark.ml.feature import Bucketizer

bkt = Bucketizer(splits=[0, 12000, 13000, 14000, 15000, 16000, 17000, 18000, 19000], inputCol='CustomerID',outputCol='CustomerID_grp')
data4 = bkt.transform(data3)

In [13]:
data4.show()

+----------+--------------+--------+------------+-----+----+---+------+--------------+
|CustomerID|   InvoiceDate|Quantity|InvoiceDate2|Month|Year|Day| monyy|CustomerID_grp|
+----------+--------------+--------+------------+-----+----+---+------+--------------+
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:26|       8|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:26|       2|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:26|       6|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:28|       6|  12/1/2010 |   12|2010|  1|122010|           6.0|
|     17850|12/1/2010 8:28|       6|  12/1/

In [14]:
data4.select([count(when(col(c).isNull(),c)).alias(c) for c in data4.columns]).head()

Row(CustomerID=0, InvoiceDate=0, Quantity=0, InvoiceDate2=0, Month=0, Year=0, Day=0, monyy=0, CustomerID_grp=0)

In [70]:
data4.describe().show()

+-------+------------------+---------------+------------------+------------+-----------------+------------------+------------------+-----------------+------------------+
|summary|        CustomerID|    InvoiceDate|          Quantity|InvoiceDate2|            Month|              Year|               Day|            monyy|    CustomerID_grp|
+-------+------------------+---------------+------------------+------------+-----------------+------------------+------------------+-----------------+------------------+
|  count|            531285|         531285|            531285|      531285|           531285|            531285|            531285|           531285|            531285|
|   mean|11455.198377518658|           null|10.655262241546438|        null|7.560243560424254|2010.9050269655702|15.024694843633831|82242.28260646672| 2.841011886275728|
| stddev|  6795.30407033276|           null|156.83032303670862|        null|3.508716805778585|0.2931780227158318| 8.662156653949872|35089.11735707545|

In [54]:
data4.select([count(when(col('Quantity')<0,0))]).head()

Row(count(CASE WHEN (Quantity < 0) THEN 0 END)=10624)

In [55]:
data4.count()

541909

In [57]:
data4 = data4.filter(col('Quantity')>0)

In [58]:
data4.groupBy('CustomerID_grp').count().show()

+--------------+------+
|CustomerID_grp| count|
+--------------+------+
|           0.0|133361|
|           7.0| 15285|
|           1.0| 44956|
|           4.0| 64416|
|           3.0| 82888|
|           2.0| 57127|
|           6.0| 70594|
|           5.0| 62658|
+--------------+------+



## Changing the date into numerical variable for regression problem

In [59]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer

sIndex = StringIndexer(inputCols=['monyy'],outputCols=['monyy_indexed'])
sIndex_transformed = sIndex.fit(data4).transform(data4)

vAssemble = VectorAssembler(inputCols=['CustomerID_grp', 'monyy_indexed'],outputCol='combined_features')
vAssemble_transformed = vAssemble.transform(sIndex_transformed)
data5 = vAssemble_transformed.select('Quantity','combined_features')

In [60]:
# Test Train split
test, train = data5.randomSplit([0.3,0.7],seed=123)

## Using Linear Regression Model

In [61]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'combined_features', labelCol='Quantity')
lr_model = lr.fit(train)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [1.0277433982624584,0.06413056173581233]
Intercept: 7.174540042175528


### RMSE and R2 for train dataset

In [62]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 127.776771
r2: 0.000325


### Making Predictions

In [63]:
predictions = lr_model.transform(test)

# lr_predictions.select("prediction","MV","features").show(5)

In [64]:
predictions.show()

+--------+-----------------+-----------------+
|Quantity|combined_features|       prediction|
+--------+-----------------+-----------------+
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|        [0.0,2.0]|7.302801165647153|
|       1|   

### Evaluating Predictions

In [65]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE:", rmse)

evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared:", r2)



RMSE: 209.33628822855258
R-squared: 0.0001632829653058776


### Tuning model with help of hyperparameters

In [71]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create a Linear Regression instance
lr = LinearRegression(featuresCol='combined_features', labelCol='Quantity')

# Define the hyperparameter grid
param_grid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [1, 2, 3]) \
    .addGrid(lr.regParam, [0.1, 0.2, 0.3]) \
    .build()

# Perform grid search cross-validation
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=RegressionEvaluator(labelCol='Quantity', metricName='rmse'),
                          numFolds=3)

# Fit the model with the best hyperparameters
cv_model = crossval.fit(train)

# Get the best model
best_model = cv_model.bestModel

# Evaluate the model on the test data
predictions = best_model.transform(test)


## Checking params for best model

In [72]:
best_max_iter = best_model.getMaxIter()
best_reg_param = best_model.getRegParam()

print("Best maxIter: ", best_max_iter)
print("Best regParam: ", best_reg_param)


Best maxIter:  1
Best regParam:  0.1


In [73]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE:", rmse)

evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared:", r2)



RMSE: 209.336292089746
R-squared: 0.00016324608146689457
