# Programming Assignment - 4 A Solution

Use the MLLib API of Spark to construct a decision tree for the Breast Cancer Diagnostic data (we call it dataset1), available from the UC-Irvine ML repository. Select appropriate parameters to generate only a 3-level deep decision tree. Submit the following:

a.	Your program code.

b.	The choice of parameters and attribute selection metric (Gini index, info gain, etc.) used.
    
    Impurity Measur: Gini Index
    
    Max depth of the tree: 3
    
    Max Bins of the continuous data: [10, 20, 32, 40, 50]

c.	Any assumptions made.

    The one which has more Area under the ROC (AUROC) curve is the best model for our application.
    
    While performing K-fold Cross-validation the value of k is taken as 3.

d.	Validation and Train/Test Strategy used.
    
    Firstly, the entire data is split into train and test. Then, K-fold Cross-validation is performed on train to get model that will not overfit and more generalized. Finally, the model is tested against test data, and the performance metrics are also reported.

e.	Decision tree Obtained.

f.	Performance shown by the confusion matrix.


In [1]:
# Creating a SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('DT_BC').getOrCreate()

In [2]:
# Reading the csv data to a dataframe 

df = spark.read.csv('wdbc.csv')

# Top 1 column of the dataframe

df.limit(1).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29,_c30,_c31
0,842302,M,17.99,10.38,122.8,1001,0.1184,0.2776,0.3001,0.1471,...,25.38,17.33,184.6,2019,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189


In [3]:
# print schema of the datafame

df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (nullable = tru

In [4]:
# The number of rows

df.count()

569

In [5]:
# We need to change the column type of the features to Double as all are read as string  
# Then we drop all the string type columns 

from pyspark.sql.types import DoubleType

for i in range(2,len(df.columns)):
    df = df.withColumn("_cc"+str(i),df["_c"+str(i)].cast(DoubleType()))
    df = df.drop('_c'+str(i))
        
df.limit(1).toPandas()

Unnamed: 0,_c0,_c1,_cc2,_cc3,_cc4,_cc5,_cc6,_cc7,_cc8,_cc9,...,_cc22,_cc23,_cc24,_cc25,_cc26,_cc27,_cc28,_cc29,_cc30,_cc31
0,842302,M,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,...,25.38,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189


In [6]:
row1 = df.agg({"_cc31": "min"}).collect()[0]

print(row1)

Row(min(_cc31)=0.05504)


In [7]:
# Print the schema of the dataframe

df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _cc2: double (nullable = true)
 |-- _cc3: double (nullable = true)
 |-- _cc4: double (nullable = true)
 |-- _cc5: double (nullable = true)
 |-- _cc6: double (nullable = true)
 |-- _cc7: double (nullable = true)
 |-- _cc8: double (nullable = true)
 |-- _cc9: double (nullable = true)
 |-- _cc10: double (nullable = true)
 |-- _cc11: double (nullable = true)
 |-- _cc12: double (nullable = true)
 |-- _cc13: double (nullable = true)
 |-- _cc14: double (nullable = true)
 |-- _cc15: double (nullable = true)
 |-- _cc16: double (nullable = true)
 |-- _cc17: double (nullable = true)
 |-- _cc18: double (nullable = true)
 |-- _cc19: double (nullable = true)
 |-- _cc20: double (nullable = true)
 |-- _cc21: double (nullable = true)
 |-- _cc22: double (nullable = true)
 |-- _cc23: double (nullable = true)
 |-- _cc24: double (nullable = true)
 |-- _cc25: double (nullable = true)
 |-- _cc26: double (nullable = true)
 |-- _cc

In [8]:
# Drop the first column as well since it is the id of each row which is not a feature

df = df.drop('_c0')

df.limit(1).toPandas()

Unnamed: 0,_c1,_cc2,_cc3,_cc4,_cc5,_cc6,_cc7,_cc8,_cc9,_cc10,...,_cc22,_cc23,_cc24,_cc25,_cc26,_cc27,_cc28,_cc29,_cc30,_cc31
0,M,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,...,25.38,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189


In [9]:
# We will then use StringIndexer to encode the label as Malignant(M):1 and Benign(B):0 
# We will also use VectorAssembler to create a vector of features which will be given to the DecisionTreeClassifier
# The above two stages are combined to form a Pipeline

from pyspark.ml.feature import StringIndexer, VectorAssembler

label_stringIdx = StringIndexer(inputCol = '_c1', outputCol = 'label')

assemblerInputs = ["_cc"+str(i) for i in range(2,len(df.columns))]

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages=[label_stringIdx, assembler]

In [10]:
# Create a Pipeline and Pass the dataframe into the pipeline to transform as required to the DecisionTreeClassifier

cols = df.columns

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(df)

df = pipelineModel.transform(df)

n_cols = ['label', 'features']+cols

df = df.select(n_cols)

df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _cc2: double (nullable = true)
 |-- _cc3: double (nullable = true)
 |-- _cc4: double (nullable = true)
 |-- _cc5: double (nullable = true)
 |-- _cc6: double (nullable = true)
 |-- _cc7: double (nullable = true)
 |-- _cc8: double (nullable = true)
 |-- _cc9: double (nullable = true)
 |-- _cc10: double (nullable = true)
 |-- _cc11: double (nullable = true)
 |-- _cc12: double (nullable = true)
 |-- _cc13: double (nullable = true)
 |-- _cc14: double (nullable = true)
 |-- _cc15: double (nullable = true)
 |-- _cc16: double (nullable = true)
 |-- _cc17: double (nullable = true)
 |-- _cc18: double (nullable = true)
 |-- _cc19: double (nullable = true)
 |-- _cc20: double (nullable = true)
 |-- _cc21: double (nullable = true)
 |-- _cc22: double (nullable = true)
 |-- _cc23: double (nullable = true)
 |-- _cc24: double (nullable = true)
 |-- _cc25: double (nullable = true)
 |

In [11]:
# Top row of the dataframe

df.head(1)

[Row(label=1.0, features=DenseVector([17.99, 10.38, 122.8, 1001.0, 0.1184, 0.2776, 0.3001, 0.1471, 0.2419, 0.0787, 1.095, 0.9053, 8.589, 153.4, 0.0064, 0.049, 0.0537, 0.0159, 0.03, 0.0062, 25.38, 17.33, 184.6, 2019.0, 0.1622, 0.6656, 0.7119, 0.2654, 0.4601]), _c1='M', _cc2=17.99, _cc3=10.38, _cc4=122.8, _cc5=1001.0, _cc6=0.1184, _cc7=0.2776, _cc8=0.3001, _cc9=0.1471, _cc10=0.2419, _cc11=0.07871, _cc12=1.095, _cc13=0.9053, _cc14=8.589, _cc15=153.4, _cc16=0.006399, _cc17=0.04904, _cc18=0.05373, _cc19=0.01587, _cc20=0.03003, _cc21=0.006193, _cc22=25.38, _cc23=17.33, _cc24=184.6, _cc25=2019.0, _cc26=0.1622, _cc27=0.6656, _cc28=0.7119, _cc29=0.2654, _cc30=0.4601, _cc31=0.1189)]

In [12]:
# Spliting the data into train and split

train, test = df.randomSplit([0.7, 0.3], seed = 2018)

# Print the top of the train and test dataframe

train.limit(1).toPandas()

Unnamed: 0,label,features,_c1,_cc2,_cc3,_cc4,_cc5,_cc6,_cc7,_cc8,...,_cc22,_cc23,_cc24,_cc25,_cc26,_cc27,_cc28,_cc29,_cc30,_cc31
0,0.0,"[6.981, 13.43, 43.79, 143.5, 0.117, 0.07568, 0...",B,6.981,13.43,43.79,143.5,0.117,0.07568,0.0,...,7.93,19.54,50.41,185.2,0.1584,0.1202,0.0,0.0,0.2932,0.09382


In [13]:
test.limit(1).toPandas()

Unnamed: 0,label,features,_c1,_cc2,_cc3,_cc4,_cc5,_cc6,_cc7,_cc8,...,_cc22,_cc23,_cc24,_cc25,_cc26,_cc27,_cc28,_cc29,_cc30,_cc31
0,0.0,"[7.76, 24.54, 47.92, 181.0, 0.05263, 0.04362, ...",B,7.76,24.54,47.92,181.0,0.05263,0.04362,0.0,...,9.456,30.37,59.16,268.6,0.08996,0.06444,0.0,0.0,0.2871,0.07039


In [14]:
# We will use CrossValidation to to tune the maxBins size to get a best model
# The evaluator to this will be the Area under the ROC curve(AUROC)

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator() # The evalutor is AUROC by default

# Impurity is Gini by default

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3) 

