Project Title: Diabetes Prediction Pipeline using PySpark

Objective:
This project demonstrates the process of cleaning, preparing, and modeling a diabetes dataset using PySpark. The dataset is preprocessed by imputing missing values, feature engineering, and training a Logistic Regression model to predict diabetes outcomes based on various health metrics.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DiabetesPipeline") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/20 16:13:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/20 16:13:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Step 1: Data Exploration

In [6]:
df = spark.read.csv("diabetes.csv", header=True, inferSchema=True)
df.printSchema()
df.show()


root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          

Step 2: Handling Missing Values (Imputation)

In [8]:
from pyspark.sql.functions import col, isnan, when

columns_with_zeros = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
for col_name in columns_with_zeros:
    df = df.withColumn(col_name, when(col(col_name) == 0, None).otherwise(col(col_name)))
    df.show()


+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   NULL|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|   NULL|26.6|                   0.351| 31|      0|
|          8|    183|           64|         NULL|   NULL|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|         NULL|   NULL|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [24]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=columns_with_zeros, outputCols=[c + "_imputed" for c in columns_with_zeros])
df = imputer.fit(df).transform(df)
df.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+---------------------+---------------------+---------------+-----------+--------------------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Glucose_imputed|BloodPressure_imputed|SkinThickness_imputed|Insulin_imputed|BMI_imputed|            features|     scaled_features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+---------------------+---------------------+---------------+-----------+--------------------+--------------------+
|          6|    148|           72|           35|   NULL|33.6|                   0.627| 50|      1|            148|                   72|                   35|            155|       33.6|[6.0,148.0,72.0,3...|[1.78063837321943...|
|          1|     85|           66|           29|   NULL|26.6|                  

In [12]:
#here one can drop original cols and lets imputed(filled) columns in:


Step 3: Feature Engineering

In [15]:
#pyspark take 1 feature..so combinig into one feature:

from pyspark.ml.feature import VectorAssembler

# List all the columns to be used as features (imputed ones and any new features)
feature_cols = [
    'Pregnancies', 'Glucose_imputed', 'BloodPressure_imputed', 'SkinThickness_imputed', 
    'Insulin_imputed', 'BMI_imputed', 'DiabetesPedigreeFunction', 'Age'
]

# Assembler: combines features into a single column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

df.show(5)  # To check the new "features" column



+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+---------------------+---------------------+---------------+-----------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Glucose_imputed|BloodPressure_imputed|SkinThickness_imputed|Insulin_imputed|BMI_imputed|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+---------------------+---------------------+---------------+-----------+--------------------+
|          6|    148|           72|           35|   NULL|33.6|                   0.627| 50|      1|            148|                   72|                   35|            155|       33.6|[6.0,148.0,72.0,3...|
|          1|     85|           66|           29|   NULL|26.6|                   0.351| 31|      0|             85|                   66|                   29|     

In [18]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
df = scaler.fit(df).transform(df)

df.show(5)  # Now the DataFrame will have a "scaled_features" column


+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+---------------------+---------------------+---------------+-----------+--------------------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Glucose_imputed|BloodPressure_imputed|SkinThickness_imputed|Insulin_imputed|BMI_imputed|            features|     scaled_features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+---------------------+---------------------+---------------+-----------+--------------------+--------------------+
|          6|    148|           72|           35|   NULL|33.6|                   0.627| 50|      1|            148|                   72|                   35|            155|       33.6|[6.0,148.0,72.0,3...|[1.78063837321943...|
|          1|     85|           66|           29|   NULL|26.6|                  

Step 4: Model Training

In [19]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)


In [21]:
from pyspark.ml.classification import LogisticRegression

# Initialize the Logistic Regression model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="Outcome")

# Train the model
lr_model = lr.fit(train_data)

# Make predictions
predictions = lr_model.transform(test_data)

# Show results
predictions.select("Outcome", "prediction", "probability").show(10)


+-------+----------+--------------------+
|Outcome|prediction|         probability|
+-------+----------+--------------------+
|      0|       0.0|[0.98801884361001...|
|      0|       0.0|[0.92681538906873...|
|      0|       0.0|[0.87619988556419...|
|      0|       0.0|[0.93400414148460...|
|      0|       0.0|[0.95866237880879...|
|      0|       0.0|[0.96402248619869...|
|      0|       0.0|[0.89199040663298...|
|      0|       0.0|[0.81823775295386...|
|      0|       0.0|[0.90555106687083...|
|      1|       0.0|[0.87349445835079...|
+-------+----------+--------------------+
only showing top 10 rows



Step 5: Model Evaluation

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="Outcome", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC AUC: {roc_auc}")


ROC AUC: 0.8630813953488372


Step 6: Model Saving & Data Export

In [23]:
# Save the cleaned data
df.write.mode("overwrite").parquet("diabetes_cleaned_data/")

# Save the model
lr_model.save("diabetes_lr_model/")


                                                                                

Conclusion
This project demonstrates the full pipeline for preprocessing, modeling, and evaluating a diabetes dataset using PySpark. The key steps include data exploration, imputation, feature engineering, model training, and evaluation. The final model and data are saved for further analysis or deployment, providing a solid foundation for further improvements and real-world use.