# <center> **M3.2 ELT and Spark SQL**</center>

### <p style="color:purple;"> Submission by Team Supreme : Raghuveer Karrotu , Vinaya Rajaram Nayak, Arivarasan Ramasamy, Gayathri Shanmuga Sundaram 
 
### <p style="color:purple;"> Assignment Description : 
### <p style="color:purple;">Load the data into a spark dataframe , Show the schema, and make any necessary changes to the data schema, Conduct any transformations, Store the data into a persistent table and Create a temp view of the data

# <center> <p style="color:green;"> **Holiday Package Analysis**</p>  </center>  
#### <p style="color:cyan;">Aim: To predict which customer is more likely to purchase the newly introduced travel package so that company could make its marketing expenditure more efficient.  </p> 

#### <p style="color:cyan;">Dataset Description : The dataset we use in this analysis is from Kaggle which was obtained from “Travel.com” website. Our dataset includes various customer demographics and company features like the following 
##### CustomerID : Unique customer Id
##### ProdTaken : This our target variable which says whether the customer purchased the product pitched.
##### Age : Age of the customer 
##### TypeofContact : How customer was contacted (Company Invited or Self Inquiry) 
##### CityTier : City tier depends on the development of a city, population, facilities, and living standards. 
##### DurationOfPitch : Duration of the pitch by a salesperson to the customer,
##### Occupation : Occupation of the customer
##### Gender : Gender of the customer
##### NumberOfPersonVisiting : Total number of persons planning to take the trip with the customer
##### NumberOfFollowups : Total number of follow-ups has been done by the salesperson after the sales pitch
##### ProductPitched : Product pitched by the salesperson
##### PreferredPropertyStar : Preferred hotel property rating by customer
##### MaritalStatus : Marital status of customer
##### NumberOfTrips : Average number of trips in a year by customer
##### Passport : The customer has a passport or not (0: No, 1: Yes)
##### PitchSatisfactionScore : Sales pitch satisfaction score
##### OwnCar : Whether the customers own a car or not (0: No, 1: Yes)
##### NumberOfChildrenVisiting : Total number of children with age less than 5 planning to take the trip with the customer
##### Designation : Designation of the customer in the current organization
##### MonthlyIncome : Gross monthly income of the customer
</p> 


In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession;

spark = SparkSession.builder.master("local[4]").appName("ISM6562 Spark Assignment App").getOrCreate();

# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext  

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 01:40:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session WebUI Port: 4040


In [2]:
# this will set the log level to ERROR. This will hide the INFO or WARNING messages that are printed out by default. If you want to see them, set this to INFO or WARN.
sc.setLogLevel("ERROR") 

In [3]:
spark

## Loading our data into spark dataframe. 

In [4]:
# Load CSV file
df_spark = spark.read.csv("Travel.csv", header=True, inferSchema=True)
df_spark.show()

                                                                                

+----------+---------+----+---------------+--------+---------------+--------------+------+----------------------+-----------------+--------------+---------------------+-------------+-------------+--------+----------------------+------+------------------------+--------------+-------------+
|CustomerID|ProdTaken| Age|  TypeofContact|CityTier|DurationOfPitch|    Occupation|Gender|NumberOfPersonVisiting|NumberOfFollowups|ProductPitched|PreferredPropertyStar|MaritalStatus|NumberOfTrips|Passport|PitchSatisfactionScore|OwnCar|NumberOfChildrenVisiting|   Designation|MonthlyIncome|
+----------+---------+----+---------------+--------+---------------+--------------+------+----------------------+-----------------+--------------+---------------------+-------------+-------------+--------+----------------------+------+------------------------+--------------+-------------+
|    200000|        1|  41|   Self Enquiry|       3|              6|      Salaried|Female|                     3|                3

# Data Exploration and transformations 

