In [8]:
from pyspark.sql import SparkSession

# Khởi tạo Spark session
spark = SparkSession.builder \
    .appName("Sleep Health Analysis") \
    .config("spark.master", "local") \
    .getOrCreate()

# Đọc file CSV vào Spark DataFrame
df = spark.read.csv("E:\\Sleep_health_and_lifestyle_dataset.csv", header=True, inferSchema=True)

# Kiểm tra dữ liệu
df.show()
df.printSchema()

+---------+------+---+--------------------+--------------+----------------+-----------------------+------------+-------------+--------------+----------+-----------+--------------+
|Person ID|Gender|Age|          Occupation|Sleep Duration|Quality of Sleep|Physical Activity Level|Stress Level| BMI Category|Blood Pressure|Heart Rate|Daily Steps|Sleep Disorder|
+---------+------+---+--------------------+--------------+----------------+-----------------------+------------+-------------+--------------+----------+-----------+--------------+
|        1|  Male| 27|   Software Engineer|           6.1|               6|                     42|           6|   Overweight|        126/83|        77|       4200|          None|
|        2|  Male| 28|              Doctor|           6.2|               6|                     60|           8|       Normal|        125/80|        75|      10000|          None|
|        3|  Male| 28|              Doctor|           6.2|               6|                     60| 

In [9]:
# Kiểm tra các giá trị thiếu và các thống kê mô tả cho các trường
df.describe().show()

# Kiểm tra thống kê cho một số trường cụ thể
df.select("Sleep Duration", "Quality of Sleep", "Stress Level").describe().show()

+-------+------------------+------+-----------------+----------+------------------+------------------+-----------------------+------------------+------------+--------------+-----------------+------------------+--------------+
|summary|         Person ID|Gender|              Age|Occupation|    Sleep Duration|  Quality of Sleep|Physical Activity Level|      Stress Level|BMI Category|Blood Pressure|       Heart Rate|       Daily Steps|Sleep Disorder|
+-------+------------------+------+-----------------+----------+------------------+------------------+-----------------------+------------------+------------+--------------+-----------------+------------------+--------------+
|  count|               374|   374|              374|       374|               374|               374|                    374|               374|         374|           374|              374|               374|           374|
|   mean|             187.5|  NULL|42.18449197860963|      NULL| 7.132085561497317|  7.312834224

In [10]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Lựa chọn các đặc trưng để phân nhóm
features = ["Sleep Duration", "Quality of Sleep", "Physical Activity Level", "Stress Level"]
vec_assembler = VectorAssembler(inputCols=features, outputCol="features")
df_kmeans = vec_assembler.transform(df)

# Chạy K-means
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df_kmeans)
predictions = model.transform(df_kmeans)

# Hiển thị các nhóm
predictions.select("Person ID", "prediction").show()

+---------+----------+
|Person ID|prediction|
+---------+----------+
|        1|         2|
|        2|         0|
|        3|         0|
|        4|         2|
|        5|         2|
|        6|         2|
|        7|         2|
|        8|         1|
|        9|         1|
|       10|         1|
|       11|         2|
|       12|         1|
|       13|         2|
|       14|         2|
|       15|         2|
|       16|         2|
|       17|         2|
|       18|         2|
|       19|         2|
|       20|         1|
+---------+----------+
only showing top 20 rows



In [13]:
from pyspark.sql.functions import col

# Nếu cột cần thiết, bạn có thể loại bỏ các giá trị không hợp lệ trước khi thực hiện các phép toán
df = df.filter(col("BMI Category").isNotNull())


In [14]:
# Hiển thị các giá trị duy nhất trong cột "BMI Category"
df.select("BMI Category").distinct().show()


+-------------+
| BMI Category|
+-------------+
|Normal Weight|
|   Overweight|
|        Obese|
|       Normal|
+-------------+



In [15]:
from pyspark.sql.functions import when, col

# Chuẩn hóa các giá trị trong cột "BMI Category"
df = df.withColumn(
    "BMI Category",
    when(col("BMI Category").like("Normal Weight"), "Normal")  # Chuyển "Normal Weight" thành "Normal"
    .otherwise(col("BMI Category"))  # Giữ nguyên các giá trị khác
)


