## Import libraries

In [1]:
import findspark #to make pyspark importable as a regular library
findspark.init('/home/sony/spark-3.3.0-bin-hadoop2') #my spark folder location

In [2]:
from pyspark.sql import SparkSession #to start a pyspark dataframe and SQL functionality
from pyspark.sql.types import StructField, StringType, IntegerType, StructType #common dtypes
from pyspark.sql.functions import (count, countDistinct, isnull, corr, when, col, avg, stddev, mean, min, max, 
                                   date_format, format_number, hour, dayofmonth, dayofyear, weekofyear, 
                                   month, year) #common functions
from pyspark.ml import Pipeline #to create pipeline for deployment
from pyspark.ml.classification import (DecisionTreeClassifier, RandomForestClassifier, 
                                       GBTClassifier, LogisticRegression) #classification algorithms                                    
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorAssembler, StringIndexer, OneHotEncoder, 
                                Imputer, StandardScaler) #for data preprocessing
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator #for model evaluation
from pyspark.mllib.evaluation import MulticlassMetrics #for model evaluation 

## Spark session

In [3]:
spark = SparkSession.builder.appName('Churn').getOrCreate() #this is the entry point to spark to work with RDD, DataFrame, or Dataset

22/08/18 18:12:59 WARN Utils: Your hostname, sony-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/08/18 18:12:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/18 18:13:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load dataset

In [4]:
raw = spark.read.csv('/home/sony/Churn_Modelling.csv', 
                     inferSchema=True, header=True) #load the dataset

## Data understanding

In [5]:
raw.printSchema() #this is similar to .info() in pandas but without non-null values

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [6]:
raw.show() #show all data

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|     Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93

In [7]:
raw.show(n=1, truncate=False, vertical=True) #show 1 row example from the dataset

-RECORD 0--------------------
 RowNumber       | 1         
 CustomerId      | 15634602  
 Surname         | Hargrave  
 CreditScore     | 619       
 Geography       | France    
 Gender          | Female    
 Age             | 42        
 Tenure          | 2         
 Balance         | 0.0       
 NumOfProducts   | 1         
 HasCrCard       | 1         
 IsActiveMember  | 1         
 EstimatedSalary | 101348.88 
 Exited          | 1         
only showing top 1 row



In [8]:
raw.describe().show(truncate=False, vertical=True) #statistics of each columns

[Stage 4:>                                                          (0 + 1) / 1]

-RECORD 0------------------------------
 summary         | count               
 RowNumber       | 10000               
 CustomerId      | 10000               
 Surname         | 10000               
 CreditScore     | 10000               
 Geography       | 10000               
 Gender          | 10000               
 Age             | 10000               
 Tenure          | 10000               
 Balance         | 10000               
 NumOfProducts   | 10000               
 HasCrCard       | 10000               
 IsActiveMember  | 10000               
 EstimatedSalary | 10000               
 Exited          | 10000               
-RECORD 1------------------------------
 summary         | mean                
 RowNumber       | 5000.5              
 CustomerId      | 1.56909405694E7     
 Surname         | null                
 CreditScore     | 650.5288            
 Geography       | null                
 Gender          | null                
 Age             | 38.9218             


                                                                                

## Missing values

In [9]:
df = raw.alias('df') #create a copy the initial data

In [10]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show() #check missing values

+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname|CreditScore|Geography|Gender|Age|Tenure|Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|        0|         0|      0|          0|        0|     0|  0|     0|      0|            0|        0|             0|              0|     0|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+



## String indexer

In [11]:
df.printSchema() #check which column is still in string dtype

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [12]:
df.groupBy('Geography').count().show()

+---------+-----+
|Geography|count|
+---------+-----+
|  Germany| 2509|
|   France| 5014|
|    Spain| 2477|
+---------+-----+



In [13]:
df.groupBy('Gender').count().show()

+------+-----+
|Gender|count|
+------+-----+
|Female| 4543|
|  Male| 5457|
+------+-----+



