In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# TASK 1 : Install Dependencies & Run Spark Session

In [2]:
#install pyspark
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
#create a sparksession
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('spark').getOrCreate()

# TASK 2: Clone & Explore dataset

In [4]:
#clone the diabetes dataset from the github repository
! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (6/6), 13.02 KiB | 833.00 KiB/s, done.


In [5]:
#create spark dataframe
diabetes_path = '/content/drive/MyDrive/code/coursera projects/Diabetes Prediction With Pyspark MLLIB/diabetes_dataset/diabetes.csv'
diabetes_df = spark.read.csv(diabetes_path, header = True, inferSchema = True)
diabetes_df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [6]:
#print the schema
diabetes_df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [7]:
#count the total no. of diabetic and non-diabetic class
class_counts = diabetes_df.groupBy('Outcome').count()
class_counts.show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [8]:
#get the summary statistics
diabetes_df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

# TASK 3: Data Cleaning & Preparation

In [9]:
#check for null values
for col in diabetes_df.columns:
  print(f"{col}: ",diabetes_df.where(F.col(col).isNull()).count(), " | ", diabetes_df.where(F.col(col).isNull()).count()/diabetes_df.count())

Pregnancies:  0  |  0.0
Glucose:  0  |  0.0
BloodPressure:  0  |  0.0
SkinThickness:  0  |  0.0
Insulin:  0  |  0.0
BMI:  0  |  0.0
DiabetesPedigreeFunction:  0  |  0.0
Age:  0  |  0.0
Outcome:  0  |  0.0


In [10]:
#look for the unnecessary values present
print("Column | Count | Persentage")
for col in diabetes_df.columns:
  print(f"{col}: ", diabetes_df.where(F.col(col) == 0).count(), " | ", diabetes_df.where(F.col(col) == 0).count()/diabetes_df.count())

Column | Count | Persentage
Pregnancies:  301  |  0.1505
Glucose:  13  |  0.0065
BloodPressure:  90  |  0.045
SkinThickness:  573  |  0.2865
Insulin:  956  |  0.478
BMI:  28  |  0.014
DiabetesPedigreeFunction:  0  |  0.0
Age:  0  |  0.0
Outcome:  1316  |  0.658


SkinThickness and Insulin have too many missing values to replace with just the mean. I will first try just dropping them.

In [11]:
diabetes_df = diabetes_df.drop('SkinThickness')
diabetes_df = diabetes_df.drop('Insulin')

In [12]:
import pyspark.sql.functions as F

for col in diabetes_df.columns:
  mean_value = diabetes_df.select(F.mean(diabetes_df[col])).collect()[0][0]
  if col != 'DiabetesPedigreeFunction':
    mean_value = int(mean_value)  # Convert mean to integer for non-DiabetesPedigreeFunction columns
  diabetes_df = diabetes_df.withColumn(col, F.when(diabetes_df[col] == 0, mean_value).otherwise(diabetes_df[col]))

In [13]:
print("Column | Count | Persentage")
for col in diabetes_df.columns:
  print(f"{col}: ", diabetes_df.where(F.col(col) == 0).count(), " | ", diabetes_df.where(F.col(col) == 0).count()/diabetes_df.count())

Column | Count | Persentage
Pregnancies:  0  |  0.0
Glucose:  0  |  0.0
BloodPressure:  0  |  0.0
BMI:  0  |  0.0
DiabetesPedigreeFunction:  0  |  0.0
Age:  0  |  0.0
Outcome:  1316  |  0.658


In [14]:
#display the dataframe
diabetes_df.show()

+-----------+-------+-------------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+----+------------------------+---+-------+
|          2|    138|           62|33.6|                   0.127| 47|      1|
|          3|     84|           82|38.2|                   0.233| 23|      0|
|          3|    145|           69|44.2|                    0.63| 31|      1|
|          3|    135|           68|42.3|                   0.365| 24|      1|
|          1|    139|           62|40.7|                   0.536| 21|      0|
|          3|    173|           78|46.5|                   1.159| 58|      0|
|          4|     99|           72|25.6|                   0.294| 28|      0|
|          8|    194|           80|26.1|                   0.551| 67|      0|
|          2|     83|           65|36.8|                   0.629| 24|      0|
|          2|     89|           90|33.5|                   0.292

