## Import Libraries

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import plotly.express as px
import plotly.subplots as sp


Spark Session

In [2]:
# spark session created
spark = SparkSession.builder.appName("example").getOrCreate()

24/02/14 14:17:17 WARN Utils: Your hostname, NX00597 resolves to a loopback address: 127.0.1.1; using 10.28.81.139 instead (on interface enp3s0f0)
24/02/14 14:17:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/14 14:17:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Load Data

In [3]:
spark_df = spark.read.csv("./BankChurners.csv", header=True)

# show first 5 rows
spark_df.show(5)

                                                                                

+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+--------------------+---------------+--------------+-------------------+---------------------+----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|CLIENTNUM|   Attrition_Flag|Customer_Age|Gender|Dependent_count|Education_Level|Marital_Status|Income_Category|Card_Category|Months_on_book|Total_Relationship_Count|Months_Inactive_12_mon|Contacts_Count_12_mon|Credit_Limit|Total_Revolving_Bal|Avg_Open_To_Buy|Total_Amt_Chng_Q4_Q1|Total_Trans_Amt|Total_Trans_Ct|Total_Ct_Chng_Q4_Q1|Avg_Utilization_Ratio|Naive_Bayes_Classifier_Attrit

## Data Preprocessing

In [4]:
# show schema of spark dataframe
spark_df.printSchema()

