In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connected to cloud.r-project.org (3.171.85.123)] [Con[0m                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
                                                                               Get:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
                                                                               Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:9 https://

In [2]:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
sc = SparkContext.getOrCreate(conf)
sc.master

'local[*]'

## Reading Of Data

In [3]:
df_filtered = spark.read.parquet("/content/diabetescleaned.parquet")

ML Models

In [None]:
# Gradient Boosting - Baseline Model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.ml.classification import GBTClassifier

# Define the label column and feature columns
label_col = "diabetes"
feature_cols = [col for col in df_filtered.columns if col != label_col]

# Assemble feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the data
data = assembler.transform(df_filtered)

# Split the data into training and testing sets
train_df, test_df = data.randomSplit([0.7, 0.3], seed=42)

# Create the Gradient Boosting Tree Classifier
gbt = GBTClassifier(
    featuresCol="features",
    labelCol=label_col,
    seed=42)

# Train the model
gbt_model = gbt.fit(train_df)

# Get feature importances from the trained RandomForest model
importances = gbt_model.featureImportances

# Get the names of the features from the assembler
feature_names = feature_cols

# Create a list of (feature, importance) pairs and sort them by importance
feature_importances = [(feature_names[i], importances[i]) for i in range(len(feature_names))]
sorted_features = sorted(feature_importances, key=lambda x: x[1], reverse=True)

# Select the top 10 features
top_n_features = [feature for feature, importance in sorted_features[:10]]  # Select top 10

# Print the list of selected features and their importance values
print("Selected Features and their Importance Values:")
for feature, importance in sorted_features[:10]:
    print(f"Feature: {feature}, Importance: {importance}")

# Calculate predictions on the test data
predictions = gbt_model.transform(test_df)

# Evaluate the model's performance on the test data
# Set up evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction")

# Calculate and display metrics
auc = binary_evaluator.evaluate(predictions)
accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "accuracy"})
f1_score = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "f1"})
precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedPrecision"})
recall = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedRecall"})

print(f"Area Under ROC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Selected Features and their Importance Values:
Feature: HbA1c_level, Importance: 0.5689643560119504
Feature: blood_glucose_level, Importance: 0.3037765049049565
Feature: bmi, Importance: 0.06128631072512461
Feature: age, Importance: 0.028410412205284032
Feature: hypertension, Importance: 0.022058409359722125
Feature: heart_disease, Importance: 0.015504006792961713
Feature: gender_encoded, Importance: 6.05750082280264e-16
Feature: smoking_history_encoded, Importance: 1.6505352967050194e-16
Area Under ROC: 0.9678
Accuracy: 0.9650
F1 Score: 0.9618
Precision: 0.9663
Recall: 0.9650


In [4]:
#Gradient Boosting with Hyperparameter tuning
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline

# Define the label column and feature columns
label_col = "diabetes"
feature_cols = [col for col in df_filtered.columns if col != label_col]

# Assemble feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing data
data = assembler.transform(df_filtered)

# Split the data into training and testing sets
train_df, test_df = data.randomSplit([0.7, 0.3], seed=42)

# Create the Gradient Boosting Tree Classifier
gbt = GBTClassifier(
    featuresCol="features",
    labelCol=label_col,
    seed=42)

# Define the parameter grid for maxIter, maxDepth, and stepSize
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [30, 50, 70]) \
    .addGrid(gbt.maxDepth, [3, 5, 7]) \
    .addGrid(gbt.stepSize, [0.01, 0.1, 0.2, 0.3]) \
    .build()

# Set up evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction")

# Set up CrossValidator
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=binary_evaluator,
                          numFolds=5,  # 5-fold cross-validation
                          seed = 42)

# Train the model with hyper-parameter tuning
cvModel = crossval.fit(train_df)

# Retrieve the best model and its parameters
best_model = cvModel.bestModel
print(f"Best Parameters:")
print(f"Best maxIter: {best_model.getMaxIter()}")
print(f"Best maxDepth: {best_model.getMaxDepth()}")
print(f"Best learning rate: {best_model.getStepSize()}")

# Get feature importances from the trained RandomForest model
importances = best_model.featureImportances

# Get the names of the features from the assembler
feature_names = feature_cols

# Create a list of (feature, importance) pairs and sort them by importance
feature_importances = [(feature_names[i], importances[i]) for i in range(len(feature_names))]
sorted_features = sorted(feature_importances, key=lambda x: x[1], reverse=True)

# Select the top 10 features
top_n_features = [feature for feature, importance in sorted_features[:10]]  # Select top 10

# Print the list of selected features and their importance values
print("Selected Features and their Importance Values:")
for feature, importance in sorted_features[:10]:
    print(f"Feature: {feature}, Importance: {importance}")

# Make predictions on the test data using the best model
predictions = best_model.transform(test_df)

# Calculate and display metrics
auc = binary_evaluator.evaluate(predictions)
accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "accuracy"})
f1_score = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "f1"})
precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedPrecision"})
recall = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedRecall"})

