# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 60px; font-weight: bold;">SPARK MLlib</span>

In [74]:
# Import necessary libraries
!pip install pyspark
# Import necessary libraries
from pyspark.sql import SparkSession

# Import specific data types from Spark SQL types
from pyspark.sql.types import IntegerType, DoubleType, StringType

# Import LogisticRegression and RandomForest classifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier

# Import evaluators for binary and multiclass classification
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Import StringIndexer, OneHotEncoder, and VectorAssembler for feature engineering
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Import Pipeline for building a machine learning pipeline
from pyspark.ml import Pipeline



In [4]:
# Initialize Spark session
spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Data Understanding</span>

In [5]:
# Load the dataset into a DataFrame
data_path = '/content/churn.csv'
df = spark.read.csv(data_path, header = True, inferSchema = True)

In [13]:
# Show the first few rows of the DataFrame
df.show()

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|     Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93

In [14]:
# Print the schema of the DataFrame
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [17]:
#Showing the distinct values of the label
df.select('Exited').distinct().show()

+------+
|Exited|
+------+
|     1|
|     0|
+------+



In [18]:
# Showing the value counts for the label
# We have imbalance problem
df.groupby('Exited').count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Data Preparation</span>

In [19]:
# Drop unnecesarry columns
df = df.drop('RowNumber')
df = df.drop('CustomerID')
df = df.drop('Surname')

In [20]:
df.printSchema()

root
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [21]:
# Get the schema fields of the DataFrame
df.schema.fields

[StructField('CreditScore', IntegerType(), True),
 StructField('Geography', StringType(), True),
 StructField('Gender', StringType(), True),
 StructField('Age', IntegerType(), True),
 StructField('Tenure', IntegerType(), True),
 StructField('Balance', DoubleType(), True),
 StructField('NumOfProducts', IntegerType(), True),
 StructField('HasCrCard', IntegerType(), True),
 StructField('IsActiveMember', IntegerType(), True),
 StructField('EstimatedSalary', DoubleType(), True),
 StructField('Exited', IntegerType(), True)]

In [24]:
# Define numerical columns
numcols = [field.name for field in df.schema.fields if (isinstance(field.dataType, IntegerType)
or isinstance(field.dataType, DoubleType)) and field.name!='Exited']

numcols

['CreditScore',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary']

In [25]:
# Define categorical columns
catcols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]

catcols

['Geography', 'Gender']

In [33]:
# Create a list of StringIndexer (transformation that converts categorical values into numerical) stages for categorical cols
stringindexer_stages = [StringIndexer(inputCol = c, outputCol = 'strindexed_' + c) for c in catcols]

In [34]:
# Add a StringIndexer stage for the target column 'Exited'
stringindexer_stages = stringindexer_stages + [StringIndexer(inputCol = 'Exited', outputCol = 'label')]

In [35]:
# Create a list of OneHotEncoder stages for the string indexed categorical columns
onehotencoder_stages = [OneHotEncoder(inputCol = 'strindexed_' + c, outputCol = 'onehot_' + c) for c in catcols]

In [36]:
# Define feature columns that include numerical and one-hot encoded categorical columns
feature_columns = numcols + ['onehot_' + c for c in catcols]

In [37]:
# Create a VectorAssembler stage to combine features into a single vector column
vectorassembler_stage = VectorAssembler(inputCols = feature_columns, outputCol = 'features')

In [38]:
# Combine all preprocessing stages into a single list
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]

# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 40px; font-weight: bold;">Create a Pipeline</span>

In [39]:
# Create the pipeline using all preprocessing stages
pipeline = Pipeline(stages = all_stages)

In [40]:
# Fit the entire pipeline to the dataset
pipeline_model = pipeline.fit(df)

In [42]:
# Define final columns for the DataFrame
final_columns = feature_columns + ['features', 'label']

In [43]:
# Apply the trained pipeline to the DataFrame and select final columns
churn_df = pipeline_model.transform(df).select(final_columns)

