# Multiple Linear Regression on Customer Data

## Agenda

* Business Understanding
* Data Understanding
* Data Preparation
* Exploratory Data Analysis
* Building a Linear Model
* Evaluation

### Business Understanding

#### Problem Statement

A large child education toy company which sells edutainment tablets and gaming systems
both online and in retail stores wanted to analyze the customer data. They are operating
from last few years and maintaining all transactional information data. The given data
‘CustomerData.csv’ is a sample of customer level data extracted and processed for the
analysis from various set of transactional files.

The objectives of today’s activity are :
* Building a regression model to predict the customer revenue based on other factors and understand the influence of other attributes on revenue

### Identify right Error Metrics

##### Error Metrics for Regression

* Mean Absolute Error (MAE):

$$MAE = \dfrac{1}{n}\times|\sum_{i = 1}^{n}y_{i} - \hat{y_{i}}|$$


* Mean Squared Error (MSE):

$$MSE = \dfrac{1}{n}\times(\sum_{i = 1}^{n}y_{i} - \hat{y_{i}})^2$$


* Root Mean Squared Error (RMSE):

$$RMSE = \sqrt{\dfrac{1}{n}\times(\sum_{i = 1}^{n}y_{i} - \hat{y_{i}})^2}$$


* Mean Absolute Percentage Error (MAPE):

$$MAPE = \dfrac{100}{n}\times\mid\dfrac{\sum_{i = 1}^{n}y_{i} - \hat{y_{i}}}{y_{i}}\mid$$


### Create SPARK_HOME and PYLIB env var and update PATH env var¶

### ### Initializing Spark

Build __SparkConf__ object 

    Contains information about your application.  


Create __SparkContext__ object 
    
    Tells Spark how to access a cluster. 
    

Create __SparkSession__ object

    The entry point to programming Spark with the Dataset and DataFrame API.

    Used to create DataFrame, register DataFrame as tables and execute SQL over tables etc.

In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

In [3]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [4]:
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content//spark-3.0.1-bin-hadoop2.7"

In [5]:
import findspark
findspark.init()

In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import * 

In [7]:
! pip install chart_studio

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting chart_studio
  Downloading chart_studio-1.1.0-py3-none-any.whl (64 kB)
