In [22]:
from typing import List, Tuple, Dict, Any, Optional, Callable

import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')

import numpy as np
import pandas as pd
import pyspark
import pyspark.sql.functions as F
import os
import random

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, DoubleType
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [2]:
spark = SparkSession.builder\
.config("spark.driver.memory", "6g")\
.appName("diabetes_indicators").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/12 10:35:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv("./data/diabetes/final_diabetes_dataset.csv", header=True, inferSchema=True, sep=",")
df.show(5)

                                                                                

+--------+--------+--------+-------+------+--------+--------+------+--------+-------+-------+--------+-------+-------+--------+--------+---+--------+-------+-----+-------+
|DIABETE3|CHCKIDNY|_RFHYPE5|TOLDHI2| _BMI5|SMOKE100|CVDSTRK3|_MICHD|_TOTINDA|_FRTLT1|_VEGLT1|_RFDRHV5|MEDCOST|GENHLTH|PHYSHLTH|MENTHLTH|SEX|_AGEG5YR|_MRACE1|EDUCA|INCOME2|
+--------+--------+--------+-------+------+--------+--------+------+--------+-------+-------+--------+-------+-------+--------+--------+---+--------+-------+-----+-------+
|       0|       0|       1|      1|2522.0|       1|       0|     0|       1|      1|      1|       0|      0|    2.0|     2.0|    88.0|  0|    10.0|    1.0|  6.0|    8.0|
|       0|       0|       0|      0|2407.0|       0|       0|     0|       0|      0|      1|       0|      0|    2.0|    88.0|    88.0|  0|     8.0|    1.0|  4.0|    3.0|
|       2|       0|       0|      0|2468.0|       1|       0|     0|       1|      1|      1|       0|      0|    3.0|    88.0|    88.0|  0|

In [4]:
df = df.limit(100_000)

# Feature Scaling and Transformation

In [5]:
def transform_data(df: pyspark.sql.DataFrame, map_dict: dict, colName: str) -> pyspark.sql.DataFrame:
    """ Function to transform predictor variable based on map_dict """
    map_col = F.create_map([F.lit(x) for i in map_dict.items() for x in i])
    new_df = df.withColumn(colName, map_col[F.col(colName)])
    
    return new_df

In [6]:
df = df.withColumn("PHYSHLTH", F.when(df["PHYSHLTH"] == 88, 0).otherwise(df["PHYSHLTH"]))
df = df.withColumn("MENTHLTH", F.when(df["MENTHLTH"] == 88, 0).otherwise(df["MENTHLTH"]))

df = transform_data(df, {v:idx for idx, v in enumerate(range(1, 14))}, "_AGEG5YR")
df = transform_data(df, {v:idx for idx, v in enumerate(range(1, 8))}, "_MRACE1")
df = transform_data(df, {v:idx for idx, v in enumerate(range(1, 7))}, "EDUCA")
df = transform_data(df, {v:idx for idx, v in enumerate(range(1, 9))}, "INCOME2")
df = transform_data(df, {v:idx for idx, v in enumerate(range(1, 6))}, "GENHLTH")

In [7]:
def scale_data(df: pyspark.sql.DataFrame, feats: List) -> pyspark.sql.DataFrame: 
    """ Function to scale the data """
    def scale_feat(df: pyspark.sql.DataFrame, feat: str) -> pyspark.sql.DataFrame:
        """ Function to scale numeric columns of dataframe """
        unlist = F.udf(lambda x: round(float(list(x)[0]), 3), DoubleType())

        assembler = VectorAssembler(inputCols=[feat], outputCol=feat+"_vec")
        scaler = StandardScaler(inputCol=feat+"_vec", outputCol=feat+"_scaled")
        pipeline = Pipeline(stages=[
            assembler,
            scaler
        ])

        if not os.path.exists(f"./scalers/{feat}_pipeline"):
            pipeline_model = pipeline.fit(df)
            pipeline_model.save(f"./scalers/{feat}_pipeline")

        else:
            print(f"Loading pipeline from : ./scalers/{feat}_pipeline")
            pipeline_model = PipelineModel.load(f"./scalers/{feat}_pipeline")

        df = pipeline_model.transform(df).withColumn(feat+"_scaled", unlist(feat+"_scaled")).drop(feat+"_vec")

        return df, feat+"_scaled"
    
    new_feats = list()
    for feat in feats:
        df, _name = scale_feat(df, feat)
        new_feats.append(_name)
        
    return df, new_feats

