In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-2.4.5-bin-hadoop2.7.tgz
!wget --no-cookies --no-check-certificate https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar zxvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

In [0]:
!wget https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv

--2020-03-30 22:14:23--  https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4021593 (3.8M) [text/plain]
Saving to: ‘banking_attrition.csv.1’


2020-03-30 22:14:24 (12.4 MB/s) - ‘banking_attrition.csv.1’ saved [4021593/4021593]



In [0]:
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import datetime, time
import re, random, sys

# Note - Not all of these will be used, but I've added them for your reference as a "getting started"
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType, FloatType, LongType, DateType
from pyspark.sql.functions import struct, array, lit, monotonically_increasing_id, col, expr, when, concat, udf, split, size, lag, count, isnull
from pyspark.sql import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor, LinearRegression, GeneralizedLinearRegression, RandomForestRegressor
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator

In [0]:
#creat spark session
spark = SparkSession.builder.appName("Spark ML Assignment").master("local[*]").getOrCreate()

In [0]:
#load in the data
training = spark.read.load('banking_attrition.csv', format="csv", header=True, inferSchema=True)

In [0]:
training.show(10,False)

+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|uid    |age|age_group|profession    |marital_status|education  |default|housing|loan|gender|balance      |membership|charges|customer_contacts|attrition|
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|1000001|69 |60s      |retired       |married       |high school|no     |no     |no  |female|$50k - $100k |gold      |74     |5                |0        |
|1000002|46 |40s      |management    |married       |high school|yes    |no     |no  |male  |$10k - $50k  |silver    |149    |1                |0        |
|1000003|45 |40s      |management    |married       |high school|no     |no     |no  |female|$100k - $250k|platinum  |58     |5                |1        |
|1000004|54 |50s      |administration|divorced      |graduate   |no   

In [0]:
#drop unique identifiers
columns_todrop=['uid', 'age']
training = training.drop(*columns_todrop)

In [0]:
training.dtypes

[('age_group', 'string'),
 ('profession', 'string'),
 ('marital_status', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('gender', 'string'),
 ('balance', 'string'),
 ('membership', 'string'),
 ('charges', 'int'),
 ('customer_contacts', 'int'),
 ('attrition', 'int')]

In [0]:
#remove null values
training = training.where(col("attrition").isNotNull())

#summary statistics
training.describe().show()

training.groupby("attrition").avg("charges").show()
training.groupby("attrition").avg("customer_contacts").show()

+-------+---------+--------------+--------------+---------+-------+-------+-----+------+-------------+----------+------------------+------------------+-------------------+
|summary|age_group|    profession|marital_status|education|default|housing| loan|gender|      balance|membership|           charges| customer_contacts|          attrition|
+-------+---------+--------------+--------------+---------+-------+-------+-----+------+-------------+----------+------------------+------------------+-------------------+
|  count|    45211|         45211|         45211|    45211|  45211|  45211|45211| 45211|        45211|     45211|             45211|             45211|              45211|
|   mean|     null|          null|          null|     null|   null|   null| null|  null|         null|      null|123.77065315962929| 1.944593129990489|0.22560881201477517|
| stddev|     null|          null|          null|     null|   null|   null| null|  null|         null|      null| 90.36019089570658|2.104808

In [0]:
#creating binary indicator for binary variables
training = training.withColumn('defaultBinary', when(col('default') == 'yes', 1).otherwise(0))
training = training.withColumn('housingBinary', when(col('housing') == 'yes', 1).otherwise(0))
training = training.withColumn('loanBinary', when(col('loan') == 'yes', 1).otherwise(0))
training = training.withColumn('genderBinary', when(col('gender') == 'female', 1).otherwise(0))

columns_to_drop = ['default', 'housing', 'loan', 'gender']
training = training.drop(*columns_to_drop)

In [0]:
#test train split
training_attrition, testing_attrition = training.randomSplit([0.8, 0.2], seed=12345)

In [0]:
#creating string indexers for string columns
categorical_columns = [item[0] for item in training_attrition.dtypes if item[1].startswith('string') ]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(training_attrition) for column in categorical_columns ]


In [0]:
#build model pipeline
target='attrition'

features = ['charges', 'customer_contacts','defaultBinary', 'housingBinary',
                   'loanBinary','genderBinary','age_group_index',
                   'profession_index', 'marital_status_index',
                   'education_index','balance_index',
                   'membership_index']

#encode the Label column: feature indexer
fi=StringIndexer(inputCol='attrition', outputCol='label').fit(training_attrition)
# Pipelines API requires that input variables are passed in  a vector
va = VectorAssembler(inputCols=features, outputCol="features")


In [0]:
# run the algorithm and build model
rfr = RandomForestRegressor(featuresCol="features", labelCol=target, predictionCol="prediction", maxDepth=5, maxBins=350, seed=12345)

# Convert indexed labels back to original labels, label converter
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=fi.labels)

In [0]:
#Training the model
# Build the machine learning pipeline
pipeline_train = Pipeline(stages=[*indexers, va, rfr, labelConverter])

In [0]:
rf_model = pipeline_train.fit(training_attrition)

In [0]:
# Make predictions.
predictions = rf_model.transform(testing_attrition)

# show the results
predictions.show(3)

+---------+--------------+--------------+---------+-----------+----------+-------+-----------------+---------+-------------+-------------+----------+------------+---------------+----------------+--------------------+---------------+-------------+----------------+--------------------+------------------+--------------+
|age_group|    profession|marital_status|education|    balance|membership|charges|customer_contacts|attrition|defaultBinary|housingBinary|loanBinary|genderBinary|age_group_index|profession_index|marital_status_index|education_index|balance_index|membership_index|            features|        prediction|predictedLabel|
+---------+--------------+--------------+---------+-----------+----------+-------+-----------------+---------+-------------+-------------+----------+------------+---------------+----------------+--------------------+---------------+-------------+----------------+--------------------+------------------+--------------+
|      20s|administration|      divorced|  

In [0]:
#generate results
predictions=predictions.select(predictions["attrition"],predictions["predictedLabel"],predictions["prediction"])
type(predictions)

pyspark.sql.dataframe.DataFrame

In [0]:
predictions.show(5)

+---------+--------------+------------------+
|attrition|predictedLabel|        prediction|
+---------+--------------+------------------+
|        1|             0|0.9937803263039455|
|        1|             0|0.9965679371741059|
|        1|             0|0.9965679371741059|
|        1|             0|0.9965679371741059|
|        1|             0|0.9971278690455131|
+---------+--------------+------------------+
only showing top 5 rows



In [0]:
# Model Evaluation
#Evaluate Results
evaluator = RegressionEvaluator(metricName="rmse", labelCol=target)  # rmse (default)|mse|r2|mae
RMSE = evaluator.evaluate(predictions)
print('RMSE: ' + str(RMSE))

evaluator = RegressionEvaluator(metricName="mae", labelCol=target)  # rmse (default)|mse|r2|mae
MAE = evaluator.evaluate(predictions) # Mean Absolute Error
print('MSE: ' + str(MAE))

RMSE: 0.21897943846019907
MSE: 0.09610222832883476