[K     |████████████████████████████████| 64 kB 2.0 MB/s 
Collecting retrying>=1.3.3
  Downloading retrying-1.3.4-py3-none-any.whl (11 kB)
Installing collected packages: retrying, chart-studio
Successfully installed chart-studio-1.1.0 retrying-1.3.4


In [8]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
import pyspark
from pyspark.sql import SparkSession

In [10]:
import numpy as np
import pandas as pd
from io import StringIO

In [11]:
spark = SparkSession.builder.appName('SparkML_1').getOrCreate()

In [12]:
spark

In [13]:
#define schema
customerDataSchema = StructType([
                 StructField("CustomerID",StringType(),True),
                 StructField("City",StringType(),True),
                 StructField("NoOfChildren",DoubleType(),True),
                 StructField("MinAgeOfChild",DoubleType(),True),
                 StructField("MaxAgeOfChild",DoubleType(),True),
                 StructField("Tenure",DoubleType(),True),
                 StructField("FrquncyOfPurchase",DoubleType(),True),
                 StructField("NoOfUnitsPurchased",DoubleType(),True),
                 StructField("FrequencyOFPlay",DoubleType(),True),
                 StructField("NoOfGamesPlayed",DoubleType(),True),
                 StructField("NoOfGamesBought",DoubleType(),True),
                 StructField("FavoriteChannelOfTransaction",StringType(),True),
                 StructField("FavoriteGame",StringType(),True),
                 StructField("TotalRevenueGenerated",DoubleType(),True)
                 ])

#### Loading the data

In [15]:
## Read data and create a dataframe
customerDF = spark.read.format('csv')\
.load("/content/drive/My Drive/CustomerData (1).csv", schema=customerDataSchema)

### Data Understanding

In [16]:
# Print Schema
customerDF.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- City: string (nullable = true)
 |-- NoOfChildren: double (nullable = true)
 |-- MinAgeOfChild: double (nullable = true)
 |-- MaxAgeOfChild: double (nullable = true)
 |-- Tenure: double (nullable = true)
 |-- FrquncyOfPurchase: double (nullable = true)
 |-- NoOfUnitsPurchased: double (nullable = true)
 |-- FrequencyOFPlay: double (nullable = true)
 |-- NoOfGamesPlayed: double (nullable = true)
 |-- NoOfGamesBought: double (nullable = true)
 |-- FavoriteChannelOfTransaction: string (nullable = true)
 |-- FavoriteGame: string (nullable = true)
 |-- TotalRevenueGenerated: double (nullable = true)



Total number of Columns and Records

In [17]:
len(customerDF.columns)

14

In [18]:
customerDF.count()

3210

See the top rows of the data

In [19]:
customerDF.show(5)

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|        null|         null|         null|  null|             null|              null|           null|           null|           null|        FavoriteChannelOf...|FavoriteGame|                 null|
|      1001|   1|         2.0|          3.0|          8.0| 210.0|             11.0|              11.0|         2344.0|          108.0|      

Shows a quick statistic summary of your data using Describe

In [20]:
customerDF.describe().show()

+-------+----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+----------------------------+------------+---------------------+
|summary|      CustomerID|              City|      NoOfChildren|     MinAgeOfChild|    MaxAgeOfChild|           Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|   FrequencyOFPlay|  NoOfGamesPlayed|   NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+-------+----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+----------------------------+------------+---------------------+
|  count|            3210|              3210|              3209|              3209|             3209|             3209|             3209|              3209|             

In [21]:
customerDF.describe().select("NoOfChildren","MinAgeOfChild","MaxAgeOfChild","Tenure","FrquncyOfPurchase","NoOfUnitsPurchased","FrequencyOFPlay","NoOfGamesPlayed","TotalRevenueGenerated").show()

+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+---------------------+
|      NoOfChildren|     MinAgeOfChild|    MaxAgeOfChild|           Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|   FrequencyOFPlay|  NoOfGamesPlayed|TotalRevenueGenerated|
+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+---------------------+
|              3209|              3209|             3209|             3209|             3209|              3209|              3209|             3209|                 3209|
| 2.128388906201309|4.9607354315986285|7.990651293237769|347.5204113430975|  16.269554378311|14.684013711436585|1568.2078529136802| 93.6279214708632|    168.4771829230288|
|1.0350924535316555|3.7141911525693776|8.784084301356664|90.52011823796829|8.441670101163071| 7.182029169207206| 1810.630463829322|88.936371

Display the data type of each of the variable

In [22]:
customerDF.dtypes

[('CustomerID', 'string'),
 ('City', 'string'),
 ('NoOfChildren', 'double'),
 ('MinAgeOfChild', 'double'),
 ('MaxAgeOfChild', 'double'),
 ('Tenure', 'double'),
 ('FrquncyOfPurchase', 'double'),
 ('NoOfUnitsPurchased', 'double'),
 ('FrequencyOFPlay', 'double'),
 ('NoOfGamesPlayed', 'double'),
 ('NoOfGamesBought', 'double'),
 ('FavoriteChannelOfTransaction', 'string'),
 ('FavoriteGame', 'string'),
 ('TotalRevenueGenerated', 'double')]

### Data Preparation

#### Observations:
    1. City is interpreted as numeric (which is actually categorical) and FavouriteGame, FavouriteChannelOfTransaction are interpreted as objects.
    2. max age of children is 113 which must be a wrong entry
    3. Summary statistics for CustomerID is not meaningful

So we now change these appropriately i.e, convert city, favourite game and favourite channel to category, exclude customer id from the data for analysis and treat wrong entry records

##### Check and delete CustomerID attribute

#### Data type conversion 
    Using astype('category') convert 'City', 'FavoriteChannelOfTransaction', 'FavoriteGame' attributes to a categorical data type .

In [25]:
# Creating a list of categorical and numerical features
cat_Var_names = ["CustomerID","City","FavoriteChannelOfTransaction","FavoriteGame"]
num_var_names = ["NoOfChildren","MinAgeOfChild","MaxAgeOfChild","Tenure","FrquncyOfPurchase","NoOfUnitsPurchased","FrequencyOFPlay","NoOfGamesPlayed"]

In [26]:
customerDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in customerDF.columns]).show()

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|         0|   0|           1|            1|            1|     1|                1|                 1|              1|              1|              1|                           0|           0|                    1|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+------

#### Observe how many records have values 113 for age of children

In [27]:
customerDF.select('MaxAgeOfChild').where(customerDF.MaxAgeOfChild >=113).count()

20

Observe how many records have values 113 for age of children

#### Removing outliers

In [28]:
customerDF = customerDF.where((customerDF["MinAgeOfChild"] != 113) & (customerDF["MaxAgeOfChild"] != 113))

In [29]:
customerDF.count()

3189

#### Missing Data

pandas primarily uses the value np.nan to represent missing data. 

Check for missing value

    is.null() output boolean i.e. if missing value then true else false. 

    sum function counts 'true' thus gives total number of missing values