In [8]:
if not os.path.exists("./scalers/"):
    os.mkdir("./scalers")


num_cols = ["_BMI5", "PHYSHLTH", "MENTHLTH"]
cat_cols = [x for x in df.columns if x not in ["DIABETE3"]+num_cols]

df_train, df_test = df.randomSplit([.8, .2], seed=42)

In [9]:
df_train_scaled, train_new_feats = scale_data(df_train, num_cols)
df_test_scaled, test_new_feats = scale_data(df_test, num_cols)

df_train_scaled = df_train_scaled.withColumnRenamed("DIABETE3", "label")
df_test_scaled = df_test_scaled.withColumnRenamed("DIABETE3", "label")

Loading pipeline from : ./scalers/_BMI5_pipeline


                                                                                

Loading pipeline from : ./scalers/PHYSHLTH_pipeline
Loading pipeline from : ./scalers/MENTHLTH_pipeline
Loading pipeline from : ./scalers/_BMI5_pipeline
Loading pipeline from : ./scalers/PHYSHLTH_pipeline
Loading pipeline from : ./scalers/MENTHLTH_pipeline


In [10]:
df_train_scaled = df_train_scaled.repartition(20)

In [25]:
dtree = DecisionTreeClassifier(featuresCol="features", labelCol="label", seed=32)
rf = RandomForestClassifier(featuresCol="features", labelCol="label", seed=32)
log_reg = LogisticRegression(featuresCol="features", labelCol="label", standardization=True)

In [26]:
paramGrid_dtree = ParamGridBuilder() \
.addGrid(dtree.maxDepth, [5, 10, 15, 20])\
.build()

paramGrid_rf = ParamGridBuilder()\
.addGrid(rf.maxDepth, [5, 10, 15, 20])\
.build()

paramGrid_logreg = ParamGridBuilder()\
.addGrid(log_reg.maxIter, [20, 50, 100])\
.addGrid(log_reg.tol, [1e-2, 1e-4, 1e-6])\
.addGrid(log_reg.threshold, [.5, .7])\
.build()

In [27]:
pipeline_dtree = Pipeline(
    stages=[
        VectorAssembler(inputCols=cat_cols + train_new_feats, outputCol="features"),
        dtree
    ]
)

pipeline_rf = Pipeline(
    stages=[
        VectorAssembler(inputCols=cat_cols + train_new_feats, outputCol="features"),
        rf
    ]
)

pipeline_logreg = Pipeline(
    stages=[
        VectorAssembler(inputCols=cat_cols + train_new_feats, outputCol="features"),
        log_reg
    ]
)

# Model Cross-Validation

## - Decision Tree

In [14]:
crossval_dtree = CrossValidator(estimator=pipeline_dtree, 
                               estimatorParamMaps=paramGrid_dtree, 
                               evaluator=MulticlassClassificationEvaluator(),
                               numFolds=3)
cvmodel_dtree = crossval_dtree.fit(df_train_scaled)

23/10/12 10:36:53 WARN DAGScheduler: Broadcasting large task binary with size 1027.3 KiB
23/10/12 10:37:03 WARN DAGScheduler: Broadcasting large task binary with size 1027.3 KiB
23/10/12 10:37:04 WARN DAGScheduler: Broadcasting large task binary with size 1274.2 KiB
23/10/12 10:37:06 WARN DAGScheduler: Broadcasting large task binary with size 1519.3 KiB
23/10/12 10:37:08 WARN DAGScheduler: Broadcasting large task binary with size 1760.7 KiB
23/10/12 10:37:12 WARN DAGScheduler: Broadcasting large task binary with size 1995.2 KiB
23/10/12 10:37:17 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/10/12 10:37:22 WARN DAGScheduler: Broadcasting large task binary with size 1412.0 KiB
23/10/12 10:38:41 WARN DAGScheduler: Broadcasting large task binary with size 1017.8 KiB
23/10/12 10:39:09 WARN DAGScheduler: Broadcasting large task binary with size 1017.8 KiB
23/10/12 10:39:12 WARN DAGScheduler: Broadcasting large task binary with size 1260.5 KiB
23/10/12 10:39:15 WARN D