root
 |-- CLIENTNUM: string (nullable = true)
 |-- Attrition_Flag: string (nullable = true)
 |-- Customer_Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Dependent_count: string (nullable = true)
 |-- Education_Level: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income_Category: string (nullable = true)
 |-- Card_Category: string (nullable = true)
 |-- Months_on_book: string (nullable = true)
 |-- Total_Relationship_Count: string (nullable = true)
 |-- Months_Inactive_12_mon: string (nullable = true)
 |-- Contacts_Count_12_mon: string (nullable = true)
 |-- Credit_Limit: string (nullable = true)
 |-- Total_Revolving_Bal: string (nullable = true)
 |-- Avg_Open_To_Buy: string (nullable = true)
 |-- Total_Amt_Chng_Q4_Q1: string (nullable = true)
 |-- Total_Trans_Amt: string (nullable = true)
 |-- Total_Trans_Ct: string (nullable = true)
 |-- Total_Ct_Chng_Q4_Q1: string (nullable = true)
 |-- Avg_Utilization_Ratio: string (nullable = 

Data Cleaning

In [5]:
# Toral rows in spark dataframe
print(f"Total Records Before Cleaning: {spark_df.count()}")

# Remove Null Values from any columns
spark_df = spark_df.na.drop('any')
print(f"Total Records After Cleaning: {spark_df.count()}")

Total Records Before Cleaning: 10127
Total Records After Cleaning: 10127


### Transformation

Remove Uncessary columns

In [6]:
# drop columns
drop_cols = ['Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_1',
               'Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_2']
spark_df = spark_df.drop(*drop_cols)

Type Casting

In [7]:
spark_df =  spark_df.withColumn('Customer_Age', spark_df.Customer_Age.cast('int'))
spark_df = spark_df.withColumn("Dependent_count", spark_df["Dependent_count"].cast("int"))
spark_df = spark_df.withColumn("Months_on_book", spark_df["Months_on_book"].cast("int"))
spark_df = spark_df.withColumn("Total_Relationship_Count", spark_df["Total_Relationship_Count"].cast("int"))
spark_df = spark_df.withColumn("Months_Inactive_12_mon", spark_df["Months_Inactive_12_mon"].cast("int"))
spark_df = spark_df.withColumn("Contacts_Count_12_mon", spark_df["Contacts_Count_12_mon"].cast("int"))
spark_df = spark_df.withColumn("Credit_Limit", spark_df["Credit_Limit"].cast("double"))
spark_df = spark_df.withColumn("Total_Revolving_Bal", spark_df["Total_Revolving_Bal"].cast("int"))
spark_df = spark_df.withColumn("Avg_Open_To_Buy", spark_df["Avg_Open_To_Buy"].cast("double"))
spark_df = spark_df.withColumn("Total_Amt_Chng_Q4_Q1", spark_df["Total_Amt_Chng_Q4_Q1"].cast("double"))
spark_df = spark_df.withColumn("Total_Trans_Amt", spark_df["Total_Trans_Amt"].cast("double"))
spark_df = spark_df.withColumn("Total_Trans_Ct", spark_df["Total_Trans_Ct"].cast("int"))
spark_df = spark_df.withColumn("Total_Ct_Chng_Q4_Q1", spark_df["Total_Ct_Chng_Q4_Q1"].cast("double"))
spark_df = spark_df.withColumn("Avg_Utilization_Ratio", spark_df["Avg_Utilization_Ratio"].cast("double"))


In [8]:
# show schema of spark dataframe
spark_df.printSchema()

root
 |-- CLIENTNUM: string (nullable = true)
 |-- Attrition_Flag: string (nullable = true)
 |-- Customer_Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Dependent_count: integer (nullable = true)
 |-- Education_Level: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income_Category: string (nullable = true)
 |-- Card_Category: string (nullable = true)
 |-- Months_on_book: integer (nullable = true)
 |-- Total_Relationship_Count: integer (nullable = true)
 |-- Months_Inactive_12_mon: integer (nullable = true)
 |-- Contacts_Count_12_mon: integer (nullable = true)
 |-- Credit_Limit: double (nullable = true)
 |-- Total_Revolving_Bal: integer (nullable = true)
 |-- Avg_Open_To_Buy: double (nullable = true)
 |-- Total_Amt_Chng_Q4_Q1: double (nullable = true)
 |-- Total_Trans_Amt: double (nullable = true)
 |-- Total_Trans_Ct: integer (nullable = true)
 |-- Total_Ct_Chng_Q4_Q1: double (nullable = true)
 |-- Avg_Utilization_Ratio: double (nul

Churn vs Non Churn Customer Count

In [18]:
churn_count = spark_df.groupby("Attrition_Flag").agg(count("Attrition_Flag"))

# Rename the columns
churn_df = churn_count.withColumnRenamed("count(Attrition_Flag)", "Count")

# Convert the Spark DataFrame to a Pandas DataFrame for plotting with Plotly Express
churn_pd_df = churn_df.toPandas()

# Create a bar chart using Plotly Express
fig = px.bar(churn_pd_df, x='Attrition_Flag', y='Count', title='Churn vs Non-Churn Customer Count')

# Set the size of the chart
fig.update_layout(width=600, height=400)

# Show the plot
fig.show()

Gender Distribution

In [19]:
# groupby gender
gender_df = spark_df.groupby("Gender").agg(count("Attrition_Flag"))

# Rename the columns
gender_df = gender_df.withColumnRenamed("count(Attrition_Flag)", "Count")

# Convert the Spark DataFrame to a Pandas DataFrame for plotting with Plotly Express
gender_pd_df = gender_df.toPandas()

# Create a bar chart using Plotly Express
fig = px.bar(gender_pd_df, x='Gender', y='Count', title='Attrition Flag Count by Gender')

# Set the size of the chart
fig.update_layout(width=600, height=400)

# Show the plot
fig.show()


Education Level Distribution for Churn and Non-Churn Customers

In [22]:
# print("Existing Customer Education Level")
# # Education Level Distribution for 'Existing Customer'
# spark_df.filter(col("Attrition_Flag")=="Existing Customer").groupby("Education_Level").agg(count("Education_Level")).show()

# print("Attrited Customer Education Level")
# # Education Level Distribution for 'Attrited Customer'
# spark_df.filter(col("Attrition_Flag")=="Attrited Customer").groupby("Education_Level").agg(count("Education_Level")).show()

# Filter data for Existing Customer and Attrited Customer
existing_customer_df = spark_df.filter(col("Attrition_Flag") == "Existing Customer").groupBy("Education_Level").agg(count("Education_Level").alias("Count"))
attrited_customer_df = spark_df.filter(col("Attrition_Flag") == "Attrited Customer").groupBy("Education_Level").agg(count("Education_Level").alias("Count"))

# Convert Spark DataFrames to Pandas DataFrames for plotting with Plotly Express
existing_customer_pd_df = existing_customer_df.toPandas()
attrited_customer_pd_df = attrited_customer_df.toPandas()

# Create two bar charts side by side
fig = sp.make_subplots(rows=1, cols=2, subplot_titles=["Existing Customer", "Attrited Customer"])

fig.add_trace(px.bar(existing_customer_pd_df, x='Education_Level', y='Count').data[0], row=1, col=1)
fig.add_trace(px.bar(attrited_customer_pd_df, x='Education_Level', y='Count').data[0], row=1, col=2)

# Update layout for better display
fig.update_layout(title_text="Education Level Distribution for Existing and Attrited Customers", showlegend=False)

# Show the plot
fig.show()

Age Ranges Distribution

In [76]:
spark_df.select(min("Customer_Age"), max("Customer_Age")).show()


+-----------------+-----------------+
|min(Customer_Age)|max(Customer_Age)|
+-----------------+-----------------+
|               26|               73|
+-----------------+-----------------+



In [28]:
# define udf (user define function to get age range)
age_range = udf(lambda age:'< 20' if age<20 else
                '20-25' if (age>=20 and age<25) else
                '25-30' if (age>=25 and age<30) else
                '30-35' if (age>=30 and age<35) else
                '35-40' if (age>=35 and age<40) else
                '40-45' if (age>=40 and age<45) else
                '45-50' if (age>=45 and age<50) else
                '50-55' if (age>=50 and age<55) else
                '55-60' if (age>=55 and age<60) else
                '60-65' if (age>=60 and age<65) else
                '65-70' if (age>=65 and age<70) else
                '70+'  if (age >= 70) else ''
                )

# spark_df.groupby("Customer_Age").agg(count("Customer_Age")).show()
age_df = spark_df.withColumn('Age_Range', age_range(spark_df.Customer_Age))

In [33]:
# Age Range Distribution for all customers
total_age_dist_df = age_df.groupby('Age_Range').agg(count('Customer_Age').alias('Total_Count')).sort('Age_Range')
total_age_dist_pd_df = total_age_dist_df.toPandas()

# Age Range Distribution for 'Existing Customer'
existing_customer_age_df = age_df.filter(col("Attrition_Flag")=="Existing Customer").groupby("Age_Range").agg(count("Age_Range").alias("Existing_Customer_Count")).sort('Age_Range')
existing_customer_age_pd_df = existing_customer_age_df.toPandas()

# Age Range Distribution for 'Attrited Customer'
attrited_customer_age_df = age_df.filter(col("Attrition_Flag")=="Attrited Customer").groupby("Age_Range").agg(count("Age_Range").alias("Attrited_Customer_Count")).sort('Age_Range')
attrited_customer_age_pd_df = attrited_customer_age_df.toPandas()

total_fig = px.bar(total_age_dist_pd_df, x='Age_Range', y='Total_Count')
total_fig.update_layout(title_text="Total")
total_fig.show()

# Create side-by-side bar charts
fig = sp.make_subplots(rows=1, cols=2, subplot_titles=["Existing Customer", "Attrited Customer"])

fig.add_trace(px.bar(existing_customer_age_pd_df, x='Age_Range', y='Existing_Customer_Count').data[0], row=1, col=1)
fig.add_trace(px.bar(attrited_customer_age_pd_df, x='Age_Range', y='Attrited_Customer_Count').data[0], row=1, col=2)

# Update layout for better display
fig.update_layout(title_text="Age Range Distribution for Existing, and Attrited Customers", showlegend=False)

# Show the plot
fig.show()


In [51]:
non_chrun_count =spark_df.filter(col("Attrition_Flag")=="Existing Customer").agg(count("Customer_Age").alias('Non_Churn_Count')).collect()[0]['Non_Churn_Count']
chrun_count = spark_df.filter(col("Attrition_Flag")=="Attrited Customer").agg(count("Customer_Age").alias('Non_Churn_Count')).collect()[0]['Non_Churn_Count']
non_chrun_count, chrun_count

existing_customer_age_df.withColumn('Percent', (col('Existing_Customer_Count')/non_chrun_count)*100).show()
attrited_customer_age_df.withColumn('Percent', (col('Attrited_Customer_Count')/chrun_count)*100).show()


+---------+-----------------------+--------------------+
|Age_Range|Existing_Customer_Count|             Percent|
+---------+-----------------------+--------------------+
|    25-30|                    178|   2.094117647058823|
|    30-35|                    456|   5.364705882352941|
|    35-40|                   1124|  13.223529411764707|
|    40-45|                   1768|                20.8|
|    45-50|                   2021|  23.776470588235295|
|    50-55|                   1605|   18.88235294117647|
|    55-60|                    887|  10.435294117647059|
|    60-65|                    361|   4.247058823529412|
|    65-70|                     98|  1.1529411764705881|
|      70+|                      2|0.023529411764705882|
+---------+-----------------------+--------------------+

+---------+-----------------------+------------------+
|Age_Range|Attrited_Customer_Count|           Percent|
+---------+-----------------------+------------------+
|    25-30|                     17|1