<a href="https://colab.research.google.com/github/sandypreiss/IAA-2019/blob/master/Sandy's_SparkML_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Spark ML Assignment**
The goal of this assignment is to train a Spark ML Model. 

You may do this in any way that you'd like, but I've provided this template as a helpful framework, which you may find helpful. 

Assignment:

1) Load the CSV into a Spark Dataframe. I've added code to automatically download "banking_attrition.csv", however, if you have another dataset you'd like to analyze, feel free to use that instead. 

2) Preprocess the data, applying any data exploration or cleaning techniques. 

3) Split the model into train and test

4) Train the model (Please predict "attrition" probability if you use the banking_attrition.csv" file). 

5) Apply the model again your test dataframe

6) Display model evaluation 



## **Install PySpark Dependencies**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-2.4.5-bin-hadoop2.7.tgz
!wget --no-cookies --no-check-certificate https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar zxvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

## **Load Data**

In [3]:
!wget https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv

--2020-03-30 18:00:21--  https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4021593 (3.8M) [text/plain]
Saving to: ‘banking_attrition.csv’


2020-03-30 18:00:21 (20.5 MB/s) - ‘banking_attrition.csv’ saved [4021593/4021593]



## **Import Python / Spark Libraries**

In [0]:
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import datetime, time
import re, random, sys
import spark_df_profiling

# Note - Not all of these will be used, but I've added them for your reference as a "getting started"
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType, FloatType, LongType, DateType
from pyspark.sql.functions import struct, array, lit, monotonically_increasing_id, col, expr, when, concat, udf, split, size, lag, count, isnull
from pyspark.sql import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor, LinearRegression, GeneralizedLinearRegression, RandomForestRegressor
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row


## **Create Spark Session**

In [0]:
spark = SparkSession.builder.appName("Spark ML Assignment").master("local[*]").getOrCreate()

## **Load CSV Data into Spark Dataframe**

In [0]:
schema = StructType([
    StructField("uid", IntegerType()),
    StructField("age", IntegerType()),
    StructField("age_group", StringType()),
    StructField("profession", StringType()),
    StructField("marital_status", StringType()),
    StructField("education", StringType()),
    StructField("default", StringType()),
    StructField("housing", StringType()),
    StructField("loan", StringType()),
    StructField("gender", StringType()),
    StructField("balance", StringType()),
    StructField("membership", StringType()),
    StructField("charges", IntegerType()),
    StructField("customer_contacts", IntegerType()),
    StructField("attrition", IntegerType())
])

rawdata = spark.read.load('banking_attrition.csv', format="csv", header=True, schema=schema)


