<a href="https://colab.research.google.com/github/josh-hull/misc_projects/blob/main/Spark_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install PySpark Dependencies

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-3.2.3-bin-hadoop3.2.tgz
!wget --no-cookies --no-check-certificate https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
!tar zxvf spark-3.2.3-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark==3.2.3

# Load Data

In [61]:
schema = StructType([
    StructField("uid", IntegerType()),
    StructField("age", IntegerType()),
    StructField("age_group", StringType()),
    StructField("profession", StringType()),
    StructField("marital_status", StringType()),
    StructField("education", StringType()),
    StructField("default", StringType()),
    StructField("housing", StringType()),
    StructField("loan", StringType()),
    StructField("gender", StringType()),
    StructField("balance", StringType()),
    StructField("membership", StringType()),
    StructField("charges", IntegerType()),
    StructField("customer_contacts", IntegerType()),
    StructField("attrition", IntegerType())
    ])

In [None]:
!wget https://raw.githubusercontent.com/zaratsian/Datasets/master/banking_attrition.csv

# Import Python / Spark Libraries

In [3]:
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

import datetime, time
import re, random, sys

# Note - Not all of these will be used, but I've added them for your reference as a "getting started"
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType, FloatType, LongType, DateType
from pyspark.sql.functions import struct, array, lit, monotonically_increasing_id, col, expr, when, concat, udf, split, size, lag, count, isnull
from pyspark.sql import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor, LinearRegression, GeneralizedLinearRegression, RandomForestRegressor
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator

# Create Spark Session

In [43]:
spark = SparkSession.builder.appName("Spark ML Assignment").master("local[*]").getOrCreate()

# Load CSV Data into Spark Dataframe

In [62]:
attrition_data = spark.read.load("/content/banking_attrition.csv", format="csv", header=True, schema=schema)

# Data Exploration
Perform at least one data exploration of your choice (This could be a basic show(), an aggregation/groupby, correlation, summarizer, etc.)

In [63]:
attrition_data.show()

+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|    uid|age|age_group|    profession|marital_status|  education|default|housing|loan|gender|      balance|membership|charges|customer_contacts|attrition|
+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+
|1000001| 69|      60s|       retired|       married|high school|     no|     no|  no|female| $50k - $100k|      gold|     74|                5|        0|
|1000002| 46|      40s|    management|       married|high school|    yes|     no|  no|  male|  $10k - $50k|    silver|    149|                1|        0|
|1000003| 45|      40s|    management|       married|high school|     no|     no|  no|female|$100k - $250k|  platinum|     58|                5|        1|
|1000004| 54|      50s|administration|      divorced|   graduate|     

In [64]:
attrition_data.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- age_group: string (nullable = true)
 |-- profession: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- membership: string (nullable = true)
 |-- charges: integer (nullable = true)
 |-- customer_contacts: integer (nullable = true)
 |-- attrition: integer (nullable = true)



In [66]:
#columns_to_keep =   ["uid", "age", "age_group", "profession", "marital_status",
                     "education", "default", "housing", "loan", "gender", "balance", 
                     "membership", "charges", "customer_contacts", "attrition" 
                    ]
#attrition_data = attrition_data.select(columns_to_keep)

#attrition_data = attrition_data.filter(attrition_data != 'NA')


In [67]:
#numeric_columns     = [c[0] for c in attrition_data.dtypes if c[1] not in ['string','timestamp']]
#categorical_columns = [c[0] for c in attrition_data.dtypes if c[1] in ['string']]
#datetime_columns    = [c[0] for c in attrition_data.dtypes if c[1] in ['timestamp']]

In [None]:
#attrition_data.summary().show()

# Split the Spark Dataframe into Train and Test
You could use a randomsplit here, a Cross Validator, or another approach of your choice.

In [68]:
train_ratio = 0.7
test_ratio = 1 - train_ratio

train_attrition, test_attrition = attrition_data.randomSplit([train_ratio, test_ratio], seed = 1214)

# Feature Engineering
During this step, I'd like to see you convert at least one STRING variable (such as gender, membership, education or another variable of your choice) into a numeric representation so that you can use it as one of the model inputs. You can convert the string to a numeric by using one-hot encoding, a stringindexer, etc

You will also want to define a ML model object. An example of this would be a random forest, gradient boosting, or some other approach listed here.

