**Header and Dependency Setup**

In [1]:
###############################################################################
# IAA 2020 Big Data Session 4 Assignment #2
# @author: Preston MacDonald
# @date: 3-29-
###############################################################################


#Install dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-2.4.5-bin-hadoop2.7.tgz
!wget --no-cookies --no-check-certificate https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar zxvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

--2020-03-30 04:25:16--  https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 2a01:4f8:10a:201a::2
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 232530699 (222M) [application/x-gzip]
Saving to: ‘spark-2.4.5-bin-hadoop2.7.tgz’


2020-03-30 04:25:26 (24.0 MB/s) - ‘spark-2.4.5-bin-hadoop2.7.tgz’ saved [232530699/232530699]

spark-2.4.5-bin-hadoop2.7/
spark-2.4.5-bin-hadoop2.7/licenses/
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-jtransforms.html
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd-jni.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-xmlenc.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-vis.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-spire.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sorttable.js.txt
spark-2.4.5-bin-hadoop2.7/licenses

**Load Data**

In [2]:
#Load dataset
!wget https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv

--2020-03-30 04:25:35--  https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4021593 (3.8M) [text/plain]
Saving to: ‘banking_attrition.csv.4’


2020-03-30 04:25:36 (15.1 MB/s) - ‘banking_attrition.csv.4’ saved [4021593/4021593]



**Import Python / Spark Libraries**


In [0]:
#Import Python / Spark Libraries
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import datetime, time
import re, random, sys

# Note - Not all of these will be used, but I've added them for your reference as a "getting started"
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType, FloatType, LongType, DateType
from pyspark.sql.functions import struct, array, lit, monotonically_increasing_id, col, expr, when, concat, udf, split, size, lag, count, isnull
from pyspark.sql import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor, LinearRegression, GeneralizedLinearRegression, RandomForestRegressor
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator

#My added imports
from pyspark.ml.stat import Correlation
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import mean_squared_error
from pyspark.ml.classification import RandomForestClassifier


**Create Spark Session**

In [0]:
#Create spark session
spark = SparkSession.builder.appName("Spark ML Assignment").master("local[*]").getOrCreate()

**Load CSV Data into Spark Dataframe**

In [0]:
#Load CSV data into Spark Dataframe
rawData = spark.read.load('banking_attrition.csv',
                               format = 'csv',
                               header = True,
                               inferSchema = True)


**Data Exploration**

In [6]:
####### Using show and groupBys
#Show top 10 rows in dataset
print("Showing top 10 rows:")
rawData.show(n=10)

#group by profession
print("\nCount of obs by profession:")
rawData.groupBy("profession").count().show()

#Average age for balance
print("\nAverage age for each balance:")
rawData.groupBy("balance").avg("age").show()


Showing top 10 rows:
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|    uid|age|age_group|    profession|marital_status|  education|default|housing|loan|gender|      balance|membership|charges|customer_contacts|attrition|
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|1000001| 69|      60s|       retired|       married|high school|     no|     no|  no|female| $50k - $100k|      gold|     74|                5|        0|
|1000002| 46|      40s|    management|       married|high school|    yes|     no|  no|  male|  $10k - $50k|    silver|    149|                1|        0|
|1000003| 45|      40s|    management|       married|high school|     no|     no|  no|female|$100k - $250k|  platinum|     58|                5|        1|
|1000004| 54|      50s|administration|      divor

In [7]:
####### Using correlation and Summarizer

#Select features
features = ["age", "charges", "customer_contacts", "attrition"]

va = VectorAssembler(inputCols= features, outputCol = "features") #Create Vector Assember
featuresData = va.transform(rawData) #transform original dataset to include new col of vectors
featuresData.show(n=2)

#Calculate correlation and display
r1 = Correlation.corr(featuresData, "features", method = 'pearson').head()
print("Pearson correlation matrix:\n" + str(r1[0]))

#Calculate mean statistic for the list of features in order
summarizer = Summarizer.metrics("mean")
featuresData.select(summarizer.summary(featuresData.features)).show(truncate=False)

+-------+---+---------+----------+--------------+-----------+-------+-------+----+------+------------+----------+-------+-----------------+---------+--------------------+
|    uid|age|age_group|profession|marital_status|  education|default|housing|loan|gender|     balance|membership|charges|customer_contacts|attrition|            features|
+-------+---+---------+----------+--------------+-----------+-------+-------+----+------+------------+----------+-------+-----------------+---------+--------------------+
|1000001| 69|      60s|   retired|       married|high school|     no|     no|  no|female|$50k - $100k|      gold|     74|                5|        0| [69.0,74.0,5.0,0.0]|
|1000002| 46|      40s|management|       married|high school|    yes|     no|  no|  male| $10k - $50k|    silver|    149|                1|        0|[46.0,149.0,1.0,0.0]|
+-------+---+---------+----------+--------------+-----------+-------+-------+----+------+------------+----------+-------+-----------------+------