In [30]:
# Checking for null values at each column
df2 = customerDF.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in customerDF.columns])
df2.show()

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|         0|   0|           0|            0|            0|     0|                0|                 0|              0|              0|              0|                           0|           0|                    0|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+------

In this case there are no missing values. However if we find any missing values in the data, as a rule of thumb


    If the perticular row/column has more number of missing values then drop that perticular rows/column 
    
        e.g. To drop any rows that have missing data use data.dropna(axis=0, inplace=True) 
        
    Otherwise, impute/fill missing data based on domain knowledge or using imputation techniques
        
        e.g. To fill missing values with mean use data.fillna(data.mean(), inplace=True)      

In [31]:
# The NA values are considered as string values in order to make them null we are comverting the NA values to null values


In [32]:
customerDF.columns

['CustomerID',
 'City',
 'NoOfChildren',
 'MinAgeOfChild',
 'MaxAgeOfChild',
 'Tenure',
 'FrquncyOfPurchase',
 'NoOfUnitsPurchased',
 'FrequencyOFPlay',
 'NoOfGamesPlayed',
 'NoOfGamesBought',
 'FavoriteChannelOfTransaction',
 'FavoriteGame',
 'TotalRevenueGenerated']

In [33]:
customerDF.dtypes

[('CustomerID', 'string'),
 ('City', 'string'),
 ('NoOfChildren', 'double'),
 ('MinAgeOfChild', 'double'),
 ('MaxAgeOfChild', 'double'),
 ('Tenure', 'double'),
 ('FrquncyOfPurchase', 'double'),
 ('NoOfUnitsPurchased', 'double'),
 ('FrequencyOFPlay', 'double'),
 ('NoOfGamesPlayed', 'double'),
 ('NoOfGamesBought', 'double'),
 ('FavoriteChannelOfTransaction', 'string'),
 ('FavoriteGame', 'string'),
 ('TotalRevenueGenerated', 'double')]

### Train-Test Split

In [34]:
# Split the data into training and test sets (30% held out for testing)
(trainingData,testingData) = customerDF.randomSplit([0.7,0.3], seed = 1)

In [35]:
trainingData.show(5)

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|      1001|   1|         2.0|          3.0|          8.0| 210.0|             11.0|              11.0|         2344.0|          108.0|           10.0|                     Uniform|     Uniform|               107.51|
|      1002|   1|         2.0|          3.0|          6.0| 442.0|             20.0|              20.0|          245.0|           22.0|      

In [36]:
testingData.show(5)

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|      1005|   1|         3.0|          6.0|          9.0| 422.0|             44.0|              31.0|         1066.0|          102.0|           44.0|                     Uniform|     Uniform|               335.05|
|      1006|   1|         2.0|          3.0|          4.0| 378.0|             16.0|              16.0|          228.0|           12.0|      

### Use VectorAssembler to combine a given list of numcolumns into a single vector column.

In [37]:
from pyspark.ml.feature import VectorAssembler
vector_assembler_num_cols = VectorAssembler(inputCols=num_var_names , outputCol="num_features")

### Scaling numeric attributes using MinMaxScaler method

1. Scale all the numeric attributes using MinMaxScaler
2. MinMaxScaler transforms a dataset of Vector rows, rescaling each feature to a specific range (often [0, 1]). 
3. MinMaxScaler computes summary statistics on a data set and produces a MinMaxScalerModel.
4. The model can then transform each feature individually such that it is in the given range.

In [38]:
from pyspark.ml.feature import MinMaxScaler
min_max_Scalar_Num_cols = MinMaxScaler(inputCol="num_features",outputCol="scaled_num_features")

### Covert categorical to numeric: OneHotEncoder, StringIndexer, VectorAssembler,  VectorIndexer

In [39]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
indexers_Cat = [StringIndexer(inputCol=cat_var_name,outputCol="{0}_index".format(cat_var_name),handleInvalid = "keep")
                for cat_var_name in cat_Var_names]
encoders_Cat = [OneHotEncoder(inputCol=indexer.getOutputCol(),outputCol="{0}_vec".format(indexer.getInputCol())) for indexer in indexers_Cat]

assembler_Cat = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders_Cat],outputCol="cat_features")
assembler = VectorAssembler(inputCols=["scaled_num_features","cat_features"],outputCol="features")

In [40]:
indexer_Label = StringIndexer(inputCol="TotalRevenueGenerated",outputCol="label",handleInvalid="keep")

### Defining the pipeline

In [41]:
preprocessingStages = [vector_assembler_num_cols] + [min_max_Scalar_Num_cols] + indexers_Cat + encoders_Cat + [assembler_Cat] + [assembler] + [indexer_Label]

