## **Spark ML Assignment**
The goal of this assignment is to (1) use Spark to analyze and process data and (2) to train a Spark ML Model.

This notebook was run using Google Colab. 



## **Install PySpark Dependencies**
code provided by instructor for this part, load data, and for some of the imports of libraries.

In [None]:
# Install Spark dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-3.2.3-bin-hadoop3.2.tgz
!wget --no-cookies --no-check-certificate https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
!tar zxvf spark-3.2.3-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark==3.2.3

## **Load Data**

In [None]:
!wget https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv

## **Import Python / Spark Libraries**

In [None]:
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

import datetime, time
import re, random, sys

# Note - Not all of these will be used. 
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType, FloatType, LongType, DateType
from pyspark.sql.functions import struct, array, lit, monotonically_increasing_id, col, expr, when, concat, udf, split, size, lag, count, isnull
from pyspark.sql import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor, LinearRegression, GeneralizedLinearRegression, RandomForestRegressor
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator

In [None]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplitModel

## **Create Spark Session**

In [None]:
spark = SparkSession.builder.appName("Spark ML Assignment").master("local[*]").getOrCreate()

## **Load CSV Data into Spark Dataframe**

In [None]:
input_bucket = 'banking_attrition.csv' # this was loaded into local colab drive with wget
df = spark.read.csv(input_bucket, header=True, inferSchema=True)
df.show(10)

+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|    uid|age|age_group|    profession|marital_status|  education|default|housing|loan|gender|      balance|membership|charges|customer_contacts|attrition|
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|1000001| 69|      60s|       retired|       married|high school|     no|     no|  no|female| $50k - $100k|      gold|     74|                5|        0|
|1000002| 46|      40s|    management|       married|high school|    yes|     no|  no|  male|  $10k - $50k|    silver|    149|                1|        0|
|1000003| 45|      40s|    management|       married|high school|     no|     no|  no|female|$100k - $250k|  platinum|     58|                5|        1|
|1000004| 54|      50s|administration|      divorced|   graduate|     