In [15]:
cvmodel_dtree.avgMetrics

[0.7912102002347752, 0.7933607056277938, 0.7797563972636286, 0.76369564240532]

## - Random Forest

In [16]:
crossval_rf = CrossValidator(estimator=pipeline_rf, 
                               estimatorParamMaps=paramGrid_rf, 
                               evaluator=MulticlassClassificationEvaluator(),
                               numFolds=3)
cvmodel_rf = crossval_rf.fit(df_train_scaled)

23/10/12 10:41:10 WARN DAGScheduler: Broadcasting large task binary with size 1488.2 KiB
23/10/12 10:41:12 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/10/12 10:41:16 WARN DAGScheduler: Broadcasting large task binary with size 1573.8 KiB
23/10/12 10:41:31 WARN DAGScheduler: Broadcasting large task binary with size 1488.2 KiB
23/10/12 10:41:38 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/10/12 10:41:49 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/10/12 10:42:04 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB
23/10/12 10:42:18 WARN DAGScheduler: Broadcasting large task binary with size 1016.4 KiB
23/10/12 10:42:25 WARN DAGScheduler: Broadcasting large task binary with size 8.7 MiB
23/10/12 10:42:44 WARN DAGScheduler: Broadcasting large task binary with size 1237.0 KiB
23/10/12 10:42:51 WARN DAGScheduler: Broadcasting large task binary with size 11.9 MiB
23/10/12 10:43:13 WARN DAGScheduler: B

23/10/12 11:00:27 WARN DAGScheduler: Broadcasting large task binary with size 1262.5 KiB
23/10/12 11:00:29 WARN DAGScheduler: Broadcasting large task binary with size 12.0 MiB
23/10/12 11:00:36 WARN DAGScheduler: Broadcasting large task binary with size 1446.0 KiB
23/10/12 11:00:39 WARN DAGScheduler: Broadcasting large task binary with size 15.6 MiB
23/10/12 11:00:47 WARN DAGScheduler: Broadcasting large task binary with size 1584.9 KiB
23/10/12 11:00:51 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB
23/10/12 11:01:00 WARN DAGScheduler: Broadcasting large task binary with size 1616.1 KiB
23/10/12 11:01:04 WARN DAGScheduler: Broadcasting large task binary with size 23.3 MiB
23/10/12 11:01:15 WARN DAGScheduler: Broadcasting large task binary with size 1560.7 KiB
23/10/12 11:01:20 WARN DAGScheduler: Broadcasting large task binary with size 26.9 MiB
23/10/12 11:01:58 WARN DAGScheduler: Broadcasting large task binary with size 1435.9 KiB
23/10/12 11:02:10 WARN DAGSched

In [17]:
cvmodel_rf.avgMetrics

[0.7585544594555061,
 0.7870592152906328,
 0.7943679202221721,
 0.7967513844578309]

In [28]:
crossval_log_reg = CrossValidator(estimator=pipeline_logreg, 
                               estimatorParamMaps=paramGrid_logreg, 
                               evaluator=MulticlassClassificationEvaluator(),
                               numFolds=3)
cvmodel_log_reg = crossval_log_reg.fit(df_train_scaled)

23/10/12 11:20:28 WARN CacheManager: Asked to cache already cached data.
23/10/12 11:20:28 WARN CacheManager: Asked to cache already cached data.
23/10/12 11:20:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/10/12 11:20:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
[Stage 7374:>                                                       (0 + 1) / 1]

In [30]:
cvmodel_log_reg.avgMetrics

[0.7943157207658739,
 0.7943157207658739,
 0.7936749154071325,
 0.7936749154071325,
 0.7936749154071325,
 0.7936749154071325,
 0.7943157207658739,
 0.7943157207658739,
 0.7936801529260931,
 0.7936801529260931,
 0.7936887999328142,
 0.7936887999328142,
 0.7943157207658739,
 0.7943157207658739,
 0.7936801529260931,
 0.7936801529260931,
 0.7936887999328142,
 0.7936887999328142]