In [5]:
df_spark.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- ProdTaken: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- TypeofContact: string (nullable = true)
 |-- CityTier: integer (nullable = true)
 |-- DurationOfPitch: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- NumberOfPersonVisiting: integer (nullable = true)
 |-- NumberOfFollowups: integer (nullable = true)
 |-- ProductPitched: string (nullable = true)
 |-- PreferredPropertyStar: integer (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- NumberOfTrips: integer (nullable = true)
 |-- Passport: integer (nullable = true)
 |-- PitchSatisfactionScore: integer (nullable = true)
 |-- OwnCar: integer (nullable = true)
 |-- NumberOfChildrenVisiting: integer (nullable = true)
 |-- Designation: string (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)



# Visualizing data

import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# Assuming df_spark is the Spark DataFrame containing the column of interest
# Convert Spark DataFrame to Pandas DataFrame
df_pandas = df_spark.toPandas()

sns.countplot(x='DurationOfPitch', data=df_pandas) # checking distribution of "durationofpitch"

# Finding missing values

In [6]:
from pyspark.sql.functions import col

# Find columns with missing values
columns_with_missing_values = [column for column in df_spark.columns if df_spark.filter(col(column).isNull()).count() > 0]

# Print columns with missing values
print("Columns with missing values:")
for column in columns_with_missing_values:
    print(column)


Columns with missing values:
Age
TypeofContact
DurationOfPitch
NumberOfFollowups
PreferredPropertyStar
NumberOfTrips
NumberOfChildrenVisiting
MonthlyIncome


In [7]:
from pyspark.sql.functions import col

# Group by the column and apply the count() function
count_df = df_spark.groupBy("TypeofContact").count()

# Show the resulting counts
count_df.show()

+---------------+-----+
|  TypeofContact|count|
+---------------+-----+
|           null|   25|
|   Self Enquiry| 3444|
|Company Invited| 1419|
+---------------+-----+



# Imputing missing values

Since, TypeofContact column is in string object, imputing with the most frequent value

In [8]:
df_spark = df_spark.fillna("Self Enquiry", subset=["TypeofContact"])

Imputing with missing value with median value for numeric values

In [9]:
from pyspark.sql.functions import col
from pyspark.sql.functions import percentile_approx
from pyspark.sql.functions import when

# Iterate over columns with missing values
for column in columns_with_missing_values:
    # Calculate median of the column
    median_value = df_spark.select(column).agg(percentile_approx(column, 0.5)).collect()[0][0]
    if median_value is not None:
    # Round median_value to nearest integer
        median_value_rounded = int(round(median_value))
    else:
        median_value_rounded = 0
    
    # Impute missing values with median value
    df_spark = df_spark.withColumn(column, when(col(column).isNull(), median_value_rounded).otherwise(col(column)))


In [10]:
# verifying if all missing values were imputed
columns_with_missing_values = [column for column in df_spark.columns if df_spark.filter(col(column).isNull()).count() > 0]

# Print columns with missing values
print("Columns with missing values:")
for column in columns_with_missing_values:
    print(column)

Columns with missing values:


In [11]:
# Export DataFrame to CSV

import shutil
import os

output_folder = "processedfolder"

# check if the output folder already exists
if os.path.isdir(output_folder):
    # remove the folder if it already exists
    shutil.rmtree(output_folder)

df_spark.write.csv(output_folder)

In [12]:
df_spark.show(3)

+----------+---------+---+---------------+--------+---------------+-----------+------+----------------------+-----------------+--------------+---------------------+-------------+-------------+--------+----------------------+------+------------------------+-----------+-------------+
|CustomerID|ProdTaken|Age|  TypeofContact|CityTier|DurationOfPitch| Occupation|Gender|NumberOfPersonVisiting|NumberOfFollowups|ProductPitched|PreferredPropertyStar|MaritalStatus|NumberOfTrips|Passport|PitchSatisfactionScore|OwnCar|NumberOfChildrenVisiting|Designation|MonthlyIncome|
+----------+---------+---+---------------+--------+---------------+-----------+------+----------------------+-----------------+--------------+---------------------+-------------+-------------+--------+----------------------+------+------------------------+-----------+-------------+
|    200000|        1| 41|   Self Enquiry|       3|              6|   Salaried|Female|                     3|                3|        Deluxe|         

# Store the data into a persistent table and create a temp view of the data


In [13]:
output_folder = "spark-warehouse/travel.db"

# check if the output folder already exists
if os.path.isdir(output_folder):
    # remove the folder if it already exists
    shutil.rmtree(output_folder)

df_spark.write.csv(output_folder)


# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS Travel")

# Use the database
spark.sql("USE Travel")



DataFrame[]

In [14]:
# Store the data into a persistent table in the Travel db
#df_spark.write.mode("ignore").saveAsTable("travel_information")

spark.sql("DROP TABLE IF EXISTS travel_information")

df_spark.write.saveAsTable("travel_information", mode='overwrite')


                                                                                

In [15]:
# Create a temporary view of the data
df_spark.createOrReplaceTempView("travel_information_view")

In [16]:
# Verify if the table exists in the created database
check = spark.sql("SHOW TABLES")
if check.filter(check.tableName == "travel_information_view").count() > 0:
    print("Table exists in the created database.")

Table exists in the created database.


# Now let us find some insights by using the aggregation.
#### <p style="color:orange;"> 1. The average age of customers </p>
<p style="color:orange;">First we are interested to know the average age of customers </p>


In [17]:
avg_age_result = spark.sql("SELECT AVG(Age) as avg_age FROM travel_information_view").show()

+-----------------+
|          avg_age|
+-----------------+
|37.54725859247136|
+-----------------+



#### <p style="color:orange;"> 2. Occupation wise display </p>
<p style="color:orange;">Next let us calculate  the number of customers who have taken a product in  each occupation. We calculate the total number of customers and no of customers who bought the product in each occupation.</p>


In [18]:
total_result = spark.sql("""
    SELECT Occupation, COUNT(*) AS total_customers, SUM(ProdTaken) AS customers_taken_product
    FROM travel_information_view
    GROUP BY Occupation
""").show()

+--------------+---------------+-----------------------+
|    Occupation|total_customers|customers_taken_product|
+--------------+---------------+-----------------------+
|      Salaried|           2368|                    414|
|Large Business|            434|                    120|
|   Free Lancer|              2|                      2|
|Small Business|           2084|                    384|
+--------------+---------------+-----------------------+



#### <p style="color:orange;"> 3. Occupation wise average age and monthly income  </p>
<p style="color:orange;">Now let us calculate the average age and monthly income for each occupation. We have grouped the avg age and monthlyincome by occupation for the below results </p> 


In [19]:
ocupation_results = spark.sql("""WITH cte AS (
    SELECT Occupation, AVG(Age) AS avg_age, AVG(`MonthlyIncome`) AS avg_monthly_income
    FROM travel_information_view
    GROUP BY Occupation
)
SELECT Occupation, avg_age, avg_monthly_income
FROM cte""").show()

+--------------+------------------+------------------+
|    Occupation|           avg_age|avg_monthly_income|
+--------------+------------------+------------------+
|      Salaried|37.569679054054056| 23591.93918918919|
|Large Business| 36.60829493087557|22859.873271889403|
|   Free Lancer|              37.5|           18929.0|
|Small Business| 37.71737044145873|23672.031669865642|
+--------------+------------------+------------------+



#### <p style="color:orange;"> 4. Percentage of customers who bought the package pitched. </p>
<p style="color:orange;">Finally let us calculate the the percentage of customers who have taken a product based on the product pitched. For this we selected the product pitched, count of customers and sum of the prodtaken and then grouped by product pitched to get the necessary results </p>  


In [20]:
purchased_percent_result = spark.sql("""
    SELECT ProductPitched, COUNT(*) AS total_customers, SUM(ProdTaken) AS customers_taken_product,
           (SUM(ProdTaken) / COUNT(*) * 100) AS percentage_taken_product
    FROM travel_information_view
    GROUP BY ProductPitched
    ORDER BY percentage_taken_product DESC
""").show()

+--------------+---------------+-----------------------+------------------------+
|ProductPitched|total_customers|customers_taken_product|percentage_taken_product|
+--------------+---------------+-----------------------+------------------------+
|         Basic|           1842|                    552|      29.967426710097723|
|      Standard|            742|                    124|      16.711590296495956|
|        Deluxe|           1732|                    204|      11.778290993071593|
|          King|            230|                     20|       8.695652173913043|
|  Super Deluxe|            342|                     20|       5.847953216374268|
+--------------+---------------+-----------------------+------------------------+



## Check for data imbalance

In [21]:
'''ProdTaken_result = spark.sql("""
    SELECT ProdTaken, COUNT(*) AS total_customers
    FROM travel_information_view
    GROUP BY ProdTaken
""").show()'''

prodTakenCounts = df_spark.groupBy("ProdTaken").count()
prodTakenCounts.show()

+---------+-----+
|ProdTaken|count|
+---------+-----+
|        1|  920|
|        0| 3968|
+---------+-----+



We can see that customer who taken Product count(920) is less than and customers who did not taken the product. This is not balanced dataset. If we train any ML model with this unbalanced dataset, it will give a biased result towwards the major category. To fix this, we are trying to oversample the minority category data.  

## Over sampling 

In [22]:
# Import necessary libraries
from pyspark.sql.functions import col
from pyspark.sql.functions import rand

ProdTakenCount = prodTakenCounts.orderBy(col("count").asc()).first()[1]

ProdnotTakenCount = prodTakenCounts.orderBy(col("count").desc()).first()[1]

# Calculate the desired number of samples for the minority class
diffrence_count = ProdnotTakenCount - ProdTakenCount

oversampleRatio = round(float(diffrence_count) / float(ProdTakenCount), 2)
 
Prodtaken_df = df_spark.filter(df_spark.ProdTaken == 1)

# Select  random rows from Prodtaken_df 
sample_df = Prodtaken_df.sample(withReplacement = True, fraction=oversampleRatio) # it generated little extra sample than necessary due to fraction input value
sample_df = sample_df.orderBy(rand()).limit(diffrence_count) # selecting exact count sample to balance the diffrence 

df_spark = df_spark.union(sample_df)


In [23]:
# verifying if the data is balanced 

prodTakenCounts = df_spark.groupBy("ProdTaken").count()

prodTakenCounts.show()

+---------+-----+
|ProdTaken|count|
+---------+-----+
|        1| 3968|
|        0| 3968|
+---------+-----+



# Metric of Evaluation - Recall

This is a binary class classification problem with target column- "ProdTaken"- (0/1)

False Negatives- The model predicts that the customer does not buy the product but in reality, the customer purchases the product.

False Positives-  The model predicts that the customer purchases the product but in reality, he/she does not.

In this problem, the revenue brought in by a customer purchasing the product is much more than the cost of marketing. So, we cannot afford to lose a potential customer and ready to trade-off some marketing cost on non-potential customers.  
In other words, we cannot afford False Negatives. 


Since, recall is sensitive to False Negatives,"Recall" is the metric of evaluation for this problem. 




# Data Preparation for Modeling

In [24]:
data=df_spark.select([
    'Age',
    'TypeofContact',
    'CityTier',
    'ProdTaken',
    'DurationOfPitch',
    'Occupation',
    'Gender',
    'ProductPitched',
    'NumberOfFollowups',
    'MaritalStatus',
    'PitchSatisfactionScore',
    'OwnCar',
    'Designation',
    'NumberOfChildrenVisiting',
    'MonthlyIncome'
    ]
)
data=data.dropna()

# picking relevant columns for analysis

In [25]:
train_data,test_data=data.randomSplit([0.7,0.3])
#splitting train and test data

In [26]:
data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- TypeofContact: string (nullable = false)
 |-- CityTier: integer (nullable = true)
 |-- ProdTaken: integer (nullable = true)
 |-- DurationOfPitch: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ProductPitched: string (nullable = true)
 |-- NumberOfFollowups: integer (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- PitchSatisfactionScore: integer (nullable = true)
 |-- OwnCar: integer (nullable = true)
 |-- Designation: string (nullable = true)
 |-- NumberOfChildrenVisiting: integer (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)



In [27]:
from pyspark.ml.feature import VectorAssembler,StringIndexer,StandardScaler ,OneHotEncoder
from pyspark.ml import Pipeline

In [28]:
TypeofContact_indexer = StringIndexer(inputCol='TypeofContact',outputCol='TypeofContact_index',handleInvalid='keep')
Occupation_indexer = StringIndexer(inputCol='Occupation',outputCol='Occupation_index',handleInvalid='keep')
Gender_indexer = StringIndexer(inputCol='Gender',outputCol='Gender_index',handleInvalid='keep')
ProductPitched_indexer = StringIndexer(inputCol='ProductPitched',outputCol='ProductPitched_index',handleInvalid='keep')
MaritalStatus_indexer = StringIndexer(inputCol='MaritalStatus',outputCol='MaritalStatus_index',handleInvalid='keep')
Designation_indexer = StringIndexer(inputCol='Designation',outputCol='Designation_index',handleInvalid='keep')

In [29]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
    inputCols=[
        'TypeofContact_index',
        'Occupation_index',
        'Gender_index',
        'ProductPitched_index',
        'MaritalStatus_index',
        'Designation_index',
    ], 
    outputCols= [
        'TypeofContact_vec',
        'Occupation_vec',
        'Gender_vec',
        'ProductPitched_vec',
        'MaritalStatus_vec',
        'Designation_vec'],
    handleInvalid='keep'
)

In [30]:
assembler = VectorAssembler(
    inputCols=[
        'Age',
        'TypeofContact_vec',
        'CityTier',
        'Occupation_vec',
        'Gender_vec',
        'ProductPitched_vec',
        'NumberOfFollowups',
        'MaritalStatus_vec',
        'Designation_vec',
        'DurationOfPitch',
        'PitchSatisfactionScore',
        'OwnCar',
        'NumberOfChildrenVisiting',
        'MonthlyIncome'
    ],
    outputCol="features"
)


## Model 1 - Logistic Regression

In [31]:
from pyspark.ml.classification import LogisticRegression
lreg_model = LogisticRegression(labelCol='ProdTaken')

In [32]:
from pyspark.ml import Pipeline

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data
# https://spark.apache.org/docs/latest/ml-pipeline.html
 
pipe = Pipeline(stages=[
    TypeofContact_indexer,
    Occupation_indexer,
    Gender_indexer,
    ProductPitched_indexer,
    MaritalStatus_indexer,
    Designation_indexer,
    encoder,
    assembler,
    lreg_model,
    ]
)

In [33]:
# run the pipeline
fit_model=pipe.fit(train_data)

# Store the results in a dataframe
result = fit_model.transform(test_data)

                                                                                

In [34]:
result.select(['ProdTaken','prediction']).show()

+---------+----------+
|ProdTaken|prediction|
+---------+----------+
|        1|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       1.0|
|        0|       1.0|
|        0|       1.0|
|        0|       1.0|
|        1|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       1.0|
|        0|       1.0|
|        0|       1.0|
|        0|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       1.0|
+---------+----------+
only showing top 20 rows



#### Model evaluation- Logistic regression

##### Area under the ROC

In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='ProdTaken',metricName='areaUnderROC')