## **Data Exploration**
Perform at least one data exploration of your choice (This could be a basic show(), an aggregation/[groupby](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy), [correlation](https://spark.apache.org/docs/latest/ml-statistics.html#correlation), [summarizer](https://spark.apache.org/docs/latest/ml-statistics.html#summarizer), etc.)

In [None]:
df.describe().show()

+-------+------------------+------------------+---------+--------------+--------------+---------+-------+-------+-----+------+-------------+----------+------------------+------------------+-------------------+
|summary|               uid|               age|age_group|    profession|marital_status|education|default|housing| loan|gender|      balance|membership|           charges| customer_contacts|          attrition|
+-------+------------------+------------------+---------+--------------+--------------+---------+-------+-------+-----+------+-------------+----------+------------------+------------------+-------------------+
|  count|             45211|             45211|    45211|         45211|         45211|    45211|  45211|  45211|45211| 45211|        45211|     45211|             45211|             45211|              45211|
|   mean|         1022606.0| 41.06354648205083|     null|          null|          null|     null|   null|   null| null|  null|         null|      null|123.77065

In [None]:
df.printSchema() # what did it decide?


root
 |-- uid: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- age_group: string (nullable = true)
 |-- profession: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- membership: string (nullable = true)
 |-- charges: integer (nullable = true)
 |-- customer_contacts: integer (nullable = true)
 |-- attrition: integer (nullable = true)



In [None]:
print('\r\nTotal Records: ' + str(df.count()) + '\r\n\r\n')
for i in df.dtypes: print(i)


Total Records: 45211


('uid', 'int')
('age', 'int')
('age_group', 'string')
('profession', 'string')
('marital_status', 'string')
('education', 'string')
('default', 'string')
('housing', 'string')
('loan', 'string')
('gender', 'string')
('balance', 'string')
('membership', 'string')
('charges', 'int')
('customer_contacts', 'int')
('attrition', 'int')


In [None]:
df.groupBy("gender", "profession").avg("charges").show(20)


+------+--------------+------------------+
|gender|    profession|      avg(charges)|
+------+--------------+------------------+
|  male|       retired| 90.06541129831517|
|female|       unknown|  95.6412213740458|
|  male|    consulting|105.07890222984562|
|female|       student|106.88148148148149|
|female|administration|196.62704545454545|
|  male|       student| 104.4538043478261|
|  male| self_employed| 99.64225352112676|
|female|    technician|  97.8839156800462|
|  male|  entrepreneur| 96.58604651162791|
|female|    consulting|110.82528486163864|
|female|    management| 99.05277973258269|
|female|  entrepreneur|104.86695906432749|
|  male|administration|100.57098214285715|
|female|     executive| 92.15077989601386|
|  male|       unknown|            91.625|
|female|       retired| 94.21102661596959|
|  male| manufacturing|108.40989570700945|
|  male|    unemployed| 94.95126353790614|
|female| manufacturing|115.82603578154426|
|female|    unemployed|251.80401878914404|
+------+---

In [None]:
df.where(col("education").contains("high school")).groupBy("marital_status").avg("age").show()


+--------------+------------------+
|marital_status|          avg(age)|
+--------------+------------------+
|      divorced| 38.91083980762116|
|       married|42.275862068965516|
|        single| 37.23961490461758|
+--------------+------------------+



## **Feature Engineering**
Create numeric variables for the qualitative variables. 

In [None]:
inputs = ('age_group', 'profession','marital_status','education','default','housing', 'loan', 'gender', 'balance', 'membership')
# can't do multiple at once!
# https://stackoverflow.com/questions/36942233/apply-stringindexer-to-several-columns-in-a-pyspark-dataframe
indexers = [StringIndexer(inputCol=column, outputCol=column+"_idx", stringOrderType="frequencyDesc").fit(df) for column in inputs ]
pipeline = Pipeline(stages=indexers)
indexed = pipeline.fit(df).transform(df)
indexed.show()

+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+
|    uid|age|age_group|    profession|marital_status|  education|default|housing|loan|gender|      balance|membership|charges|customer_contacts|attrition|age_group_idx|profession_idx|marital_status_idx|education_idx|default_idx|housing_idx|loan_idx|gender_idx|balance_idx|membership_idx|
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+
|1000001| 69|      60s|       retired|       married|high school|     no|     no|  no|female| $50k - $100k|      gold|     74|          

In [None]:
# Count the target variable in data
df.groupBy("attrition").count().show()

+---------+-----+
|attrition|count|
+---------+-----+
|        1|10200|
|        0|35011|
+---------+-----+



### Delete the categorical columns in favor of using the indexed columns 
create reduced dataframe

In [None]:
df_reduced = indexed.drop(*inputs) # using list of categoricals above
df_reduced = df_reduced.drop("uid") # you could leave this and just not include in features. 

In [None]:
df_reduced.show(10)
for i in df_reduced.dtypes: print(i)

+---+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+
|age|charges|customer_contacts|attrition|age_group_idx|profession_idx|marital_status_idx|education_idx|default_idx|housing_idx|loan_idx|gender_idx|balance_idx|membership_idx|
+---+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+
| 69|     74|                5|        0|          4.0|           6.0|               0.0|          1.0|        0.0|        1.0|     0.0|       0.0|        3.0|           2.0|
| 46|    149|                1|        0|          2.0|           0.0|               0.0|          1.0|        1.0|        1.0|     0.0|       1.0|        1.0|           1.0|
| 45|     58|                5|        1|          2.0|           0.0|               0.0|          1.0|        0.0|        1.

### Turn data frame columns into features vector and label


In [None]:
df_reduced.withColumnRenamed("attrition", "label") # not an inplace transformation
features_cols = [c for c in df_reduced.columns if c not in {"attrition"}]
assembler = VectorAssembler(inputCols = features_cols, outputCol = "features")
assembled = assembler.transform(df_reduced) #  is using sparse/dense interchangably


In [None]:
assembled.show(5)
features_cols


+---+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+--------------------+
|age|charges|customer_contacts|attrition|age_group_idx|profession_idx|marital_status_idx|education_idx|default_idx|housing_idx|loan_idx|gender_idx|balance_idx|membership_idx|            features|
+---+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+--------------------+
| 69|     74|                5|        0|          4.0|           6.0|               0.0|          1.0|        0.0|        1.0|     0.0|       0.0|        3.0|           2.0|[69.0,74.0,5.0,4....|
| 46|    149|                1|        0|          2.0|           0.0|               0.0|          1.0|        1.0|        1.0|     0.0|       1.0|        1.0|           1.0|[46.0,149.0,1.0,2...|
| 45|     58|       

['age',
 'charges',
 'customer_contacts',
 'age_group_idx',
 'profession_idx',
 'marital_status_idx',
 'education_idx',
 'default_idx',
 'housing_idx',
 'loan_idx',
 'gender_idx',
 'balance_idx',
 'membership_idx']

In [None]:
assembled.select('features').show(5,truncate = False) # ok. is using sparse notation sometimes and dense others. 13 is total number columns. 

+--------------------------------------------------------+
|features                                                |
+--------------------------------------------------------+
|[69.0,74.0,5.0,4.0,6.0,0.0,1.0,0.0,1.0,0.0,0.0,3.0,2.0] |
|[46.0,149.0,1.0,2.0,0.0,0.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0]|
|(13,[0,1,2,3,6,8,11],[45.0,58.0,5.0,2.0,1.0,1.0,2.0])   |
|[54.0,317.0,4.0,1.0,3.0,2.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0]|
|[36.0,139.0,8.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,1.0,3.0,2.0]|
+--------------------------------------------------------+
only showing top 5 rows



## **Split the Spark Dataframe into Train and Test**
I used a [randomsplit](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) here. Later I want to play with a other options. 
Either two outputs or it outputs a list. 

In [None]:
# training, test = df.randomSplit([0.75, 0.25], seed = 19) # two outputs
splits = assembled.randomSplit([0.75, 0.25], seed = 19) # list output


In [None]:
# training.count()
splits[0].count()

33841

## **Fit/Train ML Model**

In [None]:
lr = LogisticRegression(regParam=0.3, elasticNetParam=0.8)
lr.setMaxIter(10) # alternate way to set parameters
model = lr.fit(splits[0].withColumnRenamed("attrition", "label")) #uses names features and label by default. 


## **Make Predictions**
Use your model to make predications against the Test (holdout) Dataframe

In [None]:
predictions = model.transform(splits[1]) # (test)

In [None]:
predictions.show()

+---+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+--------------------+--------------------+--------------------+----------+
|age|charges|customer_contacts|attrition|age_group_idx|profession_idx|marital_status_idx|education_idx|default_idx|housing_idx|loan_idx|gender_idx|balance_idx|membership_idx|            features|       rawPrediction|         probability|prediction|
+---+-------+-----------------+---------+-------------+--------------+------------------+-------------+-----------+-----------+--------+----------+-----------+--------------+--------------------+--------------------+--------------------+----------+
| 18|     85|                7|        0|          7.0|          10.0|               1.0|          3.0|        0.0|        1.0|     0.0|       0.0|        2.0|           0.0|[18.0,85.0,7.0,7....|[1.32707387410786...|[0.79035620733538...|       0.0|
| 19

## **Evaluate Model against Test Dataframe**
Display model fit statistics.


In [None]:
bi_evaluator = BinaryClassificationEvaluator(labelCol="attrition", metricName='areaUnderROC')  # areaUnderROC | areaUnderPR
areaunderroc = bi_evaluator.evaluate(predictions)
print("Area Under ROC: " + str(areaunderroc))

Area Under ROC: 0.8873319171210698


In [None]:
# Print True Positive vs. False Positives
predictions.groupBy('attrition','prediction').count().show()
# Multiclass Evaluator
mc_evaluator = MulticlassClassificationEvaluator(labelCol="attrition", predictionCol="prediction", metricName="accuracy") #f1|weightedPrecision|weightedRecall|accuracy
accuracy     = mc_evaluator.evaluate(predictions)
print("Accuracy:       " + str(accuracy))


+---------+----------+-----+
|attrition|prediction|count|
+---------+----------+-----+
|        1|       0.0| 2533|
|        0|       0.0| 8837|
+---------+----------+-----+

Accuracy:       0.7772207563764292


## Save the Model Object 

Write spark code that saves the model object. 




In [None]:
spark.sparkContext.getConf().getAll()
os.getcwd()
os.chdir('/content') # this is the default in colab
wd = os.getcwd()
model.write().overwrite().save(wd + "/model_named") # this is a path as place. not a file name. 
os.listdir('.')
# not clear this is a rest or gRPC endpoint - the model is now in folders




['.config',
 'drive',
 'wd',
 'model',
 'model_named',
 'spark-3.2.3-bin-hadoop3.2.tgz',
 'banking_attrition.csv',
 'spark-3.2.3-bin-hadoop3.2',
 'sample_data']

In [None]:
os.chdir('model_named')
os.listdir('.')

['data', 'metadata']

## Download the model to local google drive. 
Have to give permissions once you mount it. 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


zip up the files in the folders since is not a single file. is multiple folders

In [None]:
import zipfile
folder_path = '/content/model_named'
zip_path = '/content/model.zip'

with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            zipf.write(os.path.join(root, file))
            

In [None]:
os.listdir('..')

['.config',
 'drive',
 'wd',
 'model.zip',
 'model',
 'model_named',
 'spark-3.2.3-bin-hadoop3.2.tgz',
 'banking_attrition.csv',
 'spark-3.2.3-bin-hadoop3.2',
 'sample_data']

## Transfer the file to my Google Drive. 

In [None]:
import shutil
src = '/content/model.zip'
dst = '/content/drive/cloudcomputing/model.zip'
shutil.copyfile(src,dst)
# that seemed to work. 
 