In [44]:
# Show the contents of the churn_df DataFrame
churn_df.show()

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|   (2,[0],[1.0])|    (1,[],[])|[619.0,42.0,2.0,0...|  1.0|
|        608| 41|     1| 83807.86|            1|        0|             1|      112542.58|       (2,[],[])|    (1,[],[])|[608.0,41.0,1.0,8...|  0.0|
|        502| 42|     8| 159660.8|            3|        1|             0|      113931.57|   (2,[0],[1.0])|    (1,[],[])|[502.0,42.0,8.0,1...|  1.0|
|        699| 39|     1|      0.0|            2|        0|             0|       93826.63|   (2,[0],[1.0])|    (1

# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Train/Test split</span>

In [45]:
# Split the churn_df DataFrame into training and testing sets
train, test = churn_df.randomSplit([0.8,0.2], seed=123)

In [46]:
# Show the contents of the test DataFrame
test.show()

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|        350| 51|    10|      0.0|            1|        1|             1|      125823.79|   (2,[0],[1.0])|(1,[0],[1.0])|[350.0,51.0,10.0,...|  1.0|
|        358| 52|     8|143542.36|            3|        1|             0|      141959.11|       (2,[],[])|    (1,[],[])|[358.0,52.0,8.0,1...|  1.0|
|        376| 29|     4|115046.74|            4|        1|             0|      119346.88|   (2,[1],[1.0])|    (1,[],[])|[376.0,29.0,4.0,1...|  1.0|
|        376| 46|     6|      0.0|            1|        1|             0|      157333.69|   (2,[0],[1.0])|    (1

# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Modeling (Logistic Regression)</span>

In [47]:
# Fit a Logistic Regression model to the training data
logisticmodel = LogisticRegression(featuresCol = 'features', labelCol = 'label').fit(train)

# Make predictions on the test data using the trained model
y_pred = logisticmodel.transform(test)

# Show the predictions
y_pred.show()

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+--------------------+--------------------+----------+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|       rawPrediction|         probability|prediction|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+--------------------+--------------------+----------+
|        350| 51|    10|      0.0|            1|        1|             1|      125823.79|   (2,[0],[1.0])|(1,[0],[1.0])|[350.0,51.0,10.0,...|  1.0|[1.85105108171071...|[0.86425046479635...|       0.0|
|        358| 52|     8|143542.36|            3|        1|             0|      141959.11|       (2,[],[])|    (1,[],[])|[358.0,52.0,8.0,1...|  1.0|[-0.1145041111844...|[0.47140520802828...|       

# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Evaluation</span>

In [76]:
# Display the raw predictions, actual labels, and predicted labels without truncating values
# The symmetric raw predictions are a natural consequence of how the algorithm calculates class probabilities
y_pred.select('rawPrediction', 'probability', 'label', 'prediction').show(truncate = False)

+------------------------------------------+-----------------------------------------+-----+----------+
|rawPrediction                             |probability                              |label|prediction|
+------------------------------------------+-----------------------------------------+-----+----------+
|[1.851051081710714,-1.851051081710714]    |[0.8642504647963503,0.13574953520364974] |1.0  |0.0       |
|[-0.11450411118442849,0.11450411118442849]|[0.4714052080282856,0.5285947919717144]  |1.0  |1.0       |
|[0.8974742380474878,-0.8974742380474878]  |[0.7104301813449215,0.28956981865507847] |1.0  |0.0       |
|[0.5252646475400642,-0.5252646475400642]  |[0.6283779906478635,0.3716220093521365]  |1.0  |0.0       |
|[-0.10402353292296773,0.10402353292296773]|[0.47401754199991003,0.52598245800009]   |1.0  |1.0       |
|[2.75549506558964,-2.75549506558964]      |[0.9402229413236978,0.05977705867630223] |1.0  |0.0       |
|[3.2921737963710274,-3.2921737963710274]  |[0.9641593478815131,

In [53]:
# Calculate the accuracy of the model's predictions
correct_predictions = y_pred.filter(y_pred.label == y_pred.prediction).count()
total_predictions = y_pred.count()
accuracy = correct_predictions / total_predictions

# Print the accuracy results
print("Correct Predictions:", correct_predictions)
print("Total Predictions:", total_predictions)
print("Accuracy:", accuracy)

Correct Predictions: 1668
Total Predictions: 2032
Accuracy: 0.8208661417322834


# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 30px; font-weight: bold;">Overfitting check</span>

In [54]:
# Make predictions on the training data using the trained model
y_pred_train = logisticmodel.transform(train)

# Filter the training predictions to count correct predictions
correct_predictions_train = y_pred_train.filter(y_pred_train.label == y_pred_train.prediction).count()

# Count the total number of training predictions
total_predictions_train = y_pred_train.count()

# Calculate the accuracy on the training data
accuracy_train = correct_predictions_train / total_predictions_train

# Print the accuracy results for training data
print("Correct Predictions (Training):", correct_predictions_train)
print("Total Predictions (Training):", total_predictions_train)
print("Accuracy (Training):", accuracy_train)

Correct Predictions (Training): 6448
Total Predictions (Training): 7968
Accuracy (Training): 0.8092369477911646


In [66]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', metricName='areaUnderROC')

In [67]:
roc_auc = evaluator.evaluate(y_pred)

In [68]:
evaluatorMulti = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction')

In [69]:
acc = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'accuracy'})

In [70]:
precision = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'weightedPrecision'})

In [71]:
recall = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'weightedRecall'})

In [72]:
f1 = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'f1'})

In [73]:
print('ACCURACY:', acc, '\n' )
print('PRECISION: ', precision, '\n' )
print('RECALL: ', recall, '\n' )
print('F1: ', f1, '\n' )
print('ROC/AUC SCORE: ', roc_auc)

ACCURACY: 0.8208661417322834 

PRECISION:  0.803026712640557 

RECALL:  0.8208661417322834 

F1:  0.7814069496233127 

ROC/AUC SCORE:  0.7660563809014725


# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Modeling (Random Forest)</span>

In [75]:
# Building the Random Forest model, training the model and making predictions
# Train a RandomForestClassifier model on the training data
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100).fit(train)

# Make predictions on the test data using the trained random forest model
y_pred_rf = rf.transform(test)

# Display the predictions
y_pred_rf.show()

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+--------------------+--------------------+----------+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|       rawPrediction|         probability|prediction|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+--------------------+--------------------+----------+
|        350| 51|    10|      0.0|            1|        1|             1|      125823.79|   (2,[0],[1.0])|(1,[0],[1.0])|[350.0,51.0,10.0,...|  1.0|[78.0932339833462...|[0.78093233983346...|       0.0|
|        358| 52|     8|143542.36|            3|        1|             0|      141959.11|       (2,[],[])|    (1,[],[])|[358.0,52.0,8.0,1...|  1.0|[12.2946181565655...|[0.12294618156565...|       

# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Evaluation</span>

In [78]:
# Display the probablities, actual labels, and predicted labels without truncating values
y_pred_rf.select('probability', 'label', 'prediction').show(truncate = False)

+----------------------------------------+-----+----------+
|probability                             |label|prediction|
+----------------------------------------+-----+----------+
|[0.7809323398334629,0.21906766016653711]|1.0  |0.0       |
|[0.1229461815656557,0.8770538184343443] |1.0  |1.0       |
|[0.26149055329167514,0.7385094467083249]|1.0  |1.0       |
|[0.49239166462272466,0.5076083353772753]|1.0  |1.0       |
|[0.3432731787362391,0.6567268212637609] |1.0  |1.0       |
|[0.9011483084854496,0.0988516915145505] |1.0  |0.0       |
|[0.90841774758778,0.09158225241221991]  |0.0  |0.0       |
|[0.9044170856234095,0.09558291437659054]|0.0  |0.0       |
|[0.9031214249437838,0.09687857505621615]|0.0  |0.0       |
|[0.9027588816452973,0.09724111835470267]|0.0  |0.0       |
|[0.911798922455928,0.088201077544072]   |0.0  |0.0       |
|[0.8241036179166515,0.17589638208334854]|0.0  |0.0       |
|[0.2078383719352771,0.7921616280647229] |1.0  |1.0       |
|[0.8850061320429276,0.1149938679570724]

In [79]:
# Filter the Random Forest predictions to count correct predictions on the test data
correct_predictions_rf = y_pred_rf.filter(y_pred_rf.label == y_pred_rf.prediction).count()

# Count the total number of test predictions
total_predictions_rf = y_pred_rf.count()

# Calculate the accuracy on the test data
accuracy_rf = correct_predictions_rf / total_predictions_rf

# Print the accuracy results for the Random Forest model on the test data
print("Correct Predictions (Random Forest):", correct_predictions_rf)
print("Total Predictions (Random Forest):", total_predictions_rf)
print("Accuracy (Random Forest):", accuracy_rf)

Correct Predictions (Random Forest): 1740
Total Predictions (Random Forest): 2032
Accuracy (Random Forest): 0.8562992125984252


# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 30px; font-weight: bold;">Overfitting check</span>

In [80]:
# Make predictions on the training data using the trained Random Forest model
y_pred_train_rf = rf.transform(train)

# Filter the training predictions to count correct predictions
correct_predictions_train_rf = y_pred_train_rf.filter(y_pred_train_rf.label == y_pred_train_rf.prediction).count()

# Count the total number of training predictions
total_predictions_train_rf = y_pred_train_rf.count()

# Calculate the accuracy on the training data
accuracy_train_rf = correct_predictions_train_rf / total_predictions_train_rf

# Print the accuracy results for the Random Forest model on the training data
print("Correct Predictions (Random Forest - Training):", correct_predictions_train_rf)
print("Total Predictions (Random Forest - Training):", total_predictions_train_rf)
print("Accuracy (Random Forest - Training):", accuracy_train_rf)

Correct Predictions (Random Forest - Training): 6845
Total Predictions (Random Forest - Training): 7968
Accuracy (Random Forest - Training): 0.8590612449799196


In [81]:
acc_rf = evaluatorMulti.evaluate(y_pred_rf, {evaluatorMulti.metricName: 'accuracy'})

In [82]:
precision_rf = evaluatorMulti.evaluate(y_pred_rf, {evaluatorMulti.metricName: 'weightedPrecision'})

In [83]:
recall_rf = evaluatorMulti.evaluate(y_pred_rf, {evaluatorMulti.metricName: 'weightedRecall'})

In [84]:
f1_rf = evaluatorMulti.evaluate(y_pred_rf, {evaluatorMulti.metricName: 'f1'})

In [85]:
roc_auc_rf = evaluator.evaluate(y_pred_rf)

In [86]:
print('ACCURACY: ', acc_rf, '\n' )
print('PRECISION: ', precision_rf, '\n' )
print('RECALL: ', recall_rf, '\n' )
print('F1: ', f1_rf, '\n' )
print('ROC/AUC SCORE: ', roc_auc_rf)

ACCURACY:  0.8562992125984252 

PRECISION:  0.8544301033711963 

RECALL:  0.8562992125984251 

F1:  0.8331311941954137 

ROC/AUC SCORE:  0.8360142845475964


# <span style="color:#4040a1; font-family: Trebuchet MS; font-size: 50px; font-weight: bold;">Logistic Regression vs Random Forest</span>

In [87]:
from tabulate import tabulate

# Define the metrics for both models
models_metrics = [
    ['Model', 'Accuracy', 'Precision', 'Recall', 'F1 Score', 'ROC AUC'],
    ['Logistic Regression', acc, precision, recall, f1, roc_auc],
    ['Random Forest', acc_rf, precision_rf, recall_rf, f1_rf, roc_auc_rf]
]

# Print the comparison table
print(tabulate(models_metrics, headers='firstrow', tablefmt='pretty'))

+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        Model        |      Accuracy      |     Precision      |       Recall       |      F1 Score      |      ROC AUC       |
+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| Logistic Regression | 0.8208661417322834 | 0.803026712640557  | 0.8208661417322834 | 0.7814069496233127 | 0.7660563809014725 |
|    Random Forest    | 0.8562992125984252 | 0.8544301033711963 | 0.8562992125984251 | 0.8331311941954137 | 0.8360142845475964 |
+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
