# TASK 1 : Install Dependencies & Run Spark Session

In [None]:
#install pyspark
! pip install pyspark


Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 15.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=25a084850c33a633ca4ac758086a48b39fefd716803b04327059928837a3733f
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [25]:
#create a sparksession
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark").getOrCreate()


# TASK 2: Clone & Explore dataset

In [None]:
#clone the diabetes dataset from the github repository
! git clone https://github.com/education454/diabetes_dataset

fatal: destination path 'diabetes_dataset' already exists and is not an empty directory.


In [None]:
#check if the dataset exists
! ls diabetes_dataset/

diabetes.csv  new_test.csv


In [26]:
#create spark dataframe
df = spark.read.csv("/content/diabetes_dataset/diabetes.csv", header=True, inferSchema=True)

In [None]:
#display the dataframe
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [None]:
#print the schema
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [None]:
#count the total no. of diabetic and non-diabetic class
print((df.count(), len(df.columns)))
df.groupby("Outcome").count().show()

(2000, 9)
+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [None]:
#get the summary statistics
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

# TASK 3: Data Cleaning & Preparation

In [None]:
#check for null values
from pyspark.sql.functions import when, col, isnan, count, avg
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [None]:
#look for the unnecessary values present
df.select([count(when(col(c) == 0, c)).alias(c) for c in df.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|        301|     13|           90|          573|    956| 28|                       0|  0|   1316|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [27]:
#calculate and replace the unnecessary values by the mean value
exclude = ["Pregnancies", "DiabetesPedigreeFunction", "Age", "Outcome"]
dfMean = df.agg(*(avg(c).alias(c) for c in df.columns if c not in exclude))
for column in dfMean.columns:
  df = df.withColumn(column, when(col(column) == 0, dfMean.select(column).head()[0]).otherwise(col(column).alias(column)))

In [None]:
#display the dataframe 
df.show()

+-----------+-------+-------------+-------------+-------+------------------+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|               BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+------------------+------------------------+---+-------+
|        2.0|  138.0|         62.0|         35.0| 80.254|              33.6|                   0.127| 47|      1|
|     3.7035|   84.0|         82.0|         31.0|  125.0|              38.2|                   0.233| 23|      0|
|     3.7035|  145.0|      69.1455|       20.935| 80.254|              44.2|                    0.63| 31|      1|
|     3.7035|  135.0|         68.0|         42.0|  250.0|              42.3|                   0.365| 24|      1|
|        1.0|  139.0|         62.0|         41.0|  480.0|              40.7|                   0.536| 21|      0|
|     3.7035|  173.0|         78.0|         32.0|  265.0|              46.5|            

# TASK 4: Correlation Analysis & Feature Selection

In [28]:
#find the correlation among the set of input & output variables
for column in df.columns:
  print("Correlation to outcome for {} is {}".format(column, df.stat.corr("Outcome", column)))

Correlation to outcome for Pregnancies is 0.22443699263363961
Correlation to outcome for Glucose is 0.4879768034291114
Correlation to outcome for BloodPressure is 0.17184366228641254
Correlation to outcome for SkinThickness is 0.17085584969479944
Correlation to outcome for Insulin is 0.1713393780603629
Correlation to outcome for BMI is 0.28261473844622204
Correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
Correlation to outcome for Age is 0.23650924717620253
Correlation to outcome for Outcome is 1.0


In [29]:
#feature selection
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age" ], outputCol="features")
outputData = assembler.transform(df)

In [30]:
#print the schema
outputData.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: double (nullable = true)
 |-- BloodPressure: double (nullable = true)
 |-- SkinThickness: double (nullable = true)
 |-- Insulin: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [31]:
#display dataframe
outputData.show()

+-----------+-------+-------------+-------------+-------+------------------+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|               BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+------------------+------------------------+---+-------+--------------------+
|          2|  138.0|         62.0|         35.0| 80.254|              33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|   84.0|         82.0|         31.0|  125.0|              38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|  145.0|      69.1455|       20.935| 80.254|              44.2|                    0.63| 31|      1|[0.0,145.0,69.145...|
|          0|  135.0|         68.0|         42.0|  250.0|              42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|  139.0|         62.0|         41.0|  480.

# TASK 5: Split Dataset & Build the Model

In [33]:
#create final data
from pyspark.ml.classification import LogisticRegression
finalDf = outputData.select("features", "Outcome")

In [34]:
#print schema of final data
finalDf.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [36]:
#split the dataset ; build the model
train, test = finalDf.randomSplit([0.7, 0.3])
logistic = LogisticRegression(labelCol="Outcome")
model = logistic.fit(train)

In [41]:
#summary of the model
summary = model.summary
summary.predictions.describe().show()



+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1381|               1381|
|   mean|0.33671252715423605|0.25850832729905865|
| stddev| 0.4727568505725187|0.43797336888319555|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



# TASK 6: Evaluate and Save the Model

In [42]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [45]:
predictions.predictions.show(20)



+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[4.03620118447480...|[0.98264216744712...|       0.0|
|[0.0,67.0,76.0,20...|      0|[2.59427958397179...|[0.93049251477994...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.83651321238616...|[0.97888670993177...|       0.0|
|[0.0,78.0,88.0,29...|      0|[3.01382290214163...|[0.95319470720638...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.66811302779378...|[0.93511863940645...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.66811302779378...|[0.93511863940645...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.90005671475461...|[0.94784924046310...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.81341446879027...|[0.94339642677237...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.66824398432144...|[0.93512658431801...|    

In [46]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Outcome")
evaluator.evaluate(model.transform(test))

0.8167351598173512

In [47]:
# save model
model.save("model")

In [51]:
# load saved model back to the environment
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load("model")

# TASK 7: Prediction on New Data with the saved model

In [52]:
#create a new spark dataframe
newData = spark.read.csv("/content/diabetes_dataset/new_test.csv", header=True, inferSchema=True)

In [53]:
#print the schema
newData.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)



In [56]:
#create an additional feature merged column 
testeData = assembler.transform(newData)

In [57]:
#print the schema
testeData.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)



In [58]:
#use model to make predictions
results = model.transform(testeData)

In [60]:
#display the predictions
results.select("features", "prediction").show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,190.0,78.0,3...|       1.0|
|[0.0,80.0,84.0,36...|       0.0|
|[2.0,138.0,82.0,4...|       1.0|
|[1.0,110.0,63.0,4...|       1.0|
+--------------------+----------+