In [14]:
stringIndexer = StringIndexer(inputCols=['Geography','Gender'], outputCols=['GeoIndex','GenderIndex'])
df = stringIndexer.fit(df).transform(df)

In [15]:
df.select(['Geography','GeoIndex','Gender','GenderIndex']).show() #data after being string indexed

+---------+--------+------+-----------+
|Geography|GeoIndex|Gender|GenderIndex|
+---------+--------+------+-----------+
|   France|     0.0|Female|        1.0|
|    Spain|     2.0|Female|        1.0|
|   France|     0.0|Female|        1.0|
|   France|     0.0|Female|        1.0|
|    Spain|     2.0|Female|        1.0|
|    Spain|     2.0|  Male|        0.0|
|   France|     0.0|  Male|        0.0|
|  Germany|     1.0|Female|        1.0|
|   France|     0.0|  Male|        0.0|
|   France|     0.0|  Male|        0.0|
|   France|     0.0|  Male|        0.0|
|    Spain|     2.0|  Male|        0.0|
|   France|     0.0|Female|        1.0|
|   France|     0.0|Female|        1.0|
|    Spain|     2.0|Female|        1.0|
|  Germany|     1.0|  Male|        0.0|
|  Germany|     1.0|  Male|        0.0|
|    Spain|     2.0|Female|        1.0|
|    Spain|     2.0|  Male|        0.0|
|   France|     0.0|Female|        1.0|
+---------+--------+------+-----------+
only showing top 20 rows



## OneHot encoder

In [16]:
ohe = OneHotEncoder(inputCol='GeoIndex',outputCol='GeoOneHot')

In [17]:
df = ohe.fit(df).transform(df)

In [18]:
df.select(['Geography','GeoIndex','GeoOneHot']).show() #data after one hot encoding

+---------+--------+-------------+
|Geography|GeoIndex|    GeoOneHot|
+---------+--------+-------------+
|   France|     0.0|(2,[0],[1.0])|
|    Spain|     2.0|    (2,[],[])|
|   France|     0.0|(2,[0],[1.0])|
|   France|     0.0|(2,[0],[1.0])|
|    Spain|     2.0|    (2,[],[])|
|    Spain|     2.0|    (2,[],[])|
|   France|     0.0|(2,[0],[1.0])|
|  Germany|     1.0|(2,[1],[1.0])|
|   France|     0.0|(2,[0],[1.0])|
|   France|     0.0|(2,[0],[1.0])|
|   France|     0.0|(2,[0],[1.0])|
|    Spain|     2.0|    (2,[],[])|
|   France|     0.0|(2,[0],[1.0])|
|   France|     0.0|(2,[0],[1.0])|
|    Spain|     2.0|    (2,[],[])|
|  Germany|     1.0|(2,[1],[1.0])|
|  Germany|     1.0|(2,[1],[1.0])|
|    Spain|     2.0|    (2,[],[])|
|    Spain|     2.0|    (2,[],[])|
|   France|     0.0|(2,[0],[1.0])|
+---------+--------+-------------+
only showing top 20 rows



## Assembling data

In [19]:
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)
 |-- GeoIndex: double (nullable = false)
 |-- GenderIndex: double (nullable = false)
 |-- GeoOneHot: vector (nullable = true)



In [20]:
df.columns

['RowNumber',
 'CustomerId',
 'Surname',
 'CreditScore',
 'Geography',
 'Gender',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary',
 'Exited',
 'GeoIndex',
 'GenderIndex',
 'GeoOneHot']

In [21]:
df.show(n=1, truncate=False, vertical=True)