**Split the Spark Dataframe into Train and Test**

In [8]:
#Splitting dataframe with randomsplit
splits = rawData.randomSplit(weights= [.7, .3], seed= 12345)

print("training obs count: ", splits[0].count())
print("test obs count: ", splits[1].count())

train = splits[0]
test = splits[1]

training obs count:  31454
test obs count:  13757


**Feature Engineering & Define Model**


In [0]:
#################################################
#Feature Engineering
#################################################
catFeatures = ["age_group", "profession", "marital_status", "education", 
               "default", "housing", "loan", "gender", "balance", "membership"]
target = "attrition"

#Create copies of train and text to create index versions 
train_indexed = train
test_indexed = test

# Loop through categorical features from list above and create indexed versions
# in both training and test
for feature in catFeatures:
  indexer = StringIndexer(inputCol = feature, outputCol = (feature + "Index"))
  train_indexed = indexer.fit(train_indexed).transform(train_indexed)
  test_indexed = indexer.fit(test_indexed).transform(test_indexed)


#prep target for pipeline
fi = StringIndexer(inputCol= target, outputCol = 'label').fit(train_indexed)

#################################################
#Model Prep & Definition
#################################################

#Create list of all inputs
modelFeatures = ["age", "age_groupIndex", "professionIndex", 
                 "marital_statusIndex", "educationIndex", "defaultIndex", 
                 "housingIndex", "loanIndex", "genderIndex", "balanceIndex", 
                 "membershipIndex", "charges", "customer_contacts"]

#Inputs as vector
va = VectorAssembler(inputCols= modelFeatures, outputCol = "features")


#Define Model
lr = LogisticRegression()
rf = RandomForestClassifier()

#Build Label Converter
labelConverter = IndexToString(inputCol="prediction", 
                               outputCol= "predictionLabel",
                               labels = fi.labels)

**Fit/Train ML Model**

In [11]:
pipeline = Pipeline(stages=[fi, va, rf])

model = pipeline.fit(train_indexed)

type(model)

pyspark.ml.pipeline.PipelineModel

**Make Predictions on Test Set**

In [12]:
pred = model.transform(test_indexed)
pred.show(10,False)

+-------+---+---------+-------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+--------------+---------------+-------------------+--------------+------------+------------+---------+-----------+------------+---------------+-----+--------------------------------------------------------+----------------------------------------+-----------------------------------------+----------+
|uid    |age|age_group|profession   |marital_status|education  |default|housing|loan|gender|balance      |membership|charges|customer_contacts|attrition|age_groupIndex|professionIndex|marital_statusIndex|educationIndex|defaultIndex|housingIndex|loanIndex|genderIndex|balanceIndex|membershipIndex|label|features                                                |rawPrediction                           |probability                              |prediction|
+-------+---+---------+-------------+--------------+-----------+-------+-------+----+------+

**Evaluate Model Against Test Dataframe**

In [13]:
############################################################
#Confusion Matrix
############################################################
tp = pred[(pred.label == 1) & (pred.prediction == 1)].count()
tn = pred[(pred.label == 0) & (pred.prediction == 0)].count()
fp = pred[(pred.label == 0) & (pred.prediction == 1)].count()
fn = pred[(pred.label == 1) & (pred.prediction == 0)].count()
print ("True Positives:", tp)
print ("True Negatives:", tn)
print ("False Positives:", fp)
print ("False Negatives:", fn)
print ("Total", pred.count())
print("\n")

############################################################
#Precision and recall
############################################################
print("recall: ", float(tp)/(tp + fn))
print("precision: ", float(tp) / (tp + fp))
print("accuracy: ", (float(tp) + tn) / (pred.count()))
print("\n")
############################################################
#MSE
############################################################
true_target = [int(row.label) for row in pred.select('label').collect()]
pred_target = [int(row.prediction) for row in pred.select('prediction').collect()]

print("MSE: ", mean_squared_error(true_target, pred_target))
print("RMSE: ", mean_squared_error(true_target, pred_target, squared = False))


True Positives: 2438
True Negatives: 10609
False Positives: 16
False Negatives: 694
Total 13757


recall:  0.7784163473818646
precision:  0.993480032599837
accuracy:  0.9483899105909719


MSE:  0.05161008940902813
RMSE:  0.22717854081983213
