In [1]:
from pyspark.sql import SparkSession

In [2]:
# Step 1: Create Spark session
spark = SparkSession.builder\
    .appName("Diabetes Health Indicators Analysis")\
    .getOrCreate()
print("Session is created")

Session is created


In [3]:
# Step 2: Load Dataset
df = spark.read.csv("diabetes_binary_health_indicators_BRFSS2015.csv", header=True, inferSchema=True)

In [4]:
#Step 3: Explore Dataset
df.printSchema()

root
 |-- Diabetes_binary: double (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- HeartDiseaseorAttack: double (nullable = true)
 |-- PhysActivity: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Veggies: double (nullable = true)
 |-- HvyAlcoholConsump: double (nullable = true)
 |-- AnyHealthcare: double (nullable = true)
 |-- NoDocbcCost: double (nullable = true)
 |-- GenHlth: double (nullable = true)
 |-- MentHlth: double (nullable = true)
 |-- PhysHlth: double (nullable = true)
 |-- DiffWalk: double (nullable = true)
 |-- Sex: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



In [5]:
df.show(5)

+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|Diabetes_binary|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex| Age|Education|Income|
+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|            0.0|   1.0|     1.0|      1.0|40.0|   1.0|   0.0|                 0.0|         0.0|   0.0|    1.0|              0.0|          1.0|        0.0|    5.0|    18.0|    15.0|     1.0|0.0| 9.0|      4.0|   3.0|
|            0.0|   0.0|     0.0|      0.0|25.0|   1.0|   0.0|                 0.0|         1.0|   0.0|    0.0|              0.0|   

In [6]:
print("Total Rows:", df.count())

Total Rows: 253680


In [7]:
#Step 4: Summary Statistics
df.describe().show()

summary_stats = df.describe()
summary_stats.write.csv("Task_1_Outputs/Summary_Statistics.csv", header=True, mode="overwrite")

+-------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+-------------------+--------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+------------------+------------------+
|summary|    Diabetes_binary|             HighBP|           HighChol|         CholCheck|               BMI|             Smoker|             Stroke|HeartDiseaseorAttack|      PhysActivity|            Fruits|           Veggies|  HvyAlcoholConsump|      AnyHealthcare|        NoDocbcCost|           GenHlth|          MentHlth|          PhysHlth|           DiffWalk|               Sex|              Age|         Education|            Income|
+-------+-------------------+-------------------+-------------------+------------------+------------------+-----------------

In [8]:
#Step 5: Group by Diabetes
df.groupBy("Diabetes_binary").count().show()

+---------------+------+
|Diabetes_binary| count|
+---------------+------+
|            0.0|218334|
|            1.0| 35346|
+---------------+------+



In [9]:
#Step 6: Average Health Idicators by Diabetes
import pyspark.sql.functions as F
cols_to_avg = ['BMI','HighBP','HighChol','Smoker','PhysActivity','Fruits','Veggies','HvyAlcoholConsump']
agg_exprs = [F.avg(col).alias(f"avg_{col}") for col in cols_to_avg]
df.groupBy("Diabetes_binary").agg(*agg_exprs).show()

summary_by_diabetes = df.groupBy("Diabetes_binary").agg(*agg_exprs)
summary_by_diabetes.write.csv("Task_1_Outputs/Grouped_Averages_By_Diabetes.csv", header=True, mode="overwrite")

+---------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+---------------------+
|Diabetes_binary|          avg_BMI|         avg_HighBP|      avg_HighChol|        avg_Smoker|  avg_PhysActivity|        avg_Fruits|       avg_Veggies|avg_HvyAlcoholConsump|
+---------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+---------------------+
|            0.0|27.80577005871738|0.37660190350563816|0.3842965364991252| 0.431018531241126|0.7769426658239211|0.6421583445546731|0.8203257394633909|  0.06148378172891075|
|            1.0|31.94401063769592| 0.7526735698523171|0.6701182594918803|0.5182198834380128|0.6305381089797997|0.5854410682962712|0.7564080801222204|  0.02353873139817801|
+---------------+-----------------+-------------------+------------------+------------------+------------------+------------------+----

In [10]:
#Step 7: Export cleaned data for ML or Dashboard
import os
os.environ['HADOOP_HOME'] = 'F:/hadoop'
df.write.csv("Task_1_outputs/Processed_diabetes_data.csv", header=True, mode="overwrite")