# Install Dependencies and Download Data


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-2.4.5-bin-hadoop2.7.tgz
!wget --no-cookies --no-check-certificate https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar zxvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

In [0]:
!wget https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv

In [0]:
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import datetime, time
import re, random, sys

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType, FloatType, LongType, DateType
from pyspark.sql.functions import struct, array, lit, monotonically_increasing_id, col, expr, when, concat, udf, split, size, lag, count, isnull
from pyspark.sql import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString, OneHotEncoderEstimator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from functools import reduce


In [0]:
spark = SparkSession.builder.appName("Spark ML Assignment").master("local[*]").getOrCreate()

# Load CSV into Spark Dataframe 

In [0]:
data = spark.read.load('banking_attrition.csv', format="csv", header=True, inferSchema=True)

In [0]:
data.show(10, False)

+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|uid    |age|age_group|profession    |marital_status|education  |default|housing|loan|gender|balance      |membership|charges|customer_contacts|attrition|
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|1000001|69 |60s      |retired       |married       |high school|no     |no     |no  |female|$50k - $100k |gold      |74     |5                |0        |
|1000002|46 |40s      |management    |married       |high school|yes    |no     |no  |male  |$10k - $50k  |silver    |149    |1                |0        |
|1000003|45 |40s      |management    |married       |high school|no     |no     |no  |female|$100k - $250k|platinum  |58     |5                |1        |
|1000004|54 |50s      |administration|divorced      |graduate   |no   

In [0]:
columns_to_drop = ['age', 'uid']
data = data.drop(*columns_to_drop)

# Data Exploration

In [0]:
data = data.where(col("attrition").isNotNull())
data.groupby('attrition').agg({'attrition': 'count'}).collect()

[Row(attrition=1, count(attrition)=10200),
 Row(attrition=0, count(attrition)=35011)]

In [0]:
data.groupBy('attrition').agg({'charges': 'mean'}).collect()

[Row(attrition=1, avg(charges)=239.645),
 Row(attrition=0, avg(charges)=90.01216760446717)]

In [0]:
data.groupBy('attrition').agg({'customer_contacts': 'mean'}).collect()

[Row(attrition=1, avg(customer_contacts)=2.981764705882353),
 Row(attrition=0, avg(customer_contacts)=1.6424266659050013)]

In [0]:
data.groupBy(['attrition', 'education']).count().collect()

[Row(attrition=1, education='unknown', count=281),
 Row(attrition=0, education='high school', count=9873),
 Row(attrition=1, education='graduate', count=5949),
 Row(attrition=1, education='college', count=1413),
 Row(attrition=0, education='unknown', count=1061),
 Row(attrition=0, education='graduate', count=20559),
 Row(attrition=1, education='high school', count=2557),
 Row(attrition=0, education='college', count=3518)]

# Preprocessing and Feature Engineering

In [0]:
data = data.withColumn('defaultBinary', when(col('default') == 'yes', 1).otherwise(0))
data = data.withColumn('housingBinary', when(col('housing') == 'yes', 1).otherwise(0))
data = data.withColumn('loanBinary', when(col('loan') == 'yes', 1).otherwise(0))
data = data.withColumn('genderBinary', when(col('gender') == 'female', 1).otherwise(0))

columns_to_drop = ['default', 'housing', 'loan', 'gender']
data = data.drop(*columns_to_drop)

In [0]:
oldColumns = [name for name in data.schema.names if name.endswith('Binary')]
renameColumns = columns_to_drop

data = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], renameColumns[idx]), range(len(oldColumns)), data)

In [0]:
train, test   = data.randomSplit([0.8, 0.2], seed=12345)

In [0]:
inputOneHot=['age_group', 'profession', 'marital_status', 'education', 'balance', 'membership']
outputOneHot=['cat_' + vec for vec in inputOneHot]

si1 = StringIndexer(inputCol="age_group", outputCol="age_group_index")
si2 = StringIndexer(inputCol="profession", outputCol="profession_index")
si3 = StringIndexer(inputCol="marital_status", outputCol="marital_status_index")
si4 = StringIndexer(inputCol="education", outputCol="education_index")
si5 = StringIndexer(inputCol="balance", outputCol="balance_index")
si6 = StringIndexer(inputCol="membership", outputCol="membership_index")

indexers = [si1, si2, si3, si4, si5, si6]

inputs = [indexer.getOutputCol() for indexer in indexers]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=outputOneHot)

In [0]:
indices = [s + "_index" for s in inputOneHot]
features = ['charges', 'customer_contacts', 'default', 'housing', 'loan', 'gender', *indices]
target = 'attrition'
tgt = StringIndexer(inputCol='attrition', outputCol='label').fit(train)

va  = VectorAssembler(inputCols=features, outputCol="features")

In [0]:
clf = RandomForestClassifier(featuresCol="features", labelCol=target, predictionCol="prediction", maxDepth=5, maxBins=350, seed=12345)
# clf = MultilayerPerceptronClassifier(featuresCol="features", labelCol=target, predictionCol="prediction", layers=[len(features), len(features)+1, len(features), 2], seed=12345)

labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=tgt.labels)

# Fit and Train a Model

In [0]:
pipeline_run  = Pipeline(stages=[*indexers, va, tgt, clf, labelConverter])

model_run = pipeline_run.fit(train)

In [0]:
predictions = model_run.transform(test)

predictions.show(3)

+---------+--------------+--------------+---------+-----------+----------+-------+-----------------+---------+-------+-------+----+------+---------------+----------------+--------------------+---------------+-------------+----------------+--------------------+-----+--------------------+--------------------+----------+--------------+
|age_group|    profession|marital_status|education|    balance|membership|charges|customer_contacts|attrition|default|housing|loan|gender|age_group_index|profession_index|marital_status_index|education_index|balance_index|membership_index|            features|label|       rawPrediction|         probability|prediction|predictedLabel|
+---------+--------------+--------------+---------+-----------+----------+-------+-----------------+---------+-------+-------+----+------+---------------+----------------+--------------------+---------------+-------------+----------------+--------------------+-----+--------------------+--------------------+----------+-----------

# Model Evaluation

In [0]:
evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC', rawPredictionCol='rawPrediction', labelCol='label')

auc = evaluator.evaluate(predictions)

print(auc)
#auc with random forest: 0.928
#auc with MLP: 0.879
#Random forest performs better with minimal tuning

0.9279628661476019
