In [1]:
import os
import findspark

# ✅ Correct your paths here:
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-11.0.27.6-hotspot"
os.environ["SPARK_HOME"] = r"C:\spark-3.5.6-bin-hadoop3"

findspark.init()


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Healthanalysis").getOrCreate()

In [4]:
data_path = "C:/Users/samin/Desktop/bigdata/data.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.show(5)

+------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+------+----+---------+------+
|Diabetes_012|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|   Sex| Age|Education|Income|
+------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+------+----+---------+------+
|         0.0|   1.0|     1.0|      1.0|40.0|   1.0|   0.0|                 0.0|         0.0|   0.0|    1.0|              0.0|          1.0|        0.0|    5.0|    18.0|    15.0|     1.0|Female| 9.0|      4.0|   3.0|
|         0.0|   0.0|     0.0|      0.0|25.0|   1.0|   0.0|                 0.0|         1.0|   0.0|    0.0|              0.0|      

In [5]:
df_clean = df.dropna()

In [6]:
df

DataFrame[Diabetes_012: double, HighBP: double, HighChol: double, CholCheck: double, BMI: double, Smoker: double, Stroke: double, HeartDiseaseorAttack: double, PhysActivity: double, Fruits: double, Veggies: double, HvyAlcoholConsump: double, AnyHealthcare: double, NoDocbcCost: double, GenHlth: double, MentHlth: double, PhysHlth: double, DiffWalk: double, Sex: string, Age: double, Education: double, Income: double]

In [7]:
df.columns

['Diabetes_012',
 'HighBP',
 'HighChol',
 'CholCheck',
 'BMI',
 'Smoker',
 'Stroke',
 'HeartDiseaseorAttack',
 'PhysActivity',
 'Fruits',
 'Veggies',
 'HvyAlcoholConsump',
 'AnyHealthcare',
 'NoDocbcCost',
 'GenHlth',
 'MentHlth',
 'PhysHlth',
 'DiffWalk',
 'Sex',
 'Age',
 'Education',
 'Income']

In [8]:
df.describe().show()

+-------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------+-----------------+------------------+------------------+
|summary|       Diabetes_012|             HighBP|           HighChol|          CholCheck|               BMI|             Smoker|             Stroke|HeartDiseaseorAttack|      PhysActivity|            Fruits|            Veggies|  HvyAlcoholConsump|     AnyHealthcare|        NoDocbcCost|           GenHlth|          MentHlth|          PhysHlth|           DiffWalk|   Sex|              Age|         Education|            Income|
+-------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------

In [9]:
df.printSchema()

