# **DIABETES PREDICTION(PNDM) USING PYSPARK AND MLLIB**

#Installing Dependencies & Initiating a New Spark Session



In [None]:
#install pyspark
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=b99b306ecb92aa53b68b9d154deb5e228cb23719dc651488329d23ef56554909
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
#creating a sparksession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("diabetes_spark").getOrCreate()

#Load Dataset

In [5]:
#create a spark dataframe
df = spark.read.csv("PNDB.csv",header=True, inferSchema=True)

In [6]:
#displaying the dataframe
df.show()

+---+------------------+------------+--------------+------------------+-------------------+------------------+----+
|Age|             HbA1c|Genetic Info|Family History|      Birth Weight|Developmental Delay|     Insulin Level|PNDM|
+---+------------------+------------+--------------+------------------+-------------------+------------------+----+
|  3|4.8409274670203315|    Mutation|           Yes| 3.128267604571405|                 No| 5.585608291414472|   0|
|  3| 5.694742026537993|    Mutation|            No|2.0593417801955525|                 No|3.1413594690450974|   1|
|  7| 6.843595441641113| No mutation|            No| 2.718666663895555|                Yes|  4.63931318811706|   0|
|  2| 6.480186154058938| No mutation|            No|3.0870165527365243|                 No|6.2171780403504755|   0|
|  4| 7.052861345896749|    Mutation|            No| 3.481472357384762|                 No|3.3688923443736773|   0|
| 11| 7.978628423869647| No mutation|            No|2.5342488302789747| 

In [7]:
print("Shape:", (df.count(), len(df.columns)))

Shape: (100000, 8)


In [8]:
#printing the schema
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- HbA1c: double (nullable = true)
 |-- Genetic Info: string (nullable = true)
 |-- Family History: string (nullable = true)
 |-- Birth Weight: double (nullable = true)
 |-- Developmental Delay: string (nullable = true)
 |-- Insulin Level: double (nullable = true)
 |-- PNDM: integer (nullable = true)



In [9]:
#count the total no. of diabetic and non-diabetic class.
print((df.count(),len(df.columns)))
df.groupBy('PNDM').count().show()

(100000, 8)
+----+-----+
|PNDM|count|
+----+-----+
|   1| 4822|
|   0|95178|
+----+-----+



#Data Preparation

In [10]:
#checking for null values
for col in df.columns:
  print(col+":",df[df[col].isNull()].count())

Age: 0
HbA1c: 0
Genetic Info: 0
Family History: 0
Birth Weight: 0
Developmental Delay: 0
Insulin Level: 0
PNDM: 0


In [11]:
from pyspark.sql.functions import when
df = df.withColumn("Genetic Info", when(df["Genetic Info"] == "No mutation", 0).otherwise(1))
df = df.withColumn("Family History", when(df["Family History"] == "Yes", 1).otherwise(0))
df = df.withColumn("Developmental Delay", when(df["Developmental Delay"] == "Yes", 0).otherwise(1))

In [None]:
df.show()

+---+------------------+------------+--------------+------------------+-------------------+------------------+----+
|Age|             HbA1c|Genetic Info|Family History|      Birth Weight|Developmental Delay|     Insulin Level|PNDM|
+---+------------------+------------+--------------+------------------+-------------------+------------------+----+
|  3|4.8409274670203315|           1|             1| 3.128267604571405|                  1| 5.585608291414472|   0|
|  3| 5.694742026537993|           1|             0|2.0593417801955525|                  1|3.1413594690450974|   1|
|  7| 6.843595441641113|           0|             0| 2.718666663895555|                  0|  4.63931318811706|   0|
|  2| 6.480186154058938|           0|             0|3.0870165527365243|                  1|6.2171780403504755|   0|
|  4| 7.052861345896749|           1|             0| 3.481472357384762|                  1|3.3688923443736773|   0|
| 11| 7.978628423869647|           0|             0|2.5342488302789747| 

#Performing Feature Selection

In [12]:
#feature selection
from pyspark.ml.feature import VectorAssembler
assembler= VectorAssembler(inputCols=['Age','HbA1c','Genetic Info','Family History','Birth Weight','Developmental Delay','Insulin Level'],outputCol='features')
output_data= assembler.transform(df)
output_data.show()