AUC = AUC_evaluator.evaluate(result)

In [36]:
print("The area under the curve is {}".format(AUC))

The area under the curve is 0.6867103109656301


##### Area under the PR

In [37]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='ProdTaken',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(result)

In [38]:

print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.6485460292927681


##### Accuracy

In [39]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

ACC_evaluator = MulticlassClassificationEvaluator(  #  Multiclass or Binary, the accuracy is calculated in the same way.
    labelCol="ProdTaken", predictionCol="prediction", metricName="accuracy")

accuracy = ACC_evaluator.evaluate(result)

In [40]:
print("The accuracy of the model is {}".format(accuracy))

The accuracy of the model is 0.686691697955778


##### Confusion Matrix

In [41]:
from sklearn.metrics import confusion_matrix

In [42]:
y_true = result.select("ProdTaken")
y_true = y_true.toPandas()
 
y_pred = result.select("prediction")
y_pred = y_pred.toPandas()
 
cnf_matrix = confusion_matrix(y_true, y_pred)

print("Below is the confusion matrix \n {}".format(cnf_matrix))


Below is the confusion matrix 
 [[838 384]
 [367 808]]


In [43]:
print("Below is the confusion matrix \n {}".format(cnf_matrix))

Below is the confusion matrix 
 [[838 384]
 [367 808]]


