# ML Pipeline

This Jupyter Notebook demonstrates the implementation of a pipeline using PySpark. The pipeline includes data preprocessing, feature engineering, and model training. In addition, we perform cross-validation to find the best model and save it for future use. The dataset used in this notebook is a dataset on customer churn.
Data Loading

The first step in this process is to load the customer churn dataset. The dataset should be loaded into a PySpark DataFrame.

## Feature Engineering

After preprocessing, we perform feature engineering to create new features that may be useful in predicting customer churn. In this notebook, we use PySpark's VectorAssembler to combine all the features into a single vector.

## Model Training

With the data prepared and engineered, the next step is to train the models. We use PySpark's Pipeline class to define a pipeline that includes the preprocessing, feature engineering, and model training steps. We train three different classifiers - logistic regression, random forest, and naive Bayes classifier. We also perform cross-validation to determine the best model.

## Model Evaluation

After training the models, we evaluate their performance using metrics such as accuracy, precision, recall, and F1-score. In this case we use the ROC value. We also calculate the confusion matrix for each model.

## Model Selection and Saving

Based on the evaluation results, we select the best model, which turns out to be the random forest classifier. We save this model for future use.

## Conclusion

In this Jupyter Notebook, we have demonstrated the implementation of a pipeline using PySpark for customer churn prediction. We performed data preprocessing, feature engineering, and model training using PySpark's built-in libraries. We also performed cross-validation to find the best model and saved it for future use. The random forest classifier was found to be the best model based on evaluation metrics.

In [1]:
# Starter code
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, udf
from pyspark.sql.functions import max as fmax, min as fmin
from pyspark.sql.types import IntegerType, FloatType

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [3]:
from CurveMetrics import CurveMetrics

In [4]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

23/03/11 11:12:32 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.248.232 instead (on interface wlp110s0)
23/03/11 11:12:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/11 11:12:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [6]:
# Read in full sparkify dataset
# Full dataset
# event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
event_data = "clean_data/part-00000-e576dd1f-cbc4-437f-abcd-f83e8d8519fa-c000.csv"
# Mini dataset
#event_data = "mini_sparkify_event_data.json"
df = spark.read.csv(event_data, header=True)

In [7]:
features = ['n_pages', 
            'thumbs_down', 
            'home', 
            'downgrade', 
            'roll_advert', 
            'about', 
            'add_playlist', 
            'nextsong', 
            'thumbs_up', 
            'error', 
            'submit_upgrade', 
            'total_length']

In [8]:
# Cast data to integers
for i in range(len(features)):
    df = df.withColumn(features[i] , df[features[i]].cast(IntegerType()))
    
df = df.withColumn('label' , df['label'].cast(IntegerType()))

In [9]:
df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- n_pages: integer (nullable = true)
 |-- thumbs_down: integer (nullable = true)
 |-- home: integer (nullable = true)
 |-- downgrade: integer (nullable = true)
 |-- roll_advert: integer (nullable = true)
 |-- about: integer (nullable = true)
 |-- add_playlist: integer (nullable = true)
 |-- nextsong: integer (nullable = true)
 |-- thumbs_up: integer (nullable = true)
 |-- error: integer (nullable = true)
 |-- submit_upgrade: integer (nullable = true)
 |-- total_length: integer (nullable = true)
 |-- label: integer (nullable = true)



In [10]:
# Split data into training and test data
train, test = df.randomSplit([0.9, 0.1], seed=42)

In [11]:
# Pipeline
assembler = VectorAssembler(inputCols=features, outputCol='features')
lr = LogisticRegression()
rf = RandomForestClassifier()
nb = NaiveBayes()


lrPipeline = Pipeline(stages=[assembler, lr])
rfPipeline = Pipeline(stages=[assembler, rf])
nbPipeline = Pipeline(stages=[assembler, nb])

In [12]:
# Model tuning
lrParamGrid = ParamGridBuilder()\
            .addGrid(lr.regParam, [0.0, 0.1, 1.0])\
            .addGrid(lr.maxIter, [100])\
            .build()

rfParamGrid = ParamGridBuilder()\
            .addGrid(rf.numTrees, [3, 5, 10])\
            .addGrid(rf.maxDepth, [2, 5, 10, 20])\
            .build()

nbParamGrid = ParamGridBuilder()\
            .addGrid(nb.modelType, ['multinomial', 'gaussian'])\
            .build()

lrCrossval = CrossValidator(estimator = lrPipeline,
                         estimatorParamMaps = lrParamGrid,
                         evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC'),
                         numFolds = 3)

rfCrossval = CrossValidator(estimator = rfPipeline,
                         estimatorParamMaps = rfParamGrid,
                         evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC'),
                         numFolds = 3)

nbCrossval = CrossValidator(estimator = nbPipeline,
                         estimatorParamMaps = nbParamGrid,
                         evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC'),
                         numFolds = 3)

In [13]:
lrCVModel = lrCrossval.fit(train)

                                                                                

In [14]:
rfCVModel = rfCrossval.fit(train)