In [16]:
# Loại bỏ các giá trị không hợp lệ (giả sử bạn muốn giữ lại chỉ những giá trị hợp lệ)
valid_categories = ["Normal", "Overweight", "Obese"]
df = df.filter(col("BMI Category").isin(valid_categories))

In [17]:
# Thống kê mô tả cho một số trường cụ thể
df.select("Sleep Duration", "Quality of Sleep", "Stress Level").describe().show()


+-------+------------------+------------------+------------------+
|summary|    Sleep Duration|  Quality of Sleep|      Stress Level|
+-------+------------------+------------------+------------------+
|  count|               374|               374|               374|
|   mean| 7.132085561497317|  7.31283422459893| 5.385026737967914|
| stddev|0.7956567308898186|1.1969559197336053|1.7745264441985202|
|    min|               5.8|                 4|                 3|
|    max|               8.5|                 9|                 8|
+-------+------------------+------------------+------------------+



In [18]:
# Ví dụ: Đếm số lượng người theo từng loại BMI
df.groupBy("BMI Category").count().show()

# Hoặc áp dụng các thuật toán phân cụm (clustering) nếu cần
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Chọn các trường để phân cụm
assembler = VectorAssembler(inputCols=["Sleep Duration", "Stress Level"], outputCol="features")
feature_data = assembler.transform(df)

# Áp dụng KMeans
kmeans = KMeans(k=3, seed=1)  # Chọn số cụm
model = kmeans.fit(feature_data)

# Dự đoán các cụm
predictions = model.transform(feature_data)
predictions.select("BMI Category", "features", "prediction").show()

+------------+-----+
|BMI Category|count|
+------------+-----+
|  Overweight|  148|
|       Obese|   10|
|      Normal|  216|
+------------+-----+

+------------+---------+----------+
|BMI Category| features|prediction|
+------------+---------+----------+
|  Overweight|[6.1,6.0]|         2|
|      Normal|[6.2,8.0]|         0|
|      Normal|[6.2,8.0]|         0|
|       Obese|[5.9,8.0]|         0|
|       Obese|[5.9,8.0]|         0|
|       Obese|[5.9,8.0]|         0|
|       Obese|[6.3,7.0]|         0|
|      Normal|[7.8,6.0]|         2|
|      Normal|[7.8,6.0]|         2|
|      Normal|[7.8,6.0]|         2|
|      Normal|[6.1,8.0]|         0|
|      Normal|[7.8,6.0]|         2|
|      Normal|[6.1,8.0]|         0|
|      Normal|[6.0,8.0]|         0|
|      Normal|[6.0,8.0]|         0|
|      Normal|[6.0,8.0]|         0|
|      Normal|[6.5,7.0]|         0|
|      Normal|[6.0,8.0]|         0|
|      Normal|[6.5,7.0]|         0|
|      Normal|[7.6,6.0]|         2|
+------------+---------+

In [23]:
from pyspark.sql.functions import col

# Chuyển đổi các cột sang kiểu số nếu cần
df = df.withColumn("Sleep Duration", col("Sleep Duration").cast("double"))
df = df.withColumn("Stress Level", col("Stress Level").cast("double"))


In [24]:
from pyspark.ml.feature import VectorAssembler

# Tạo VectorAssembler với các cột đầu vào
assembler = VectorAssembler(inputCols=["Sleep Duration", "Stress Level"], outputCol="features")

# Áp dụng VectorAssembler
df_regression = assembler.transform(df)

# Hiển thị kết quả
df_regression.select("features").show()

+---------+
| features|
+---------+
|[6.1,6.0]|
|[6.2,8.0]|
|[6.2,8.0]|
|[5.9,8.0]|
|[5.9,8.0]|
|[5.9,8.0]|
|[6.3,7.0]|
|[7.8,6.0]|
|[7.8,6.0]|
|[7.8,6.0]|
|[6.1,8.0]|
|[7.8,6.0]|
|[6.1,8.0]|
|[6.0,8.0]|
|[6.0,8.0]|
|[6.0,8.0]|
|[6.5,7.0]|
|[6.0,8.0]|
|[6.5,7.0]|
|[7.6,6.0]|
+---------+
only showing top 20 rows