# We will add the maxBin parameters based which the CrossValidation will go  

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxBins, [10, 20, 32]) \
    .build()

cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

# Fit the train data to get the model

cvModel = cv.fit(train)

In [15]:
# Pull out the best model

BestModel = cvModel.bestModel
print(BestModel.toDebugString)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_44388b404477) of depth 3 with 13 nodes
  If (feature 22 <= 113.15)
   If (feature 27 <= 0.1572)
    Predict: 0.0
   Else (feature 27 > 0.1572)
    If (feature 21 <= 23.59)
     Predict: 0.0
    Else (feature 21 > 23.59)
     Predict: 1.0
  Else (feature 22 > 113.15)
   If (feature 23 <= 827.85)
    If (feature 1 <= 20.155)
     Predict: 0.0
    Else (feature 1 > 20.155)
     Predict: 1.0
   Else (feature 23 > 827.85)
    If (feature 26 <= 0.19190000000000002)
     Predict: 0.0
    Else (feature 26 > 0.19190000000000002)
     Predict: 1.0



In [16]:
# What are the Parameters of the best model

print(BestModel.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)
featuresCol: features column name (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name (default: label, current: label)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32, current: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 

In [17]:
# Evaluation of the model on the test data and the following are computed:
# 1. Precision of 1
# 2. Precision of 0
# 3. Recall of 1
# 4. Recall of 0
# 5. F-1 Score
# 6. Confusion Matrix

from pyspark.mllib.evaluation import MulticlassMetrics

def getPredictionsLabels(model, test_data):
    predictions = cvModel.transform(test_data)
    predictions_n = predictions["label", "prediction"]
    predictionsAndLabels = predictions_n.rdd.map(tuple)
    return  predictionsAndLabels

def printMetrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    print ('Precision of 1:Malignant\t', metrics.precision(1))
    print ('Precision of 0:Benign\t\t', metrics.precision(0))
    print ('Recall of 1:Malignant\t\t', metrics.recall(1))
    print ('Recall of 0:Benign\t\t', metrics.recall(0))
    print ('F-1 Score\t\t\t', metrics.fMeasure())
    print ('Confusion Matrix\n', metrics.confusionMatrix().toArray())

predictions_and_labels = getPredictionsLabels(cvModel, test)

printMetrics(predictions_and_labels)

Precision of 1:Malignant	 0.8767123287671232
Precision of 0:Benign		 0.9603960396039604
Recall of 1:Malignant		 0.9411764705882353
Recall of 0:Benign		 0.9150943396226415
F-1 Score			 0.9252873563218391
Confusion Matrix
 [[97.  9.]
 [ 4. 64.]]


                                                                                                    Author:
                                                                                                        Varun Raj Rayabarapu