In [44]:
tn = cnf_matrix[0][0]
fp = cnf_matrix[0][1]
fn = cnf_matrix[1][0]
tp = cnf_matrix[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)



In [45]:
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1_score:.2f}")


Accuracy: 0.69
Precision: 0.68
Recall: 0.69
F1 Score: 0.68


## Model-2 Decision Trees

In [46]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline

In [47]:
dt_model = DecisionTreeClassifier(labelCol='ProdTaken',maxBins=5000)

In [48]:
pipe_dec = Pipeline(stages=[
    TypeofContact_indexer,
    Occupation_indexer,
    Gender_indexer,
    ProductPitched_indexer,
    MaritalStatus_indexer,
    Designation_indexer,
    encoder,
    assembler,
    dt_model
    ]
)

In [49]:
fit_model_dec=pipe_dec.fit(train_data)

In [50]:
results_dec = fit_model_dec.transform(test_data)
results_dec.show(3)

+---+---------------+--------+---------+---------------+--------------+------+--------------+-----------------+-------------+----------------------+------+-----------+------------------------+-------------+-------------------+----------------+------------+--------------------+-------------------+-----------------+-----------------+--------------+-------------+------------------+-----------------+---------------+--------------------+-------------+--------------------+----------+
|Age|  TypeofContact|CityTier|ProdTaken|DurationOfPitch|    Occupation|Gender|ProductPitched|NumberOfFollowups|MaritalStatus|PitchSatisfactionScore|OwnCar|Designation|NumberOfChildrenVisiting|MonthlyIncome|TypeofContact_index|Occupation_index|Gender_index|ProductPitched_index|MaritalStatus_index|Designation_index|TypeofContact_vec|Occupation_vec|   Gender_vec|ProductPitched_vec|MaritalStatus_vec|Designation_vec|            features|rawPrediction|         probability|prediction|
+---+---------------+--------+----

