In [1]:
import pyspark
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Classification").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler, OneHotEncoder, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator


## Preprocess Data

In [3]:
#Define field names and types
schema = StructType([
    StructField("age", FloatType(), True),
    StructField("sex", FloatType(), True),
    StructField("chest_pain_type", FloatType(), True),
    StructField("resting_bps", FloatType(), True),
    StructField("chol", FloatType(), True),
    StructField("fast_blood_sugar", FloatType(), True),
    StructField("rest_ecg_type", FloatType(), True),
    StructField("max_hr", FloatType(), True),
    StructField("exercise_angina", FloatType(), True),
    StructField("oldpeak", FloatType(), True),
    StructField("slope_type", FloatType(), True),
    StructField("colored_arteries", FloatType(), True),
    StructField("thal_type", FloatType(), True),
    StructField("heart_disease", IntegerType(), True),
])

df = spark.read.csv("data/processed.cleveland.data", schema = schema, header = False, nullValue="?")
df.limit(6).toPandas()

Unnamed: 0,age,sex,chest_pain_type,resting_bps,chol,fast_blood_sugar,rest_ecg_type,max_hr,exercise_angina,oldpeak,slope_type,colored_arteries,thal_type,heart_disease
0,63.0,1.0,1.0,145.0,233.0,1.0,2.0,150.0,0.0,2.3,3.0,0.0,6.0,0
1,67.0,1.0,4.0,160.0,286.0,0.0,2.0,108.0,1.0,1.5,2.0,3.0,3.0,2
2,67.0,1.0,4.0,120.0,229.0,0.0,2.0,129.0,1.0,2.6,2.0,2.0,7.0,1
3,37.0,1.0,3.0,130.0,250.0,0.0,0.0,187.0,0.0,3.5,3.0,0.0,3.0,0
4,41.0,0.0,2.0,130.0,204.0,0.0,2.0,172.0,0.0,1.4,1.0,0.0,3.0,0
5,56.0,1.0,2.0,120.0,236.0,0.0,0.0,178.0,0.0,0.8,1.0,0.0,3.0,0


In [4]:
print(df.describe().toPandas())

  summary                age                  sex     chest_pain_type  \
0   count                303                  303                 303   
1    mean  54.43894389438944   0.6798679867986799  3.1584158415841586   
2  stddev   9.03866244244675  0.46729882777012977  0.9601256119600138   
3     min               29.0                  0.0                 1.0   
4     max               77.0                  1.0                 4.0   

          resting_bps                chol    fast_blood_sugar  \
0                 303                 303                 303   
1  131.68976897689768  246.69306930693068  0.1485148514851485   
2   17.59974772958769  51.776917542637065  0.3561978749279763   
3                94.0               126.0                 0.0   
4               200.0               564.0                 1.0   

        rest_ecg_type              max_hr      exercise_angina  \
0                 303                 303                  303   
1  0.9900990099009901   149.6072607260

In [5]:
df.groupBy("heart_disease").count().show()

+-------------+-----+
|heart_disease|count|
+-------------+-----+
|            1|   55|
|            3|   35|
|            4|   13|
|            2|   36|
|            0|  164|
+-------------+-----+



In [6]:
#check how much missing data there is
dataAgg = df.agg(*[count(when(isnull(c), c)).alias(c) for c in df.columns])
print(dataAgg.limit(8).toPandas())

dfClean = df.na.drop()
features = df.columns[:-1]
label = "heart_disease"

   age  sex  chest_pain_type  resting_bps  chol  fast_blood_sugar  \
0    0    0                0            0     0                 0   

   rest_ecg_type  max_hr  exercise_angina  oldpeak  slope_type  \
0              0       0                0        0           0   

   colored_arteries  thal_type  heart_disease  
0                 4          2              0  


In [7]:
# change from multiclass to binary prediction
dfBinary = dfClean.withColumn("label", when(dfClean.heart_disease == 0, 0).otherwise(1))
print(dfBinary.groupBy("label").count().show())

#categorical cols will be indexed later
categoricalCols = [col for col in dfBinary.columns if "type" in col]
print(categoricalCols)

continuousCols = [f for f in features if "type" not in f]
continuousCols.remove("sex")

+-----+-----+
|label|count|
+-----+-----+
|    1|  137|
|    0|  160|
+-----+-----+

None
['chest_pain_type', 'rest_ecg_type', 'slope_type', 'thal_type']


In [8]:
#create dense feature vector and scale
featuresList = continuousCols + categoricalCols