In [42]:
preprocessingStages

[VectorAssembler_8b71ca89d848,
 MinMaxScaler_5c4e363232b8,
 StringIndexer_132f7ce0195a,
 StringIndexer_79b83ec7d5af,
 StringIndexer_e37eb2b787a3,
 StringIndexer_6a496c34127e,
 OneHotEncoder_8e323ce72a70,
 OneHotEncoder_266ce1cd2398,
 OneHotEncoder_c0ef9a241c2c,
 OneHotEncoder_867251581d55,
 VectorAssembler_75c02ca9733a,
 VectorAssembler_678d4382abd2,
 StringIndexer_c71decb54b50]

### Model Building, Tuning and Evaluation

### Linear Regression

In [43]:
trainingData

DataFrame[CustomerID: string, City: string, NoOfChildren: double, MinAgeOfChild: double, MaxAgeOfChild: double, Tenure: double, FrquncyOfPurchase: double, NoOfUnitsPurchased: double, FrequencyOFPlay: double, NoOfGamesPlayed: double, NoOfGamesBought: double, FavoriteChannelOfTransaction: string, FavoriteGame: string, TotalRevenueGenerated: double]

In [44]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
linear =  LinearRegression(maxIter=10,labelCol="label",featuresCol="features")
linear_pipeline = Pipeline(stages=preprocessingStages+[linear])
linear_pipeline_model = linear_pipeline.fit(trainingData)

In [45]:
linear_pipeline_model.stages[-1].coefficients