In [51]:
results_dec.select(['ProdTaken','prediction']).show(10)

+---------+----------+
|ProdTaken|prediction|
+---------+----------+
|        1|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       1.0|
|        0|       1.0|
|        0|       1.0|
|        0|       1.0|
|        1|       1.0|
|        1|       1.0|
|        1|       1.0|
+---------+----------+
only showing top 10 rows



In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [53]:
ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="ProdTaken", predictionCol="prediction", metricName="accuracy")

accuracy = ACC_evaluator.evaluate(results_dec)

print(f"The accuracy of the decision tree classifier is {accuracy}")

The accuracy of the decision tree classifier is 0.7125573633708803


In [54]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='ProdTaken',metricName='areaUnderROC')

AUC = AUC_evaluator.evaluate(results_dec)
print("The area under the curve is {}".format(AUC))

The area under the curve is 0.7113093289689034


In [55]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='ProdTaken',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(results_dec)
print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.6915070479025424


In [56]:
y_true_dec = results_dec.select("ProdTaken")
y_true_dec = y_true_dec.toPandas()
 
y_pred_dec = results_dec.select("prediction")
y_pred_dec = y_pred_dec.toPandas()
 
cnf_matrix_dec = confusion_matrix(y_true_dec, y_pred_dec)

print("Below is the confusion matrix \n {}".format(cnf_matrix_dec))