## **Data Exploration**
Perform at least one data exploration of your choice (This could be a basic show(), an aggregation/[groupby](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy), [correlation](https://spark.apache.org/docs/latest/ml-statistics.html#correlation), [summarizer](https://spark.apache.org/docs/latest/ml-statistics.html#summarizer), etc.)

In [26]:
rawdata.describe().show()

+-------+------------------+------------------+---------+--------------+--------------+---------+-------+-------+-----+------+-------------+----------+------------------+------------------+-------------------+
|summary|               uid|               age|age_group|    profession|marital_status|education|default|housing| loan|gender|      balance|membership|           charges| customer_contacts|          attrition|
+-------+------------------+------------------+---------+--------------+--------------+---------+-------+-------+-----+------+-------------+----------+------------------+------------------+-------------------+
|  count|             45211|             45211|    45211|         45211|         45211|    45211|  45211|  45211|45211| 45211|        45211|     45211|             45211|             45211|              45211|
|   mean|         1022606.0| 41.06354648205083|     null|          null|          null|     null|   null|   null| null|  null|         null|      null|123.77065

**Notes:**
*   No null values to worry about. 
*   Only need one of age and age_group.
*   Drop uid from features vector

*Find number of levels for categoricals - will convert binary with stringindexer and categorical with dummy encoder*

In [29]:
numeric_columns     = [c[0] for c in rawdata.dtypes if c[1] not in ['string']]
categorical_columns = [c[0] for c in rawdata.dtypes if c[1] in ['string']]
print(numeric_columns)
print(categorical_columns)

['uid', 'age', 'charges', 'customer_contacts', 'attrition']
['age_group', 'profession', 'marital_status', 'education', 'default', 'housing', 'loan', 'gender', 'balance', 'membership']


In [33]:
for i in categorical_columns:
  rawdata.select(i).distinct().show()

+---------+
|age_group|
+---------+
|      40s|
|      80s|
|      70s|
|      90s|
|      20s|
|      50s|
|      60s|
|      U20|
|      30s|
+---------+

+--------------+
|    profession|
+--------------+
|    management|
|       retired|
|       unknown|
|administration|
|     executive|
| manufacturing|
|       student|
|  entrepreneur|
|    consulting|
|    technician|
| self_employed|
|    unemployed|
+--------------+

+--------------+
|marital_status|
+--------------+
|      divorced|
|       married|
|        single|
+--------------+

+-----------+
|  education|
+-----------+
|   graduate|
|    college|
|    unknown|
|high school|
+-----------+

+-------+
|default|
+-------+
|     no|
|    yes|
+-------+

+-------+
|housing|
+-------+
|     no|
|    yes|
+-------+

+----+
|loan|
+----+
|  no|
| yes|
+----+

+------+
|gender|
+------+
|female|
|  male|
+------+

+-------------+
|      balance|
+-------------+
|  $10k - $50k|
| $50k - $100k|
|   Over $250k|
|$100k - $250k|
|   U

Binary vars: default, housing, loan, gender

In [0]:
binary_columns = ['default', 'housing', 'loan', 'gender']
categorical_columns = ['age_group', 'profession', 'marital_status', 'education', 'balance', 'membership']

## **Split the Spark Dataframe into Train and Test**
You could use a [randomsplit](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) here, a [Cross Validator](https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation), or another approach of your choice.

In [0]:
train, test   = rawdata.randomSplit([0.8, 0.2], seed=12345)

## **Feature Engineering**
During this step, I'd like to see you convert at least one STRING variable (such as gender, membership, education or another variable of your choice) into a numeric representation so that you can use it as one of the model inputs. You can convert the string to a numeric by using [one-hot encoding](https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator), a [stringindexer](https://spark.apache.org/docs/latest/ml-features.html#stringindexer), etc

You will also want to define a ML model object. An example of this would be a random forest, gradient boosting, or some other approach listed [here](https://spark.apache.org/docs/latest/ml-classification-regression.html). 

In [0]:
# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices
si1 = StringIndexer(inputCol="default", outputCol="default_index")
si2 = StringIndexer(inputCol="housing", outputCol="housing_index")
si3 = StringIndexer(inputCol="loan", outputCol="loan_index")
si4 = StringIndexer(inputCol="gender", outputCol="gender_index")
#si5 = StringIndexer(inputCol="age_group", outputCol="age_group_index")
si6 = StringIndexer(inputCol="profession", outputCol="profession_index")
si7 = StringIndexer(inputCol="marital_status", outputCol="marital_status_index")
si8 = StringIndexer(inputCol="education", outputCol="education_index")
si9 = StringIndexer(inputCol="balance", outputCol="balance_index")
si10 = StringIndexer(inputCol="membership", outputCol="membership_index")

target   = 'attrition'
features = ['default_index','housing_index','loan_index','gender_index','profession_index','marital_status_index','education_index','balance_index','membership_index','age', 'charges', 'customer_contacts']

#encode the Label column: feature indexer
fi = StringIndexer(inputCol='attrition', outputCol='label').fit(train)

# Pipelines API requires that input variables are passed in  a vector
va  = VectorAssembler(inputCols=features, outputCol="features")

# run the algorithm and build model taking the default settings
gbm = GBTClassifier(featuresCol="features", labelCol=target, predictionCol="prediction", seed=12345)

# Convert indexed labels back to original labels, label converter
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=fi.labels)

## **Fit/Train ML Model**

In [0]:
# Build the machine learning pipeline
pipeline_run  = Pipeline(stages=[si1, si2, si3, si4, si6, si7, si8, si9, si10, fi, va, gbm, labelConverter])

# Build model. 
# The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages.
model_run = pipeline_run.fit(train)

## **Make Predictions**
Use your model to make predications against the Test (holdout) Dataframe

In [47]:
# Make predictions.
predictions = model_run.transform(test)
predictions.show(10)

+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+-------------+-------------+----------+------------+----------------+--------------------+---------------+-------------+----------------+-----+--------------------+--------------------+--------------------+----------+--------------+
|    uid|age|age_group|    profession|marital_status|  education|default|housing|loan|gender|      balance|membership|charges|customer_contacts|attrition|default_index|housing_index|loan_index|gender_index|profession_index|marital_status_index|education_index|balance_index|membership_index|label|            features|       rawPrediction|         probability|prediction|predictedLabel|
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+-------------+-------------+----------+------------+----------------+---

## **Evaluate Model against Test Dataframe**
Display model fit statistics, such as RMSE or MSE

In [54]:
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
roc = evaluator.evaluate(predictions)
print('AUC = ' + str(eval))

AUC = 0.932458998190936
