In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
from pyspark.sql.types import DoubleType
from pyspark.sql.types import LongType
from pyspark.sql.types import FloatType
from pyspark.sql.functions import sum
from pyspark.sql.functions import array
from pyspark.sql.functions import udf
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel


In [2]:
#load data as dataframe
data = sqlContext.read.load('/FileStore/tables/Breast_Cancer_Wisconsin/data.csv', format='csv', header='true', inferSchema='true')


In [3]:

#keep only the features with average values. don't keep id either
data = data.select(['diagnosis', 'radius_mean', 'texture_mean', 'perimeter_mean', 'area_mean', 'smoothness_mean', 
                    'compactness_mean', 'concavity_mean', 'concave points_mean', 'symmetry_mean', 
                    'fractal_dimension_mean'])



In [4]:
#display the structure
data.printSchema()

In [5]:
#randomly split the data into a test/train set and validation set (80%/20%)
splits = data.randomSplit([0.8, 0.2])
data_train = splits[0]
data_val = splits[1]


In [6]:
display(data_train)

In [7]:
#take a look at some data
data.limit(5).toPandas()

In [8]:
#convert diagnosis --> diagnosis_numeric. no one-hot encoding required because this can only have 
#two values (benign or malignant)
stringIndexerDiagnosis = StringIndexer(inputCol='diagnosis', outputCol='label')

In [9]:
all_feats = ['radius_mean', 'texture_mean', 'perimeter_mean', 'area_mean', 'smoothness_mean', 'compactness_mean', 
             'concavity_mean', 'concave points_mean', 'symmetry_mean', 'fractal_dimension_mean']
assemblerAllFeatures = VectorAssembler(inputCols=all_feats, outputCol='features')

In [10]:

#define the pipeline
pipeline = Pipeline(stages=[stringIndexerDiagnosis, assemblerAllFeatures])


In [11]:

#fit the pipeline on the training data. what it means to "fit" the pipeline is to basically go through each of the 
#stages of the piepline and figure out the appropriate parameters. for example, in the stringIndexerDiagnosis stage, 
#it assigns benign=0 and malignant=1 (or vice versa)
pipelineModel = pipeline.fit(data_train)



In [12]:
#actually transform the data now that the pipeline is fit. note that the resultant features vector may be sparse
output = pipelineModel.transform(data_train)


In [13]:
#take a look at the output
output.limit(3).toPandas()

In [14]:
#It's generally a good idea to take a look at the correlations between input features. This is a good first step at feature selection, in which we can #remove any highly correlated (and therefore redundant) features.
#To do correlation analysis, Let's select all the features and conver to a Pandas DataFrame.
#note: if the data set is large, we should limit the number of samples before converting to pandas
cont_feats = data_train.select(all_feats).toPandas()


In [15]:
#compute the correlation matrix
corr = cont_feats.corr()
print 'correlation matrix:'
corr
#fractal_dimension_mean  | smoothness_mean  | 

In [16]:
#plot the correlation matrix as a heatmap 
f, ax = plt.subplots(figsize=(11, 9))
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(corr, cmap=cmap, ax=ax)
plt.show()

In [17]:
#exclude the highly correlated features
all_feats = ['texture_mean', 'smoothness_mean', 'compactness_mean',  'symmetry_mean', 'fractal_dimension_mean']
cont_feats = data_train.select(all_feats).toPandas()

#compute the correlation matrix
corr = cont_feats.corr()
print 'correlation matrix:'
corr

In [18]:
import matplotlib.pyplot as plt

#the following lines allow the notebook to have multiple outputs for a single cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

#plot the correlation matrix as a heatmap 
f, ax = plt.subplots(figsize=(11, 9))
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(corr, cmap=cmap, ax=ax)
plt.show()

In [19]:
#Now, let's put everything together. Define a new feature assembler that only assembles the non-correlated features. Define a random forest classifier with default parameters, and put everything together in a pipeline.
#define a new assembler on only the non-correlated features
assemblerAllFeatures = VectorAssembler(inputCols=all_feats, outputCol='features')

model = RandomForestClassifier()

#chain everything together into a pipeline
pipeline = Pipeline(stages=[stringIndexerDiagnosis, assemblerAllFeatures, model])



In [20]:
#Note that we didn't define any of the parameters for the random forest classifier. As it turns out, there are two parameters that need to be optimized: the max depth of the trees, and the number of trees in the random forest. To identify the optimal values for these parameters,we will use the CrossValidator.

#we will set up the max depth to range from 1 to 10, and the number of trees to range from 5 to 70 in increments of 5. The metric used to determine the best performing model will be the area under the ROC, and the number of folds of cross validation are set to 3.

#define a ParamGridBuilder to determine optimal values of elasticNetParam (range [0,1]) and regParam (typically 
#ranges between 0 and 1). we'll sweep through 0 to 1 in increments of 0.1 for both parameters

paramGrid = ParamGridBuilder().addGrid(model.maxDepth, range(1,11)) \
                              .addGrid(model.numTrees, range(5,75,5)).build()
paramGrid

In [21]:
#define the RMSE to be the evaluation metric for model performance
#root-mean-square deviation
evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')


In [22]:
#set up 3-fold cross validation to determine the optimal depth parameter (this can be set higher for potentially 
#better results)
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)



In [23]:
#do the actual cross validation on the training data
CV_model = crossval.fit(data_train)


In [24]:
#get the best model
best_model = CV_model.bestModel

#for the best model, print out each tree in the random forest and list its depth and number of nodes
print ('number of trees: %i') % (len(best_model.stages[-1].trees))
best_model.stages[-1].trees

In [25]:
#From the above output, we can see that the optimal max depth was identified to be 6 and the number of trees used was 35.