# Diabetes Prediction using PySpark MLlib

In this project, we will build a logistic regression model to classify between diabetic and the non-diabetic patients. After training the model, we assess its performance using relevant metrics to gauge accuracy and effectiveness. The model is saved for future use, ensuring it can be retrieved and deployed in real-world applications to make predictions on new data.

This project has four parts: 

- Part 1 - Perform ETL Activity
  - Load a csv dataset
  - Check for null values in each column
  - Replace zero values with mean of the column
  - Store the cleaned data in parquet format
- Part 2 - Build a Logistic Regression Classifier
  - Correlation analysis among the input and the output variables
  - Selection of the input features
  - Split the data into training and test sets
  - Build and train the Logistic Regression Model
- Part 3 - Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4 - Persist the Model
  - Save the model for future production use
  - Load and verify the stored model

### Preliminaries: Installing libraries and downloading data

Install the required libraries

In [None]:
! pip install pyspark
! pip install findspark

Clone the required dataset from GitHub

In [None]:
! git clone https://github.com/pregismond/diabetes_dataset

Check if dataset exists

In [None]:
! ls diabetes_dataset

### Importing Libraries

Importing the required libraries

In [None]:
import os
import findspark
import warnings

def warn(*args, **kwargs):
    pass

# Suppress generated warnings
warnings.warn = warn
warnings.filterwarnings("ignore")

findspark.init()

# import functions/Classes for sparkml
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, filter, mean, when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel

# import functions/Classes for metrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator

### Create a spark session

Ignore any warnings by SparkSession command

In [None]:
spark = SparkSession \
    .builder \
    .appName("Diabetes Prediction") \
    .getOrCreate()

## Tasks

### Part 1 - Perform ETL Activity

Our initial step involves reading the CSV file named `diabetes.csv` into a Spark DataFrame called `diabetes_df`.

Load a csv dataset

* Using the `spark.read.csv` function we load the data into a dataframe
* The `header=True` indicates that there is a header row in our csv file
* The `inferSchema=True` tells spark to automatically determine the data types of the columns

In [None]:
diabetes_df = spark.read.csv("./diabetes_dataset/diabetes.csv", header=True, inferSchema=True)

We then display the structure of the `diabetes_df` DataFrame, including details about all columns and their associated data types. 

In [None]:
diabetes_df.printSchema()

Show top 5 rows from the dataset

In [None]:
diabetes_df.show(5)

Show the dimensions of the dataframe (rows, columns)

In [None]:
print((diabetes_df.count(), len(diabetes_df.columns)))

Print the value counts for the column `Outcome`

In [None]:
diabetes_df.groupBy("Outcome") \
    .count().withColumnRenamed("count", "Count") \
    .sort("Count", ascending=False) \
    .show()

The `Outcome` column consists of two classes, each indicating whether a patient has diabetes or not:

* **0**: the patient does not have diabetes
* **1**: the patient has diabetes

We can generate descriptive statistics to view some basic statistical details like count, mean, standard deviation, etc.

In [None]:
diabetes_df.describe().show()

As we can see above, the minimum values for the `Glucose`, `BloodPressure`, `SkinThickness`, `Insulin`, and `BMI` columns are 0, which is an invalid reading. We will replace the zero values in these five columns with their respective mean values. However, before doing so, let’s check for any null or missing values in the dataframe.

Check for null values in each column

In [None]:
for column in diabetes_df.columns:
    null_count = diabetes_df[diabetes_df[column].isNull()].count()
    print(f"{column}: {null_count}")

As you can see, we do not have any missing values for any of the columns present in our dataframe.

Replace zero values with mean of the column

* Replace zero values for the 5 columns from Glucose to BMI with their respective mean values

In [None]:
columns_list = ["Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI"]

# Replace zero values with mean of the column
for column in columns_list:
    # Count zero values in the column
    zero_count = diabetes_df.filter(col(column) == 0).count()
    
    # Calculate mean value of the column and convert to integer
    mean_value = int(diabetes_df.select(mean(col(column))).collect()[0][0])
    
    # Replace zero values with mean value
    print(f"Zero values in {column}: {zero_count}, Mean value: {mean_value}")
    diabetes_df = diabetes_df.withColumn(column, when(col(column) == 0, mean_value).otherwise(col(column)))

Display the dataframe contents

In [None]:
diabetes_df.show()

Store the cleaned data in parquet format

* Save the dataframe as `diabetes_cleaned.parquet`

In [None]:
diabetes_df.write.mode("overwrite").parquet("diabetes_cleaned.parquet")

Verify that the parquet file(s) are created

In [None]:
! ls -l diabetes_cleaned.parquet

### Part 2 - Build a Logistic Regression Classifier

First, load data from "diabetes_cleaned.parquet" into a dataframe

In [None]:
diabetes_df = spark.read.parquet("diabetes_cleaned.parquet")

Show total number of rows in the dataset

In [None]:
print(diabetes_df.count())

Determine the correlation among the set of input and output variables

