<h1 style="text-align:center">Advanced Decision Tree Optimization in Spark ML for <font color="red">Breast Cancer Analysis</font></h1>  
<h2 style="text-align:center">Sai Sanwariya Narayan</h2>  

# Load and set up the Python files for this Lab
1. The decision_tree_plot directory
- decision_tree_parser.py
- decision_tree_plot.py
- tree_template.jinja2

In [1]:
import pyspark
import pandas as pd
import csv

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

## The following two lines import relevant functions from the two python files you uploaded into the decision_tree_plot subdirectory.

In [3]:
from decision_tree_plot.decision_tree_parser import decision_tree_parse
from decision_tree_plot.decision_tree_plot import plot_trees

## The first part of this project runs Spark in the local mode (with a small hyperparameter tuning)
## The second part of this project will require a large hyperparameter tuning in cluster mode
## Notice we are creating a SparkSession, not a SparkContext, when we use ML pipeline.
## The "getOrCreate()" method means we can re-evaluate this without a need to "stop the current SparkSession" first (unlike SparkContext).

In [4]:
ss=SparkSession.builder.master("local").appName("lab 8 DT").getOrCreate()

## Completing the following path with the path for your home directory.  

In [5]:
bc_schema = StructType([ StructField("id", IntegerType(), False ), \
                        StructField("clump_thickness", IntegerType(), False), \
                        StructField("unif_cell_size", IntegerType(), False ), \
                        StructField("unif_cell_shape", IntegerType(), False ), \
                        StructField("marg_adhesion", IntegerType(), False), \
                        StructField("single_epith_cell_size", IntegerType(), False), \
                        StructField("bare_nuclei", IntegerType(), False),\
                        StructField("bland_chrom", IntegerType(), False), \
                        StructField("norm_nucleoli", IntegerType(), False), \
                        StructField("mitoses", IntegerType(), False), \
                        StructField("class", StringType(), False) \
                           ])

In [6]:
data = ss.read.csv("/storage/home/breast-cancer-wisconsin.data.txt", schema=bc_schema, header=True, inferSchema=False)

# Feature Transformation Using DataFrame

In [7]:
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- clump_thickness: integer (nullable = true)
 |-- unif_cell_size: integer (nullable = true)
 |-- unif_cell_shape: integer (nullable = true)
 |-- marg_adhesion: integer (nullable = true)
 |-- single_epith_cell_size: integer (nullable = true)
 |-- bare_nuclei: integer (nullable = true)
 |-- bland_chrom: integer (nullable = true)
 |-- norm_nucleoli: integer (nullable = true)
 |-- mitoses: integer (nullable = true)
 |-- class: string (nullable = true)



In [8]:
data.show(5)

+-------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+
|     id|clump_thickness|unif_cell_size|unif_cell_shape|marg_adhesion|single_epith_cell_size|bare_nuclei|bland_chrom|norm_nucleoli|mitoses|class|
+-------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+
|1000025|              5|             1|              1|            1|                     2|          1|          3|            1|      1|    2|
|1002945|              5|             4|              4|            5|                     7|         10|          3|            2|      1|    2|
|1015425|              3|             1|              1|            1|                     2|          2|          3|            1|      1|    2|
|1016277|              6|             8|              8|            1|                     3|          4|          3|       

In [9]:
from pyspark.sql.functions import col
class_count = data.groupBy(col("class")).count()
class_count.show()

+-----+-----+
|class|count|
+-----+-----+
|    4|  241|
|    2|  458|
+-----+-----+



# Detecting and Filtering Rows with missing values

In [10]:
data.filter(col("bare_nuclei").isNull()).show()

+-------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+
|     id|clump_thickness|unif_cell_size|unif_cell_shape|marg_adhesion|single_epith_cell_size|bare_nuclei|bland_chrom|norm_nucleoli|mitoses|class|
+-------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+
|1057013|              8|             4|              5|            1|                     2|       null|          7|            3|      1|    4|
|1096800|              6|             6|              6|            9|                     6|       null|          7|            8|      1|    2|
|1183246|              1|             1|              1|            1|                     1|       null|          2|            1|      1|    2|
|1184840|              1|             1|              3|            1|                     2|       null|          2|       