+---+------------------+------------+--------------+------------------+-------------------+------------------+----+--------------------+
|Age|             HbA1c|Genetic Info|Family History|      Birth Weight|Developmental Delay|     Insulin Level|PNDM|            features|
+---+------------------+------------+--------------+------------------+-------------------+------------------+----+--------------------+
|  3|4.8409274670203315|           1|             1| 3.128267604571405|                  1| 5.585608291414472|   0|[3.0,4.8409274670...|
|  3| 5.694742026537993|           1|             0|2.0593417801955525|                  1|3.1413594690450974|   1|[3.0,5.6947420265...|
|  7| 6.843595441641113|           0|             0| 2.718666663895555|                  0|  4.63931318811706|   0|[7.0,6.8435954416...|
|  2| 6.480186154058938|           0|             0|3.0870165527365243|                  1|6.2171780403504755|   0|[2.0,6.4801861540...|
|  4| 7.052861345896749|           1|    

In [None]:
#print the schema
output_data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- HbA1c: double (nullable = true)
 |-- Genetic Info: integer (nullable = false)
 |-- Family History: integer (nullable = false)
 |-- Birth Weight: double (nullable = true)
 |-- Developmental Delay: integer (nullable = false)
 |-- Insulin Level: double (nullable = true)
 |-- PNDM: integer (nullable = true)
 |-- features: vector (nullable = true)



#Split Dataset & Build the Model

In [13]:
#create final data
final_data = output_data.select('features','PNDM')

In [14]:
#print schema of final data
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- PNDM: integer (nullable = true)



In [15]:
#split the dataset and build the model
train,test= df.randomSplit([0.7,0.3])

# **Without HyperParameter Tuning**

**DecisionTreeClassifier**

In [16]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a VectorAssembler to assemble the feature columns into a single 'features' column

assembler= VectorAssembler(inputCols=['Age','HbA1c','Genetic Info','Family History','Birth Weight','Developmental Delay','Insulin Level'],outputCol='features')

# Create a Decision Tree Classifier
decision_tree = DecisionTreeClassifier(labelCol="PNDM", featuresCol="features", maxDepth=4)

# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="PNDM", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Create a pipeline with the stages
dt_pipeline = Pipeline(stages=[assembler, decision_tree])


# Fit the pipeline on the training data
dt_model = dt_pipeline.fit(train)

# Make predictions on the test data
dt_predict = dt_model.transform(test)

# Show the first 5 rows of predictions
dt_predict.select("PNDM", "prediction").show(5)


+----+----------+
|PNDM|prediction|
+----+----------+
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   1|       1.0|
+----+----------+
only showing top 5 rows



In [18]:
# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="PNDM", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Calculate the accuracy
dt_auc = evaluator.evaluate(dt_predict)
print("Decision Tree AUC =", dt_auc)

Decision Tree AUC = 0.9982503448550992


**RandomForestClassifier**

In [19]:
from pyspark.ml.classification import RandomForestClassifier

In [20]:
from pyspark.ml import Pipeline
ran_f = RandomForestClassifier(featuresCol="features", labelCol="PNDM", numTrees=100, maxDepth=4, seed=142)
# Define the pipeline stages
stages = [assembler, ran_f]

# Create a pipeline
rf_pipeline = Pipeline(stages=stages)


# Fit the pipeline on the training data
rf_model = rf_pipeline.fit(train)

# Make predictions on the test data
rf_predictions = rf_model.transform(test)

# Show the first 5 rows of predictions
rf_predictions.select("PNDM", "prediction").show(5)

+----+----------+
|PNDM|prediction|
+----+----------+
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   1|       1.0|
+----+----------+
only showing top 5 rows



In [21]:
# Random Forest Evaluation
rf_auc = evaluator.evaluate(rf_predictions)
print("Random Forest AUC =", rf_auc)

Random Forest AUC = 0.9986556458073372


**LogisticRegression**

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


# Create a Logistic Regression Classifier
logistic_reg = LogisticRegression(labelCol="PNDM", featuresCol="features")

# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="PNDM", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Create a pipeline with the stages
lg_pipeline = Pipeline(stages=[assembler, logistic_reg])

# Fit the pipeline to your data
lg_model = lg_pipeline.fit(train)

In [23]:
# Make predictions
lg_predictions = lg_model.transform(test)

# Evaluate the model
log_auc = evaluator.evaluate(lg_predictions)
print("Logistic Regression AUC = ", log_auc)

Logistic Regression AUC =  0.9941852848335063


**GBTClassifier**

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a GBT Classifier
gbt_classifier = GBTClassifier(labelCol="PNDM", featuresCol="features")

# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="PNDM", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Create a pipeline with the stages
gbt_pipeline = Pipeline(stages=[assembler, gbt_classifier])

# Fit the pipeline to your data
gbt_model = gbt_pipeline.fit(train)


In [25]:
# Make predictions
gbt_predictions = gbt_model.transform(test)

# Evaluate the model
gbt_auc = evaluator.evaluate(gbt_predictions)
print("GBT Classifier AUC:", gbt_auc)

GBT Classifier AUC: 0.9999962077103883


**LinearSVC**

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a LinearSVC classifier
svc_classifier = LinearSVC(labelCol="PNDM", featuresCol="features")

# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="PNDM", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Create a pipeline with the stages
svc_pipeline = Pipeline(stages=[assembler, svc_classifier])

# Fit the pipeline to your data
svc_model = svc_pipeline.fit(train)

In [27]:
# Make predictions
svc_predictions = svc_model.transform(test)

# Evaluate the model
svc_auc = evaluator.evaluate(svc_predictions)
print("LinearSVC AUC =", svc_auc)

LinearSVC AUC = 0.9941409998725846


# **With HyperParameter Tuning**

In [28]:
# Define feature and target columns
feature_column = "features"
target_column = "PNDM"

**DecisionTreeClassifier**

In [29]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [30]:
# Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol=target_column, featuresCol=feature_column)

# Create a ParamGrid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
    .addGrid(dt.maxDepth, [5, 10, 15])  # Maximum depth of the tree
    .addGrid(dt.impurity, ['gini', 'entropy'])  # Criterion for information gain
    .build())

# Train-validation split
tvs_dt = TrainValidationSplit(
    estimator=dt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8
)

In [38]:
# Fit the model with grid search
dt_model_tuned = tvs_dt.fit(train)
best_dt_model = dt_model_tuned.bestModel

# Make predictions and evaluate
best_dt_predictions = best_dt_model.transform(test)
best_dt_auc = evaluator.evaluate(best_dt_predictions)
print("Tuned Decision Tree AUC =", best_dt_auc)

Tuned Decision Tree AUC = 0.9997546613164778


**RandomForestClassifier**

In [33]:
from pyspark.ml.classification import RandomForestClassifier

In [34]:
# Random Forest Classifier
rf = RandomForestClassifier(labelCol=target_column, featuresCol=feature_column)

# Create a ParamGrid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [10, 20, 30])
    .addGrid(rf.maxDepth, [5, 10, 15])
    .build())

# Train-validation split
tvs = TrainValidationSplit(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8
)

In [40]:
from pyspark.ml.feature import VectorAssembler

# List all your feature columns (excluding the label/target column)
feature_columns = ['Age', 'HbA1c', 'Genetic Info', 'Family History',
                   'Birth Weight', 'Developmental Delay', 'Insulin Level']

# Drop the existing "features" column if it exists
if "features" in train.columns:
    train = train.drop("features")
if "features" in test.columns:
    test = test.drop("features")

# Create the VectorAssembler
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features"
)

# Transform your training and test data
train = assembler.transform(train)
test = assembler.transform(test)

**LogisticRegression**

In [41]:
from pyspark.ml.classification import LogisticRegression

In [42]:
# Logistic Regression
lr = LogisticRegression(labelCol=target_column, featuresCol=feature_column)

# Create a ParamGrid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 1.0])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build())

# Train-validation split
tvs_lr = TrainValidationSplit(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8
)

In [43]:
# Fit the model with grid search
lr_model_tuned = tvs_lr.fit(train)
best_lr_model = lr_model_tuned.bestModel

# Make predictions and evaluate
best_lr_predictions = best_lr_model.transform(test)
best_lr_auc = evaluator.evaluate(best_lr_predictions)
print("Tuned Logistic Regression AUC =", best_lr_auc)

Tuned Logistic Regression AUC = 0.9941830643481447


**GBTClassifier**

In [44]:
from pyspark.ml.classification import GBTClassifier

In [45]:
# Gradient-Boosted Trees (GBT) Classifier
gbt = GBTClassifier(labelCol=target_column, featuresCol=feature_column)

# Create a ParamGrid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
    .addGrid(gbt.maxDepth, [5, 10, 15])
    .addGrid(gbt.maxIter, [10, 20, 30])
    .build())