* Correlation is the statistical relationship between two variables, where a change in one variable results in a change in the other.
    * input variables are the columns from `Pregnancies` to `Age`
    * output variable is the `Outcome` column

In [None]:
for column in diabetes_df.columns:
    print(f"Correlation to Outcome for {column} is {diabetes_df.stat.corr('Outcome', column)}")

As observed above, the Glucose column has the highest correlation value at 0.48, while all other values are below 0.4. This indicates that there are no highly correlated variables. Therefore, we will retain all the input columns as features for the model.

Define `features` selection using VectorAssembler

* Assemble the input columns into a single vector column `features`
* Use all the columns except `Outcome` as input features

In [None]:
assembler = VectorAssembler(
    inputCols=[
        "Pregnancies",
        "Glucose",
        "BloodPressure",
        "SkinThickness",
        "Insulin",
        "BMI",
        "DiabetesPedigreeFunction",
        "Age"
    ],
    outputCol="features"
)

diabetes_transformed_df = assembler.transform(diabetes_df)

Create a new DataFrame `diabetes_final_df` using the existing `diabetes_transformed_df` DataFrame.
* Select only the `features` and `Outcome` columns to isolate the relevant data needed for analysis.

In [None]:
diabetes_final_df = diabetes_transformed_df.select("features","Outcome")

Display the structure of the `diabetes_final_df` DataFrame

In [None]:
diabetes_final_df.printSchema()

Display the dataframe contents

In [None]:
diabetes_final_df.show()

Split the data into training and test sets

* We split the data set in the ratio of 70:30. 70% training data, 30% testing data.
* The random_state variable `seed` controls the shuffling applied to the data before applying the split. Pass the same integer for reproducible output across multiple function calls.

In [None]:
(trainingData, testingData) = diabetes_final_df.randomSplit([0.7, 0.3], seed=42)

Create a logistic regression model

* Logistic Regression gives the highest performance for binary classification models.

In [None]:
lr = LogisticRegression(labelCol="Outcome")
model = lr.fit(trainingData)

Display a summary of the trained model, including descriptive statistics of the model's predictions

In [None]:
summary = model.summary
summary.predictions.describe().show()

### Part 3 - Evaluate the Model

After training the model, we will assess its accuracy and effectiveness using suitable metrics.

Make predictions on testing data

In [None]:
predictions = model.evaluate(testingData)

Show the predictions

In [None]:
predictions.predictions.show()

As you can see, `LogisticRegression` has added three additional columns as predictions:

- **rawPrediction**: This is the raw prediction for each possible label and represents the raw output of the logistic regression classifier.
- **probability**: This is the result of applying logistic regression to this raw prediction.
- **prediction**: This is the corresponding class label that the model has predicted.


Use the `BinaryClassificationEvaluator` to evaluate the overall performance of the model

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Outcome")
accuracy = evaluator.evaluate(model.transform(testingData))
print(f"Accuracy: {accuracy:.4f}")

### Part 4 - Persist the Model

Save the model for future use, ensuring that it can be stored and retrieved later. This allows us to deploy the trained model in real-world applications and make predictions on new data.

* Save the model as "diabetes_model"

In [None]:
# Create folder to save model
! mkdir -p diabetes_model

# Persist the model to the path "./diabetes_model/"
model.write().overwrite().save("./diabetes_model/")

Load the model from the folder "diabetes_model"

In [None]:
loaded_model = LogisticRegressionModel.load("./diabetes_model/")

Read the csv file named `new_test.csv` into a Spark DataFrame called `new_test_df`

In [None]:
new_test_df = spark.read.csv("./diabetes_dataset/new_test.csv", header=True, inferSchema=True)

Display the structure of the `new_test_df` DataFrame, including details about all columns and their associated data types. 

In [None]:
new_test_df.printSchema()

Here we can see that we have similar input features as before. However, one thing to notice is that we don't have the output column `Outcome` because this dataset is unlabelled. We'll use the loaded model to predict diabetes on this data.

Assemble the input columns into a single vector column `features`

In [None]:
new_test_transformed_df = assembler.transform(new_test_df)

Display the structure of the `new_test_transformed_df` DataFrame

In [None]:
new_test_transformed_df.printSchema()

Here we have an additional `features` column as a vector.

Use `loaded_model` to make predictions on test data

In [None]:
predictions = loaded_model.transform(new_test_transformed_df)
predictions.printSchema()

Here we got an additional 3 columns: `rawPrediction`, `probability`, and `prediction`. The `prediction` column contains the main class level as either 0 or 1.

Show the predictions

* Display only the `features` column and `prediction`

In [None]:
predictions.select("features","prediction").show()

We have a total of four input features, and our model has made certain predictions on the input data. A prediction of 1 indicates that a patient is diabetic, while a prediction of 0 indicates that a patient is a non-diabetic.


### Stop Spark Session

In [None]:
spark.stop()

## Change Log


|  Date (YYYY-MM-DD) |  Version | Changed By  |  Change Description |
|---|---|---|---|
| 2024-08-03  | 0.1  | Pravin Regismond | Initial Version |

Copyright © 2024 Pravin Regismond. All rights reserved.