In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# WARNING: this will allocate 12 gigabytes of RAM to the spark driver, it was necessary as out of memory exceptions would
#  occur during cross validation model training with parallel processes
# StackOverflow post: https://stackoverflow.com/questions/32336915/pyspark-java-lang-outofmemoryerror-java-heap-space/60267878#60267878
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "12g").getOrCreate()

In [2]:
# !pip install numpy

### Reading and analyzing the data

#### The 'Kaggle Telco Customer Churn' data is used: [link](https://www.kaggle.com/blastchar/telco-customer-churn)

In [3]:
from pyspark.sql.functions import col
from pyspark.sql.types import *

churn_schema = StructType([
    StructField("customerID", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("SeniorCitizen", IntegerType(), True),
    StructField("Partner", StringType(), True),
    StructField("Dependents", StringType(), True),
    StructField("tenure", IntegerType(), True),
    StructField("PhoneService", StringType(), True),
    StructField("MultipleLines", StringType(), True),
    StructField("InternetService", StringType(), True),
    StructField("OnlineSecurity", StringType(), True),
    StructField("OnlineBackup", StringType(), True),
    StructField("DeviceProtection", StringType(), True),
    StructField("TechSupport", StringType(), True),
    StructField("StreamingTV", StringType(), True),
    StructField("StreamingMovies", StringType(), True),
    StructField("Contract", StringType(), True),
    StructField("PaperlessBilling", StringType(), True),
    StructField("PaymentMethod", StringType(), True),
    StructField("MonthlyCharges", FloatType(), True),
    StructField("TotalCharges", FloatType(), True),
    StructField("Churn", StringType(), True)
])

In [4]:
churn_df = spark.read.csv("WA_Fn-UseC_-Telco-Customer-Churn.xls", header=True, schema=churn_schema)

In [5]:
churn_df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: float (nullable = true)
 |-- TotalCharges: float (nullable = true)
 |-- Churn: string (nullable = true)



In [6]:
churn_df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|  

In [7]:
# Counting number of missing (nan) values in each column, used this blogpost as a reference:
#  https://www.datasciencemadesimple.com/count-of-missing-nanna-and-null-values-in-pyspark/

churn_df.select([F.count(F.when(F.isnan(col), col)).alias(col) for col in churn_df.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [8]:
# Counting number of null values in each column, used this blogpost as a reference:
#  https://www.datasciencemadesimple.com/count-of-missing-nanna-and-null-values-in-pyspark/

churn_df.select([F.count(F.when(F.isnull(col), col)).alias(col) for col in churn_df.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [9]:
# Checking number of duplicate rows
#  Used this stackoverflow answer: https://stackoverflow.com/a/48554666
churn_df.groupBy(churn_df.columns) \
    .count() \
    .where(F.col('count') > 1) \
    .select(F.sum('count')) \
    .show()

+----------+
|sum(count)|
+----------+
|      null|
+----------+



### Imputing null values with mean value 

In [10]:
churn_df = churn_df.na.fill(
    {
        'MonthlyCharges': churn_df.agg({"MonthlyCharges": "avg"}).collect()[0][0],
        'TotalCharges': churn_df.agg({"TotalCharges": "avg"}).collect()[0][0]
    }
)

### Data preparation

In [11]:
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder

In [12]:
# Change the label column to label
churn_df = churn_df.withColumnRenamed('Churn','label')

In [13]:
# List of columns need to be indexed and featurized
col_list = [
    "gender",
    "SeniorCitizen", 
    "Partner",
    "Dependents",
    "PhoneService", 
    "MultipleLines",
    "InternetService", 
    "OnlineSecurity",
    "OnlineBackup",
    "DeviceProtection",
    "TechSupport", 
    "StreamingTV",
    "StreamingMovies",
    "Contract",
    "PaperlessBilling", 
    "PaymentMethod"
]

numerical_cols = ["tenure", "MonthlyCharges", "TotalCharges"]

featurized_col_list = col_list + numerical_cols

In [14]:
# List of features and label indexers 
indexers = [
    StringIndexer(inputCol=c, outputCol=f'{c}_indexed')
    for c in col_list
]

label_indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
indexers.append(label_indexer)

In [15]:
# One hot encode the categorical columns
encoder = OneHotEncoder(
    inputCols = [f'{c}_indexed' for c in col_list], 
    outputCols=[f'{c}_vector' for c in col_list],
    dropLast=True
)

In [16]:
# Vectorizing the features
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(inputCols=numerical_cols + [f'{c}_vector' for c in col_list], outputCol="features")

In [17]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [encoder, vector_assembler])
pipeline_model = pipeline.fit(churn_df)

transformed_df = pipeline_model.transform(churn_df)

In [18]:
cols_drop = [f'{c}_indexed' for c in col_list] + [f'{c}_vector' for c in col_list] + [f'{c}' for c in featurized_col_list] + ['customerID']
transformed_df = transformed_df.drop(*cols_drop)

In [19]:
transformed_df.show(truncate=False)

+-----+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|labelIndex|features                                                                                                                                                    |
+-----+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|No   |0.0       |(30,[0,1,2,4,6,11,12,15,16,18,20,22,24,26,27],[1.0,29.850000381469727,29.850000381469727,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                  |
|No   |0.0       |(30,[0,1,2,3,4,5,6,7,8,11,13,14,17,18,20,22,28],[34.0,56.95000076293945,1889.5,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                    |
|Yes  |1.0       |[2.0,53.849998474121094,108.1500015258789,1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,0.0,

### Data scaling and dimensionality reduction

In [20]:
from pyspark.ml.feature import MinMaxScaler, PCA

In [21]:
# Features Scaling
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformed_df)

scaled_df = scalerModel.transform(transformed_df)

In [22]:
# Principal Component Analysis Dimensionality reduction
#  training using removed new pca feature columns were not tested

pca = PCA(k=7, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(scaled_df)
reduced_df = model.transform(scaled_df)

scaler = MinMaxScaler(inputCol="pcaFeatures", outputCol="scaledPcaFeatures")
scalerModel = scaler.fit(reduced_df)

scaled_pca_df = scalerModel.transform(reduced_df)


pca = PCA(k=7, inputCol="scaledFeatures", outputCol="pcaScaledFeatures")
model = pca.fit(scaled_pca_df)
pca_scaled_df = model.transform(scaled_pca_df)

In [23]:
pca_scaled_df.printSchema()

root
 |-- label: string (nullable = true)
 |-- labelIndex: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- pcaFeatures: vector (nullable = true)
 |-- scaledPcaFeatures: vector (nullable = true)
 |-- pcaScaledFeatures: vector (nullable = true)



In [24]:
pca_scaled_df.show(truncate=False)

+-----+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
|label|labelIndex|features                                                                                                                                                    |scaledFeatures                                 

### Splitting data to train/test splits

In [25]:
(train_data, test_data) = pca_scaled_df.randomSplit(weights=[0.7, 0.3], seed=420)

In [26]:
train_data.describe().show()

+-------+-----+-------------------+
|summary|label|         labelIndex|
+-------+-----+-------------------+
|  count| 4840|               4840|
|   mean| null|0.26776859504132233|
| stddev| null| 0.4428420632218157|
|    min|   No|                0.0|
|    max|  Yes|                1.0|
+-------+-----+-------------------+



In [27]:
test_data.describe().show()

+-------+-----+-------------------+
|summary|label|         labelIndex|
+-------+-----+-------------------+
|  count| 2203|               2203|
|   mean| null|0.26009986382206085|
| stddev| null|0.43878847015331973|
|    min|   No|                0.0|
|    max|  Yes|                1.0|
+-------+-----+-------------------+



### Classifier model training (Grid Search with Cross Validation)

### Model Evaluation 

#### Link to MulticlassClassificationEvaluator list of metric names:
[link](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator.metricName)

In [28]:
# Test set evaluation code

from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

def evaluate_model(predictions):
    # 1- Confusion Matrix calculation
    #  taken from this stackoverflow post: https://stackoverflow.com/a/58405759/10086080
    # select only prediction and label columns
    preds_and_labels = predictions.select(['prediction', 'labelIndex'])
    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
    print("Confusion Matrix:\n{}".format(metrics.confusionMatrix().toArray()))
    
    # 2- Accuracy metric calculation
    evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy = {} ".format(accuracy))
    
    # 3- Weighted Precision metric calculation
    evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
    precision = evaluator.evaluate(predictions)
    print("Weighted Precision = {}".format(precision))
    
    
    # 4- Weighted Recall metric calculation
    evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedRecall")
    recall = evaluator.evaluate(predictions)
    print("Weighted Recall = {}".format(recall))


    # 5- F1-score metric calculation
    evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="f1")
    f1 = evaluator.evaluate(predictions)
    print("F1-score = {}".format(f1))


    # 6- Receiver Operating Characteristic Area Under Curve metric calculation
    evaluator = BinaryClassificationEvaluator(labelCol="labelIndex", rawPredictionCol="prediction", metricName="areaUnderROC")
    roc_auc = evaluator.evaluate(predictions)
    print("ROC AUC: {}".format(roc_auc))

### 1- Decision Tree Classifier Training

In [29]:
from pyspark.ml.classification import DecisionTreeClassifier

model = DecisionTreeClassifier(
    labelCol="labelIndex",
    featuresCol="features",
    maxDepth=5,
    maxBins=32
)

In [30]:
# Cross Validator takes a pipeline object containing the model not the model object itself
pipeline = Pipeline(stages=[model])

In [31]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder() \
    .baseOn({model.labelCol: 'labelIndex'}) \
    .baseOn([model.predictionCol, 'prediction']) \
    .addGrid(model.maxDepth, [i for i in range(2, 10)]) \
    .addGrid(model.maxBins, [i for i in range(2, 60, 10)]) \
    .build()

In [32]:
len(param_grid)

48

In [33]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")

In [34]:
import os

cross_validator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=10,
    parallelism=os.cpu_count() # To parallelize over all available cpu cores
)

In [35]:
# Run cross validations
cv_model = cross_validator.fit(train_data)
print(cv_model)

CrossValidatorModel_a99d4f03ff2e


In [36]:
len(cv_model.avgMetrics)

48

In [37]:
cv_model.avgMetrics

[0.7631323578478708,
 0.7631323578478708,
 0.7631323578478708,
 0.7631323578478708,
 0.7631323578478708,
 0.7631323578478708,
 0.7890916886440869,
 0.790927449678486,
 0.7899860830608412,
 0.7904740425502396,
 0.7897135592090105,
 0.7874963536791563,
 0.788878894851424,
 0.790475786710366,
 0.7915873858485531,
 0.7912700745341136,
 0.7900719544313968,
 0.7880735672822865,
 0.7805327111195519,
 0.7911871340863054,
 0.7882388603526967,
 0.7912184278192747,
 0.7883122347787409,
 0.789116739391144,
 0.7894785559910105,
 0.7950869172899346,
 0.7894557157951028,
 0.7892809758226214,
 0.7884561122973837,
 0.7863986685792125,
 0.7861619973145512,
 0.7901557104389866,
 0.785867492243902,
 0.7809497914142868,
 0.7775332511248546,
 0.7834666823455486,
 0.7747396994767599,
 0.7742977205691088,
 0.778281053792023,
 0.7723838075031573,
 0.7719209841755614,
 0.7739901569223981,
 0.7617379047647231,
 0.7718613921641696,
 0.774168519417617,
 0.765606389302151,
 0.7596434675629811,
 0.7707353524744978]

In [38]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cv_model.transform(test_data)

In [39]:
evaluate_model(predictions)

Confusion Matrix:
[[1397.  233.]
 [ 251.  322.]]
Accuracy = 0.7802995914661824 
Weighted Precision = 0.778113821197282
Weighted Recall = 0.7802995914661826
F1-score = 0.7791498669690166
ROC AUC: 0.7095049197528881


### Extacting best decision tree model from cross validator object

In [40]:
best_model = cv_model.bestModel

In [41]:
best_model.stages

[DecisionTreeClassificationModel: uid=DecisionTreeClassifier_55cc51418a95, depth=6, numNodes=59, numClasses=2, numFeatures=30]

In [42]:
best_model = best_model.stages[0]

In [43]:
print(best_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_55cc51418a95, depth=6, numNodes=59, numClasses=2, numFeatures=30
  If (feature 24 in {0.0})
   If (feature 10 in {0.0})
    If (feature 12 in {0.0})
     Predict: 0.0
    Else (feature 12 not in {0.0})
     If (feature 25 in {1.0})
      If (feature 27 in {0.0})
       Predict: 0.0
      Else (feature 27 not in {0.0})
       If (feature 6 in {0.0})
        Predict: 0.0
       Else (feature 6 not in {0.0})
        Predict: 1.0
     Else (feature 25 not in {1.0})
      Predict: 0.0
   Else (feature 10 not in {0.0})
    If (feature 25 in {1.0})
     Predict: 0.0
    Else (feature 25 not in {1.0})
     If (feature 27 in {1.0})
      If (feature 1 <= 104.375)
       Predict: 0.0
      Else (feature 1 > 104.375)
       If (feature 6 in {0.0})
        Predict: 0.0
       Else (feature 6 not in {0.0})
        Predict: 1.0
     Else (feature 27 not in {1.0})
      Predict: 0.0
  Else (feature 24 not in {0.0})
   If (feature 10 in {0.0}

In [44]:
best_model.extractParamMap()

{Param(parent='DecisionTreeClassifier_55cc51418a95', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='DecisionTreeClassifier_55cc51418a95', name='labelCol', doc='label column name.'): 'labelIndex',
 Param(parent='DecisionTreeClassifier_55cc51418a95', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='DecisionTreeClassifier_55cc51418a95', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities.'): 'probability',
 Param(parent='DecisionTreeClassifier_55cc51418a95', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name.'): 'rawPrediction',
 Param(parent='DecisionTreeClassifier_55cc51418a95', name='seed', doc='random seed.'): -8261018027168668045,
 Param(parent='DecisionTreeClassifier_55cc51418a95', name='cacheNodeIds', doc='If false

In [45]:
best_model.save("decision-tree-classifier-model")

### 2- Random Forest Classifier Training

In [46]:
from pyspark.ml.classification import RandomForestClassifier

model = RandomForestClassifier(
    labelCol="labelIndex",
    featuresCol="features",
    maxDepth=5,
    maxBins=32,
    impurity='gini',
    numTrees=20
)


In [47]:
# Cross Validator takes a pipeline object containing the model not the model object itself
pipeline = Pipeline(stages=[model])

In [48]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder() \
    .baseOn({model.labelCol: 'labelIndex'}) \
    .baseOn([model.predictionCol, 'prediction']) \
    .addGrid(model.maxDepth, [i for i in range(2, 10)]) \
    .addGrid(model.numTrees, [i for i in range(20, 1000, 100)]) \
    .build()

In [49]:
len(param_grid)

80

In [50]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")

In [51]:
import os

cross_validator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=10,
    parallelism=os.cpu_count() # To parallelize over all available cpu cores
)

In [52]:
# Run cross validations
cv_model = cross_validator.fit(train_data)
print(cv_model)

CrossValidatorModel_b732c016b3d8


In [53]:
len(cv_model.avgMetrics)

80

In [54]:
cv_model.avgMetrics

[0.7393581105782508,
 0.7323980218317236,
 0.7326033606407585,
 0.7326033606407585,
 0.7323980218317236,
 0.7334111090883476,
 0.7334247158768982,
 0.7334247158768982,
 0.7336164478973825,
 0.7332193770678632,
 0.7868829917127993,
 0.7837931334137804,
 0.7852265648058037,
 0.7848100476775836,
 0.7858406804586149,
 0.784175034985595,
 0.7847596031178575,
 0.7843735216151526,
 0.7860012957905593,
 0.7845759095898795,
 0.7904795802375831,
 0.7869689470266863,
 0.7884496228330716,
 0.7878028788983541,
 0.7883933902876037,
 0.7870285416981261,
 0.7877897033665494,
 0.7886416711492491,
 0.7882392427029972,
 0.7877612867077027,
 0.7930302948240087,
 0.7917568994643117,
 0.791232199954169,
 0.7916503585228633,
 0.7914567114358203,
 0.7908767147293307,
 0.7920982820921977,
 0.7912379623514117,
 0.7924543312170587,
 0.790842688708978,
 0.7939031692435685,
 0.7977998971035519,
 0.797193502684513,
 0.7986287462107655,
 0.795091533901224,
 0.7957642625031853,
 0.7964410792987153,
 0.796509927878708

In [55]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cv_model.transform(test_data)

In [56]:
evaluate_model(predictions)

Confusion Matrix:
[[1482.  148.]
 [ 289.  284.]]
Accuracy = 0.8016341352700862 
Weighted Precision = 0.7901513748925381
Weighted Recall = 0.8016341352700862
F1-score = 0.7918308830400727
ROC AUC: 0.7024197261212646


### Extacting best random forest model from cross validator object

In [57]:
best_model = cv_model.bestModel

In [58]:
best_model.stages

[RandomForestClassificationModel: uid=RandomForestClassifier_158fd6691144, numTrees=920, numClasses=2, numFeatures=30]

In [59]:
best_model = best_model.stages[0]

In [60]:
best_model.extractParamMap()

{Param(parent='RandomForestClassifier_158fd6691144', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='RandomForestClassifier_158fd6691144', name='labelCol', doc='label column name.'): 'labelIndex',
 Param(parent='RandomForestClassifier_158fd6691144', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='RandomForestClassifier_158fd6691144', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities.'): 'probability',
 Param(parent='RandomForestClassifier_158fd6691144', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name.'): 'rawPrediction',
 Param(parent='RandomForestClassifier_158fd6691144', name='seed', doc='random seed.'): -5301201987066789725,
 Param(parent='RandomForestClassifier_158fd6691144', name='bootstrap', doc='Whether boo

In [61]:
best_model.save("random-forest-classifier-model")

### 3- Gradient-Boosted Tree Classifier Training

In [62]:
from pyspark.ml.classification import GBTClassifier

model = GBTClassifier(
    labelCol="labelIndex",
    featuresCol="features",
    maxDepth=5,
    maxBins=32,
    maxIter=20
)

In [63]:
# Cross Validator takes a pipeline object containing the model not the model object itself
pipeline = Pipeline(stages=[model])

In [64]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder() \
    .baseOn({model.labelCol: 'labelIndex'}) \
    .baseOn([model.predictionCol, 'prediction']) \
    .addGrid(model.maxDepth, [i for i in range(2, 10)]) \
    .addGrid(model.maxBins, [i for i in range(2, 60, 10)]) \
    .addGrid(model.maxIter, [i for i in range(20, 50, 10)]) \
    .build()

In [65]:
len(param_grid)

144

In [66]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")

In [67]:
import os

cross_validator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=10,
    parallelism=os.cpu_count() # To parallelize over all available cpu cores
)

In [68]:
# Run cross validations
cv_model = cross_validator.fit(train_data)
print(cv_model)

CrossValidatorModel_065adbba224a


In [69]:
len(cv_model.avgMetrics)

144

In [70]:
cv_model.avgMetrics

[0.7988420159141156,
 0.8005716109277525,
 0.8001880067470416,
 0.8000464981893077,
 0.8029523280617551,
 0.8050606315586472,
 0.8007941270083256,
 0.8051499184946485,
 0.8036430978507708,
 0.8014542013491762,
 0.8037571834671158,
 0.8038470676131837,
 0.8015720570487157,
 0.8031623743684657,
 0.801936097043514,
 0.7986630901556108,
 0.8009184278516945,
 0.8017582938020835,
 0.7976047878931172,
 0.7993776953024826,
 0.8026522378641404,
 0.8024921092357834,
 0.804794218076331,
 0.8063965419148367,
 0.8027648355787715,
 0.804211257695062,
 0.8058676269261092,
 0.802229056721442,
 0.8030435764795005,
 0.803091183431123,
 0.803148901862717,
 0.8060585728740209,
 0.806508313785862,
 0.8033924879539822,
 0.8037958634834178,
 0.8023473707224292,
 0.7990206814248202,
 0.8005458730351216,
 0.7989272577718053,
 0.8050371394854717,
 0.8046692127246012,
 0.8030345010949302,
 0.8028766161246249,
 0.8034857839571689,
 0.8023975857677861,
 0.8021279305636848,
 0.801440738985924,
 0.8021932042280976,


In [71]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cv_model.transform(test_data)

In [72]:
evaluate_model(predictions)

Confusion Matrix:
[[1481.  149.]
 [ 287.  286.]]
Accuracy = 0.802088061733999 
Weighted Precision = 0.7908000990436523
Weighted Recall = 0.8020880617339992
F1-score = 0.7925593305068748
ROC AUC: 0.7038581783530873


### Extacting best gradient-boosted tree model from cross validator object

In [73]:
best_model = cv_model.bestModel

In [74]:
best_model.stages

[GBTClassificationModel: uid = GBTClassifier_12584360e5bb, numTrees=40, numClasses=2, numFeatures=30]

In [75]:
best_model = best_model.stages[0]

In [76]:
print(best_model.toDebugString)

GBTClassificationModel: uid = GBTClassifier_12584360e5bb, numTrees=40, numClasses=2, numFeatures=30
  Tree 0 (weight 1.0):
    If (feature 24 in {0.0})
     If (feature 10 in {0.0})
      If (feature 12 in {0.0})
       Predict: -0.9600638977635783
      Else (feature 12 not in {0.0})
       Predict: -0.8074074074074075
     Else (feature 10 not in {0.0})
      If (feature 25 in {1.0})
       Predict: -0.8733333333333333
      Else (feature 25 not in {1.0})
       Predict: -0.5885714285714285
    Else (feature 24 not in {0.0})
     If (feature 10 in {0.0})
      If (feature 0 <= 3.5)
       Predict: -0.11594202898550725
      Else (feature 0 > 3.5)
       Predict: -0.6020942408376964
     Else (feature 10 not in {0.0})
      If (feature 0 <= 14.5)
       Predict: 0.41007194244604317
      Else (feature 0 > 14.5)
       Predict: -0.17232704402515722
  Tree 1 (weight 0.1):
    If (feature 12 in {0.0})
     If (feature 27 in {0.0})
      If (feature 2 <= 24.425000190734863)
       Predict

In [77]:
best_model.extractParamMap()

{Param(parent='GBTClassifier_12584360e5bb', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='GBTClassifier_12584360e5bb', name='labelCol', doc='label column name.'): 'labelIndex',
 Param(parent='GBTClassifier_12584360e5bb', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='GBTClassifier_12584360e5bb', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities.'): 'probability',
 Param(parent='GBTClassifier_12584360e5bb', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name.'): 'rawPrediction',
 Param(parent='GBTClassifier_12584360e5bb', name='seed', doc='random seed.'): 2277498301948407352,
 Param(parent='GBTClassifier_12584360e5bb', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances 

In [78]:
best_model.save("gbt-classifier-model")

In [79]:
!zip -r decision-tree-classifier-model.zip decision-tree-classifier-model/

  adding: decision-tree-classifier-model/ (stored 0%)
  adding: decision-tree-classifier-model/data/ (stored 0%)
  adding: decision-tree-classifier-model/data/.part-00004-ee1facae-82e3-499c-a61f-e8af73df05eb-c000.snappy.parquet.crc (stored 0%)
  adding: decision-tree-classifier-model/data/.part-00006-ee1facae-82e3-499c-a61f-e8af73df05eb-c000.snappy.parquet.crc (stored 0%)
  adding: decision-tree-classifier-model/data/part-00010-ee1facae-82e3-499c-a61f-e8af73df05eb-c000.snappy.parquet (deflated 66%)
  adding: decision-tree-classifier-model/data/part-00012-ee1facae-82e3-499c-a61f-e8af73df05eb-c000.snappy.parquet (deflated 69%)
  adding: decision-tree-classifier-model/data/.part-00011-ee1facae-82e3-499c-a61f-e8af73df05eb-c000.snappy.parquet.crc (stored 0%)
  adding: decision-tree-classifier-model/data/part-00004-ee1facae-82e3-499c-a61f-e8af73df05eb-c000.snappy.parquet (deflated 65%)
  adding: decision-tree-classifier-model/data/_SUCCESS (stored 0%)
  adding: decision-tree-classifier-model

In [80]:
!zip -r gbt-classifier-model.zip gbt-classifier-model/

  adding: gbt-classifier-model/ (stored 0%)
  adding: gbt-classifier-model/data/ (stored 0%)
  adding: gbt-classifier-model/data/.part-00010-1e6cdfa4-6b0c-4dbd-a7bf-639b02de3ce3-c000.snappy.parquet.crc (stored 0%)
  adding: gbt-classifier-model/data/.part-00011-1e6cdfa4-6b0c-4dbd-a7bf-639b02de3ce3-c000.snappy.parquet.crc (stored 0%)
  adding: gbt-classifier-model/data/part-00006-1e6cdfa4-6b0c-4dbd-a7bf-639b02de3ce3-c000.snappy.parquet (deflated 49%)
  adding: gbt-classifier-model/data/.part-00015-1e6cdfa4-6b0c-4dbd-a7bf-639b02de3ce3-c000.snappy.parquet.crc (stored 0%)
  adding: gbt-classifier-model/data/part-00014-1e6cdfa4-6b0c-4dbd-a7bf-639b02de3ce3-c000.snappy.parquet (deflated 48%)
  adding: gbt-classifier-model/data/.part-00000-1e6cdfa4-6b0c-4dbd-a7bf-639b02de3ce3-c000.snappy.parquet.crc (stored 0%)
  adding: gbt-classifier-model/data/part-00011-1e6cdfa4-6b0c-4dbd-a7bf-639b02de3ce3-c000.snappy.parquet (deflated 43%)
  adding: gbt-classifier-model/data/_SUCCESS (stored 0%)
  adding:

In [81]:
!zip -r random-forest-classifier-model.zip random-forest-classifier-model/

  adding: random-forest-classifier-model/ (stored 0%)
  adding: random-forest-classifier-model/data/ (stored 0%)
  adding: random-forest-classifier-model/data/.part-00013-911cdd4a-3d12-4403-919e-8f3aa2c8fa7e-c000.snappy.parquet.crc (stored 0%)
  adding: random-forest-classifier-model/data/part-00014-911cdd4a-3d12-4403-919e-8f3aa2c8fa7e-c000.snappy.parquet (deflated 15%)
  adding: random-forest-classifier-model/data/.part-00000-911cdd4a-3d12-4403-919e-8f3aa2c8fa7e-c000.snappy.parquet.crc (stored 0%)
  adding: random-forest-classifier-model/data/.part-00009-911cdd4a-3d12-4403-919e-8f3aa2c8fa7e-c000.snappy.parquet.crc (stored 0%)
  adding: random-forest-classifier-model/data/.part-00004-911cdd4a-3d12-4403-919e-8f3aa2c8fa7e-c000.snappy.parquet.crc (stored 0%)
  adding: random-forest-classifier-model/data/part-00008-911cdd4a-3d12-4403-919e-8f3aa2c8fa7e-c000.snappy.parquet (deflated 15%)
  adding: random-forest-classifier-model/data/.part-00007-911cdd4a-3d12-4403-919e-8f3aa2c8fa7e-c000.snapp