Below is the confusion matrix 
 [[947 275]
 [414 761]]


In [57]:
tn = cnf_matrix_dec[0][0]
fp = cnf_matrix_dec[0][1]
fn = cnf_matrix_dec[1][0]
tp = cnf_matrix_dec[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1_score:.2f}")

Accuracy: 0.71
Precision: 0.73
Recall: 0.65
F1 Score: 0.69


## Model-3 Support Vector Machines

In [58]:
from pyspark.ml.classification import LinearSVC

In [59]:
# Scaling to normalize the data input to svm

scaler = StandardScaler(inputCol="features",outputCol="scaled_features")

In [60]:
svm_model = LinearSVC(labelCol='ProdTaken',featuresCol="scaled_features")

In [61]:
pipe_svm = Pipeline(stages=[
    TypeofContact_indexer,
    Occupation_indexer,
    Gender_indexer,
    ProductPitched_indexer,
    MaritalStatus_indexer,
    Designation_indexer,
    encoder,
    assembler,
    scaler,
    svm_model,
    ]
)

In [62]:
fit_model=pipe_svm.fit(train_data)

In [63]:
results_svm = fit_model.transform(test_data)
display(results_svm)

DataFrame[Age: int, TypeofContact: string, CityTier: int, ProdTaken: int, DurationOfPitch: int, Occupation: string, Gender: string, ProductPitched: string, NumberOfFollowups: int, MaritalStatus: string, PitchSatisfactionScore: int, OwnCar: int, Designation: string, NumberOfChildrenVisiting: int, MonthlyIncome: int, TypeofContact_index: double, Occupation_index: double, Gender_index: double, ProductPitched_index: double, MaritalStatus_index: double, Designation_index: double, TypeofContact_vec: vector, Occupation_vec: vector, Gender_vec: vector, ProductPitched_vec: vector, MaritalStatus_vec: vector, Designation_vec: vector, features: vector, scaled_features: vector, rawPrediction: vector, prediction: double]

In [64]:
results_svm.select(['ProdTaken','prediction']).show(5)

+---------+----------+
|ProdTaken|prediction|
+---------+----------+
|        1|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       1.0|
|        0|       1.0|
+---------+----------+
only showing top 5 rows



