In [2]:
from pyspark.sql import SparkSession

# Khởi tạo Spark Session
spark = SparkSession.builder \
    .appName("Sleep Heathy and Life Style") \
    .getOrCreate()

# Đọc dữ liệu từ HDFS
hdfs_path = "hdfs://localhost:9000/netprtony/14-Sleep_health_and_lifestyle_dataset.csv"
df = spark.read.csv(hdfs_path, header=True, inferSchema=True)

# Hiển thị dữ liệu
df.show()



+---------+------+---+--------------+--------------+----------------+-----------------------+------------+-------------+--------------+----------+-----------+--------------+
|Person ID|Gender|Age|    Occupation|Sleep Duration|Quality of Sleep|Physical Activity Level|Stress Level| BMI Category|Blood Pressure|Heart Rate|Daily Steps|Sleep Disorder|
+---------+------+---+--------------+--------------+----------------+-----------------------+------------+-------------+--------------+----------+-----------+--------------+
|        1|  Male| 27|      Software|           6.1|               6|                     42|           6|   Overweight|        126/83|        77|       4200|          None|
|        2|  Male| 28|      Engineer|           6.2|               6|                     60|           8|       Normal|        125/80|        75|      10000|          None|
|        3|  Male| 28|        Doctor|           6.2|               6|                     60|           8|       Normal|        12

In [3]:
# Kiểm tra số lượng giá trị null trong mỗi cột
df.select([df[col].isNull().cast("int").alias(col) for col in df.columns]).show()

+---------+------+---+----------+--------------+----------------+-----------------------+------------+------------+--------------+----------+-----------+--------------+
|Person ID|Gender|Age|Occupation|Sleep Duration|Quality of Sleep|Physical Activity Level|Stress Level|BMI Category|Blood Pressure|Heart Rate|Daily Steps|Sleep Disorder|
+---------+------+---+----------+--------------+----------------+-----------------------+------------+------------+--------------+----------+-----------+--------------+
|        0|     0|  0|         0|             0|               0|                      0|           0|           0|             0|         0|          0|             0|
|        0|     0|  0|         0|             0|               0|                      0|           0|           0|             0|         0|          0|             0|
|        0|     0|  0|         0|             0|               0|                      0|           0|           0|             0|         0|          0|  

In [4]:
# Loại bỏ các hàng có giá trị null (nếu có)
df_cleaned = df.dropna()

In [5]:
# Kiểm tra kiểu dữ liệu
df.printSchema()

root
 |-- Person ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Sleep Duration: double (nullable = true)
 |-- Quality of Sleep: integer (nullable = true)
 |-- Physical Activity Level: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)
 |-- BMI Category: string (nullable = true)
 |-- Blood Pressure: string (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Daily Steps: integer (nullable = true)
 |-- Sleep Disorder: string (nullable = true)



In [7]:
from pyspark.sql.functions import trim, lower

# Chuẩn hóa cột 'Occupation'
df_cleaned = df_cleaned.withColumn("Occupation", trim(lower(df_cleaned["Occupation"])))

In [8]:
# Loại bỏ các hàng có tuổi âm (nếu có)
df_cleaned = df_cleaned.filter(df_cleaned.Age >= 0)

In [9]:
# Hiển thị dữ liệu đã được làm sạch
df_cleaned.show(truncate=False)

# Dừng SparkSession nếu không còn sử dụng
spark.stop()

+---------+------+---+--------------+--------------+----------------+-----------------------+------------+-------------+--------------+----------+-----------+--------------+
|Person ID|Gender|Age|Occupation    |Sleep Duration|Quality of Sleep|Physical Activity Level|Stress Level|BMI Category |Blood Pressure|Heart Rate|Daily Steps|Sleep Disorder|
+---------+------+---+--------------+--------------+----------------+-----------------------+------------+-------------+--------------+----------+-----------+--------------+
|1        |Male  |27 |software      |6.1           |6               |42                     |6           |Overweight   |126/83        |77        |4200       |None          |
|2        |Male  |28 |engineer      |6.2           |6               |60                     |8           |Normal       |125/80        |75        |10000      |None          |
|3        |Male  |28 |doctor        |6.2           |6               |60                     |8           |Normal       |125/80    

In [13]:
from pyspark.sql import SparkSession

# Tạo SparkSession
spark = SparkSession.builder \
    .appName("DataUploadToMongoDB") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/database.Sleep-Heathy") \
    .getOrCreate()

# Giả sử df_cleaned là DataFrame đã được xử lý
df_cleaned.write \
    .format("mongo") \
    .mode("append") \
    .save()

# Dừng SparkSession
spark.stop()
