In [21]:
!pip install pyspark



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/usr/local/opt/python@3.10/bin/python3.10 -m pip install --upgrade pip[0m


In [1]:
!which python3.10


/usr/local/bin/python3.10


In [2]:
import os

os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3.10"         # Path to Python 3.10
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/bin/python3.10"


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("IoT Health Data EDA").config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/05 18:10:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Read the CSV file with the pipe delimiter (|) and infer the schema automatically
df_full = spark.read.csv(
    "hdfs://localhost:9000/iot/health_dataset/processed/health_data_20250429.csv",
    header=True,       # Use the first row as the header
    sep="|",           # Set the delimiter to '|'
    inferSchema=True   # Automatically infer the schema
)

# Show the first few rows to verify the data
df_full.show(5)

# Print the schema to see the inferred types
df_full.printSchema()

                                                                                

+-----------+---+------+----------+-------------+---------+----+----------+-----+-------+-----------+----------+--------------+-----------------+-------------------+-----------+------------+-----------+-----+--------------+--------------+-----------------+----+------+
|    user_id|age|gender| datestamp|height_inches|weight_kg| bmi|heart_rate| spo2|ecg_avg|body_temp_f|step_count|calories_burnt|sleep_duration_hr|sleep_quality_score|systolic_bp|diastolic_bp|day_of_week|month|pulse_pressure|activity_level|health_risk_score|year|season|
+-----------+---+------+----------+-------------+---------+----+----------+-----+-------+-----------+----------+--------------+-----------------+-------------------+-----------+------------+-----------+-----+--------------+--------------+-----------------+----+------+
|user_000000| 24|Female|2022-01-01|         65.1|     97.9|35.8|      74.0| 99.2|   59.8|       97.8|      4544|          1866|              7.4|                8.3|        133|          83|   

In [3]:
# Count the number of rows in the DataFrame
row_count = df_full.count()

# Print the row count
print(f"Total number of rows: {row_count}")



Total number of rows: 141384000


                                                                                

In [5]:
from pyspark.sql import functions as F

# Aggregating by activity level to analyze metrics
activity_level_agg = df_full.groupBy("activity_level").agg(
    F.avg("health_risk_score").alias("avg_health_risk_score"),
    F.avg("bmi").alias("avg_bmi"),
    F.avg("systolic_bp").alias("avg_systolic_bp"),
    F.avg("diastolic_bp").alias("avg_diastolic_bp"),
    F.avg("heart_rate").alias("avg_heart_rate"),
    F.avg("sleep_quality_score").alias("avg_sleep_quality_score"),
    F.avg("spo2").alias("avg_spo2"),
    F.avg("pulse_pressure").alias("avg_pulse_pressure")
)

# Show the results
activity_level_agg.show()




+--------------+---------------------+------------------+------------------+-----------------+-----------------+-----------------------+-----------------+------------------+
|activity_level|avg_health_risk_score|           avg_bmi|   avg_systolic_bp| avg_diastolic_bp|   avg_heart_rate|avg_sleep_quality_score|         avg_spo2|avg_pulse_pressure|
+--------------+---------------------+------------------+------------------+-----------------+-----------------+-----------------------+-----------------+------------------+
|        Active|    2.414945849713073|29.219232426711745| 118.6337381996837|78.21431300402692|68.25350774709905|      7.482182295770849|97.48747027527995| 40.41942519565679|
|      Moderate|   2.5376023984743314| 29.57135934767827|120.77193467958027|79.60334915825256|68.22236110218715|      7.415539800367009|  97.480127209262| 41.16858552132772|
|     Sedentary|    2.691499705299768|28.967660423784736| 126.4562962345867|83.35842479935671|68.44270207427527|      7.0841965977

                                                                                

In [7]:
from pyspark.sql.functions import col

# List of numeric columns
numeric_cols = ["age", "bmi", "calories_burnt", "step_count", "sleep_quality_score", "health_risk_score"]

# Select only those columns from the main DataFrame
numeric_df = df_full.select(numeric_cols)

# Initialize an empty dictionary to store correlation values
corr_dict = {}

# Loop through each pair of columns and calculate correlation
for i in range(len(numeric_cols)):
    for j in range(i + 1, len(numeric_cols)):
        col1 = numeric_cols[i]
        col2 = numeric_cols[j]
        corr_value = numeric_df.stat.corr(col1, col2)  # Compute correlation
        corr_dict[(col1, col2)] = corr_value

# Convert the correlation dictionary to a DataFrame for easier viewing
corr_df = spark.createDataFrame(
    [(key[0], key[1], value) for key, value in corr_dict.items()],
    ["Column1", "Column2", "Correlation"]
)

# Show the correlation results
corr_df.show(truncate=False)


                                                                                

+-------------------+-------------------+---------------------+
|Column1            |Column2            |Correlation          |
+-------------------+-------------------+---------------------+
|age                |bmi                |-0.052618195675783005|
|age                |calories_burnt     |-0.5940629038761709  |
|age                |step_count         |-0.49686875804236663 |
|age                |sleep_quality_score|-0.18557614416997026 |
|age                |health_risk_score  |0.18360075287533886  |
|bmi                |calories_burnt     |0.05497048003302466  |
|bmi                |step_count         |0.04982858364780363  |
|bmi                |sleep_quality_score|0.02902933055049414  |
|bmi                |health_risk_score  |0.6059262035973417   |
|calories_burnt     |step_count         |0.4975369726243971   |
|calories_burnt     |sleep_quality_score|0.11938752075761844  |
|calories_burnt     |health_risk_score  |-0.09975065457431655 |
|step_count         |sleep_quality_score