# TASK 4: Correlation Analysis & Feature Selection

In [None]:
from pyspark.ml.stat import Correlation



In [26]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = diabetes_df.columns[:-1],outputCol = 'features')
data = assembler.transform(diabetes_df)
data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [27]:
data.show()

+-----------+-------+-------------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          3|     84|           82|38.2|                   0.233| 23|      0|[3.0,84.0,82.0,38...|
|          3|    145|           69|44.2|                    0.63| 31|      1|[3.0,145.0,69.0,4...|
|          3|    135|           68|42.3|                   0.365| 24|      1|[3.0,135.0,68.0,4...|
|          1|    139|           62|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          3|    173|           78|46.5|                   1.159| 58|      0|[3.0,173.0,78.0,4...|
|          4|     99|           72|25.6|                   0.294| 28|      0|[4.0,99.0,72.0,25...|
|         

In [44]:
#find the correlation among the set of input & output variables
pearsonCorr = Correlation.corr(data, 'features', 'pearson').collect()[0]

rows = pearsonCorr[0].toArray().tolist()
df = spark.createDataFrame(rows,diabetes_df.columns[:-1] )
df.show()

+--------------------+-------------------+--------------------+--------------------+------------------------+--------------------+
|         Pregnancies|            Glucose|       BloodPressure|                 BMI|DiabetesPedigreeFunction|                 Age|
+--------------------+-------------------+--------------------+--------------------+------------------------+--------------------+
|                 1.0|0.14315010160093808| 0.23827297053253124| 0.06391250574900126|    -0.01567339386454522|    0.53375928127878|
| 0.14315010160093808|                1.0| 0.20055605544524802| 0.23347218412509715|     0.12418335544037215|  0.2598636909895083|
| 0.23827297053253124|0.20055605544524802|                 1.0| 0.25948450246041294|    0.015354937355195726|  0.3255620937940902|
| 0.06391250574900126|0.23347218412509715| 0.25948450246041294|                 1.0|     0.14201772636887044|0.018966709871156135|
|-0.01567339386454522|0.12418335544037215|0.015354937355195726| 0.14201772636887044

# TASK 5: Split Dataset & Build the Model

In [45]:
#create final data
final_data = data.select('features','Outcome')

In [46]:
#print schema of final data
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [51]:
#split the dataset ; build the model
train, test = final_data.randomSplit([0.7,0.3])

from pyspark.ml.classification import LogisticRegression
model = LogisticRegression(labelCol = 'Outcome')
model = model.fit(train)

In [52]:
#summary of the model
summary = model.summary
summary.predictions.describe().show()

+-------+------------------+-------------------+
|summary|           Outcome|         prediction|
+-------+------------------+-------------------+
|  count|              1399|               1399|
|   mean|0.3409578270192995| 0.2651894210150107|
| stddev| 0.474200717865217|0.44159187004259826|
|    min|               0.0|                0.0|
|    max|               1.0|                1.0|
+-------+------------------+-------------------+



# TASK 6: Evaluate and Save the Model

In [53]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [54]:
predictions.predictions.show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[1.0,71.0,48.0,20...|      0|[4.25006575050087...|[0.98593728461364...|       0.0|
|[1.0,71.0,62.0,21...|      0|[4.04715368003616...|[0.98282799493903...|       0.0|
|[1.0,73.0,50.0,23...|      0|[4.08064130343791...|[0.98338412590260...|       0.0|
|[1.0,77.0,56.0,33...|      0|[2.24695711275765...|[0.90438773852197...|       0.0|
|[1.0,79.0,80.0,25...|      0|[3.38243638230186...|[0.96715109686818...|       0.0|
|[1.0,80.0,55.0,19...|      0|[4.08807555026670...|[0.98350516438999...|       0.0|
|[1.0,80.0,74.0,30...|      0|[3.08562071655625...|[0.95629570155214...|       0.0|
|[1.0,80.0,74.0,30...|      0|[3.08562071655625...|[0.95629570155214...|       0.0|
|[1.0,81.0,72.0,26...|      0|[3.50667345138094...|[0.97087705385353...|    

In [60]:
predictions.predictions.select('probability').take(5)

[Row(probability=DenseVector([0.9859, 0.0141])),
 Row(probability=DenseVector([0.9828, 0.0172])),
 Row(probability=DenseVector([0.9834, 0.0166])),
 Row(probability=DenseVector([0.9044, 0.0956])),
 Row(probability=DenseVector([0.9672, 0.0328]))]

In [62]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = "Outcome")
evaluator.evaluate(model.transform(test))

0.8366438608107111

In [63]:
# save model
model.save('/content/drive/MyDrive/code/coursera projects/Diabetes Prediction With Pyspark MLLIB/model')

In [65]:
# load saved model back to the environment
from pyspark.ml.classification import LogisticRegressionModel
loaded_model = LogisticRegressionModel.load('/content/drive/MyDrive/code/coursera projects/Diabetes Prediction With Pyspark MLLIB/model')

# TASK 7: Prediction on New Data with the saved model

In [66]:
#create a new spark dataframe
unseen = spark.read.csv('/content/drive/MyDrive/code/coursera projects/Diabetes Prediction With Pyspark MLLIB/diabetes_dataset/new_test.csv', header = True, inferSchema=True)

In [67]:
#print the schema
unseen.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)



In [68]:
# Drop columns
unseen = unseen.drop('SkinThickness')
unseen = unseen.drop('Insulin')

In [70]:
# count 0
print("Column | Count | Persentage")
for col in unseen.columns:
  print(f"{col}: ", unseen.where(F.col(col) == 0).count(), " | ", unseen.where(F.col(col) == 0).count()/unseen.count())

Column | Count | Persentage
Pregnancies:  1  |  0.25
Glucose:  0  |  0.0
BloodPressure:  0  |  0.0
BMI:  0  |  0.0
DiabetesPedigreeFunction:  0  |  0.0
Age:  0  |  0.0


In [71]:
#check for null values
for col in diabetes_df.columns:
  print(f"{col}: ",diabetes_df.where(F.col(col).isNull()).count(), " | ", diabetes_df.where(F.col(col).isNull()).count()/diabetes_df.count())

Pregnancies:  0  |  0.0
Glucose:  0  |  0.0
BloodPressure:  0  |  0.0
BMI:  0  |  0.0
DiabetesPedigreeFunction:  0  |  0.0
Age:  0  |  0.0
Outcome:  0  |  0.0


In [72]:
#create an additional feature merged column
test_data = assembler.transform(unseen)

In [73]:
#print the schema
test_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)



In [74]:
#use model to make predictions
results = model.transform(test_data)
results.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [75]:
#display the predictions
results.show()

+-----------+-------+-------------+----+------------------------+---+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure| BMI|DiabetesPedigreeFunction|Age|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+----+------------------------+---+--------------------+--------------------+--------------------+----------+
|          1|    190|           78|45.1|                   0.153| 48|[1.0,190.0,78.0,4...|[-1.6557488861342...|[0.16033348413029...|       1.0|
|          0|     80|           84|50.2|                   0.211| 26|[0.0,80.0,84.0,50...|[2.15883888604958...|[0.89649185348177...|       0.0|
|          2|    138|           82|52.3|                   0.315| 30|[2.0,138.0,82.0,5...|[-0.5032384792432...|[0.37677991651963...|       1.0|
|          1|    110|           63|62.7|                   0.616| 32|[1.0,110.0,63.0,6...|[-0.3603280196679...|[0.41088016394933...|    