DenseVector([16.2914, -27.6826, 11.2369, 50.9675, 568.2731, 962.3547, 256.8509, 124.6854, -211.2476, 780.8438, -310.5964, -224.8955, -412.7585, -303.9911, -296.0214, -139.0526, 638.4619, -205.8136, -281.8331, 411.0944, 548.5627, -228.0027, 552.5193, -368.7713, 809.6958, -320.4168, -37.1369, -287.2063, 410.5, 270.6446, -190.371, 362.066, -299.9536, 284.0196, -220.6092, -405.2558, -34.3255, -287.4461, -365.0101, 639.3687, 494.0267, -321.29, -176.6228, -43.0395, 414.8511, -295.6123, 461.1964, 219.2705, -78.046, 480.7967, 538.919, 781.559, 323.1658, -335.6091, 185.1366, -50.6309, -54.982, -119.3579, 17.8877, 94.4341, 181.1529, -113.0042, 72.5918, 349.9534, -156.6683, 179.1358, -61.5142, 702.3917, 220.8339, 520.9916, -121.5378, -74.284, 17.2199, -208.7054, -92.0024, 150.086, 7.418, -210.2086, 427.9219, 322.4041, 155.1596, -157.171, 93.6241, 422.1097, -111.1847, 409.3975, 92.1631, 417.3683, -122.6456, 81.7141, -272.1993, -334.5995, 50.0526, 617.0819, 277.1438, 420.5553, 208.7728, -194.708, 1

In [46]:
linear_pipeline_model.stages[-1].intercept

185.92850481737273

#### Predicting on train and test data

In [47]:
train_predictions_linear = linear_pipeline_model.transform(trainingData)

In [48]:
train_predictions_linear.select('features').show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                    |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(2258,[0,1,2,3,4,5,6,7,8,2252,2255,2256],[0.1,0.1875,0.25,0.2956989247311828,0.0847457627118644,0.09009009009009009,0.0842286823098207,0.14173228346456693,1.0,1.0,1.0,1.0])                                |
|(2258,[0,1,2,3,4,5,6,7,9,2252,2254,2256],[0.1,0.1875,0.15000000000000002,0.9193548387096775,0.16101694915254236,0.17117117117117117,0.008803765855761975,0.0288713910761154

In [49]:
train_predictions_linear.show()

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+--------------------+--------------------+----------------+----------+----------------------------------+------------------+-----------------+-------------+--------------------------------+----------------+--------------------+--------------------+------+--------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|        num_features| scaled_num_features|CustomerID_index|City_index|FavoriteChannelOfTransaction_index|FavoriteGame_index|   CustomerID_vec|     City_vec|FavoriteChannelOfTransaction_vec|FavoriteGame_vec|        cat_features|            features| label|          prediction|
+----------+----+-

In [50]:
test_predictions_linear = linear_pipeline_model.transform(testingData)

In [51]:
# Find the error metric - RMSE
from pyspark.ml.evaluation import RegressionEvaluator
predictionAndLabels_train_linear = train_predictions_linear.select("prediction","label")
evaluator = RegressionEvaluator(metricName="rmse")
train_rmse_linear = evaluator.evaluate(predictionAndLabels_train_linear)
print("Training Set RMSE  ",train_rmse_linear)

Training Set RMSE   0.0024697392740015665


In [52]:
from pyspark.ml.evaluation import RegressionEvaluator
predictionAndLabels_test_linear = test_predictions_linear.select("prediction","label")
evaluator = RegressionEvaluator(metricName="rmse")
test_rmse_linear = evaluator.evaluate(predictionAndLabels_test_linear)
print("Testing Set RMSE  ",test_rmse_linear)

Testing Set RMSE   621.6000753702514


In [53]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
DT =  DecisionTreeRegressor(maxDepth=6)
DT_pipeline = Pipeline(stages=preprocessingStages+[DT])
DT_pipeline_model = DT_pipeline.fit(trainingData)

In [54]:
train_predictions_DT = DT_pipeline_model.transform(trainingData)

In [55]:
train_predictions_DT.select('features').show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                    |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(2258,[0,1,2,3,4,5,6,7,8,2252,2255,2256],[0.1,0.1875,0.25,0.2956989247311828,0.0847457627118644,0.09009009009009009,0.0842286823098207,0.14173228346456693,1.0,1.0,1.0,1.0])                                |
|(2258,[0,1,2,3,4,5,6,7,9,2252,2254,2256],[0.1,0.1875,0.15000000000000002,0.9193548387096775,0.16101694915254236,0.17117117117117117,0.008803765855761975,0.0288713910761154

In [56]:
train_predictions_DT.show()

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+--------------------+--------------------+----------------+----------+----------------------------------+------------------+-----------------+-------------+--------------------------------+----------------+--------------------+--------------------+------+------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|        num_features| scaled_num_features|CustomerID_index|City_index|FavoriteChannelOfTransaction_index|FavoriteGame_index|   CustomerID_vec|     City_vec|FavoriteChannelOfTransaction_vec|FavoriteGame_vec|        cat_features|            features| label|        prediction|
+----------+----+-----

In [57]:
test_predictions_DT = DT_pipeline_model.transform(testingData)

In [58]:
test_predictions_DT.show()

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+--------------------+--------------------+----------------+----------+----------------------------------+------------------+--------------+-------------+--------------------------------+----------------+--------------------+--------------------+------+------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|        num_features| scaled_num_features|CustomerID_index|City_index|FavoriteChannelOfTransaction_index|FavoriteGame_index|CustomerID_vec|     City_vec|FavoriteChannelOfTransaction_vec|FavoriteGame_vec|        cat_features|            features| label|        prediction|
+----------+----+-----------

In [59]:
from pyspark.ml.evaluation import RegressionEvaluator
predictionAndLabels_train_DT = train_predictions_DT.select("prediction","label")
evaluator = RegressionEvaluator(metricName="rmse")
train_rmse_DT = evaluator.evaluate(predictionAndLabels_train_DT)
print("Training Set RMSE  ",train_rmse_DT)

Training Set RMSE   318.5604353933582


In [60]:
from pyspark.ml.evaluation import RegressionEvaluator
predictionAndLabels_test_DT = test_predictions_DT.select("prediction","label")
evaluator = RegressionEvaluator(metricName="rmse")
test_rmse_DT = evaluator.evaluate(predictionAndLabels_test_linear)
print("Testing Set RMSE  ",test_rmse_DT)

Testing Set RMSE   621.6000753702514


### Tuning LR Model

In [61]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

In [62]:
# Defining the grid parameters and Cross validator
grid = ParamGridBuilder().addGrid(linear.maxIter, [1,2,3,4,5]).build()

In [63]:
# Run cross-validation, and choose the best set of parameters.
cv = CrossValidator(estimator=linear, estimatorParamMaps=grid, evaluator=evaluator,parallelism=2)
cvModel = cv.fit(train_predictions_linear.drop("prediction"))
cvModel.getNumFolds()

3

In [64]:
cvModel.avgMetrics[0]

402.8045804707764

In [65]:
# Predicting on train and test data using cross validation model
cvModel_preds = cvModel.transform(train_predictions_linear.drop("prediction"))

In [66]:
cvModel_preds = cvModel.transform(test_predictions_linear.drop("prediction"))

In [67]:
# Evaluating the model
evaluator.evaluate(cvModel.transform(test_predictions_linear.drop("prediction")))

617.4804366754123

#Here we dont find the KNN code in PYSPARK, This is the server link;-https://spark.apache.org/docs/3.0.1/mllib-classification-regression.html