# Train-validation split
tvs_gbt = TrainValidationSplit(
    estimator=gbt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8
)

In [46]:
# Fit the model with grid search
gbt_model_tuned = tvs_gbt.fit(train)
best_gbt_model = gbt_model_tuned.bestModel

# Make predictions and evaluate
best_gbt_predictions = best_gbt_model.transform(test)
best_gbt_auc = evaluator.evaluate(best_gbt_predictions)
print("Tuned GBT Classifier AUC =", best_gbt_auc)


Tuned GBT Classifier AUC = 0.9999968064929587


 **LinearSVC**

In [None]:
from pyspark.ml.classification import LinearSVC

In [47]:
# Linear Support Vector Machine (SVM)
svm = LinearSVC(labelCol=target_column, featuresCol=feature_column, maxIter=10)

# Create a ParamGrid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
    .addGrid(svm.regParam, [0.01, 0.1, 1.0])
    .build())

# Train-validation split
tvs_svm = TrainValidationSplit(
    estimator=svm,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8
)

In [48]:
# Fit the model with grid search
svm_model_tuned = tvs_svm.fit(train)
best_svm_model = svm_model_tuned.bestModel

# Make predictions and evaluate
best_svm_predictions = best_svm_model.transform(test)
best_svm_auc = evaluator.evaluate(best_svm_predictions)
print("Tuned SVM AUC =", best_svm_auc)

Tuned SVM AUC = 0.9940603139212432
