In [43]:
#Install Java Development kit for Spark
!apt-get install openjdk-8-jdk

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jdk is already the newest version (8u372-ga~us1-0ubuntu1~20.04).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.


In [44]:
import os

In [45]:
#Set the JAVA_HOME env variable
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

In [46]:
#Current working directory
!pwd

/content


In [47]:
!echo $JAVA_HOME

/usr/lib/jvm/java-8-openjdk-amd64


In [48]:
#Install PySpark with latest version
!pip install pyspark==3.0.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [49]:
!pip install findspark
import findspark
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [50]:
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.master("local").appName("Test Spark").config("spark.some.config.option", "some-value").getOrCreate()

In [51]:
sc = spark.sparkContext

In [52]:
spark

In [53]:
#mount your drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [54]:
#Read the csv file
df=spark.read.csv('/content/drive/MyDrive/Colab Notebooks/diabetes_prediction_dataset.csv',inferSchema=True,header=True)

In [55]:
#Check dimension's
print((df.count(),len(df.columns)))

(100000, 9)


In [56]:
#Check for the schema
df.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- HbA1c_level: double (nullable = true)
 |-- blood_glucose_level: integer (nullable = true)
 |-- diabetes: integer (nullable = true)



In [57]:
#Top 5 records
df.show(5)

+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|Female|80.0|           0|            1|          never|25.19|        6.6|                140|       0|
|Female|54.0|           0|            0|        No Info|27.32|        6.6|                 80|       0|
|  Male|28.0|           0|            0|          never|27.32|        5.7|                158|       0|
|Female|36.0|           0|            0|        current|23.45|        5.0|                155|       0|
|  Male|76.0|           1|            1|        current|20.14|        4.8|                155|       0|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
only showing top 5 rows



In [58]:

#Datatypes of the columns
df.dtypes

[('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('smoking_history', 'string'),
 ('bmi', 'double'),
 ('HbA1c_level', 'double'),
 ('blood_glucose_level', 'int'),
 ('diabetes', 'int')]

In [59]:

#Drop unwanted columns
#my_data = df.drop(*['contact', 'day', 'month','default'])
my_data = df.alias('my_data')
#my_data = df.copy()

In [60]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))

(100000, 9)

In [61]:
my_data.describe().show()

+-------+------+-----------------+------------------+------------------+---------------+-----------------+------------------+-------------------+-------------------+
|summary|gender|              age|      hypertension|     heart_disease|smoking_history|              bmi|       HbA1c_level|blood_glucose_level|           diabetes|
+-------+------+-----------------+------------------+------------------+---------------+-----------------+------------------+-------------------+-------------------+
|  count|100000|           100000|            100000|            100000|         100000|           100000|            100000|             100000|             100000|
|   mean|  null|41.88585600000013|           0.07485|           0.03942|           null|27.32076709999422|5.5275069999983275|          138.05806|              0.085|
| stddev|  null|22.51683987161704|0.2631504702289171|0.1945930169980986|           null|6.636783416648357|1.0706720918835468|  40.70813604870383|0.27888308976661896|
|   

In [62]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
data_agg.show()
     

+------+---+------------+-------------+---------------+---+-----------+-------------------+--------+
|gender|age|hypertension|heart_disease|smoking_history|bmi|HbA1c_level|blood_glucose_level|diabetes|
+------+---+------------+-------------+---------------+---+-----------+-------------------+--------+
|     0|  0|           0|            0|              0|  0|          0|                  0|       0|
+------+---+------------+-------------+---------------+---+-----------+-------------------+--------+



In [63]:

# value counts of columns
my_data.groupBy('gender').count().show()
print()
my_data.groupBy('smoking_history').count().show()
print()

+------+-----+
|gender|count|
+------+-----+
|Female|58552|
| Other|   18|
|  Male|41430|
+------+-----+


+---------------+-----+
|smoking_history|count|
+---------------+-----+
|    not current| 6447|
|         former| 9352|
|        No Info|35816|
|        current| 9286|
|          never|35095|
|           ever| 4004|
+---------------+-----+




In [64]:
my_data.dtypes