root
 |-- Diabetes_012: double (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- HeartDiseaseorAttack: double (nullable = true)
 |-- PhysActivity: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Veggies: double (nullable = true)
 |-- HvyAlcoholConsump: double (nullable = true)
 |-- AnyHealthcare: double (nullable = true)
 |-- NoDocbcCost: double (nullable = true)
 |-- GenHlth: double (nullable = true)
 |-- MentHlth: double (nullable = true)
 |-- PhysHlth: double (nullable = true)
 |-- DiffWalk: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



In [10]:
df.createOrReplaceTempView("health")

In [11]:
avg_bmi_income_by_education = spark.sql("""
    SELECT 
        Education, 
        ROUND(AVG(BMI), 2) AS AvgBMI, 
        ROUND(AVG(Income), 2) AS AvgIncome
    FROM health
    GROUP BY Education
    ORDER BY AvgBMI DESC
""")
avg_bmi_income_by_education.show()


+---------+------+---------+
|Education|AvgBMI|AvgIncome|
+---------+------+---------+
|      1.0| 29.76|     3.77|
|      3.0| 29.64|     3.77|
|      2.0| 29.45|     3.29|
|      4.0| 29.04|     5.19|
|      5.0| 28.88|     5.88|
|      6.0| 27.52|     6.98|
+---------+------+---------+



In [12]:
stroke_by_age = spark.sql("""
    SELECT 
        Age, 
        COUNT(*) AS Count
    FROM health
    WHERE Stroke = 1
    GROUP BY Age
    ORDER BY Age
""")

stroke_by_age.show()

+----+-----+
| Age|Count|
+----+-----+
| 1.0|   21|
| 2.0|   29|
| 3.0|   83|
| 4.0|  137|
| 5.0|  229|
| 6.0|  368|
| 7.0|  722|
| 8.0| 1085|
| 9.0| 1414|
|10.0| 1620|
|11.0| 1557|
|12.0| 1291|
|13.0| 1736|
+----+-----+



In [13]:
from pyspark.sql.functions import col, isnan, when, count

In [14]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|Diabetes_012|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex|Age|Education|Income|
+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|           0|     0|       0|        0|  0|     0|     0|                   0|           0|     0|      0|                0|            0|          0|      0|       0|       0|       0|  0|  0|        0|     0|
+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-

In [15]:
df_with_label = df.withColumn("label", when(df["Diabetes_012"] == 2.0, 1.0).otherwise(0.0))

In [16]:
df_with_label.createOrReplaceTempView("health")

In [17]:
correlation = df.stat.corr("CholCheck", "HvyAlcoholConsump")
print("Correlation between CholCheck and heavyAlcohol consumption :", correlation)

Correlation between CholCheck and heavyAlcohol consumption : -0.02373009065481386


In [18]:
high_risk_groups = spark.sql("""
    SELECT 
        HighBP,
        Smoker,
        ROUND(AVG(label), 3) AS AvgRisk,
        COUNT(*) AS People
    FROM health
    GROUP BY HighBP, Smoker
    ORDER BY AvgRisk DESC
""")
high_risk_groups.show()

+------+------+-------+------+
|HighBP|Smoker|AvgRisk|People|
+------+------+-------+------+
|   1.0|   1.0|  0.257| 54279|
|   1.0|   0.0|  0.232| 54550|
|   0.0|   1.0|  0.075| 58144|
|   0.0|   0.0|   0.05| 86707|
+------+------+-------+------+



In [19]:
df = df.na.fill({'Fruits':0, 'DiffWalk': 0})

In [20]:
df.show(5)

+------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+------+----+---------+------+
|Diabetes_012|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|   Sex| Age|Education|Income|
+------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+------+----+---------+------+
|         0.0|   1.0|     1.0|      1.0|40.0|   1.0|   0.0|                 0.0|         0.0|   0.0|    1.0|              0.0|          1.0|        0.0|    5.0|    18.0|    15.0|     1.0|Female| 9.0|      4.0|   3.0|
|         0.0|   0.0|     0.0|      0.0|25.0|   1.0|   0.0|                 0.0|         1.0|   0.0|    0.0|              0.0|      

In [21]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [22]:
df = df.drop("Sex_indexed")
indexer = StringIndexer(inputCol="Sex", outputCol="Sex_indexed")
df = indexer.fit(df).transform(df)
df.select("Sex", "Sex_indexed").show(5)

+------+-----------+
|   Sex|Sex_indexed|
+------+-----------+
|Female|        0.0|
|Female|        0.0|
|Female|        0.0|
|Female|        0.0|
|Female|        0.0|
+------+-----------+
only showing top 5 rows



In [23]:
encoder = OneHotEncoder(inputCols=["Sex_indexed"], outputCols=["Sex_encoded"])
df = encoder.fit(df).transform(df)
df.select("Sex", "Sex_indexed", "Sex_encoded").show(5)

+------+-----------+-------------+
|   Sex|Sex_indexed|  Sex_encoded|
+------+-----------+-------------+
|Female|        0.0|(1,[0],[1.0])|
|Female|        0.0|(1,[0],[1.0])|
|Female|        0.0|(1,[0],[1.0])|
|Female|        0.0|(1,[0],[1.0])|
|Female|        0.0|(1,[0],[1.0])|
+------+-----------+-------------+
only showing top 5 rows



In [24]:
feature_columns = [
  "HighBP", "HighChol", "CholCheck", "BMI", "Smoker", "Stroke",
  "HeartDiseaseorAttack", "PhysActivity", "Fruits", "Veggies",
  "HvyAlcoholConsump", "AnyHealthcare", "NoDocbcCost",
  "GenHlth", "MentHlth", "PhysHlth", "DiffWalk", 
  "Sex_encoded",  
  "Age", "Education", "Income"
]

count_before = df.count()
df_clean = df.na.drop(subset=feature_columns)
count_after = df_clean.count()
print(f"Rows before drop: {count_before}, after drop: {count_after}")

Rows before drop: 253680, after drop: 253680


In [25]:
df_clean.createOrReplaceTempView("diabtes")

In [26]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)

