<a href="https://colab.research.google.com/github/mohiteprathamesh1996/Predicting-Annual-Customer-Spending-on-E-Commerce-Websites/blob/main/PySparkMLib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h2 align="center"> Predicting Annual Customer Spending on E-Commerce Websites  using PySpark MLib

#### Importing packages

In [1]:
import pandas as pd
import numpy as np
import pandas_datareader as pdr
import datetime
import matplotlib.pyplot as plt
import requests
import json
from tqdm import tqdm
import seaborn as sns
import numpy.random as npr
import scipy.stats as scs
from scipy.stats import norm
import warnings
warnings.filterwarnings("ignore")
plt.style.use('fivethirtyeight')

from sklearn.metrics import mean_squared_error, r2_score

#### Create Spark session

In [2]:
# Install pyspark library
!pip install pyspark

# Create new Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Customers").getOrCreate()

# Print Spark session
spark



In [3]:
# from pyspark.ml.regression import LinearRegression

#### Load data

In [4]:
path_data_file = "https://raw.githubusercontent.com/krishnaik06/PysparkRegressions/master/Ecommerce_Customers.csv"

df_ecommerce = pd.read_csv(path_data_file)

df_ecommerce.head()

Unnamed: 0,Email,Address,Avg Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
0,mstephenson@fernandez.com,"835 Frank TunnelWrightmouth, MI 82180-9605",34.497268,12.655651,39.577668,4.082621,587.951054
1,hduke@hotmail.com,"4547 Archer CommonDiazchester, CA 06566-8576",31.926272,11.109461,37.268959,2.664034,392.204933
2,pallen@yahoo.com,"24645 Valerie Unions Suite 582Cobbborough, DC ...",33.000915,11.330278,37.110597,4.104543,487.547505
3,riverarebecca@gmail.com,"1414 David ThroughwayPort Jason, OH 22070-1220",34.305557,13.717514,36.721283,3.120179,581.852344
4,mstephens@davidson-herman.com,"14023 Rodriguez PassagePort Jacobville, PR 372...",33.330673,12.795189,37.536653,4.446308,599.406092


#### Add pandas dataframe to Spark cluster

In [5]:
# Create Spark temporary dataframe
spark_df_ecommerce = spark.createDataFrame(data = df_ecommerce)

# Add spark_df_ecommerce to the catalog
spark_df_ecommerce.createOrReplaceTempView(name="spark_df_ecommerce")

# Show all tables in the Spark catalog
print("List of tables in the Spark catalog : \n",spark.catalog.listTables())

# Describe dataset
print("\n Table schema : \n")
spark_df_ecommerce.printSchema()

# Print top 5 rows of the dataset uploaded to Spark catalog
print("\n")
spark.table(tableName="spark_df_ecommerce").show(5)

List of tables in the Spark catalog : 
 [Table(name='spark_df_ecommerce', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

 Table schema : 

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



+--------------------+--------------------+------------------+-----------+---------------+--------------------+-------------------+
|               Email|             Address|Avg Session Length|Time on App|Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+------------------+-----------+---------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|       34.49726773|12.65565115|    39.57766802|         4.08262063

#### Assembling a vector of independant feartures

In [6]:
from pyspark.ml.feature import VectorAssembler

fearture_assembler = VectorAssembler(inputCols = ['Avg Session Length', 'Time on App',
                                                  'Time on Website', 'Length of Membership'], 
                                     outputCol = 'Independant_Features')

dataset = fearture_assembler.transform(spark_df_ecommerce)

dataset = dataset.select(["Independant_Features", "Yearly Amount Spent"])

#### Split training and validation datasets

In [7]:
train, val = dataset.randomSplit([0.70, 0.30])

#### Fitting regressor 

In [8]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor

models = [("LinearRegressionModel", LinearRegression),
          ("DecisionTreeRegressor", DecisionTreeRegressor),
          ("RandomForestRegressor", RandomForestRegressor),
          ("GBTRegressor", GBTRegressor)]
model = []
rmse_score = []
regr = []

for m in tqdm(models):
  regressor = m[1](featuresCol="Independant_Features", labelCol="Yearly Amount Spent").fit(dataset = train)

  results = regressor.transform(val).toPandas()

  rmse_score.append(np.sqrt(mean_squared_error(results["Yearly Amount Spent"], 
                                               results["prediction"])))
  
  model.append(m[0])
  regr.append(m[1])

100%|██████████| 4/4 [00:25<00:00,  6.38s/it]


#### Comparing model performance leaderboard

In [9]:
pd.DataFrame([model, rmse_score, regr], index=["Model Fitted", "Root Mean Squared Error", "To_Save"]).T

Unnamed: 0,Model Fitted,Root Mean Squared Error,To_Save
0,LinearRegressionModel,9.93691,<class 'pyspark.ml.regression.LinearRegression'>
1,DecisionTreeRegressor,34.4644,<class 'pyspark.ml.regression.DecisionTreeRegr...
2,RandomForestRegressor,30.5318,<class 'pyspark.ml.regression.RandomForestRegr...
3,GBTRegressor,27.1546,<class 'pyspark.ml.regression.GBTRegressor'>
