###Import all the Libraries

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import col, when, max

from pyspark.ml import Pipeline

from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, RandomForestClassifier, LinearSVC, GBTClassifier, FMClassifier

from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

###Create a Spark-Submit Session

In [0]:
PYSPARK_CLI = True # conditional statement to run only at shell
if PYSPARK_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

In [0]:
# Limit the log
spark.sparkContext.setLogLevel("WARN")

###Load the sample dataset from DBFS

In [0]:
# Oracle BDCE
#csv = spark.read.csv('/user/agupta25/project/benefits1.csv', inferSchema=True, header=True)
# File location and type
file_location = "/FileStore/tables/benefits1.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
  
display(df)

BusinessYear,StateCode,IssuerId,SourceName,ImportDate,StandardComponentId,PlanId,BenefitName,CopayInnTier1,CopayInnTier2,CopayOutofNet,CoinsInnTier1,CoinsInnTier2,CoinsOutofNet,IsEHB,IsCovered,QuantLimitOnSvc,LimitQty,LimitUnit,Exclusions,Explanation,EHBVarReason,IsExclFromInnMOOP,IsExclFromOonMOOP
2021,IL,36096,SERFF,2020-12-23 20:15:49,36096IL0990144,36096IL0990144-06,Hearing Aids,Not Applicable,Not Applicable,Not Applicable,30.00% Coinsurance after deductible,Not Applicable,50.00% Coinsurance after deductible,Yes,Covered,Yes,2.0,Item(s) per 2 Years,,,Substantially Equal,No,No
2021,TN,23552,HIOS,2020-09-18 03:20:24,23552TN0020016,23552TN0020016-05,Inpatient Physician and Surgical Services,$75.00,Not Applicable,Not Applicable,Not Applicable,Not Applicable,100.00%,Yes,Covered,,,,,,,No,No
2017,FL,36194,HIOS,2016-09-28 03:28:44,36194FL0030034,36194FL0030034-00,"Inpatient Hospital Services (e.g., Hospital Stay)",Not Applicable,,Not Applicable,50.00% Coinsurance after deductible,,100.00%,Yes,Covered,No,,,,,,No,Yes
2020,GA,60224,HIOS,2019-08-14 03:20:21,60224GA0010003,60224GA0010003-03,Preventive Care/Screening/Immunization,$0.00,,Not Applicable,0.00%,,100.00%,Yes,Covered,,,,,"The recommendations by the USPSTF for breast cancer screenings, mammography and preventions issued prior to November 2009 will be considered current. Immunizations covered are those recommended by the Advisory Committee on Immunization Practices of the Centers for Disease Control and Prevention (CDC).",,No,No
2020,MI,77739,SERFF,2019-08-21 20:17:17,77739MI0070009,77739MI0070009-03,Infertility Treatment,Not Applicable,Not Applicable,Not Applicable,40.00% Coinsurance after deductible,Not Applicable,100.00%,Yes,Covered,,,,,Underlying causes only.,,No,No
2018,MI,60829,SERFF,2017-08-16 20:15:58,60829MI0190010,60829MI0190010-01,Urgent Care Centers or Facilities,$60.00,,$60.00,Not Applicable,,Not Applicable,Yes,Covered,,,,,Urgent care visits are always covered at network benefit level.,,No,No
2021,UT,68781,SERFF,2020-12-29 20:15:44,68781UT0200008,68781UT0200008-03,Treatment for Temporomandibular Joint Disorders,,,,,,,,Not Covered,,,,,,,,
2021,OK,40463,HIOS,2020-10-22 03:20:10,40463OK0010023,40463OK0010023-01,Prenatal and Postnatal Care,No Charge,,Not Applicable,Not Applicable,,50.00% Coinsurance after deductible,Yes,Covered,,,,,,,No,Yes
2020,IL,20129,SERFF,2019-09-16 20:17:28,20129IL0340045,20129IL0340045-01,Basic Dental Care - Child,Not Applicable,,Not Applicable,50.00% Coinsurance after deductible,,100.00%,Yes,Covered,,,,,,,No,No
2018,PA,16322,SERFF,2018-03-15 11:38:40,16322PA0060096,16322PA0060096-00,Basic Dental Care - Adult,,,,,,,,Not Covered,,,,,,,,