assembler = VectorAssembler(inputCols = featuresList, outputCol = "features")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=5)
scaler = MinMaxScaler(inputCol="indexed", outputCol="features_scaled")
pipeline = Pipeline(stages=[assembler, indexer, scaler])
scalerModel = pipeline.fit(dfBinary)
scaledData = scalerModel.transform(dfBinary).select("features_scaled", "label")
scaledData = scaledData.withColumnRenamed("features_scaled", "features")
scaledData.limit(10).toPandas()

Unnamed: 0,features,label
0,"[0.7083333333333333, 0.4811320754716981, 0.244...",0
1,"[0.7916666666666666, 0.6226415094339622, 0.365...",1
2,"[0.7916666666666666, 0.24528301886792453, 0.23...",1
3,"[0.16666666666666666, 0.33962264150943394, 0.2...",0
4,"[0.25, 0.33962264150943394, 0.1780821917808219...",0
5,"(0.5625, 0.24528301886792453, 0.25114155251141...",0
6,"[0.6875, 0.43396226415094336, 0.32420091324200...",1
7,"[0.5833333333333333, 0.24528301886792453, 0.52...",0
8,"[0.7083333333333333, 0.33962264150943394, 0.29...",1
9,"[0.5, 0.43396226415094336, 0.17579908675799086...",1


## Train and Compare Models

In [9]:
train, test = scaledData.randomSplit([0.7, 0.3])
print(f"train len: {train.count()}, test len: {test.count()}")

train len: 212, test len: 85


In [11]:
def selectParameters(Mtype):
    if Mtype == "LogisticRegression":
            paramGrid = (ParamGridBuilder() \
#                          .addGrid(classifier.regParam, [0.1, 0.01]) \
                         .addGrid(classifier.maxIter, [10, 15,20])
                         .build())
    if Mtype == "RandomForestClassifier":
            paramGrid = (ParamGridBuilder() \
#                          .addGrid(classifier.maxDepth, [2, 5, 10]) \
#                          .addGrid(classifier.maxBins, [5, 10, 20]) \
                         .addGrid(classifier.numTrees, [5, 20])
                         .build())

    if Mtype == "GBTClassifier":
        paramGrid = (ParamGridBuilder() \
#                      .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                      .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
#                      .addGrid(classifier.maxIter, [10, 15]) \
                     .addGrid(classifier.stepSize, [0.1, 0.2, 0.3]) #learning rate
                     .build())
    return paramGrid

                
classifiers = [LogisticRegression(), GBTClassifier(), RandomForestClassifier()]

for classifier in classifiers:
    
    Mtype = type(classifier).__name__
    print(Mtype)
    BCEvaluator = BinaryClassificationEvaluator() 
    paramGrid = selectParameters(Mtype)

    crossval = CrossValidator(
        estimator = classifier,
        estimatorParamMaps = paramGrid,
        evaluator=BCEvaluator,
        numFolds= 2)

    fitModel = crossval.fit(train)

    best_model = fitModel.bestModel
    predictions = fitModel.transform(test) #fitModel automatically uses best model
    areaUnderROC = BCEvaluator.evaluate(predictions)
    print(f"Area under ROC: {areaUnderROC}")

LogisticRegression
Area under ROC: 0.903153153153153
GBTClassifier
Area under ROC: 0.8006756756756754
RandomForestClassifier
Area under ROC: 0.902027027027027


## Model Summary
For models such as logistic regression you can also get model summary information.

In [14]:
#Classification diagnostics for logistic regression

lr = LogisticRegression()

crossval = CrossValidator(
        estimator = lr,
        estimatorParamMaps = paramGrid,
        evaluator=BCEvaluator,
        numFolds= 2)
    
lrModel = crossval.fit(train)
best_Model = lrModel.bestModel
    
trainingSummary = best_Model.summary

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.limit(10).show()

print("Coefficients: \n" + str(best_Model.coefficientMatrix))
print("Intercept: " + str(best_Model.interceptVector))
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max("F-Measure").select("max(F-Measure)").head()
bestThreshold = fMeasure.where(fMeasure["F-Measure"] == maxFMeasure["max(F-Measure)"]) \
    .select("threshold").head()["threshold"]

lr.setThreshold(bestThreshold)

+---+----+
|FPR| TPR|
+---+----+
|0.0| 0.0|
|0.0|0.02|
|0.0|0.04|
|0.0|0.06|
|0.0|0.08|
|0.0| 0.1|
|0.0|0.12|
|0.0|0.14|
|0.0|0.16|
|0.0|0.18|
+---+----+

Coefficients: 
DenseMatrix([[-2.83419255,  2.42154978,  0.89220182, -0.1533292 , -4.57204062,
               0.98464082,  1.41579521,  4.02179428,  1.43571494,  0.34894122,
               0.78117619,  2.39696765]])
Intercept: [-0.7665040236657693]
areaUnderROC: 0.91625


LogisticRegression_8c72198e5432