[('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('smoking_history', 'string'),
 ('bmi', 'double'),
 ('HbA1c_level', 'double'),
 ('blood_glucose_level', 'int'),
 ('diabetes', 'int')]

In [65]:
#Preprocessing steps
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# create object of StringIndexer class and specify input and output column
SI_gender = StringIndexer(inputCol='gender',outputCol='gender_Index')
SI_smoking_history = StringIndexer(inputCol='smoking_history',outputCol='smoking_history_Index')

# transform the data
my_data = SI_gender.fit(my_data).transform(my_data)
my_data = SI_smoking_history.fit(my_data).transform(my_data)

In [66]:
# view the transformed data
my_data.select('gender', 'gender_Index', 'smoking_history', 'smoking_history_Index').show(10)

+------+------------+---------------+---------------------+
|gender|gender_Index|smoking_history|smoking_history_Index|
+------+------------+---------------+---------------------+
|Female|         0.0|          never|                  1.0|
|Female|         0.0|        No Info|                  0.0|
|  Male|         1.0|          never|                  1.0|
|Female|         0.0|        current|                  3.0|
|  Male|         1.0|        current|                  3.0|
|Female|         0.0|          never|                  1.0|
|Female|         0.0|          never|                  1.0|
|Female|         0.0|        No Info|                  0.0|
|  Male|         1.0|          never|                  1.0|
|Female|         0.0|          never|                  1.0|
+------+------------+---------------+---------------------+
only showing top 10 rows



In [67]:
# create object and specify input and output column
OHE = OneHotEncoder(inputCols=['gender_Index', 'smoking_history_Index'],outputCols=['gender_OHE', 'smoking_history_OHE'])

# transform the data
my_data = OHE.fit(my_data).transform(my_data)

# view and transform the data
my_data.select('gender', 'gender_Index', 'gender_OHE','smoking_history','smoking_history_Index','smoking_history_OHE').show(10)

+------+------------+-------------+---------------+---------------------+-------------------+
|gender|gender_Index|   gender_OHE|smoking_history|smoking_history_Index|smoking_history_OHE|
+------+------------+-------------+---------------+---------------------+-------------------+
|Female|         0.0|(2,[0],[1.0])|          never|                  1.0|      (5,[1],[1.0])|
|Female|         0.0|(2,[0],[1.0])|        No Info|                  0.0|      (5,[0],[1.0])|
|  Male|         1.0|(2,[1],[1.0])|          never|                  1.0|      (5,[1],[1.0])|
|Female|         0.0|(2,[0],[1.0])|        current|                  3.0|      (5,[3],[1.0])|
|  Male|         1.0|(2,[1],[1.0])|        current|                  3.0|      (5,[3],[1.0])|
|Female|         0.0|(2,[0],[1.0])|          never|                  1.0|      (5,[1],[1.0])|
|Female|         0.0|(2,[0],[1.0])|          never|                  1.0|      (5,[1],[1.0])|
|Female|         0.0|(2,[0],[1.0])|        No Info|         

In [68]:
my_data.columns

['gender',
 'age',
 'hypertension',
 'heart_disease',
 'smoking_history',
 'bmi',
 'HbA1c_level',
 'blood_glucose_level',
 'diabetes',
 'gender_Index',
 'smoking_history_Index',
 'gender_OHE',
 'smoking_history_OHE']

In [69]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['age',
                                       'gender_Index',
                                       'smoking_history_Index',
                                       'hypertension',
                                       'heart_disease',
                                       'bmi',
                                       'HbA1c_level',
                                       'blood_glucose_level',
                                       'gender_OHE',
                                       'smoking_history_OHE'
                                       ],
                           outputCol='features')

# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)

In [70]:
# view the transformed vector
final_data.select('features','diabetes').show()

+--------------------+--------+
|            features|diabetes|
+--------------------+--------+
|(15,[0,2,4,5,6,7,...|       0|
|(15,[0,5,6,7,8,10...|       0|
|(15,[0,1,2,5,6,7,...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|[76.0,1.0,3.0,1.0...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|(15,[0,2,5,6,7,8,...|       1|
|(15,[0,5,6,7,8,10...|       0|
|(15,[0,1,2,5,6,7,...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|(15,[0,5,6,7,8,10...|       0|
|(15,[0,1,5,6,7,9,...|       0|
|(15,[0,1,2,5,6,7,...|       0|
|(15,[0,2,5,6,7,8,...|       0|
|(15,[0,5,6,7,8,10...|       0|
|(15,[0,1,2,5,6,7,...|       0|
+--------------------+--------+
only showing top 20 rows



In [71]:
#Model_Dataframe
model_df = final_data.select(['features','diabetes'])
model_df = model_df.withColumnRenamed("diabetes","label")
model_df.printSchema()
     

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)



In [72]:
#Split into training & testing Dataframe
training_df,test_df = model_df.randomSplit([0.75,0.25])

In [73]:

#Create a logistic regression model object
from pyspark.ml.classification import LogisticRegression
log_reg=LogisticRegression().fit(training_df)

In [74]:
lr_summary=log_reg.summary

In [75]:
#Overall accuracy of the classification model
lr_summary.accuracy

0.9609615615375385

In [76]:
#Precision of both classes
print(lr_summary.precisionByLabel)

[0.966974563024016, 0.8696309086984675]


In [77]:

#Get Preditions
predictions = log_reg.transform(test_df)

In [78]:
predictions.select('label','prediction').show(50)

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    1|       1.0|
|    0|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       1.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|     