# Diabetes Project

The purpose of this project is to demonstrate basic skills handling large datasets with pyspark. 

The first section using the 'rel.csv' dataset involves  data processing, dataset output into a new file, and simple analysis.

The second section using the 'diabetes.csv' dataset inolves training and testing a random forest classifier for diabetes prediction

In [11]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext.setLogLevel('ERROR')

In [12]:
#import data
rel_data = spark.read.csv('./rel.csv', sep = ',', header = False, inferSchema = True)

                                                                                

In [13]:
#show data
rel_data.show(5)

+--------+--------+
|     _c0|     _c1|
+--------+--------+
|C0005790|C0005778|
|C1255279|C3537249|
|C1255446|C0002520|
|C1255552|C0596019|
|C1254417|C0004611|
+--------+--------+
only showing top 5 rows



# Data Processing
Order the pair of strings alphabetically. Given a pair (c1, c2),

-if c1 <= c2, then the ordered pair is still (c1, c2)

or

-if c2 < c1, then the ordered pair is (c2, c1);

In [14]:
#import necessary modules
from pyspark.sql.functions import col,when

In [15]:
rel_data_alpha = rel_data.select(
    #sort for c0, placing lower value in c0 and renaming the column
    when(col("_c0") <= col("_c1"), col("_c0")).otherwise(col("_c1")).alias("c0"),
    #sort for c1, placing higher value in c1 and renaming the column
    when(col("_c0") <= col("_c1"), col("_c1")).otherwise(col("_c0")).alias("c1")
)

rel_data_alpha.show(5)

+--------+--------+
|      c0|      c1|
+--------+--------+
|C0005778|C0005790|
|C1255279|C3537249|
|C0002520|C1255446|
|C0596019|C1255552|
|C0004611|C1254417|
+--------+--------+
only showing top 5 rows



Count the number of instances for each ordered pair and save the result
in a plain text file (named as “pair-count.txt”)

In [16]:
#Use groupby on the organized dataset to group by each unique pair and then count each group
rel_data_counts = rel_data_alpha.groupby('c0','c1').count()

rel_data_counts.show(5)

[Stage 24:>                                                         (0 + 1) / 1]

+--------+--------+-----+
|      c0|      c1|count|
+--------+--------+-----+
|C0002210|C1255543|    2|
|C0031507|C1254766|    2|
|C0043375|C1255373|    2|
|C0002520|C0523760|    2|
|C0039350|C1278175|   10|
+--------+--------+-----+
only showing top 5 rows



                                                                                

In [18]:
#save output as a txt file
#convert dataframe into single string format rdd
rel_data_rdd = rel_data_counts.rdd.map(lambda row: f'"{row.c0}","{row.c1}" {row["count"]}')

#convert RDD to single partition and save as txt file
rel_data_rdd.coalesce(1).saveAsTextFile('pair-count.txt')

#output is "part-00000" in the pair-count.txt folder


                                                                                

Count how many unique ordered pairs are obtained.

In [19]:
rel_data_counts.distinct().count()

                                                                                

12946540

# Diabetes Prediction Model

In this section, we utilize a dataset containing clinical information to develop a random forest model for diabetes classification (presence vs. absence), evaluating the model with AUROC

In [20]:
#import the data
dm_data = spark.read.csv('./diabetes.csv', sep=',', header=True, inferSchema=True)

In [21]:
#preview data
dm_data.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



Remove the rows where the column “BloodPressure”, “BMI” or “Glucose” is zero

In [22]:
#create temp view of data for sql query
dm_data.createOrReplaceTempView("dm_data")

#filter for data of interest (values >0) via sql query
small_dm = spark.sql("SELECT * FROM dm_data WHERE `BloodPressure` > 0 AND `BMI` > 0 AND `Glucose` > 0")

small_dm.show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [23]:
#double check that there are no zero values
from pyspark.sql.functions import min

small_dm.select(min('Glucose')).show()
small_dm.select(min('BloodPressure')).show()
small_dm.select(min('BMI')).show()

+------------+
|min(Glucose)|
+------------+
|          44|
+------------+

+------------------+
|min(BloodPressure)|
+------------------+
|                24|
+------------------+

+--------+
|min(BMI)|
+--------+
|    18.2|
+--------+



Convert the categorical column “Pregnancies” into one-hot encoding.

In [24]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

#create index column for pregnancies 
indexer = StringIndexer(inputCol = 'Pregnancies', outputCol = 'Pregnancies_INDEX')
small_dm = indexer.fit(small_dm).transform(small_dm)