In [65]:
ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="ProdTaken", predictionCol="prediction", metricName="accuracy")

accuracy = ACC_evaluator.evaluate(results_svm)

print(f"The accuracy of the decision tree classifier is {accuracy}")

The accuracy of the decision tree classifier is 0.688360450563204


In [66]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='ProdTaken',metricName='areaUnderROC')

AUC = AUC_evaluator.evaluate(results_svm)
print("The area under the curve is {}".format(AUC))

The area under the curve is 0.6888870703764322


In [67]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='ProdTaken',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(results_svm)
print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.645005858599166


In [68]:
y_true_svm = results_svm.select("ProdTaken")
y_true_svm = y_true_svm.toPandas()
 
y_pred_svm = results_svm.select("prediction")
y_pred_svm = y_pred_svm.toPandas()
 
cnf_matrix_svm = confusion_matrix(y_true_svm, y_pred_svm)

print("Below is the confusion matrix \n {}".format(cnf_matrix_svm))


Below is the confusion matrix 
 [[809 413]
 [334 841]]


In [80]:
tn = cnf_matrix_svm[0][0]
fp = cnf_matrix_svm[0][1]
fn = cnf_matrix_svm[1][0]
tp = cnf_matrix_svm[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1_score:.2f}")

Accuracy: 0.68
Precision: 0.67
Recall: 0.69
F1 Score: 0.68


# Model-4 Random Forest

In [88]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml.classification import RandomForestClassifier

In [89]:
# Define Random Forest Classifier and its hyperparameters for tuning
rf = RandomForestClassifier(
    labelCol="ProdTaken",
    featuresCol="features",
    seed=42
)

In [90]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Define evaluator for binary classification and multiclass classification
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="ProdTaken", metricName="areaUnderROC")
evaluator_pr = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="ProdTaken", metricName="areaUnderPR")
evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="ProdTaken", metricName="accuracy")

# Define cross-validator for hyperparameter tuning
cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_auc,
    numFolds=5,
    seed=42
)

In [91]:
pipe_rf= Pipeline(stages=[
    TypeofContact_indexer,
    Occupation_indexer,
    Gender_indexer,
    ProductPitched_indexer,
    MaritalStatus_indexer,
    Designation_indexer,
    encoder,
    assembler,
    cv
    ]
)

In [98]:
# Fit pipeline to training data
model_rf = pipe_rf.fit(train_data)

# Predict on test data
predictions_rf = model_rf.transform(test_data)



In [104]:
accuracy = evaluator_acc.evaluate(predictions_rf)

print(f"The accuracy of the Random Forest classifier is {accuracy}")

The accuracy of the Random Forest classifier is 0.950354609929078


In [100]:
# Print evaluation metrics of the model
auc = evaluator_auc.evaluate(predictions_rf)


print("The area under the curve is {}".format(auc))



The area under the curve is 0.950949263502455


In [101]:
pr = evaluator_pr.evaluate(predictions_rf)

print("The area under the PR curve is {}".format(pr))

The area under the PR curve is 0.9183538356278682


In [102]:
y_true_rf = predictions_rf.select("ProdTaken")
y_true_rf = y_true_rf.toPandas()
 
y_pred_rf = predictions_rf.select("prediction")
y_pred_rf = y_pred_rf.toPandas()
 
cnf_matrix_rf = confusion_matrix(y_true_rf, y_pred_rf)

print("Below is the confusion matrix \n {}".format(cnf_matrix_rf))


Below is the confusion matrix 
 [[1125   97]
 [  22 1153]]


In [103]:
tn = cnf_matrix_rf[0][0]
fp = cnf_matrix_rf[0][1]
fn = cnf_matrix_rf[1][0]
tp = cnf_matrix_rf[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1_score:.2f}")

Accuracy: 0.95
Precision: 0.92
Recall: 0.98
F1 Score: 0.95


## Conclusion:


The following four models were trained for the prediction task-

1. Logistic Regression
2. Decision Tree
3. Support Vector Machines
4. Random Forest

Out of these, the ensemble model- "Random Forest with hyperparameter tuning" performed the best with a recall score of 98%.

So, this model can be used by the company to predict the potential purchasers of their "new package" based on the available customer data and subsequently tailor their marketing campaign to reduce the marketing costs and/or improve customer retention. 

