### **CHURN PREDICTION WITH PYSPARK**

##### **Business Problem:** The objective is to build a machine learning model to predict the customers that will leave the company.

##### **Dataset Story:**
* Comprises of 10.000 observations and 12 variables.
* Independent variables include information on customers.
* Dependent variable (exited) indicates the churn situation of the customer, in other words; represents whether the customer left the company (bank) or not. 

##### **Attributes:**
* RowNumber: Number of rows in the dataframe
* CustomerId: ID numbers of customers
* Surname: Surname of the customer
* CreditScore: Credit score of the customer
* Geography: Location (country) of the customer
* Gender: Gender of the customer 
* Age: Age of the customer
* Tenure: The number of years that the customer has been a client
* Balance: The amount of money owed (or due) on customer's account
* NumOfProducts: The number of products that a customer has purchased through the bank
* HasCrCard: Represents whether a customer has credit card or not
* IsActiveMember: Represents whether a customer is an active member or not 
* EstimatedSalary: Estimated salary of the customer
* Exited: Represents whether customer left the company or not.

<font color = 'black'>
Content: 

1. [Exploratory Data Analysis](#1)
1. [Feature Engineering](#2)
1. [Data Pre-Processing](#3)
    * 3.1 [Label Encoding](#4)
    * 3.2 [One Hot Encoding](#5)
    * 3.3 [Define Variables](#6)
    * 3.4 [Standard Scaler](#7)
    * 3.5 [Train-Test Split](#8)
1. [Modeling](#9)
    * 4.1 [Gradient Boosted Tree Classifier](#10)
    * 4.2 [Model Tuning](#11)
1. [References](#12)

In [1]:
!pip install pyspark



In [2]:
!pip install findspark



In [3]:
!pip install PyArrow



In [4]:
import warnings
import findspark
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.ml.classification import GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', lambda x: '%.2f' % x)

In [5]:
findspark.init("C:\spark")

In [6]:
spark = SparkSession.builder \
    .master("local") \
    .appName("pyspark_giris") \
    .getOrCreate()

In [7]:
sc = spark.sparkContext

In [8]:
# Load data
spark_df = spark.read.csv("C:\Career\VBO DSMLBootcamp-6\Ders_notlari\HAFTA_10\datasets\churn2.csv", 
                          header=True, inferSchema=True)

<a id = "1"></a><br>
#### 1. EXPLORATORY DATA ANALYSIS

In [9]:
spark_df

DataFrame[RowNumber: int, CustomerId: int, Surname: string, CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, Exited: int]

In [10]:
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [11]:
spark_df.head(3)

[Row(RowNumber=1, CustomerId=15634602, Surname='Hargrave', CreditScore=619, Geography='France', Gender='Female', Age=42, Tenure=2, Balance=0.0, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=101348.88, Exited=1),
 Row(RowNumber=2, CustomerId=15647311, Surname='Hill', CreditScore=608, Geography='Spain', Gender='Female', Age=41, Tenure=1, Balance=83807.86, NumOfProducts=1, HasCrCard=0, IsActiveMember=1, EstimatedSalary=112542.58, Exited=0),
 Row(RowNumber=3, CustomerId=15619304, Surname='Onio', CreditScore=502, Geography='France', Gender='Female', Age=42, Tenure=8, Balance=159660.8, NumOfProducts=3, HasCrCard=1, IsActiveMember=0, EstimatedSalary=113931.57, Exited=1)]

In [12]:
spark_df.tail(3)

[Row(RowNumber=9998, CustomerId=15584532, Surname='Liu', CreditScore=709, Geography='France', Gender='Female', Age=36, Tenure=7, Balance=0.0, NumOfProducts=1, HasCrCard=0, IsActiveMember=1, EstimatedSalary=42085.58, Exited=1),
 Row(RowNumber=9999, CustomerId=15682355, Surname='Sabbatini', CreditScore=772, Geography='Germany', Gender='Male', Age=42, Tenure=3, Balance=75075.31, NumOfProducts=2, HasCrCard=1, IsActiveMember=0, EstimatedSalary=92888.52, Exited=1),
 Row(RowNumber=10000, CustomerId=15628319, Surname='Walker', CreditScore=792, Geography='France', Gender='Female', Age=28, Tenure=4, Balance=130142.79, NumOfProducts=1, HasCrCard=1, IsActiveMember=0, EstimatedSalary=38190.78, Exited=0)]

In [13]:
# Number of observations and features
print("Shape: ", (spark_df.count(), len(spark_df.columns)))

Shape:  (10000, 14)


In [14]:
# Data types
spark_df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [15]:
# A different representation
spark_df.dtypes

[('RowNumber', 'int'),
 ('CustomerId', 'int'),
 ('Surname', 'string'),
 ('CreditScore', 'int'),
 ('Geography', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Tenure', 'int'),
 ('Balance', 'double'),
 ('NumOfProducts', 'int'),
 ('HasCrCard', 'int'),
 ('IsActiveMember', 'int'),
 ('EstimatedSalary', 'double'),
 ('Exited', 'int')]

In [16]:
# To observe a feature
spark_df.select(spark_df.Age).show(10)

+---+
|Age|
+---+
| 42|
| 41|
| 42|
| 39|
| 43|
| 44|
| 50|
| 29|
| 44|
| 27|
+---+
only showing top 10 rows



In [17]:
# Different ways of seeing head
spark_df.show(3, truncate=True)

+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1|83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8|159660.8|            3|        1|             0|      113931.57|     1|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+

In [18]:
# Different ways of seeing head
spark_df.take(5)

[Row(RowNumber=1, CustomerId=15634602, Surname='Hargrave', CreditScore=619, Geography='France', Gender='Female', Age=42, Tenure=2, Balance=0.0, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=101348.88, Exited=1),
 Row(RowNumber=2, CustomerId=15647311, Surname='Hill', CreditScore=608, Geography='Spain', Gender='Female', Age=41, Tenure=1, Balance=83807.86, NumOfProducts=1, HasCrCard=0, IsActiveMember=1, EstimatedSalary=112542.58, Exited=0),
 Row(RowNumber=3, CustomerId=15619304, Surname='Onio', CreditScore=502, Geography='France', Gender='Female', Age=42, Tenure=8, Balance=159660.8, NumOfProducts=3, HasCrCard=1, IsActiveMember=0, EstimatedSalary=113931.57, Exited=1),
 Row(RowNumber=4, CustomerId=15701354, Surname='Boni', CreditScore=699, Geography='France', Gender='Female', Age=39, Tenure=1, Balance=0.0, NumOfProducts=2, HasCrCard=0, IsActiveMember=0, EstimatedSalary=93826.63, Exited=0),
 Row(RowNumber=5, CustomerId=15737888, Surname='Mitchell', CreditScore=850, Geograph

In [19]:
# Lowering the names of the features
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])
spark_df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|rownumber|customerid| surname|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [20]:
# Descriptive statistics summary
spark_df.describe().show()

+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|summary|         rownumber|       customerid|surname|      creditscore|geography|gender|               age|            tenure|          balance|     numofproducts|          hascrcard|     isactivemember|  estimatedsalary|             exited|
+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|  count|             10000|            10000|  10000|            10000|    10000| 10000|             10000|             10000|            10000|             10000|              10000|              10000|            10000|              10000|
|   mean|            5000.5|

In [21]:
# Descriptive statistics for age and churn
spark_df.describe(["age", "exited"]).show()

+-------+------------------+-------------------+
|summary|               age|             exited|
+-------+------------------+-------------------+
|  count|             10000|              10000|
|   mean|           38.9218|             0.2037|
| stddev|10.487806451704587|0.40276858399486065|
|    min|                18|                  0|
|    max|                92|                  1|
+-------+------------------+-------------------+



In [22]:
# Observing the number of unique values of each feature
spark_df.toPandas().nunique()

rownumber          10000
customerid         10000
surname             2932
creditscore          460
geography              3
gender                 2
age                   70
tenure                11
balance             6382
numofproducts          4
hascrcard              2
isactivemember         2
estimatedsalary     9999
exited                 2
dtype: int64

In [23]:
# Selecting all numerical columns and observing their descriptive statistics
num_cols = [col[0] for col in spark_df.dtypes if col[1] != 'string']
spark_df.select(num_cols).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
rownumber,10000,5000.5,2886.8956799071675,1,10000
customerid,10000,1.56909405694E7,71936.18612274907,15565701,15815690
creditscore,10000,650.5288,96.65329873613035,350,850
age,10000,38.9218,10.487806451704587,18,92
tenure,10000,5.0128,2.8921743770496837,0,10
balance,10000,76485.88928799961,62397.40520238599,0.0,250898.09
numofproducts,10000,1.5302,0.5816543579989917,1,4
hascrcard,10000,0.7055,0.45584046447513327,0,1
isactivemember,10000,0.5151,0.49979692845891815,0,1


In [24]:
# Selecting all categorical columns and observing them
cat_cols = [col[0] for col in spark_df.dtypes if col[1] == 'string']

for col in cat_cols:
    spark_df.select(col).distinct().show(10)

+-----------+
|    surname|
+-----------+
|      Tyler|
|    Palermo|
|     Piccio|
|   Lazareva|
| Kambinachi|
|      Virgo|
|Baryshnikov|
|    Wofford|
|     Lavrov|
|  Bezrukova|
+-----------+
only showing top 10 rows

+---------+
|geography|
+---------+
|  Germany|
|   France|
|    Spain|
+---------+

+------+
|gender|
+------+
|Female|
|  Male|
+------+



In [25]:
# Observing the mean values of "exited" per numerical column
for col in [col.lower() for col in num_cols]:
    spark_df.groupby("exited").agg({col: "mean"}).show()

+------+-----------------+
|exited|   avg(rownumber)|
+------+-----------------+
|     1|4905.917525773196|
|     0|5024.694964209469|
+------+-----------------+

+------+--------------------+
|exited|     avg(customerid)|
+------+--------------------+
|     1|1.5690051964653904E7|
|     0|1.5691167881702876E7|
+------+--------------------+

+------+-----------------+
|exited| avg(creditscore)|
+------+-----------------+
|     1|645.3514972999509|
|     0|651.8531960316463|
+------+-----------------+

+------+-----------------+
|exited|         avg(age)|
+------+-----------------+
|     1| 44.8379970544919|
|     0|37.40838879819164|
+------+-----------------+

+------+-----------------+
|exited|      avg(tenure)|
+------+-----------------+
|     1|4.932744231713304|
|     0|5.033278914981791|
+------+-----------------+

+------+-----------------+
|exited|     avg(balance)|
+------+-----------------+
|     1|91108.53933726063|
|     0|72745.29677885193|
+------+-----------------+

+---

In [26]:
# Observe the target feature
spark_df.groupby("exited").count().show()

+------+-----+
|exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



<a id = "2"></a><br>
#### 2. FEATURE ENGINEERING

In [27]:
# Missing values
from pyspark.sql.functions import when, count, col
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).toPandas().T

Unnamed: 0,0
rownumber,0
customerid,0
surname,0
creditscore,0
geography,0
gender,0
age,0
tenure,0
balance,0
numofproducts,0


In [28]:
#FEATURE ENGINEERING

spark_df = spark_df.withColumn('new_balance_salary_ratio', spark_df.balance / spark_df.estimatedsalary)
spark_df = spark_df.withColumn('new_tenure_by_age', spark_df.tenure / spark_df.age)
spark_df = spark_df.withColumn('new_creditscore_age', spark_df.creditscore / spark_df.age)
spark_df = spark_df.withColumn('new_age_salary', spark_df.age * spark_df.estimatedsalary)

In [29]:
spark_df.show(3)

+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+
|rownumber|customerid| surname|creditscore|geography|gender|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.738095238095237|    4256652.96|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1|83807.86|            1|        0|

In [30]:
# Re-check missing values
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).toPandas().T

Unnamed: 0,0
rownumber,0
customerid,0
surname,0
creditscore,0
geography,0
gender,0
age,0
tenure,0
balance,0
numofproducts,0


#### Bucketization (Binning)

In [31]:
spark_df.select('age').describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,10000,38.9218,10.487806451704587,18,92


In [32]:
spark_df.select("age").summary("count", "min", "25%", "50%","75%", "max").show()

+-------+-----+
|summary|  age|
+-------+-----+
|  count|10000|
|    min|   18|
|    25%|   32|
|    50%|   37|
|    75%|   44|
|    max|   92|
+-------+-----+



In [33]:
bucketizer = Bucketizer(splits=[0, 30, 40, 45, 65, 92], inputCol="age", outputCol="new_age_cat")

spark_df = bucketizer.setHandleInvalid("keep").transform(spark_df)

In [34]:
# There are zero values, that's why we are adding 1 
spark_df = spark_df.withColumn('new_age_cat', spark_df.new_age_cat + 1)

In [36]:
spark_df.show(3)

+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+
|rownumber|customerid| surname|creditscore|geography|gender|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.738095238095237|    4256652.96|        3.0|
|        2|  15647311|    Hill|        608|    Spain|Fem

In [37]:
# Observe new_age_cat 
spark_df.groupby("new_age_cat").count().show()

+-----------+-----+
|new_age_cat|count|
+-----------+-----+
|        1.0| 1641|
|        4.0| 2058|
|        3.0| 1673|
|        2.0| 4346|
|        5.0|  282|
+-----------+-----+



In [38]:
# Observe the churn status of new_age_cat
spark_df = spark_df.withColumn("new_age_cat", spark_df["new_age_cat"].cast("integer"))
spark_df.groupby("new_age_cat").agg({'exited': "mean"}).show()

+-----------+-------------------+
|new_age_cat|        avg(exited)|
+-----------+-------------------+
|          1|0.07556368068251067|
|          3| 0.2367005379557681|
|          5| 0.1524822695035461|
|          4|0.48639455782312924|
|          2|0.10883571099861943|
+-----------+-------------------+



##### As seen, exited (churn) ratio is higher in 3 and 4.

In [39]:
# Observe churn status per geography
spark_df.groupby("geography").agg({'exited': "mean"}).show()

+---------+-------------------+
|geography|        avg(exited)|
+---------+-------------------+
|  Germany|0.32443204463929853|
|   France|0.16154766653370561|
|    Spain| 0.1667339523617279|
+---------+-------------------+



In [42]:
# FEATURE ENGINEERING WITH WHEN 
# tenure 
spark_df = spark_df.withColumn('segment', 
                               when(spark_df['tenure'] < 5, "segment_b").
                               otherwise("segment_a"))

In [43]:
spark_df.groupby("segment").count().show()

+---------+-----+
|  segment|count|
+---------+-----+
|segment_a| 5506|
|segment_b| 4494|
+---------+-----+



In [59]:
spark_df.groupby("segment").agg({'exited': "mean"}).show()

+---------+-------------------+
|  segment|        avg(exited)|
+---------+-------------------+
|segment_a|0.19851071558300037|
|segment_b|  0.210057854917668|
+---------+-------------------+



In [44]:
# age categories
spark_df = spark_df.withColumn('new_age_cat_2',
                    when(spark_df['age'] < 46, "young").
                    when((45 < spark_df['age']) & (spark_df['age'] < 65), "mature").
                    otherwise("senior"))

In [45]:
spark_df.groupby("new_age_cat_2").count().show()

+-------------+-----+
|new_age_cat_2|count|
+-------------+-----+
|       mature| 1829|
|       senior|  282|
|        young| 7889|
+-------------+-----+



In [60]:
spark_df.groupby("new_age_cat_2").agg({'exited': "mean"}).show()

+-------------+------------------+
|new_age_cat_2|       avg(exited)|
+-------------+------------------+
|       mature|0.4997266265718972|
|       senior|0.1524822695035461|
|        young|  0.13689948028901|
+-------------+------------------+



In [46]:
# employment status
# This is an assumption. I assume that if the estimated salary is lower than the minimum wage then 
# this can mean that the person is unemployed.

spark_df = spark_df.withColumn('employment_status', 
                               when(spark_df['estimatedsalary'] < 1000, "unemployed").
                               otherwise("employed"))

In [47]:
spark_df.groupby("employment_status").count().show()

+-----------------+-----+
|employment_status|count|
+-----------------+-----+
|       unemployed|   59|
|         employed| 9941|
+-----------------+-----+



In [61]:
spark_df.groupby("employment_status").agg({'exited': "mean"}).show()

+-----------------+-------------------+
|employment_status|        avg(exited)|
+-----------------+-------------------+
|       unemployed|0.22033898305084745|
|         employed| 0.2036012473594206|
+-----------------+-------------------+



In [48]:
spark_df.select("balance").summary("count", "min", "25%", "50%","75%", "max").show()

+-------+---------+
|summary|  balance|
+-------+---------+
|  count|    10000|
|    min|      0.0|
|    25%|      0.0|
|    50%| 97157.96|
|    75%|127638.35|
|    max|250898.09|
+-------+---------+



In [49]:
# balance status
spark_df = spark_df.withColumn('balance_status', 
                               when(spark_df['balance'] < 100000, "low_balance").
                               otherwise("high_balance"))

In [50]:
spark_df.groupby("balance_status").count().show()

+--------------+-----+
|balance_status|count|
+--------------+-----+
|  high_balance| 4799|
|   low_balance| 5201|
+--------------+-----+



In [62]:
spark_df.groupby("balance_status").agg({'exited': "mean"}).show()

+--------------+-------------------+
|balance_status|        avg(exited)|
+--------------+-------------------+
|  high_balance|0.25234423838299647|
|   low_balance|0.15881561238223418|
+--------------+-------------------+



In [51]:
# Final version of the dataframe
spark_df.show(3)

+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+
|rownumber|customerid| surname|creditscore|geography|gender|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|  segment|new_age_cat_2|employment_status|balance_status|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|  

<a id = "3"></a><br>
#### 3. DATA PRE-PROCESSING 

In [52]:
# Remove 'rownumber', "customerid" and "surname" as we will not use them in modeling section 
spark_df = spark_df.drop('rownumber', "customerid", "surname")

In [53]:
spark_df.show(3)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+
|creditscore|geography|gender|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|  segment|new_age_cat_2|employment_status|balance_status|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+
|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.738095238095237|    4256652.96|          3|segmen

<a id = "4"></a><br>
#### 3.1. Label Encoding

For categorical variables StringIndexer should be used first. Using one hot encoder directly does not work.   

In [82]:
# Let's observe the final spark dataframe before encoding
spark_df.show(3)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+
|creditscore|geography|gender|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|  segment|new_age_cat_2|employment_status|balance_status|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+
|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.738095238095237|    4256652.96|          3|segmen

In [57]:
# Let's observe the categorical columns after feature engineering
cat_cols2 = [col[0] for col in spark_df.dtypes if col[1] == 'string']

In [73]:
cat_cols2

['geography',
 'gender',
 'segment',
 'new_age_cat_2',
 'employment_status',
 'balance_status']

In [83]:
# Label encoding for geography
indexer = StringIndexer(inputCol="geography", outputCol="geography_label")
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("geography_label", temp_sdf["geography_label"].cast("integer"))
spark_df.show(2)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+---------------+
|creditscore|geography|gender|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|  segment|new_age_cat_2|employment_status|balance_status|geography_label|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+---------------+
|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.7

In [84]:
# As we have geography_label now, we can drop geography column
spark_df = spark_df.drop('geography')

In [85]:
# Label encoding for gender
indexer = StringIndexer(inputCol="gender", outputCol="gender_label")
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("gender_label", temp_sdf["gender_label"].cast("integer"))
spark_df.show(2)

+-----------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+---------------+------------+
|creditscore|gender|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|  segment|new_age_cat_2|employment_status|balance_status|geography_label|gender_label|
+-----------+------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------+-------------+-----------------+--------------+---------------+------------+
|        619|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.73

In [86]:
# As we have gender_label now, we can drop gender column
spark_df = spark_df.drop('gender')

In [87]:
# Label encoding for segment
indexer = StringIndexer(inputCol="segment", outputCol="segment_label")
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("segment_label", temp_sdf["segment_label"].cast("integer"))
spark_df = spark_df.drop('segment')
spark_df.show(2)

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+-------------+-----------------+--------------+---------------+------------+-------------+
|creditscore|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|new_age_cat_2|employment_status|balance_status|geography_label|gender_label|segment_label|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+-------------+-----------------+--------------+---------------+------------+-------------+
|        619| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.738095238095237|  

In [88]:
# Label encoding for new_age_cat_2
indexer = StringIndexer(inputCol="new_age_cat_2", outputCol="new_age_cat_2_label")
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("new_age_cat_2_label", temp_sdf["new_age_cat_2_label"].cast("integer"))
spark_df = spark_df.drop('new_age_cat_2')
spark_df.show(2)

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+-----------------+--------------+---------------+------------+-------------+-------------------+
|creditscore|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|employment_status|balance_status|geography_label|gender_label|segment_label|new_age_cat_2_label|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+-----------------+--------------+---------------+------------+-------------+-------------------+
|        619| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.047619047619047616| 14.

In [89]:
# Label encoding for employment_status
indexer = StringIndexer(inputCol="employment_status", outputCol="employment_status_label")
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("employment_status_label", temp_sdf["employment_status_label"].cast("integer"))
spark_df = spark_df.drop('employment_status')
spark_df.show(2)

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+--------------+---------------+------------+-------------+-------------------+-----------------------+
|creditscore|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|balance_status|geography_label|gender_label|segment_label|new_age_cat_2_label|employment_status_label|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+--------------+---------------+------------+-------------+-------------------+-----------------------+
|        619| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|                     0.0|0.04761

In [90]:
# Label encoding for employment_status
indexer = StringIndexer(inputCol="balance_status", outputCol="balance_status_label")
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("balance_status_label", temp_sdf["balance_status_label"].cast("integer"))
spark_df = spark_df.drop('balance_status')
spark_df.show(2)

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------------+------------+-------------+-------------------+-----------------------+--------------------+
|creditscore|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|geography_label|gender_label|segment_label|new_age_cat_2_label|employment_status_label|balance_status_label|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------------+------------+-------------+-------------------+-----------------------+--------------------+
|        619| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|              

In [93]:
spark_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
creditscore,10000,650.5288,96.65329873613035,350,850
age,10000,38.9218,10.487806451704587,18,92
tenure,10000,5.0128,2.8921743770496837,0,10
balance,10000,76485.88928799961,62397.40520238599,0.0,250898.09
numofproducts,10000,1.5302,0.5816543579989917,1,4
hascrcard,10000,0.7055,0.45584046447513327,0,1
isactivemember,10000,0.5151,0.49979692845891815,0,1
estimatedsalary,10000,100090.2398809998,57510.49281769821,11.58,199992.48
exited,10000,0.2037,0.40276858399486065,0,1


<a id = "5"></a><br>
#### 3.2. One Hot Encoding

In [95]:
# Implement one hot encoding to all categorical variables
encoder = OneHotEncoder(inputCols=["geography_label", "gender_label", "segment_label", 
                                   "new_age_cat_2_label", "employment_status_label", "balance_status_label"], 
                        outputCols=["geography_ohe", "gender_ohe", "segment_ohe", 
                                    "new_age_cat_2_ohe", "employment_status_ohe", "balance_status_ohe"])
spark_df = encoder.fit(spark_df).transform(spark_df)

In [96]:
spark_df.show(2)

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------------+------------+-------------+-------------------+-----------------------+--------------------+-------------+----------+-----------+-----------------+---------------------+------------------+
|creditscore|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|geography_label|gender_label|segment_label|new_age_cat_2_label|employment_status_label|balance_status_label|geography_ohe|gender_ohe|segment_ohe|new_age_cat_2_ohe|employment_status_ohe|balance_status_ohe|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------------+----------

<a id = "6"></a><br>
#### 3.3. Define variables

In [97]:
# We need to define target variable as "label"
stringIndexer = StringIndexer(inputCol='exited', outputCol='label')
temp_sdf = stringIndexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("label", temp_sdf["label"].cast("integer"))
spark_df.show(2)

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------------+------------+-------------+-------------------+-----------------------+--------------------+-------------+----------+-----------+-----------------+---------------------+------------------+-----+
|creditscore|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|geography_label|gender_label|segment_label|new_age_cat_2_label|employment_status_label|balance_status_label|geography_ohe|gender_ohe|segment_ohe|new_age_cat_2_ohe|employment_status_ohe|balance_status_ohe|label|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+--------------

In [98]:
# We need to define features
cols = ['creditscore', 'age', 'tenure', 'balance','numofproducts', 'hascrcard',
        'isactivemember', 'estimatedsalary', 'new_balance_salary_ratio', 'new_tenure_by_age',
        'new_creditscore_age', 'new_age_salary', 'new_age_cat',
        'geography_label', 'gender_label', 'segment_label', 'new_age_cat_2_label', 'employment_status_label',
        'balance_status_label', 'geography_ohe', 'gender_ohe', 'segment_ohe', 'new_age_cat_2_ohe',
        'employment_status_ohe', 'balance_status_ohe']

In [99]:
# Vectorize independent variables
va = VectorAssembler(inputCols=cols, outputCol="features")
va_df = va.transform(spark_df)
va_df.show(2)

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------+--------------+-----------+---------------+------------+-------------+-------------------+-----------------------+--------------------+-------------+----------+-----------+-----------------+---------------------+------------------+-----+--------------------+
|creditscore|age|tenure| balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|new_balance_salary_ratio|   new_tenure_by_age|new_creditscore_age|new_age_salary|new_age_cat|geography_label|gender_label|segment_label|new_age_cat_2_label|employment_status_label|balance_status_label|geography_ohe|gender_ohe|segment_ohe|new_age_cat_2_ohe|employment_status_ohe|balance_status_ohe|label|            features|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+------------------------+--------------------+-------------------

In [101]:
# Final sdf
final_df = va_df.select("features", "label")
final_df.show(4)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[619.0,42.0,2.0,0...|    1|
|[608.0,41.0,1.0,8...|    0|
|[502.0,42.0,8.0,1...|    1|
|(27,[0,1,2,4,7,9,...|    0|
+--------------------+-----+
only showing top 4 rows



<a id = "7"></a><br>
#### 3.4. Standard Scaler

In [102]:
# StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
final_df = scaler.fit(final_df).transform(final_df)
final_df.show(4)

+--------------------+-----+--------------------+
|            features|label|     scaled_features|
+--------------------+-----+--------------------+
|[619.0,42.0,2.0,0...|    1|[6.40433392438999...|
|[608.0,41.0,1.0,8...|    0|[6.29052508243798...|
|[502.0,42.0,8.0,1...|    1|[5.19382169635505...|
|(27,[0,1,2,4,7,9,...|    0|(27,[0,1,2,4,7,9,...|
+--------------------+-----+--------------------+
only showing top 4 rows



<a id = "8"></a><br>
#### 3.5. Train-Test Split

In [103]:
# Split the dataset into test and train sets.
train_df, test_df = final_df.randomSplit([0.7, 0.3], seed=17)
train_df.show(5)

+--------------------+-----+--------------------+
|            features|label|     scaled_features|
+--------------------+-----+--------------------+
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    1|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [104]:
test_df.show(5)

+--------------------+-----+--------------------+
|            features|label|     scaled_features|
+--------------------+-----+--------------------+
|(27,[0,1,2,4,5,7,...|    1|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [106]:
print("Training Dataset Count: " + str(train_df.count()))
print("Test Dataset Count: " + str(test_df.count()))

Training Dataset Count: 6949
Test Dataset Count: 3051


<a id = "9"></a><br>
#### 4. MODELING

<a id = "10"></a><br>
#### 4.1. Gradient Boosted Tree Classifier

In [107]:
gbm = GBTClassifier(maxIter=100, featuresCol="scaled_features", labelCol="label")
gbm_model = gbm.fit(train_df)
y_pred = gbm_model.transform(test_df)
y_pred.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+----------+
|            features|label|     scaled_features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+----------+
|(27,[0,1,2,4,5,7,...|    1|(27,[0,1,2,4,5,7,...|[-0.5811049889641...|[0.23826595259754...|       1.0|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|[1.50801762489525...|[0.95329331121448...|       0.0|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|[1.61208325805517...|[0.96173364612858...|       0.0|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|[1.76966850700981...|[0.97178654032256...|       0.0|
|(27,[0,1,2,4,5,7,...|    0|(27,[0,1,2,4,5,7,...|[1.46147979781727...|[0.94896981137209...|       0.0|
+--------------------+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [108]:
# Accuracy 
y_pred.filter(y_pred.label == y_pred.prediction).count() / y_pred.count()

0.8502130449033104

<a id = "11"></a><br>
#### 4.2. Model Tuning

In [110]:
evaluator = BinaryClassificationEvaluator()

gbm_params = (ParamGridBuilder()
              .addGrid(gbm.maxDepth, [2, 4, 6])
              .addGrid(gbm.maxBins, [20, 30])
              .addGrid(gbm.maxIter, [10, 20])
              .build())

In [111]:
cv = CrossValidator(estimator=gbm,
                    estimatorParamMaps=gbm_params,
                    evaluator=evaluator,
                    numFolds=5)

In [112]:
cv_model = cv.fit(train_df)
y_pred = cv_model.transform(test_df)
ac = y_pred.select("label", "prediction")
ac.filter(ac.label == ac.prediction).count() / ac.count()

0.850868567682727

<a id = "12"></a><br>
#### **5. REFERENCES**

* Data Science and Machine Learning Bootcamp, 2021, Veri Bilimi Okulu, https://www.veribilimiokulu.com/
* https://www.kaggle.com/kemalgunay/pyspark-ml-churn-analysis
* https://www.kaggle.com/kmalit/bank-customer-churn-prediction