In [11]:
data2 = data.filter(col("bare_nuclei").isNotNull())

In [12]:
data2.count()

683

In [13]:
data.count()

699

In [14]:
from pyspark.sql.functions import col
class_count2 = data2.groupBy(col("class")).count()
class_count2.show()

+-----+-----+
|class|count|
+-----+-----+
|    4|  239|
|    2|  444|
+-----+-----+



# Feature Transformation

## StringIndex
- Transforms a column of string to a new column of index (type double).
- The feature transformation involves three steps:
-- Step 1: Create a "transformer" 
-- Step 2: Use the data (which contains all possible values of the string column) to create a mapping (of these strings into an integer/index)
-- Step 3: Use the mapping to generate the new column's value (i.e., trasformed index) for each row.

In [15]:
labelIndexer= StringIndexer(inputCol="class", outputCol="indexedLabel").fit(data2)

In [16]:
labelIndexer

StringIndexerModel: uid=StringIndexer_4a9b224e0113, handleInvalid=error

In [17]:
transformed_data = labelIndexer.transform(data2)

In [18]:
transformed_data.show(4)

+-------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+------------+
|     id|clump_thickness|unif_cell_size|unif_cell_shape|marg_adhesion|single_epith_cell_size|bare_nuclei|bland_chrom|norm_nucleoli|mitoses|class|indexedLabel|
+-------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+------------+
|1000025|              5|             1|              1|            1|                     2|          1|          3|            1|      1|    2|         0.0|
|1002945|              5|             4|              4|            5|                     7|         10|          3|            2|      1|    2|         0.0|
|1015425|              3|             1|              1|            1|                     2|          2|          3|            1|      1|    2|         0.0|
|1016277|              6|             8|      

In [19]:
input_features = ['clump_thickness', 'unif_cell_size', 'unif_cell_shape', 'marg_adhesion', \
                  'single_epith_cell_size', 'bare_nuclei', 'bland_chrom', 'norm_nucleoli', 'mitoses']

In [20]:
assembler = VectorAssembler(inputCols=input_features, outputCol="features")

In [21]:
assembler

VectorAssembler_e858ceeeb680

In [22]:
vectorized_data = assembler.transform(transformed_data)

## We will use `vectorized_data` for splitting labelled data into training data and testing data.

In [23]:
vectorized2_data = vectorized_data.select("features",'indexedLabel')
vectorized2_data.show(5)

