# Predicting Bank Customer Churn using pyspark

📌  In this section, we will predict bank customer churn using pyspark machine learning algorithms. 

# Business Problem

📌 In order for the bank to serve more, we need to predict whether the customer will leave the bank, and we also need to make sure that the customers will not leave the bank.

# Dataset Story

📌 This dataset contains details of a banks customers and the target variable is a binary variable reflecting the fact whether the customer left the bank (closed his account) or he continues to be a customer.

The features in the given dataset are:

    rownumber: Row Numbers from 1 to 10000.

    customerid: A unique ID that identifies each customer.

    surname: The customer’s surname.

    creditscore: A credit score is a number between 300–850 that depicts a consumer's creditworthiness.

    geography: The country from which the customer belongs to.

    Gender: The customer’s gender: Male, Female

    Age: The customer’s current age, in years, at the time of being customer.

    tenure: The number of years for which the customer has been with the bank.

    balance: Bank balance of the customer.

    numofproducts: the number of bank products the customer is utilising.

    h0ascrcard: The number of credit cards given to the customer by the bank.

    isactivemember: Binary Flag for indicating if the client is active or not with the bank before the moment where the client exits the company (recorded in the variable "exited")

    exited: Binary flag 1 if the customer closed account with bank and 0 if the customer is retained.



# Create Session in Spark

In [4]:
!pip install findspark
import findspark
findspark.init("C:\spark")
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
spark = SparkSession.builder \
    .master("local") \
    .appName("Churn_Modeling") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()
sc = spark.sparkContext



In [5]:
sc

# Import Necesaary Libraries

In [79]:
import pandas as pd
pd.set_option("display.max_columns",None)
pd.set_option("display.max_rows", None)
pd.set_option("display.width", 500)
pd.set_option("display.float_format", lambda x: '%.4f' % x)
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Import Dataset

In [11]:
df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("Churn_Modelling.csv")
)
df.persist()

DataFrame[RowNumber: int, CustomerId: int, Surname: string, CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, Exited: int]

In [12]:
df.limit(5).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [15]:
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [16]:
df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [21]:
print((df.count(), len(df.columns)))

(10000, 14)


# Missing Value Analysis

In [23]:
df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df.columns]).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [24]:
def null_count(dataframe, col_name):
    nc = dataframe.select(col_name).filter(
        (F.col(col_name) == "NA")|
        (F.col(col_name) == "") |
        (F.col(col_name).isNull())
    ).count()
    return nc

In [25]:
null_count(df, "Geography")

0

In [35]:
def show_all_null(dataframe):
    for col_name in dataframe.dtypes:
        nc = null_count(dataframe, col_name[0])
        if nc > 0:
            print("{} ===> {} , Ratio: {:.2f}".format(col_name[0], nc, (nc/dataframe.count())*100))
    if nc == 0:
        print("There is no null value")    

In [36]:
show_all_null(df)

There is no null value


# Analysis of Categorical and Numerical Variables

In [41]:
categorical_cols = []
numerical_cols = []
label_col = ["Exited"]
discarted_cols = ["RowNumber", "CustomerId", "Surname"]

In [42]:
def grab_cat_num_cols(dataframe):
    for col_name in dataframe.dtypes:
        if (col_name[0] not in label_col+discarted_cols):
            if col_name[1] == "string":
                categorical_cols.append(col_name[0])
            else:
                numerical_cols.append(col_name[0])
    return categorical_cols, numerical_cols

In [43]:
categorical_cols, numerical_cols = grab_cat_num_cols(df)

#Print Categorical and Numerical Variables
print(f"Observations: {df.count()}")
print(f"Variables: {len(df.columns)}")
print(f"Cat_cols: {len(categorical_cols)}")
print(f"Num_cols: {len(numerical_cols)}")

Observations: 10000
Variables: 14
Cat_cols: 2
Num_cols: 8


In [45]:
# column check
if (len(df.columns) == (len(label_col) + len(discarted_cols) + len(categorical_cols) + len(numerical_cols))):
    print("column check is True")
else:
    print("There is a problem for column check")

column check is True


In [46]:
# trim categoric cols
def trim_cols(dataframe, cat_cols):
    for col_name in cat_cols:
        dataframe = dataframe.withColumn(col_name, F.trim(col_name))

In [47]:
trim_cols(df, categorical_cols)

