In [1]:
#set environment
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [3]:
#import Sparksession driver
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Classification of Fetal Health Dataset") \
    .getOrCreate()

In [4]:
#Loading the data
df = spark.read.csv('data/fetal_health.csv',header=True,inferSchema=True)
df.show()

+--------------+-------------+--------------+--------------------+-------------------+--------------------+------------------------+-------------------------------+------------------------------------+------------------------------------------------------+-----------------------------------+---------------+-------------+-------------+-------------------------+--------------------------+--------------+--------------+----------------+------------------+------------------+------------+
|baseline value|accelerations|fetal_movement|uterine_contractions|light_decelerations|severe_decelerations|prolongued_decelerations|abnormal_short_term_variability|mean_value_of_short_term_variability|percentage_of_time_with_abnormal_long_term_variability|mean_value_of_long_term_variability|histogram_width|histogram_min|histogram_max|histogram_number_of_peaks|histogram_number_of_zeroes|histogram_mode|histogram_mean|histogram_median|histogram_variance|histogram_tendency|fetal_health|
+--------------+--------

In [5]:
df.show(5) #showing the first 5 rows

+--------------+-------------+--------------+--------------------+-------------------+--------------------+------------------------+-------------------------------+------------------------------------+------------------------------------------------------+-----------------------------------+---------------+-------------+-------------+-------------------------+--------------------------+--------------+--------------+----------------+------------------+------------------+------------+
|baseline value|accelerations|fetal_movement|uterine_contractions|light_decelerations|severe_decelerations|prolongued_decelerations|abnormal_short_term_variability|mean_value_of_short_term_variability|percentage_of_time_with_abnormal_long_term_variability|mean_value_of_long_term_variability|histogram_width|histogram_min|histogram_max|histogram_number_of_peaks|histogram_number_of_zeroes|histogram_mode|histogram_mean|histogram_median|histogram_variance|histogram_tendency|fetal_health|
+--------------+--------

In [6]:
#Check for missing values
for col in df.columns:
    print("no. of cells in column", col, "with null values:", df.filter(df[col].isNull()).count())

no. of cells in column baseline value with null values: 0
no. of cells in column accelerations with null values: 0
no. of cells in column fetal_movement with null values: 0
no. of cells in column uterine_contractions with null values: 0
no. of cells in column light_decelerations with null values: 0
no. of cells in column severe_decelerations with null values: 0
no. of cells in column prolongued_decelerations with null values: 0
no. of cells in column abnormal_short_term_variability with null values: 0
no. of cells in column mean_value_of_short_term_variability with null values: 0
no. of cells in column percentage_of_time_with_abnormal_long_term_variability with null values: 0
no. of cells in column mean_value_of_long_term_variability with null values: 0
no. of cells in column histogram_width with null values: 0
no. of cells in column histogram_min with null values: 0
no. of cells in column histogram_max with null values: 0
no. of cells in column histogram_number_of_peaks with null valu

In [8]:
#all the independent variables need to be packed into one column of vector type
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["baseline value","accelerations","fetal_movement","uterine_contractions","light_decelerations","severe_decelerations","prolongued_decelerations","abnormal_short_term_variability","mean_value_of_short_term_variability","percentage_of_time_with_abnormal_long_term_variability","mean_value_of_long_term_variability","histogram_width","histogram_min","histogram_max","histogram_number_of_peaks","histogram_number_of_zeroes","histogram_mode","histogram_mean","histogram_median","histogram_variance","histogram_tendency"], 
                            outputCol="features")
feature_vec=assembler.transform(df).select('features','fetal_health')
feature_vec.show(5)

+--------------------+------------+
|            features|fetal_health|
+--------------------+------------+
|[120.0,0.0,0.0,0....|         2.0|
|[132.0,0.006,0.0,...|         1.0|
|[133.0,0.003,0.0,...|         1.0|
|[134.0,0.003,0.0,...|         1.0|
|[132.0,0.007,0.0,...|         1.0|
+--------------------+------------+
only showing top 5 rows



In [9]:
#Count of target classes
feature_vec.groupBy('fetal_health').count().show()
#there is data imbalance

+------------+-----+
|fetal_health|count|
+------------+-----+
|         1.0| 1655|
|         3.0|  176|
|         2.0|  295|
+------------+-----+



In [10]:
# Split the data into train and test sets
train_data, test_data = feature_vec.randomSplit([.75,.25],seed=0)

# Logistic Regression

In [11]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="fetal_health", featuresCol="features",  
                        maxIter=100, regParam=0.0001, family="multinomial",  
                        elasticNetParam=0.0)

# Train model with Training Data
lrModel = lr.fit(train_data)
predictions = lrModel.transform(test_data)
predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- fetal_health: double (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [12]:
predictions.select('fetal_health','prediction').show()

+------------+----------+
|fetal_health|prediction|
+------------+----------+
|         3.0|       3.0|
|         3.0|       3.0|
|         1.0|       1.0|
|         3.0|       3.0|
|         3.0|       3.0|
|         3.0|       3.0|
|         3.0|       3.0|
|         3.0|       3.0|
|         1.0|       2.0|
|         2.0|       2.0|
|         2.0|       2.0|
|         1.0|       1.0|
|         3.0|       3.0|
|         1.0|       1.0|
|         1.0|       1.0|
|         3.0|       3.0|
|         3.0|       3.0|
|         1.0|       1.0|
|         1.0|       1.0|
|         1.0|       1.0|
+------------+----------+
only showing top 20 rows



In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator #provides accuracy & f1 metrics

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='fetal_health', metricName='accuracy')
evaluator.evaluate(predictions)

0.8936170212765957

In [15]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='fetal_health', metricName='f1')
evaluator.evaluate(predictions)

0.8927650363820576

In [16]:
predictions.groupBy('fetal_health').count().show()

+------------+-----+
|fetal_health|count|
+------------+-----+
|         1.0|  398|
|         3.0|   54|
|         2.0|   65|
+------------+-----+



# Hyper-parameter tuning

In [17]:
#Grid Search
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()\
             .addGrid(lr.regParam,[0.001,0.01,0.1,1])\
             .addGrid(lr.elasticNetParam,[0.0,0.5,1.0])\
             .build())

# Create 4-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)

cvModel = cv.fit(train_data)

In [18]:
list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))

[(0.8873501942709796,
  {Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
   Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}),
 (0.892313510194199,
  {Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
   Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5}),
 (0.8873675753031562,
  {Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
   Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='elasticNetParam', doc='the ElasticNet mi

In [19]:
#Best Model Params
score_params_list = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
max(score_params_list,key=lambda item:item[0])

(0.892313510194199,
 {Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
  Param(parent='LogisticRegression_46a1b6f2bdd4e61f5e57', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5})

In [20]:
predictions = cvModel.bestModel.transform(test_data)

In [21]:
evaluator.evaluate(predictions)

0.904374444509741

# Inference