### Prepare the Data

In [0]:
df = df.select('BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'IsEHB', 'QuantLimitOnSvc', 'Exclusions', 'EHBVarReason',col("IsCovered").alias("label"))

df.show()

+------------+---------+--------+----------+-----+---------------+----------+-------------------+-----------+
|BusinessYear|StateCode|IssuerId|SourceName|IsEHB|QuantLimitOnSvc|Exclusions|       EHBVarReason|      label|
+------------+---------+--------+----------+-----+---------------+----------+-------------------+-----------+
|        2021|       IL|   36096|     SERFF|  Yes|            Yes|      null|Substantially Equal|    Covered|
|        2021|       TN|   23552|      HIOS|  Yes|           null|      null|               null|    Covered|
|        2017|       FL|   36194|      HIOS|  Yes|             No|      null|               null|    Covered|
|        2020|       GA|   60224|      HIOS|  Yes|           null|      null|               null|    Covered|
|        2020|       MI|   77739|     SERFF|  Yes|           null|      null|               null|    Covered|
|        2018|       MI|   60829|     SERFF|  Yes|           null|      null|               null|    Covered|
|        2

In [0]:
df.printSchema()

root
 |-- BusinessYear: integer (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- IssuerId: integer (nullable = true)
 |-- SourceName: string (nullable = true)
 |-- IsEHB: string (nullable = true)
 |-- QuantLimitOnSvc: string (nullable = true)
 |-- Exclusions: string (nullable = true)
 |-- EHBVarReason: string (nullable = true)
 |-- label: string (nullable = true)



###count the null values from prediction col

In [0]:
from pyspark.sql.functions import col, sum

# assuming that `df` is a Spark DataFrame and `label` is a column in `df`
null_count = df.select(sum(col("label").isNull().cast("integer"))).collect()[0][0]
print(null_count)


2001


###Replace null or whitespace values with None. Later drop the values.

In [0]:
from pyspark.sql.functions import when, col

# Replace empty strings or whitespace with null values
df = df.withColumn('label', when(col('label').isin('', ' '), None).otherwise(col('label')))

# Drop null values from label column
df = df.dropna(subset=['label'])
df.show()



+------------+---------+--------+----------+-----+---------------+----------+-------------------+-----------+
|BusinessYear|StateCode|IssuerId|SourceName|IsEHB|QuantLimitOnSvc|Exclusions|       EHBVarReason|      label|
+------------+---------+--------+----------+-----+---------------+----------+-------------------+-----------+
|        2021|       IL|   36096|     SERFF|  Yes|            Yes|      null|Substantially Equal|    Covered|
|        2021|       TN|   23552|      HIOS|  Yes|           null|      null|               null|    Covered|
|        2017|       FL|   36194|      HIOS|  Yes|             No|      null|               null|    Covered|
|        2020|       GA|   60224|      HIOS|  Yes|           null|      null|               null|    Covered|
|        2020|       MI|   77739|     SERFF|  Yes|           null|      null|               null|    Covered|
|        2018|       MI|   60829|     SERFF|  Yes|           null|      null|               null|    Covered|
|        2

###Take Max of all the other columns in dataset having null values

In [0]:
df.agg({'IsEHB': 'max','QuantLimitOnSvc':'max','Exclusions':'max','EHBVarReason':'max'}).collect()


Out[13]: [Row(max(Exclusions)='in vitro fertilization and artificial insemination.', max(QuantLimitOnSvc)='Yes', max(IsEHB)='Yes', max(EHBVarReason)='Using Alternate Benchmark')]

###Populating the aggregated values of other columns inplace of null values

In [0]:
df = df.fillna({
    "BusinessYear": 0,
    "StateCode": "",
    "IssuerId": 0,
    "SourceName": "",
    "IsEHB": "Yes",
    "QuantLimitOnSvc": "Yes",
    "Exclusions": "in vitro fertilization and artificial insemination",
    "EHBVarReason": "Using Alternate Benchmark",
    "label": ""
})

df.show()


+------------+---------+--------+----------+-----+---------------+--------------------+--------------------+-----------+
|BusinessYear|StateCode|IssuerId|SourceName|IsEHB|QuantLimitOnSvc|          Exclusions|        EHBVarReason|      label|
+------------+---------+--------+----------+-----+---------------+--------------------+--------------------+-----------+
|        2021|       IL|   36096|     SERFF|  Yes|            Yes|in vitro fertiliz...| Substantially Equal|    Covered|
|        2021|       TN|   23552|      HIOS|  Yes|            Yes|in vitro fertiliz...|Using Alternate B...|    Covered|
|        2017|       FL|   36194|      HIOS|  Yes|             No|in vitro fertiliz...|Using Alternate B...|    Covered|
|        2020|       GA|   60224|      HIOS|  Yes|            Yes|in vitro fertiliz...|Using Alternate B...|    Covered|
|        2020|       MI|   77739|     SERFF|  Yes|            Yes|in vitro fertiliz...|Using Alternate B...|    Covered|
|        2018|       MI|   60829

###Convert the label into 0 and 1 for classification modelling and prediction.

In [0]:
df = df.withColumn("label", when(df["label"] == "Covered", 1).otherwise(0))
df.show()


+------------+---------+--------+----------+-----+---------------+--------------------+--------------------+-----+
|BusinessYear|StateCode|IssuerId|SourceName|IsEHB|QuantLimitOnSvc|          Exclusions|        EHBVarReason|label|
+------------+---------+--------+----------+-----+---------------+--------------------+--------------------+-----+
|        2021|       IL|   36096|     SERFF|  Yes|            Yes|in vitro fertiliz...| Substantially Equal|    1|
|        2021|       TN|   23552|      HIOS|  Yes|            Yes|in vitro fertiliz...|Using Alternate B...|    1|
|        2017|       FL|   36194|      HIOS|  Yes|             No|in vitro fertiliz...|Using Alternate B...|    1|
|        2020|       GA|   60224|      HIOS|  Yes|            Yes|in vitro fertiliz...|Using Alternate B...|    1|
|        2020|       MI|   77739|     SERFF|  Yes|            Yes|in vitro fertiliz...|Using Alternate B...|    1|
|        2018|       MI|   60829|     SERFF|  Yes|            Yes|in vitro ferti

###Shows the summary of dataset

In [0]:
df.describe().show()

+-------+------------------+---------+------------------+----------+-----+---------------+--------------------+--------------------+-------------------+
|summary|      BusinessYear|StateCode|          IssuerId|SourceName|IsEHB|QuantLimitOnSvc|          Exclusions|        EHBVarReason|              label|
+-------+------------------+---------+------------------+----------+-----+---------------+--------------------+--------------------+-------------------+
|  count|             47999|    47999|             47999|     47999|47999|          47999|               47999|               47999|              47999|
|   mean|2019.0697097856205|     null|50793.196337423695|      null| null|           null|                null|                null| 0.8110168961853372|
| stddev|1.4978542860833675|     null|26507.148720621153|      null| null|           null|                null|                null|0.39149927651764344|
|    min|              2017|       AK|             10046|      HIOS|  Yes|        

###Shows the existing null values in dataset

In [0]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+------------+---------+--------+----------+-----+---------------+----------+------------+-----+
|BusinessYear|StateCode|IssuerId|SourceName|IsEHB|QuantLimitOnSvc|Exclusions|EHBVarReason|label|
+------------+---------+--------+----------+-----+---------------+----------+------------+-----+
|           0|        0|       0|         0|    0|              0|         0|           0|    0|
+------------+---------+--------+----------+-----+---------------+----------+------------+-----+



### Split the Data for training & testing

In [0]:
splits = df.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)


Training Rows: 33583  Testing Rows: 14416


### Define the Pipeline
A predictive model often requires multiple stages of feature preparation. For example, it is common when using some algorithms to distingish between continuous features (which have a calculable numeric value) and categorical features (which are numeric representations of discrete categories). It is also common to *normalize* continuous numeric features to use a common scale (for example, by scaling all numbers to a proportinal decimal value between 0 and 1).

A pipeline consists of a a series of *transformer* and *estimator* stages that typically prepare a DataFrame for
modeling and then train a predictive model. In this case, you will create a pipeline with seven stages:
- A **StringIndexer** estimator that converts string values to indexes for categorical features
- A **VectorAssembler** that combines categorical features into a single vector
- A **VectorIndexer** that creates indexes for a vector of categorical features
- A **VectorAssembler** that creates a vector of continuous numeric features
- A **MinMaxScaler** that normalizes continuous numeric features
- A **VectorAssembler** that creates a vector of categorical and continuous features
- A **DecisionTreeClassifier** that trains a classification model.

In [0]:
strIdx_SC = StringIndexer(inputCol = "StateCode", outputCol = "SC",handleInvalid='keep')
strIdx_SN = StringIndexer(inputCol = "SourceName", outputCol = "SN",handleInvalid='keep')
strIdx_EHB = StringIndexer(inputCol = "IsEHB", outputCol = "EHB",handleInvalid='keep')
strIdx_QL = StringIndexer(inputCol = "QuantLimitOnSvc", outputCol = "QL",handleInvalid='keep')
strIdx_EX = StringIndexer(inputCol = "Exclusions", outputCol = "EX",handleInvalid='keep')
strIdx_EHBVR = StringIndexer(inputCol = "EHBVarReason", outputCol = "EHBVR",handleInvalid='keep')


# the following columns are categorical number such as ID so that it should be Category features
catVect = VectorAssembler(inputCols = ["SC", "BusinessYear", "IssuerId", "SN", "EHB","QL","EHBVR"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures", handleInvalid="skip")


###Shows the feature extraction count of cat features

In [0]:
# Fit the string indexers on the input data
strIdx_SC_model = strIdx_SC.fit(df)
strIdx_SN_model = strIdx_SN.fit(df)
strIdx_EHB_model = strIdx_EHB.fit(df)
strIdx_QL_model = strIdx_QL.fit(df)
strIdx_EX_model = strIdx_EX.fit(df)
strIdx_EHBVR_model = strIdx_EHBVR.fit(df)

# Transform the input data using the fitted string indexers
data_transformed = df
data_transformed = strIdx_SC_model.transform(data_transformed)
data_transformed = strIdx_SN_model.transform(data_transformed)
data_transformed = strIdx_EHB_model.transform(data_transformed)
data_transformed = strIdx_QL_model.transform(data_transformed)
data_transformed = strIdx_EX_model.transform(data_transformed)
data_transformed = strIdx_EHBVR_model.transform(data_transformed)

# Count the number of distinct values in each output column
distinct_counts = {
    "StateCode": data_transformed.select(countDistinct("SC")).collect()[0][0],
    "SourceName": data_transformed.select(countDistinct("SN")).collect()[0][0],
    "IsEHB": data_transformed.select(countDistinct("EHB")).collect()[0][0],
    "QuantLimitOnSvc": data_transformed.select(countDistinct("QL")).collect()[0][0],
    "Exclusions": data_transformed.select(countDistinct("EX")).collect()[0][0],
    "EHBVarReason": data_transformed.select(countDistinct("EHBVR")).collect()[0][0]
}

print(distinct_counts)

{'StateCode': 39, 'SourceName': 3, 'IsEHB': 1, 'QuantLimitOnSvc': 2, 'Exclusions': 576, 'EHBVarReason': 19}


In [0]:
# cat feature vector is normalized

minMax = MinMaxScaler(inputCol = catIdx.getOutputCol(), outputCol="normFeatures")

featVect = VectorAssembler(inputCols=["normFeatures"], outputCol="features")

classification_models=["Logistic Regression (LR)","Decision Tree (DT)","Random Forest (RT)","Factorization Machine (FM)","Gradiest Boost (GBT)","Support Vector Machine (SVM)"]

#creating diff clasf algos for testing accuracy,computing time, precision, recall, ROC, PR
cls_mod=[]

cls_mod.insert(0,LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3,threshold=0.35)) 
cls_mod.insert(1,DecisionTreeClassifier(labelCol="label", featuresCol="features",seed=42)) 
cls_mod.insert(2,RandomForestClassifier(labelCol='label', featuresCol='features',seed=42)) 
cls_mod.insert(3,FMClassifier(labelCol='label', featuresCol='features', seed=42)) 
cls_mod.insert(4,GBTClassifier(labelCol='label', featuresCol='features', seed=42)) 
cls_mod.insert(5,LinearSVC(labelCol='label', featuresCol='features')) 

In [0]:
# define list of models made from Train Validation Split or Cross Validation
model = []
pipeline = []

In [0]:
# Pipeline process the series of transformation above, which is another transformation
for i in range(0,6):
    pipeline.insert(i,Pipeline(stages=[strIdx_SC,strIdx_SN,strIdx_EHB,strIdx_QL,strIdx_EHBVR, catVect, catIdx,minMax, featVect, cls_mod[i]]))

### Tune hyperparameters using ParamGrid

In [0]:
paramGrid=[]

paramGrid.insert(0,(ParamGridBuilder() \
             .addGrid(cls_mod[0].regParam, [0.01, 0.3]) \
             .addGrid(cls_mod[0].elasticNetParam, [0.0, 0.5]) \
             .addGrid(cls_mod[0].maxIter, [10,20]) \
             .build()))
             
             
paramGrid.insert(1,ParamGridBuilder() \
             .addGrid(cls_mod[1].maxBins, [64,128,256]) \
             .addGrid(cls_mod[1].maxDepth, [2, 5, 10]) \
             .addGrid(cls_mod[1].impurity, ["gini", "entropy"]) \
             .addGrid(cls_mod[1].minInstancesPerNode, [1, 5, 10]) \
             .build())
             

paramGrid.insert(2,ParamGridBuilder() \
              .addGrid(cls_mod[2].numTrees, [50, 100, 150]) \
              .addGrid(cls_mod[2].maxBins, [64,128,256])
              .addGrid(cls_mod[2].maxDepth, [2, 5, 10]) \
              .build())


paramGrid.insert(3,ParamGridBuilder()\
.addGrid(cls_mod[3].regParam, [0.01, 0.1]) \
.addGrid(cls_mod[3].stepSize, [0.1,1])\
.addGrid(cls_mod[3].factorSize, [2,4])\
.build())


paramGrid.insert(4,ParamGridBuilder()\
.addGrid(cls_mod[4].maxDepth, [2, 5])\
.addGrid(cls_mod[4].maxIter, [10, 20])\
.addGrid(cls_mod[4].minInfoGain, [0.0])\
.build())

    
paramGrid.insert(5,ParamGridBuilder() \
             .addGrid(cls_mod[5].regParam, [0.01, 0.5]) \
             .addGrid(cls_mod[5].maxIter, [1, 5]) \
             .addGrid(cls_mod[5].tol, [1e-4, 1e-3]) \
             .addGrid(cls_mod[5].fitIntercept, [True, False]) \
             .addGrid(cls_mod[5].standardization, [True, False]) \
             .build())






### Used CrossValidator for modelling

In [0]:
cv=[]
K=3 
for i in range(0,6):
    cv.insert(i, CrossValidator(estimator=pipeline[i], 
                            evaluator=BinaryClassificationEvaluator(), 
                            estimatorParamMaps=paramGrid[i], 
                            numFolds=K))


#cv1 = CrossValidator(estimator=pipeline1, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid1, numFolds=K)
#cv2= CrossValidator(estimator=pipeline2, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid2, numFolds=K)
#cv3 = CrossValidator(estimator=pipeline3, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid3, numFolds=K)
#cv4 = CrossValidator(estimator=pipeline4, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid4, numFolds=K)
#cv5 = CrossValidator(estimator=pipeline5, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid5, numFolds=K)

#cv = TrainValidationSplit(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)


###Calculating the computing time required to build a model

In [0]:
import time

start_time = []
end_time = []
computation_time = []

for i in range(0, 6):
    start_time.insert(i, time.time())
    model.insert(i, cv[i].fit(train))
    # model1 = cv1.fit(train)
    # model2 = cv2.fit(train)
    # model3 = cv3.fit(train)
    # model4 = cv4.fit(train)
    # model5 = cv5.fit(train)
    end_time.insert(i, time.time())
    computation_time.insert(i, (end_time[i] - start_time[i]) / 60.0)
    print("Computation time:",i," ",computation_time[i], "minutes")


Computation time: 0   5.152020577589671 minutes
Computation time: 1   29.827600558598835 minutes
Computation time: 2   36.10357682307561 minutes
Computation time: 3   13.262092169125875 minutes
Computation time: 4   6.539898025989532 minutes
Computation time: 5   15.529444551467895 minutes


### Test the Pipeline Model
The model produced by the pipeline is a transformer that will apply all of the stages in the pipeline to a specified DataFrame and apply the trained model to generate predictions. In this case, you will transform the **test** DataFrame using the pipeline to generate label predictions.

In [0]:
prediction =[]
predicted =[]
for i in range(0,6):
    prediction.insert(i,model[i].transform(test))
    prediction[i].show()
    predicted.insert(i,prediction[i].select("features", "prediction","trueLabel"))
    predicted[i].show()
    
    

#LR
#prediction = model.transform(test)
#prediction.show(5)
#predicted = prediction.select("features", "prediction", "probability", "trueLabel")

#predicted.show(10, truncate=False)

#DT
#prediction1 = model1.transform(test)
#predicted1 = prediction1.select("features", "prediction", "probability", "trueLabel")

#predicted1.show(10, truncate=False)

#RF
#prediction2 = model2.transform(test)
#predicted2 = prediction2.select("features", "prediction", "probability", "trueLabel")

#predicted2.show(10, truncate=False)

#SVM
#prediction.insert(5,model[5].transform(test))
#predicted.insert(5, prediction[5].select("features", "prediction", "trueLabel"))

#prediction[5].show(10,truncate=False)
#predicted[5].show(10, truncate=False)

#GBT
#prediction4 = model4.transform(test)
#prediction4.show(5)
#predicted4 = prediction4.select("features", "prediction", "probability", "trueLabel")

#predicted4.show(10, truncate=False)

#FM
#prediction5 = model5.transform(test)
#predicted5 = prediction5.select("features", "prediction", "probability", "trueLabel")

#predicted5.show(10, truncate=False)


+------------+---------+--------+----------+-----+---------------+--------------------+--------------------+---------+----+---+---+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|BusinessYear|StateCode|IssuerId|SourceName|IsEHB|QuantLimitOnSvc|          Exclusions|        EHBVarReason|trueLabel|  SC| SN|EHB| QL|EHBVR|         catFeatures|      idxCatFeatures|        normFeatures|            features|       rawPrediction|         probability|prediction|
+------------+---------+--------+----------+-----+---------------+--------------------+--------------------+---------+----+---+---+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|        2017|       AK|   73836|      HIOS|  Yes|            Yes|in vitro fertiliz...| Substantially Equal|        1|38.0|0.0|0.0|0.0|  1.0|[38.0,2017.0,7383...|[

The resulting DataFrame is produced by applying all of the transformations in the pipline to the test data. The **prediction** column contains the predicted value for the label, and the **trueLabel** column contains the actual known value from the testing data.

### Compute Confusion Matrix Metrics
Classifiers are typically evaluated by creating a *confusion matrix*, which indicates the number of:
- True Positives
- True Negatives
- False Positives
- False Negatives

From these core measures, other evaluation metrics such as *precision* and *recall* can be calculated.

In [0]:
precision=[]
recall=[]
metrics=[]

In [0]:
for i in range(0,6):
    tp = float(predicted[i].filter("prediction== 1.0 AND truelabel == 1").count())
    fp = float(predicted[i].filter("prediction== 1.0 AND truelabel == 0").count())
    tn = float(predicted[i].filter("prediction== 0.0 AND truelabel == 0").count())
    fn = float(predicted[i].filter("prediction==0.0 AND truelabel == 1").count())
    precision.insert(i,tp / (tp + fp))
    recall.insert(i,tp / (tp + fn))
    metrics.insert(i, spark.createDataFrame([
    ("TP", tp),
    ("FP", fp),
    ("TN", tn),
    ("FN", fn),
    ("Precision", tp / (tp + fp)),
    ("Recall", tp / (tp + fn))],["metric", "value"]))
    metrics[i].show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           11699.0|
|       FP|            2713.0|
|       TN|               0.0|
|       FN|               3.0|
|Precision|0.8117540938107133|
|   Recall|0.9997436335669116|
+---------+------------------+

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           11626.0|
|       FP|            1785.0|
|       TN|             928.0|
|       FN|              76.0|
|Precision|0.8669003057191857|
|   Recall|0.9935053836950949|
+---------+------------------+

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           11684.0|
|       FP|            1918.0|
|       TN|             795.0|
|       FN|              18.0|
|Precision|0.8589913248051757|
|   Recall|0.9984618014014698|
+---------+------------------+

+---------+-----------------+
|   metric|            value|
+------

### View the Raw Prediction and Probability
The prediction is based on a raw prediction score that describes a labelled point in a logistic function. This raw prediction is then converted to a predicted label of 0 or 1 based on a probability vector that indicates the confidence for each possible label value (in this case, 0 and 1). The value with the highest confidence is selected as the prediction.

In [0]:
for i in range(0,6):
    prediction[i].select("rawPrediction", "prediction", "trueLabel").show(10, truncate=False)

+----------------------------------------+----------+---------+
|rawPrediction                           |prediction|trueLabel|
+----------------------------------------+----------+---------+
|[-1.5173015736635656,1.5173015736635656]|1.0       |1        |
|[-1.7703689247076957,1.7703689247076957]|1.0       |1        |
|[-3.539492035332441,3.539492035332441]  |1.0       |1        |
|[-3.539492035332441,3.539492035332441]  |1.0       |1        |
|[-3.511845007311633,3.511845007311633]  |1.0       |1        |
|[-3.511845007311633,3.511845007311633]  |1.0       |1        |
|[-1.7081538032560648,1.7081538032560648]|1.0       |0        |
|[-1.1140409594035199,1.1140409594035199]|1.0       |1        |
|[-1.6088617790573525,1.6088617790573525]|1.0       |1        |
|[-1.604948271741145,1.604948271741145]  |1.0       |1        |
+----------------------------------------+----------+---------+
only showing top 10 rows

+----------------+----------+---------+
|rawPrediction   |prediction|trueLabel

###Calculating metrics such as ROC, PR, Accuracy, F1_score, Precision, Recall

In [0]:
evaluator = [None] * 6
ROC = [None] * 6
PR = [None] * 6
ev1 = [None] * 6
accuracy = [None] * 6
f1_score = [None] * 6

for i in range(0, 6):
    evaluator[i] = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction")
    ROC[i] = evaluator[i].evaluate(prediction[i], {evaluator[i].metricName: "areaUnderROC"})
    # print("ROC = {0:.3f}".format(auc_roc))

    PR[i] = evaluator[i].evaluate(prediction[i], {evaluator[i].metricName: "areaUnderPR"})
    # print("PR = {0:.3f}".format(auc_pr))

    ev1[i] = MulticlassClassificationEvaluator(labelCol='trueLabel', predictionCol='prediction')
    # accuracy
    accuracy[i] = ev1[i].evaluate(prediction[i], {evaluator[i].metricName: "accuracy"})
    # print("Accuracy = {0:.3f}".format(accuracy))

    # f1 score
    f1_score[i] = ev1[i].evaluate(prediction[i], {evaluator[i].metricName: "f1"})
    # print("F1 = {0:.3f}".format(f1_score))


###Comparing all metrics at one place

In [0]:
import pandas as pd

results = {
    'Model': classification_models,
    'Computation Time (min)': computation_time,
    'ROC': ROC,
    'PR': PR,
    'Accuracy': accuracy,
    'F1 Score': f1_score,
    'Precision': precision,
    'Recall': recall
}

df_results = pd.DataFrame.from_dict(results)
df_results = df_results.set_index('Model').transpose()

print(df_results)


Model                   Logistic Regression (LR)  Decision Tree (DT)  \
Computation Time (min)                  5.152021           29.827601   
ROC                                     0.620981            0.648268   
PR                                      0.879197            0.847665   
Accuracy                                0.727362            0.845612   
F1 Score                                0.727362            0.845612   
Precision                               0.811754            0.866900   
Recall                                  0.999744            0.993505   

Model                   Random Forest (RT)  Factorization Machine (FM)  \
Computation Time (min)           36.103577                   13.262092   
ROC                               0.846004                    0.633083   
PR                                0.958525                    0.882931   
Accuracy                          0.834552                    0.727156   
F1 Score                          0.834552           