In [48]:
# Show Exited 
df.select(label_col[0]).groupBy(label_col[0]).count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



In [49]:
def examine_categories(dataframe, cat_cols):
    for cat_col in cat_cols:
        print(cat_col)
        dataframe.groupBy(cat_col).count().orderBy(F.desc("count")).show()

In [50]:
examine_categories(df, categorical_cols)

Geography
+---------+-----+
|Geography|count|
+---------+-----+
|   France| 5014|
|  Germany| 2509|
|    Spain| 2477|
+---------+-----+

Gender
+------+-----+
|Gender|count|
+------+-----+
|  Male| 5457|
|Female| 4543|
+------+-----+



# Encoding Scaling

In [57]:
def find_binary_cols(dataframe, cat_cols):
    binary_cols = dataframe.select([col for col in cat_cols if dataframe.select(col).dtypes[0][1] == "string" and dataframe.select(col).distinct().count() == 2])
    return binary_cols

In [58]:
binary_cols = find_binary_cols(df, categorical_cols)
print(binary_cols.columns)

['Gender']


In [59]:
my_dict = {}
string_indexer_objs = []
string_indexer_output_names = []
ohe_input_names = []
ohe_output_names = []

for col_name in categorical_cols:
    my_dict[col_name+"_index_obj"] = StringIndexer() \
    .setHandleInvalid("skip") \
    .setInputCol(col_name) \
    .setOutputCol(col_name+"_indexed")
    
    string_indexer_objs.append(my_dict.get(col_name+"_index_obj"))
    string_indexer_output_names.append(col_name+"_indexed")
    
    if col_name not in binary_cols.columns:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")

In [60]:
not_to_hot_coded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(not_to_hot_coded)

['Gender_indexed']


In [61]:
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)
assembler = VectorAssembler().setHandleInvalid("skip").setInputCols(numerical_cols + not_to_hot_coded + ohe_output_names).setOutputCol("unscaled_features")
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

# Create Model

In [62]:
# split dataset
train_df, test_df = df.randomSplit([0.8, 0.2], seed=123)
print(train_df.count() , test_df.count())

7968 2032


In [66]:
# create Estimator
estimator = GBTClassifier(labelCol=label_col[0])

In [67]:
# create pipeline object
pipeline_obj = Pipeline().setStages(string_indexer_objs + [encoder, assembler, scaler, estimator])

In [68]:
# train model
pipeline_model = pipeline_obj.fit(train_df)

In [69]:
# prediction
transform_df = pipeline_model.transform(test_df)
transform_df.select("Exited", "prediction").show(5)

+------+----------+
|Exited|prediction|
+------+----------+
|     1|       1.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       1.0|
+------+----------+
only showing top 5 rows



In [77]:
acc = transform_df.select("Exited", "prediction")
print("Accuracy: " , acc.filter(acc["Exited"] == acc["prediction"]).count() / acc.count())

Accuracy:  0.8676181102362205


In [105]:
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction")
acc = evaluatorMulti.evaluate(transform_df, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(transform_df, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(transform_df, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(transform_df, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluatorMulti.evaluate(transform_df)
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.867618, precision: 0.887704, recall: 0.958182, f1: 0.856447, roc_auc: 0.856447


# Model Tuning

In [106]:
paramGrid = (ParamGridBuilder()
             .addGrid(estimator.maxDepth, [2, 4, 6])
             .addGrid(estimator.maxBins, [20, 30])
             .addGrid(estimator.maxIter, [10, 20])
             .build())
cv = CrossValidator(estimator=pipeline_obj,
                    estimatorParamMaps=paramGrid,
                    evaluator=BinaryClassificationEvaluator(labelCol=label_col[0]),
                    numFolds=10)

In [107]:
cv_model = cv.fit(train_df)

In [108]:
# prediction
y_pred = cv_model.transform(test_df)
y_pred.select("Exited", "prediction").show(5)

+------+----------+
|Exited|prediction|
+------+----------+
|     1|       1.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       1.0|
+------+----------+
only showing top 5 rows



In [109]:
acc = y_pred.select("Exited", "prediction")
print("Accuracy: " , acc.filter(acc["Exited"] == acc["prediction"]).count() / acc.count())

Accuracy:  0.8695866141732284


In [110]:
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluatorMulti.evaluate(y_pred)
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.869587, precision: 0.887521, recall: 0.961212, f1: 0.857938, roc_auc: 0.857938