23/03/11 11:13:13 WARN DAGScheduler: Broadcasting large task binary with size 1065.7 KiB
23/03/11 11:13:13 WARN DAGScheduler: Broadcasting large task binary with size 1155.0 KiB
23/03/11 11:13:14 WARN DAGScheduler: Broadcasting large task binary with size 1232.8 KiB
23/03/11 11:13:19 WARN DAGScheduler: Broadcasting large task binary with size 1195.9 KiB
23/03/11 11:13:19 WARN DAGScheduler: Broadcasting large task binary with size 1447.6 KiB
23/03/11 11:13:19 WARN DAGScheduler: Broadcasting large task binary with size 1698.2 KiB
23/03/11 11:13:20 WARN DAGScheduler: Broadcasting large task binary with size 1936.2 KiB
23/03/11 11:13:20 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/03/11 11:13:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/03/11 11:13:20 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/03/11 11:13:21 WARN DAGScheduler: Broadcasting large task binary with size 1560.6 KiB
23/03/11 11:13:30 WARN DAGSche

In [15]:
nbCVModel =nbCrossval.fit(train)

In [25]:
lr_dict = {'avgMetrics': lrCVModel.avgMetrics,
          'maxIter' : lrCVModel.bestModel.stages[1]._java_obj.getMaxIter(),
          'regParam' : lrCVModel.bestModel.stages[1]._java_obj.getRegParam()}

rf_dict = {'avgMetrics' : rfCVModel.avgMetrics,
           'numTrees' : rfCVModel.bestModel.stages[1]._java_obj.getNumTrees(),
           'maxDepth' : rfCVModel.bestModel.stages[1]._java_obj.getMaxDepth()}

nb_dict = {'avgMetrics' : nbCVModel.avgMetrics,
        'modelType' : nbCVModel.bestModel.stages[1]._java_obj.getModelType()}


print('Logistic Regression Results:')
print('-----------------------------------------------------')
print(f'AUC = {lr_dict["avgMetrics"]}')
print('\nBest model parameters:')
print(f'maxIter = {lr_dict["maxIter"]}')
print(f'regParam = {lr_dict["regParam"]}')

print('\n\nRandom Forest Results:')
print('-----------------------------------------------------')
print(f'AUC = {rf_dict["avgMetrics"]}')
print('\nBest model parameters:')
print(f'numTrees = {rf_dict["numTrees"]}')
print(f'maxDepth = {rf_dict["maxDepth"]}')

print('\n\nNaive Bayes Results:')
print('-----------------------------------------------------')
print(f'AUC = {nb_dict["avgMetrics"]}')
print('\nBest model parameters:')
print(f'modelType = {nb_dict["modelType"]}')

Logistic Regression Results:
-----------------------------------------------------
AUC = [0.8802065948551241, 0.8683154764361208, 0.8388433362530979]

Best model parameters:
maxIter = 100
regParam = 0.0


Random Forest Results:
-----------------------------------------------------
AUC = [0.8296629598306268, 0.9082539268220223, 0.9279110497721451, 0.8891194375043273, 0.8284132315305666, 0.9241678996471556, 0.9334208356537799, 0.9059189986523511, 0.874269895030889, 0.9221956445575673, 0.9370659010059413, 0.923195084657712]

Best model parameters:
numTrees = 10
maxDepth = 10


Naive Bayes Results:
-----------------------------------------------------
AUC = [0.21472747290453742, 0.6384957955855403]

Best model parameters:
modelType = gaussian


In [17]:
# Logistic regression
lrModel = lrCVModel.bestModel

# Random Forest
rfModel = rfCVModel.bestModel

# Naive Bayes
nbModel = nbCVModel.bestModel

In [18]:
# Prediction
pred_lr = lrModel.transform(test)
pred_rf = rfModel.transform(test)
pred_nb = nbModel.transform(test)

In [19]:
# Confusion Matrix
def get_confusion_matrix(pred):
    #important: need to cast to float type, and order by prediction, else it won't work
    preds_and_labels = pred.select(['prediction','label'])\
                        .withColumn('label', col('label').cast(FloatType()))\
                        .orderBy('prediction')

    #select only prediction and label columns
    preds_and_labels = preds_and_labels.select(['prediction','label'])

    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
    
    return metrics

def get_metrics(metrics):
    tn, fp, fn, tp = metrics.confusionMatrix().toArray().ravel().astype('int')
    p = tp / (tp + fp)
    r = tp / (tp + fn)
    f1 = 2/(1/p + 1/r)
    return p, r, f1

In [20]:
metrics_lr = get_confusion_matrix(pred_lr)
metrics_rf = get_confusion_matrix(pred_rf)
metrics_nb = get_confusion_matrix(pred_nb)

                                                                                

In [21]:
print('Logistic Regression')
print(metrics_lr.confusionMatrix().toArray().astype('int'))

print('\n Random Forest')
print(metrics_rf.confusionMatrix().toArray().astype('int'))

print('\n Naive Bayes')
print(metrics_nb.confusionMatrix().toArray().astype('int'))

Logistic Regression
[[1648   48]
 [ 251  274]]

 Random Forest
[[1681   15]
 [ 231  294]]

 Naive Bayes
[[1532  164]
 [ 314  211]]


In [22]:
results = pd.DataFrame(np.array([get_metrics(metrics_lr), 
                                 get_metrics(metrics_rf),
                                 get_metrics(metrics_nb)]),
                       columns=['precision', 'recall', 'f1-score'])

In [23]:
results['model'] = ['logistic regression', 'random forest', 'naive bayes']

In [24]:
results

Unnamed: 0,precision,recall,f1-score,model
0,0.850932,0.521905,0.646989,logistic regression
1,0.951456,0.56,0.705036,random forest
2,0.562667,0.401905,0.468889,naive bayes


In [27]:
# Save Model
rfModel.save('model/')

[Stage 1955:>                                                       (0 + 1) / 1]                                                                                