-RECORD 0------------------------
 RowNumber       | 1             
 CustomerId      | 15634602      
 Surname         | Hargrave      
 CreditScore     | 619           
 Geography       | France        
 Gender          | Female        
 Age             | 42            
 Tenure          | 2             
 Balance         | 0.0           
 NumOfProducts   | 1             
 HasCrCard       | 1             
 IsActiveMember  | 1             
 EstimatedSalary | 101348.88     
 Exited          | 1             
 GeoIndex        | 0.0           
 GenderIndex     | 1.0           
 GeoOneHot       | (2,[0],[1.0]) 
only showing top 1 row



In [22]:
assembler = VectorAssembler(inputCols=['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 
                                       'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'GenderIndex', 
                                       'GeoOneHot'], 
                            outputCol='features')
df = assembler.transform(df)

In [23]:
dfML = df.select(['features','Exited']).withColumnRenamed('Exited','label')

In [24]:
df.show(n=12,vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------------------------
 RowNumber       | 1                                                            
 CustomerId      | 15634602                                                     
 Surname         | Hargrave                                                     
 CreditScore     | 619                                                          
 Geography       | France                                                       
 Gender          | Female                                                       
 Age             | 42                                                           
 Tenure          | 2                                                            
 Balance         | 0.0                                                          
 NumOfProducts   | 1                                                            
 HasCrCard       | 1                                                            
 IsActiveMember  | 1        

In [25]:
dfML.head(12)

[Row(features=DenseVector([619.0, 42.0, 2.0, 0.0, 1.0, 1.0, 1.0, 101348.88, 1.0, 1.0, 0.0]), label=1),
 Row(features=DenseVector([608.0, 41.0, 1.0, 83807.86, 1.0, 0.0, 1.0, 112542.58, 1.0, 0.0, 0.0]), label=0),
 Row(features=DenseVector([502.0, 42.0, 8.0, 159660.8, 3.0, 1.0, 0.0, 113931.57, 1.0, 1.0, 0.0]), label=1),
 Row(features=DenseVector([699.0, 39.0, 1.0, 0.0, 2.0, 0.0, 0.0, 93826.63, 1.0, 1.0, 0.0]), label=0),
 Row(features=DenseVector([850.0, 43.0, 2.0, 125510.82, 1.0, 1.0, 1.0, 79084.1, 1.0, 0.0, 0.0]), label=0),
 Row(features=DenseVector([645.0, 44.0, 8.0, 113755.78, 2.0, 1.0, 0.0, 149756.71, 0.0, 0.0, 0.0]), label=1),
 Row(features=DenseVector([822.0, 50.0, 7.0, 0.0, 2.0, 1.0, 1.0, 10062.8, 0.0, 1.0, 0.0]), label=0),
 Row(features=DenseVector([376.0, 29.0, 4.0, 115046.74, 4.0, 1.0, 0.0, 119346.88, 1.0, 0.0, 1.0]), label=1),
 Row(features=DenseVector([501.0, 44.0, 4.0, 142051.07, 2.0, 0.0, 1.0, 74940.5, 0.0, 1.0, 0.0]), label=0),
 Row(features=DenseVector([684.0, 27.0, 2.0, 1

In [26]:
dfML.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[619.0,42.0,2.0,0...|    1|
|[608.0,41.0,1.0,8...|    0|
|[502.0,42.0,8.0,1...|    1|
|[699.0,39.0,1.0,0...|    0|
|[850.0,43.0,2.0,1...|    0|
|[645.0,44.0,8.0,1...|    1|
|[822.0,50.0,7.0,0...|    0|
|[376.0,29.0,4.0,1...|    1|
|[501.0,44.0,4.0,1...|    0|
|[684.0,27.0,2.0,1...|    0|
|[528.0,31.0,6.0,1...|    0|
|(11,[0,1,2,4,5,7]...|    0|
|[476.0,34.0,10.0,...|    0|
|[549.0,25.0,5.0,0...|    0|
|[635.0,35.0,7.0,0...|    0|
|[616.0,45.0,3.0,1...|    0|
|[653.0,58.0,1.0,1...|    1|
|[549.0,24.0,9.0,0...|    0|
|(11,[0,1,2,4,7],[...|    0|
|[726.0,24.0,6.0,0...|    0|
+--------------------+-----+
only showing top 20 rows



## Train test split

In [27]:
trainData, testData = dfML.randomSplit([0.8, 0.2], seed=42) #split data to prevent overfitting
trainData.count(), testData.count()

(8079, 1921)

## Standardizing

In [28]:
standardScaler = StandardScaler(inputCol='features', outputCol='featuresScaled')

trainData = standardScaler.fit(trainData).transform(trainData)
testData = standardScaler.fit(testData).transform(testData)

In [29]:
trainData.show()

+--------------------+-----+--------------------+
|            features|label|      featuresScaled|
+--------------------+-----+--------------------+
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    1|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    1|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    1|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    1|(11,[0,1,2,3,4,7]...|
|(11,[0,1,2,3,4,7]...|    0|(11,[0,1,2,3,4,7]...|


## Model fitting

In [30]:
logReg = LogisticRegression(featuresCol='featuresScaled')
dtc = DecisionTreeClassifier(featuresCol='featuresScaled', seed=42)
rfc = RandomForestClassifier(featuresCol='featuresScaled', seed=42)
gbt = GBTClassifier(featuresCol='featuresScaled', seed=42)

In [31]:
logRegModel = logReg.fit(trainData)
dtcModel = dtc.fit(trainData)
rfcModel = rfc.fit(trainData)
gbtModel = gbt.fit(trainData)

In [32]:
logRegTestResult = logRegModel.transform(testData)
dtcTestResult = dtcModel.transform(testData)
rfcTestResult = rfcModel.transform(testData)
gbtTestResult = gbtModel.transform(testData)

## Evaluation

In [33]:
aocEval = BinaryClassificationEvaluator(labelCol='label')
accEval = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')

In [34]:
allModel = [logRegTestResult, dtcTestResult, rfcTestResult, gbtTestResult]
allModelName = ['Logistic Regression', 'Decision Tree', 'Random Forest', 'Gradient Boosted']

In [35]:
for i in range(0, len(allModel)):
    print(allModelName[i], 'accuracy :' ,round(aocEval.evaluate(allModel[i])*100,2), 'aoc :' ,round(accEval.evaluate(allModel[i])*100,2))

Logistic Regression accuracy : 76.42 aoc : 79.8
Decision Tree accuracy : 32.31 aoc : 85.48
Random Forest accuracy : 82.69 aoc : 84.17
Gradient Boosted accuracy : 85.56 aoc : 85.27


## Feature importance

In [36]:
features = ['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 
            'EstimatedSalary', 'GenderIndex', 'GeoOneHot']

In [37]:
for i in range(0,len(features)):
    print(features[i], ':', round(100*dtcModel.featureImportances[i],2))

CreditScore : 0.19
Age : 48.3
Tenure : 0.35
Balance : 1.57
NumOfProducts : 30.95
HasCrCard : 0.0
IsActiveMember : 15.62
EstimatedSalary : 0.66
GenderIndex : 0.0
GeoOneHot : 0.0


In [38]:
for i in range(0,len(features)):
    print(features[i], ':', round(100*rfcModel.featureImportances[i],2))

CreditScore : 0.38
Age : 48.82
Tenure : 0.48
Balance : 2.38
NumOfProducts : 29.91
HasCrCard : 0.03
IsActiveMember : 11.0
EstimatedSalary : 0.45
GenderIndex : 1.46
GeoOneHot : 0.41


In [39]:
for i in range(0,len(features)):
    print(features[i], ':', round(100*gbtModel.featureImportances[i],2))

CreditScore : 5.94
Age : 28.67
Tenure : 4.08
Balance : 18.03
NumOfProducts : 16.46
HasCrCard : 1.11
IsActiveMember : 7.02
EstimatedSalary : 5.85
GenderIndex : 6.27
GeoOneHot : 1.17