In [27]:
df = df.withColumnRenamed("Diabetes_012", "label")

In [28]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [29]:
df.groupBy("label").count().show()


+-----+------+
|label| count|
+-----+------+
|  0.0|213703|
|  1.0|  4631|
|  2.0| 35346|
+-----+------+



In [30]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
rf_model = rf.fit(train_data)

In [31]:
predictions = rf_model.transform(test_data)
predictions.select("prediction", "label", "probability").show(5)

+----------+-----+--------------------+
|prediction|label|         probability|
+----------+-----+--------------------+
|       0.0|  0.0|[0.90098556989969...|
|       0.0|  0.0|[0.87592948297578...|
|       0.0|  0.0|[0.90266879696500...|
|       0.0|  0.0|[0.89541140263714...|
|       0.0|  0.0|[0.87441903353843...|
+----------+-----+--------------------+
only showing top 5 rows



In [32]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.843863672307449


In [33]:
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)
print(f"F1 Score: {f1_score}")

F1 Score: 0.7728724407618908


In [34]:
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print(f"Precision: {precision}")

Precision: 0.8212122326834389


In [35]:
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print(f"Recall: {recall}")

Recall: 0.843863672307449


In [36]:
from pyspark.ml.regression import LinearRegression

In [37]:
lr = LinearRegression(featuresCol="features", labelCol= "label")
lr_model = lr.fit(train_data)

In [40]:
df = df.filter(df["label"].isin([0.0, 1.0]))

In [43]:
from pyspark.ml.regression import GBTRegressor

In [44]:
gbt = GBTRegressor(labelCol="label", featuresCol="features")
gbt_model = gbt.fit(train_data)
predictions = gbt_model.transform(test_data)

In [45]:
gbt_pred = gbt_model.transform(test_data)
gbt_pred.select("features", "label", "prediction").show(5)

+--------------------+-----+--------------------+
|            features|label|          prediction|
+--------------------+-----+--------------------+
|(21,[3,4,7,11,13,...|  0.0|0.013634030970582403|
|(21,[3,7,9,11,13,...|  0.0| 0.04746149797014129|
|(21,[3,7,8,9,11,1...|  0.0|-0.00566763978581...|
|(21,[3,4,7,8,9,12...|  0.0|0.021622534345575994|
|(21,[3,7,8,9,11,1...|  0.0| 0.04595596035564019|
+--------------------+-----+--------------------+
only showing top 5 rows



In [55]:
from pyspark.ml.evaluation import RegressionEvaluator

In [56]:
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")


In [57]:
evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")


In [58]:
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")


In [59]:
rmse = evaluator_rmse.evaluate(predictions)


In [60]:
mae = evaluator_mae.evaluate(predictions)


In [64]:
r2 = evaluator_r2.evaluate(predictions)

In [65]:
print(f"RMSE     : {rmse:.4f}")

RMSE     : 0.6244


In [66]:
print(f"MAE      : {mae:.4f}")

MAE      : 0.3987


In [67]:
print(f"R² Score : {r2:.4f}")

R² Score : 0.1944


In [68]:
df.groupBy("label").count().show()

+-----+------+
|label| count|
+-----+------+
|  0.0|213703|
|  1.0|  4631|
+-----+------+



In [69]:
predictions.show(5)

+-----+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+------+----+---------+------+-----------+-------------+--------------------+--------------------+
|label|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|   Sex| Age|Education|Income|Sex_indexed|  Sex_encoded|            features|          prediction|
+-----+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+------+----+---------+------+-----------+-------------+--------------------+--------------------+
|  0.0|   0.0|     0.0|      0.0|16.0|   1.0|   0.0|                 0.0|         1.0|   0.0|    0.0|              0.0|          1.0|        0.0|    2.0|     5.0|    

In [72]:
output_df = predictions.select("label", "prediction", "MentHlth", "BMI", "Age", "Income","Sex")
output_df.write.csv("diabetes_prediction.csv", header=True, mode="overwrite")