## Split data for online and offline

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split

data_path = "./raw_data/diabetes_binary_health_indicators_BRFSS2015.csv"
df = pd.read_csv(data_path)
label_column = 'Diabetes_binary'

# Split the data into features and target
X = df  # Replace 'label' with your actual target column name
y = df[label_column]  # Replace 'label' with your actual target column name

# Stratified split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
X_test.to_csv("split_data/offline.csv", index=False)
X_test.to_csv("split_data/online.csv", index=False)

In [3]:
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, count, when, col

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


spark = SparkSession.builder.master("local[*]").appName("Test").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/22 01:26:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read offline data

In [4]:
df_data = spark.read.csv(
    "split_data/offline.csv",
    sep=",", inferSchema=True, header=True
)

In [5]:
# Check for nulls in each column
null_counts = df_data.select([count(when(col(c).isNull(), c)).alias(c) for c in df_data.columns])

null_counts.show()

+---------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|Diabetes_binary|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex|Age|Education|Income|
+---------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|              0|     0|       0|        0|  0|     0|     0|                   0|           0|     0|      0|                0|            0|          0|      0|       0|       0|       0|  0|  0|        0|     0|
+---------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+------------

In [6]:
for column in df_data.columns:
    unique_count = df_data.select(column).distinct().count()
    print(f"Unique count in {column}: {unique_count}")

Unique count in Diabetes_binary: 2
Unique count in HighBP: 2
Unique count in HighChol: 2
Unique count in CholCheck: 2
Unique count in BMI: 77
Unique count in Smoker: 2
Unique count in Stroke: 2
Unique count in HeartDiseaseorAttack: 2
Unique count in PhysActivity: 2
Unique count in Fruits: 2
Unique count in Veggies: 2
Unique count in HvyAlcoholConsump: 2
Unique count in AnyHealthcare: 2
Unique count in NoDocbcCost: 2
Unique count in GenHlth: 5
Unique count in MentHlth: 31
Unique count in PhysHlth: 31
Unique count in DiffWalk: 2
Unique count in Sex: 2
Unique count in Age: 13
Unique count in Education: 6
Unique count in Income: 8


In [7]:
# Count rows before removing duplicates
print("Number of rows before removing duplicates:", df_data.count())

# Remove completely identical rows
df_no_duplicates = df_data.distinct()

# Count rows after removing duplicates
print("Number of rows after removing duplicates:", df_no_duplicates.count())

Number of rows before removing duplicates: 50736
Number of rows after removing duplicates: 48713


## feature selection

In [8]:
df_features = df_no_duplicates.drop(*["Fruits" , "Veggies" , "Sex" , "CholCheck" , "AnyHealthcare"])

In [9]:
feature_columns = df_features.columns
feature_columns.remove(label_column)

In [10]:
feature_columns

['HighBP',
 'HighChol',
 'BMI',
 'Smoker',
 'Stroke',
 'HeartDiseaseorAttack',
 'PhysActivity',
 'HvyAlcoholConsump',
 'NoDocbcCost',
 'GenHlth',
 'MentHlth',
 'PhysHlth',
 'DiffWalk',
 'Age',
 'Education',
 'Income']

In [61]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df_features).select(col("features"), col(label_column).alias("label"))
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

## Models Training

In [62]:
# Define models
rf = RandomForestClassifier()
lr = LogisticRegression()
gbt = GBTClassifier()

# Define parameter grids for each model
paramGrid_rf = ParamGridBuilder().addGrid(rf.maxDepth, [5, 10]).build()
paramGrid_lr = ParamGridBuilder().addGrid(lr.maxIter, [10, 100]).build()
paramGrid_gbt = ParamGridBuilder().addGrid(gbt.maxIter, [10, 20]).build()

# Define evaluator
evaluator = MulticlassClassificationEvaluator(metricName="f1")

# Cross-validation for each model
crossval_rf = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=3)
crossval_lr = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid_lr, evaluator=evaluator, numFolds=3)
crossval_gbt = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid_gbt, evaluator=evaluator, numFolds=3)

# Fit models
cvModel_rf = crossval_rf.fit(train_data)
cvModel_lr = crossval_lr.fit(train_data)
cvModel_gbt = crossval_gbt.fit(train_data)

24/01/22 00:21:58 WARN DAGScheduler: Broadcasting large task binary with size 1306.1 KiB
24/01/22 00:21:59 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/01/22 00:21:59 WARN DAGScheduler: Broadcasting large task binary with size 1232.6 KiB
24/01/22 00:22:02 WARN DAGScheduler: Broadcasting large task binary with size 1310.1 KiB
24/01/22 00:22:02 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/01/22 00:22:02 WARN DAGScheduler: Broadcasting large task binary with size 1228.6 KiB
24/01/22 00:22:05 WARN DAGScheduler: Broadcasting large task binary with size 1287.3 KiB
24/01/22 00:22:05 WARN DAGScheduler: Broadcasting large task binary with size 2025.2 KiB
24/01/22 00:22:05 WARN DAGScheduler: Broadcasting large task binary with size 1198.6 KiB
24/01/22 00:22:08 WARN DAGScheduler: Broadcasting large task binary with size 1351.0 KiB
24/01/22 00:22:08 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


## Evaluation of Models

In [65]:
# Generate predictions on test data
predictions_rf = cvModel_rf.transform(test_data)
predictions_lr = cvModel_lr.transform(test_data)
predictions_gbt = cvModel_gbt.transform(test_data)

# Evaluate models
f1_score_rf = evaluator.evaluate(predictions_rf)
f1_score_lr = evaluator.evaluate(predictions_lr)
f1_score_gbt = evaluator.evaluate(predictions_gbt)

print("Random Forest F1 Score: ", f1_score_rf)
print("Logistic Regression F1 Score: ", f1_score_lr)
print("Gradient-Boosted Trees F1 Score: ", f1_score_gbt)

24/01/22 00:23:09 WARN DAGScheduler: Broadcasting large task binary with size 1272.7 KiB


Random Forest F1 Score:  0.8031765264076465
Logistic Regression F1 Score:  0.8112380324329342
Gradient-Boosted Trees F1 Score:  0.8129871105315047


Clearly the Logistic Regression seems to be better here, so saving that as the model

In [66]:
lr_model.write().overwrite().save("model/")