print(f"Area Under ROC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Best Parameters:
Best maxIter: 70
Best maxDepth: 7
Best learning rate: 0.3
Selected Features and their Importance Values:
Feature: HbA1c_level, Importance: 0.5322395388093473
Feature: blood_glucose_level, Importance: 0.33138658924588954
Feature: age, Importance: 0.05736832380563514
Feature: bmi, Importance: 0.04394618054898692
Feature: heart_disease, Importance: 0.012208890964276084
Feature: hypertension, Importance: 0.011461601039997911
Feature: gender_encoded, Importance: 0.007277689504077309
Feature: smoking_history_encoded, Importance: 0.004111186081789861
Area Under ROC: 0.9725
Accuracy: 0.9653
F1 Score: 0.9625
Precision: 0.9657
Recall: 0.9653


In [None]:
#XGBoost - Baseline Model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Define the label column and feature columns
label_col = "diabetes"
feature_cols = [col for col in df_filtered.columns if col != label_col]

# Assemble feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the data
data = assembler.transform(df_filtered)

# Split the data into training and testing sets
train_df, test_df = data.randomSplit([0.7, 0.3], seed =42)

# Create the XGBoost Classifier
xgb = SparkXGBClassifier(features_col="features", label_col=label_col, seed = 42)

# Train the model
xgb_model = xgb.fit(train_df)

# Extract feature importance
feature_importance = xgb_model.get_booster().get_score(importance_type="gain")
feature_importance_dict = {feature_cols[int(k[1:])]: v for k, v in feature_importance.items()}

# Sort features by importance
sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)

# Display sorted features
print("\nFeature Importance (sorted):")
for feature, importance in sorted_features:
    print(f"{feature}: {importance}")

# Calculate predictions on the test data
predictions = xgb_model.transform(test_df)

# Evaluate the model's performance on the test data
# Set up evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction")

# Calculate and display metrics
auc = binary_evaluator.evaluate(predictions)
accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "accuracy"})
f1_score = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "f1"})
precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedPrecision"})
recall = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedRecall"})

print(f"Area Under ROC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

INFO:XGBoost-PySpark:Running xgboost-2.1.2 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'seed': 42, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!



Feature Importance (sorted):
HbA1c_level: 76.6221923828125
blood_glucose_level: 38.836700439453125
heart_disease: 7.278660774230957
hypertension: 6.8252153396606445
age: 4.770997047424316
bmi: 3.080050468444824
gender_encoded: 2.9646198749542236
smoking_history_encoded: 2.1700263023376465
Area Under ROC: 0.9693
Accuracy: 0.9643
F1 Score: 0.9617
Precision: 0.9641
Recall: 0.9643


In [None]:
#XGBoosting with Hyperparameter tuning (5)
from pyspark.ml.feature import VectorAssembler
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define the label column and feature columns
label_col = "diabetes"
feature_cols = [col for col in df_filtered.columns if col != label_col]

# Assemble feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform  data
data = assembler.transform(df_filtered)

# Split the data into training and testing sets
train_df, test_df = data.randomSplit([0.7, 0.3], seed=42)

# Create the XGBoost Classifier
xgb = SparkXGBClassifier(features_col="features", label_col=label_col, seed = 42)

# Define the parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(xgb.n_estimators, [50, 100, 150]) \
    .addGrid(xgb.max_depth, [3, 5, 7]) \
    .addGrid(xgb.min_child_weight, [1, 3, 5]) \
    .addGrid(xgb.learning_rate, [0.1, 0.3, 0.5]) \
    .build()

# Set up evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction")


# Set up CrossValidator
crossval = CrossValidator(estimator=xgb,
                          estimatorParamMaps=paramGrid,
                          evaluator=binary_evaluator,
                          numFolds=5, # 5-fold cross-validation
                          seed = 42)

# Train the model with hyper-parameter tuning
cvModel = crossval.fit(train_df)

# Retrieve the best model and its parameters
best_model = cvModel.bestModel
print(f"Best Parameters:")
print(f" - n_estimators: {best_model.getOrDefault('n_estimators')}")
print(f" - max_depth: {best_model.getOrDefault('max_depth')}")
print(f" - min_child_weight: {best_model.getOrDefault('min_child_weight')}")
print(f" - learning rate: {best_model.getOrDefault('learning_rate')}")

# Extract feature importance
feature_importance = best_model.get_booster().get_score(importance_type="gain")
feature_importance_dict = {feature_cols[int(k[1:])]: v for k, v in feature_importance.items()}

# Sort features by importance
sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)

# Display sorted features
print("\nFeature Importance (sorted):")
for feature, importance in sorted_features:
    print(f"{feature}: {importance}")
# Make predictions on the test data using the best model
predictions = best_model.transform(test_df)

# Calculate and display metrics
auc = binary_evaluator.evaluate(predictions)
accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "accuracy"})
f1_score = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "f1"})
precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedPrecision"})
recall = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedRecall"})

print(f"Area Under ROC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

INFO:XGBoost-PySpark:Running xgboost-2.1.2 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 3, 'min_child_weight': 1, 'objective': 'binary:logistic', 'seed': 42, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 50}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.2 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.3, 'max_depth': 3, 'min_child_weight': 1, 'objective': 'binary:logistic', 'seed': 42, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 50}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.2 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.5, 'max_depth': 3, 'min_child_weight': 1, 'objective': 'binary:logistic', 'seed': 42, 'nthread': 1}
	train_call_kwarg

Best Parameters:
 - n_estimators: 150
 - max_depth: 3
 - min_child_weight: 5
 - learning rate: 0.1

Feature Importance (sorted):
HbA1c_level: 567.2586669921875
blood_glucose_level: 270.26434326171875
hypertension: 63.85551452636719
age: 59.98054122924805
heart_disease: 39.951568603515625
bmi: 31.082653045654297
gender_encoded: 15.680378913879395
smoking_history_encoded: 6.48671817779541
Area Under ROC: 0.9727
Accuracy: 0.9653
F1 Score: 0.9623
Precision: 0.9661
Recall: 0.9653