+--------------------+------------+
|            features|indexedLabel|
+--------------------+------------+
|[5.0,1.0,1.0,1.0,...|         0.0|
|[5.0,4.0,4.0,5.0,...|         0.0|
|[3.0,1.0,1.0,1.0,...|         0.0|
|[6.0,8.0,8.0,1.0,...|         0.0|
|[4.0,1.0,1.0,3.0,...|         0.0|
+--------------------+------------+
only showing top 5 rows



# Decision Tree Learning and Evaluation (1 hyperparameter setting)

## randomSplit is a method for DataFrames that splits data in the DataFrame into two subsets, one for training, the other for testing, using a number as the seed for random number generator.
## If you want to generate a different split, you can use a different seed (preferably a prime number).

In [24]:
trainingData, testData= vectorized_data.randomSplit([0.75, 0.25], seed=1237)

In [25]:
dt=DecisionTreeClassifier(featuresCol="features", labelCol="indexedLabel", maxDepth=6, minInstancesPerNode=2)

In [26]:
dt

DecisionTreeClassifier_b959414ba3e0

In [27]:
dt_model = dt.fit(trainingData)

In [28]:
dt_model

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_b959414ba3e0, depth=6, numNodes=31, numClasses=2, numFeatures=9

In [29]:
test_prediction = dt_model.transform(testData)

In [30]:
test_prediction.show(3)

+------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+------------+--------------------+-------------+--------------------+----------+
|    id|clump_thickness|unif_cell_size|unif_cell_shape|marg_adhesion|single_epith_cell_size|bare_nuclei|bland_chrom|norm_nucleoli|mitoses|class|indexedLabel|            features|rawPrediction|         probability|prediction|
+------+---------------+--------------+---------------+-------------+----------------------+-----------+-----------+-------------+-------+-----+------------+--------------------+-------------+--------------------+----------+
|142932|              7|             6|             10|            5|                     3|         10|          9|           10|      2|    4|         1.0|[7.0,6.0,10.0,5.0...|  [1.0,118.0]|[0.00840336134453...|       1.0|
|144888|              8|            10|             10|            8|                     5|        

# Comparing the actual labels and predicted labels more easily, we can select the following columns.
## The `probability` column records the probability for the row to be in the "zero/benign class" and the probability to be in the "one/malignant class".
## The `prediction` column records the predicted label for each row, which is the class with the higher probability.

In [31]:
test_prediction.select("features","class","indexedLabel", "probability", "prediction").show(5)

+--------------------+-----+------------+--------------------+----------+
|            features|class|indexedLabel|         probability|prediction|
+--------------------+-----+------------+--------------------+----------+
|[7.0,6.0,10.0,5.0...|    4|         1.0|[0.00840336134453...|       1.0|
|[8.0,10.0,10.0,8....|    4|         1.0|[0.00840336134453...|       1.0|
|[1.0,2.0,2.0,1.0,...|    2|         0.0|           [1.0,0.0]|       0.0|
|[5.0,3.0,2.0,8.0,...|    4|         1.0|           [1.0,0.0]|       0.0|
|[10.0,4.0,4.0,10....|    4|         1.0|[0.04166666666666...|       1.0|
+--------------------+-----+------------+--------------------+----------+
only showing top 5 rows



In [32]:
labelIndexer.labels

['2', '4']

In [33]:
labelConverter=IndexToString(inputCol="prediction", outputCol="predictedClass", labels=labelIndexer.labels)

In [34]:
test2_prediction = labelConverter.transform(test_prediction)

In [35]:
test2_prediction.select("features","class","indexedLabel","prediction","predictedClass").show(5)

+--------------------+-----+------------+----------+--------------+
|            features|class|indexedLabel|prediction|predictedClass|
+--------------------+-----+------------+----------+--------------+
|[7.0,6.0,10.0,5.0...|    4|         1.0|       1.0|             4|
|[8.0,10.0,10.0,8....|    4|         1.0|       1.0|             4|
|[1.0,2.0,2.0,1.0,...|    2|         0.0|       0.0|             2|
|[5.0,3.0,2.0,8.0,...|    4|         1.0|       0.0|             2|
|[10.0,4.0,4.0,10....|    4|         1.0|       1.0|             4|
+--------------------+-----+------------+----------+--------------+
only showing top 5 rows



In [36]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")

In [37]:
f1 = evaluator.evaluate(test_prediction)
print("f1 score:", f1)

f1 score: 0.9338310711681739


# DT Learning Using ML Pipeline

## Generating a decision tree (using pipeline) and compute f1 measure of the testing data.
- Record the f1 measure for the max_depth below  
- Recommended value for maxDepth: 2 to 10
- Recommended value for minInstancesPerNode: 1 to 7

In [38]:
trainingData, testData= data2.randomSplit([0.8, 0.2], seed=1234)

In [39]:
labelIndexer= StringIndexer(inputCol="class", outputCol="indexedLabel").fit(data2)
assembler = VectorAssembler( inputCols=input_features, outputCol="features")
dt=DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features", maxDepth= 6, minInstancesPerNode= 3)
predictionConverter = IndexToString(inputCol="prediction", outputCol="predictedClass", labels=labelIndexer.labels)
pipeline = Pipeline(stages=[labelIndexer, assembler, dt, predictionConverter])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)

In [40]:
pipeline

Pipeline_20574e130de4

In [41]:
model

PipelineModel_2d850a5fcbe7

In [42]:
predictions.select("class","indexedLabel","prediction","predictedClass").show(10)

+-----+------------+----------+--------------+
|class|indexedLabel|prediction|predictedClass|
+-----+------------+----------+--------------+
|    4|         1.0|       1.0|             4|
|    4|         1.0|       1.0|             4|
|    2|         0.0|       0.0|             2|
|    2|         0.0|       0.0|             2|
|    4|         1.0|       1.0|             4|
|    2|         0.0|       0.0|             2|
|    2|         0.0|       0.0|             2|
|    4|         1.0|       0.0|             2|
|    2|         0.0|       0.0|             2|
|    4|         1.0|       1.0|             4|
+-----+------------+----------+--------------+
only showing top 10 rows



In [43]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")

In [44]:
f1 = evaluator.evaluate(predictions)
print("f1 score:", f1)

f1 score: 0.9439967439967439


# Decision Tree Visualization

## stages[2] of the pipeline is "dt" (our model - a DecisionTreeClassifier). 
## model is a DataFrame representing a trained pipeline.
## model.stages[2] gives us the Decision Tree model learned.

In [45]:
DTmodel = model.stages[2]
print(DTmodel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_9ba308cc3902, depth=6, numNodes=29, numClasses=2, numFeatures=9


In [46]:
model_path="/storage/home/DTmodel_vis"

In [47]:
tree=decision_tree_parse(DTmodel, ss, model_path)
column = dict([(str(idx), i) for idx, i in enumerate(input_features)])
plot_trees(tree, column = column, output_path = '/storage/home/DTtree_local.html')

# Automated Hyperparameter Tuning for Decision Tree

In [48]:
input_features = ['clump_thickness', 'unif_cell_size', 'unif_cell_shape', 'marg_adhesion', \
                  'single_epith_cell_size', 'bare_nuclei', 'bland_chrom', 'norm_nucleoli', 'mitoses']

In [49]:
trainingData, testingData= data2.randomSplit([0.8, 0.2], seed=1237)
model_path="/storage/home/DT_HPT_local"

In [50]:
## Initialize a Pandas DataFrame to store evaluation results of all combination of hyper-parameter settings
hyperparams_eval_df = pd.DataFrame( columns = ['max_depth', 'minInstancesPerNode', 'training f1', 'testing f1', 'Best Model'] )
# initialize index to the hyperparam_eval_df to 0
index =0 
# initialize lowest_error
highest_testing_f1 = 0
# Set up the possible hyperparameter values to be evaluated
max_depth_list = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
minInstancesPerNode_list = [2, 3, 4, 5, 6]
labelIndexer = StringIndexer(inputCol="class", outputCol="indexedLabel").fit(data2)
assembler = VectorAssembler( inputCols=input_features, outputCol="features")
labelConverter = IndexToString(inputCol = "prediction", outputCol="predictedClass", labels=labelIndexer.labels)
trainingData.persist()
testingData.persist()
for max_depth in max_depth_list:
    for minInsPN in minInstancesPerNode_list:
        seed = 37
        # Construct a DT model using a set of hyper-parameter values and training data
        dt= DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features", maxDepth= max_depth , minInstancesPerNode= minInsPN)
        pipeline = Pipeline(stages=[labelIndexer, assembler, dt, predictionConverter])
        model = pipeline.fit(trainingData)
        training_predictions = model.transform(trainingData)
        testing_predictions = model.transform(testingData)
        evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
        training_f1 = evaluator.evaluate(training_predictions)
        testing_f1 = evaluator.evaluate(testing_predictions)
        # We use 0 as default value of the 'Best Model' column in the Pandas DataFrame.
        # The best model will have a value 1000
        hyperparams_eval_df.loc[index] = [ max_depth, minInsPN, training_f1, testing_f1, 0]  
        index = index +1
        if testing_f1 > highest_testing_f1 :
            best_max_depth = max_depth
            best_minInsPN = minInsPN
            best_index = index -1
            best_parameters_training_f1 = training_f1
            best_DTmodel= model.stages[2]
            best_tree = decision_tree_parse(best_DTmodel, ss, model_path)
            column = dict( [ (str(idx), i) for idx, i in enumerate(input_features) ])           
            highest_testing_f1 = testing_f1
print('The best max_depth is ', best_max_depth, ', best minInstancesPerNode = ', \
      best_minInsPN, ', testing f1 = ', highest_testing_f1) 
column = dict([(str(idx), i) for idx, i in enumerate(input_features)])

The best max_depth is  4 , best minInstancesPerNode =  3 , testing f1 =  0.9699248120300752


In [51]:
best_model_path="/storage/home/DT_HPT_local_best"

In [52]:
best_tree=decision_tree_parse(best_DTmodel, ss, best_model_path)
column = dict([(str(idx), i) for idx, i in enumerate(input_features)])
plot_trees(best_tree, column = column, output_path = '/storage/home/DT_HPT_local_best.html')

In [53]:
# Store the Testing RMS in the DataFrame
hyperparams_eval_df.loc[best_index]=[best_max_depth, best_minInsPN, best_parameters_training_f1, highest_testing_f1, 1000]

In [54]:
output_path = "/storage/home/HPT_Local.csv"
hyperparams_eval_df.to_csv(output_path)  

# A Revised Hyper-parameter Tuning 

# The range of hyper-parameter tuning:
- max_depth: 2 to 11
- minInstancesPerNode: 2 to 10

In [55]:
## Initialize a Pandas DataFrame to store evaluation results of all combination of hyper-parameter settings
hyperparams_eval_df = pd.DataFrame( columns = ['max_depth', 'minInstancesPerNode', 'training f1', 'testing f1', 'Best Model'] )
# initialize index to the hyperparam_eval_df to 0
index =0 
# initialize lowest_error
highest_testing_f1 = 0
# Set up the possible hyperparameter values to be evaluated
max_depth_list = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
minInstancesPerNode_list = [2, 3, 4, 5, 6, 7, 8, 9, 10]
labelIndexer = StringIndexer(inputCol="class", outputCol="indexedLabel").fit(data2)
assembler = VectorAssembler( inputCols=input_features, outputCol="features")
labelConverter = IndexToString(inputCol = "prediction", outputCol="predictedClass", labels=labelIndexer.labels)
trainingData.persist()
testingData.persist()
for max_depth in max_depth_list:
    for minInsPN in minInstancesPerNode_list:
        seed = 37
        # Construct a DT model using a set of hyper-parameter values and training data
        dt= DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features", maxDepth= max_depth , minInstancesPerNode= minInsPN)
        pipeline = Pipeline(stages=[labelIndexer, assembler, dt, predictionConverter])
        model = pipeline.fit(trainingData)
        training_predictions = model.transform(trainingData)
        testing_predictions = model.transform(testingData)
        evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
        training_f1 = evaluator.evaluate(training_predictions)
        testing_f1 = evaluator.evaluate(testing_predictions)
        # We use 0 as default value of the 'Best Model' column in the Pandas DataFrame.
        # The best model will have a value 1000
        hyperparams_eval_df.loc[index] = [ max_depth, minInsPN, training_f1, testing_f1, 0]  
        index = index +1
        if testing_f1 > highest_testing_f1 :
            best_max_depth = max_depth
            best_minInsPN = minInsPN
            best_index = index -1
            best_parameters_training_f1 = training_f1
            best_DTmodel= model.stages[2]
            best_tree = decision_tree_parse(best_DTmodel, ss, model_path)
            column = dict( [ (str(idx), i) for idx, i in enumerate(input_features) ])           
            highest_testing_f1 = testing_f1
print('The best max_depth is ', best_max_depth, ', best minInstancesPerNode = ', \
      best_minInsPN, ', testing f1 = ', highest_testing_f1) 
column = dict([(str(idx), i) for idx, i in enumerate(input_features)])

The best max_depth is  4 , best minInstancesPerNode =  3 , testing f1 =  0.9699248120300752


In [56]:
best_model_path="/storage/home/DT_HPT_local_best"

In [57]:
best_tree=decision_tree_parse(best_DTmodel, ss, best_model_path)
column = dict([(str(idx), i) for idx, i in enumerate(input_features)])
plot_trees(best_tree, column = column, output_path = '/storage/home/DT_HPT_local_best.html')

In [58]:
# Store the Testing RMS in the DataFrame
hyperparams_eval_df.loc[best_index]=[best_max_depth, best_minInsPN, best_parameters_training_f1, highest_testing_f1, 1000]

In [59]:
output_path = "/storage/home/Lab8_HPT_Local.csv"
hyperparams_eval_df.to_csv(output_path)  

In [60]:
ss.stop()