small_dm.show(5)

                                                                                

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Pregnancies_INDEX|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|              6.0|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|              0.0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|              8.0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|              0.0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|              1.0|
+-----------+-------+-------------+-------------+-------

In [25]:
#apply OneHotEncoder
encoder = OneHotEncoder(inputCol='Pregnancies_INDEX', outputCol = 'Pregnancies_VEC')
small_dm = encoder.fit(small_dm).transform(small_dm)

small_dm.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------------+---------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Pregnancies_INDEX|Pregnancies_VEC|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----------------+---------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|              6.0| (16,[6],[1.0])|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|              0.0| (16,[0],[1.0])|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|              8.0| (16,[8],[1.0])|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|              0.0| (16,[0],[1.0])|
|          0|    137|           40|           35|    168|43.1|

Create a single column with all the features collated together using VectorAssembler.

In [26]:
small_dm.columns

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome',
 'Pregnancies_INDEX',
 'Pregnancies_VEC']

In [27]:
from pyspark.ml.feature import VectorAssembler

# Define the feature columns (excluding the outcome column)
feature_cols = [col for col in small_dm.columns if col not in ["Outcome"]] 

# Apply VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
small_dm = assembler.transform(small_dm)

# check data
small_dm.select("features", "Outcome").show(truncate=False)



+-------------------------------------------------------------------------------+-------+
|features                                                                       |Outcome|
+-------------------------------------------------------------------------------+-------+
|(25,[0,1,2,3,5,6,7,8,15],[6.0,148.0,72.0,35.0,33.6,0.627,50.0,6.0,1.0])        |1      |
|(25,[0,1,2,3,5,6,7,9],[1.0,85.0,66.0,29.0,26.6,0.351,31.0,1.0])                |0      |
|(25,[0,1,2,5,6,7,8,17],[8.0,183.0,64.0,23.3,0.672,32.0,8.0,1.0])               |1      |
|(25,[0,1,2,3,4,5,6,7,9],[1.0,89.0,66.0,23.0,94.0,28.1,0.167,21.0,1.0])         |0      |
|(25,[1,2,3,4,5,6,7,8,10],[137.0,40.0,35.0,168.0,43.1,2.288,33.0,1.0,1.0])      |1      |
|(25,[0,1,2,5,6,7,8,14],[5.0,116.0,74.0,25.6,0.201,30.0,5.0,1.0])               |0      |
|(25,[0,1,2,3,4,5,6,7,8,12],[3.0,78.0,50.0,32.0,88.0,31.0,0.248,26.0,3.0,1.0])  |1      |
|(25,[0,1,2,3,4,5,6,7,8,11],[2.0,197.0,70.0,45.0,543.0,30.5,0.158,53.0,2.0,1.0])|1      |
|(25,[0,1,

Random split the collated data into training (70%) and testing (30%) datasets, using 2017 as the seed.

In [28]:
small_dm_train, small_dm_test = small_dm.randomSplit([0.7,0.3], seed = 2017)

Implement a random forest classifier with number of decision trees set to 20.

In [29]:
from pyspark.ml.classification import RandomForestClassifier

#create randomforest classifier

RF = RandomForestClassifier(labelCol = 'Outcome',featuresCol='features', numTrees=20)

#train model
RF_model = RF.fit(small_dm_train)

#test model
test = RF_model.transform(small_dm_test)

#show results:
test.select('features', 'Outcome','prediction').show(10)

+--------------------+-------+----------+
|            features|Outcome|prediction|
+--------------------+-------+----------+
|(25,[1,2,5,6,7,8,...|      0|       0.0|
|(25,[1,2,5,6,7,8,...|      0|       0.0|
|(25,[1,2,3,4,5,6,...|      0|       0.0|
|(25,[1,2,3,4,5,6,...|      0|       0.0|
|(25,[1,2,3,4,5,6,...|      0|       0.0|
|(25,[1,2,3,4,5,6,...|      0|       0.0|
|(25,[1,2,5,6,7,8,...|      0|       0.0|
|(25,[1,2,5,6,7,8,...|      0|       0.0|
|(25,[1,2,3,4,5,6,...|      0|       0.0|
|(25,[1,2,3,5,6,7,...|      0|       0.0|
+--------------------+-------+----------+
only showing top 10 rows



Evaluate the performance of the random forest classifier using the ROC curve metric.

In [30]:
import pyspark.ml.evaluation as ev

evaluator  = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='Outcome'
)

print('AuROC', evaluator.evaluate(test, {evaluator.metricName: 'areaUnderROC'}))

AuROC 0.8507462686567163