In [69]:
train_attrition.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- age_group: string (nullable = true)
 |-- profession: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- membership: string (nullable = true)
 |-- charges: integer (nullable = true)
 |-- customer_contacts: integer (nullable = true)
 |-- attrition: integer (nullable = true)



In [70]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

In [90]:
# Index and encode categorical features
categorical_columns = ["age_group", "profession", "marital_status", "education", "default", "housing", "loan", "gender", "balance", "membership"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="skip") for column in categorical_columns]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded") for column in categorical_columns]

In [91]:
# Assemble all the features into a single features column
assembler = VectorAssembler(inputCols=[column+"_encoded" for column in categorical_columns] + ["charges", "customer_contacts"], outputCol="features")

In [92]:
# Create a Random Forest classifier object
rf = RandomForestClassifier(labelCol="attrition", featuresCol="features", numTrees=100)

In [93]:
# Create a pipeline with all the indexers, encoders, assembler, and the classifier
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Fit/Train ML Model

In [94]:
model = pipeline.fit(train_attrition)

# Make Predictions
Use your model to make predications against the Test (holdout) Dataframe

In [95]:
predictions = model.transform(test_attrition)
predictions.show()

+-------+---+---------+--------------+--------------+-----------+-------+-------+----+------+-------------+----------+-------+-----------------+---------+---------------+----------------+--------------------+---------------+-------------+-------------+----------+------------+-------------+----------------+-----------------+------------------+----------------------+-----------------+---------------+---------------+-------------+--------------+---------------+------------------+--------------------+--------------------+--------------------+----------+
|    uid|age|age_group|    profession|marital_status|  education|default|housing|loan|gender|      balance|membership|charges|customer_contacts|attrition|age_group_index|profession_index|marital_status_index|education_index|default_index|housing_index|loan_index|gender_index|balance_index|membership_index|age_group_encoded|profession_encoded|marital_status_encoded|education_encoded|default_encoded|housing_encoded| loan_encoded|gender_encode

In [96]:
predictions.select(["attrition", "prediction"]).show()

+---------+----------+
|attrition|prediction|
+---------+----------+
|        0|       0.0|
|        1|       0.0|
|        1|       1.0|
|        0|       0.0|
|        0|       0.0|
|        1|       1.0|
|        1|       1.0|
|        1|       1.0|
|        0|       0.0|
|        0|       0.0|
|        1|       1.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        1|       1.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
+---------+----------+
only showing top 20 rows



# Evaluate Model against Test Dataframe
Display model fit statistics, such as RMSE or MSE

In [None]:
predictions.printSchema()

In [99]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create evaluators for AUC-ROC and AUC-PR
auc_roc_evaluator = BinaryClassificationEvaluator(labelCol="attrition", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_pr_evaluator = BinaryClassificationEvaluator(labelCol="attrition", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

# Calculate the metrics using the evaluators
auc_roc = auc_roc_evaluator.evaluate(predictions)
auc_pr = auc_pr_evaluator.evaluate(predictions)

# Print the metrics
print("AUC-ROC: {:.4f}".format(auc_roc))
print("AUC-PR: {:.4f}".format(auc_pr))

AUC-ROC: 0.9032
AUC-PR: 0.8574


In [97]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create evaluators for accuracy, precision, recall, and F1 score
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="attrition", predictionCol="prediction", metricName="accuracy")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="attrition", predictionCol="prediction", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="attrition", predictionCol="prediction", metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="attrition", predictionCol="prediction", metricName="f1")

# Calculate the metrics using the evaluators
accuracy = accuracy_evaluator.evaluate(predictions)
precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)

# Print the metrics
print("Accuracy: {:.4f}".format(accuracy))
print("Precision: {:.4f}".format(precision))
print("Recall: {:.4f}".format(recall))
print("F1 Score: {:.4f}".format(f1_score))

Accuracy: 0.9446
Precision: 0.9472
Recall: 0.9446
F1 Score: 0.9420


# Save the Model Object (Optional)
Write spark code that saves your model object.

Context: For production purposes, it's often requierd to save the model object so that it can be deployed as a stand-alone and compressed binary object. The model object is typically wrapped in a container and served as a REST or gRPC endpoint.

In [None]:
# Save the trained model
model_path = "path/to/save/your/model"
model.save(model_path)