In [19]:
#Import thu vien
from pyspark.sql.functions import avg, count, round, split, col, count, when, udf, regexp_replace
from pyspark.sql.types import FloatType
import re

In [20]:
# Khởi tạo Spark session
from pyspark.sql import SparkSession


from pyspark.sql import SparkSession

# Tạo Spark session
spark = SparkSession.builder \
    .appName("PySpark ETL Main") \
    .getOrCreate()

In [21]:
df_csv = spark.read.csv("C:\\Users\\PC\\Downloads\\lmq0411_2024-10-12.csv", header=True, inferSchema=True)
df_csv.dtypes

[('Type', 'string'),
 ('Price', 'string'),
 ('Area', 'string'),
 ('Place', 'string'),
 ('Link', 'string'),
 ('Updated Day', 'date')]

In [22]:
# Làm phẳng dữ liệu 
df = df_csv.select(
    col("Type").alias("Type"),
    col("Price").alias("Price_million_VND"),
    col("Area").alias("Area_m2"),
    col("Place").alias("Place"),
    col("Link").alias("Link"),
    col("Updated Day").alias("Updated_Day"),
)


In [23]:
df.printSchema()

root
 |-- Type: string (nullable = true)
 |-- Price_million_VND: string (nullable = true)
 |-- Area_m2: string (nullable = true)
 |-- Place: string (nullable = true)
 |-- Link: string (nullable = true)
 |-- Updated_Day: date (nullable = true)



In [24]:
# Kiểm tra số lượng giá trị null
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# Loại bỏ hàng có giá trị null
df = df.dropna()

+----+-----------------+-------+-----+----+-----------+
|Type|Price_million_VND|Area_m2|Place|Link|Updated_Day|
+----+-----------------+-------+-----+----+-----------+
|   0|                0|      0|    0|   0|          0|
+----+-----------------+-------+-----+----+-----------+



In [25]:
# Loại bỏ các hàng trùng lặp
df = df.dropDuplicates()

In [26]:
# Lọc ra các dòng mà cột 'Price' không phải là 'Giá thỏa thuận' và "1.038,02 tỷ"
df = df.filter(col('Price_million_VND') != 'Giá thỏa thuận')
df = df.filter(col('Price_million_VND') != "1.038,02 tỷ")

In [27]:
# Điều kiện lọc: Đếm số lượng dấu '.' và ',' trong cột 'Area'
condition = (col('Area').rlike(r'\.\d+') & col('Area').rlike(r',\d+'))
# Lọc bỏ những dòng thỏa mãn điều kiện
df_filtered = df.filter(~condition)

In [28]:
# Hàm chuyển đổi giá sang VND
def convert_price(price):
    if price is None:
        return None  # Tương đương với np.nan trong PySpark
    if not isinstance(price, str):
        return float(price)

    # Xử lý chuỗi giá
    price = price.replace(' nghìn', '')
    price = price.replace('/m²', '')
    price = price.replace('/tháng', '')
    price = price.replace(',', '.')  # Thay đổi dấu ',' thành '.'

    if ' tỷ' in price:
        price = price.replace(' tỷ', '')
        price = float(price) * 1000  # Chuyển tỷ thành triệu VND
    elif ' triệu' in price:
        price = price.replace(' triệu', '')
        price = float(price)  # Chuyển thành triệu VND
    return float(price)

# Định nghĩa UDF cho hàm chuyển đổi
convert_price_udf = udf(convert_price, FloatType())

# Áp dụng UDF cho cột 'Price' và tạo cột mới 'Price_million_VND'
df = df.withColumn('Price_million_VND', convert_price_udf(col('Price_million_VND')))


In [29]:
# Xử lý cột 'Area': thay thế ' m²' và ',' -> '.' rồi chuyển thành kiểu float
df = df.withColumn('Area_m2', regexp_replace(col('Area_m2'), ' m²', ''))
df = df.withColumn('Area_m2', regexp_replace(col('Area_m2'), ',', '.').cast('float'))

In [30]:
# Tách cột 'Place' thành 'District' và 'City'
df = df.withColumn('District', split(col('Place'), ',').getItem(0))
df = df.withColumn('City', split(col('Place'), ',').getItem(1))

# Xóa cột 'Place'
df = df.drop('Place')

In [31]:
df.show()

+--------------------+-----------------+-------+--------------------+-----------+------------+----------------+
|                Type|Price_million_VND|Area_m2|                Link|Updated_Day|    District|            City|
+--------------------+-----------------+-------+--------------------+-----------+------------+----------------+
|     Căn hộ chung cư|           1580.0|   65.0|https://batdongsa...| 2024-10-11|    Thuận An|      Bình Dương|
|     Căn hộ chung cư|           6200.0|  120.0|https://batdongsa...| 2024-10-11|   Long Biên|          Hà Nội|
|     Căn hộ chung cư|           2200.0|   43.0|https://batdongsa...| 2024-10-11|     Gia Lâm|          Hà Nội|
|     Căn hộ chung cư|           3800.0|   92.0|https://batdongsa...| 2024-10-11|     Hà Đông|          Hà Nội|
|           Nhà riêng|           2200.0|   30.0|https://batdongsa...| 2024-10-11|     Hà Đông|          Hà Nội|
|     Căn hộ chung cư|           1900.0|   38.0|https://batdongsa...| 2024-10-11|  Bình Thạnh|     Hồ Ch

In [32]:
# Tính toán cột 'Price_per_m2' và làm tròn tới 2 chữ số thập phân
df = df.withColumn('Price_per_m2', round(col('Price_million_VND') / col('Area_m2'), 2))

In [33]:
df.show()

+--------------------+-----------------+-------+--------------------+-----------+------------+----------------+------------+
|                Type|Price_million_VND|Area_m2|                Link|Updated_Day|    District|            City|Price_per_m2|
+--------------------+-----------------+-------+--------------------+-----------+------------+----------------+------------+
|     Căn hộ chung cư|           1580.0|   65.0|https://batdongsa...| 2024-10-11|    Thuận An|      Bình Dương|       24.31|
|     Căn hộ chung cư|           6200.0|  120.0|https://batdongsa...| 2024-10-11|   Long Biên|          Hà Nội|       51.67|
|     Căn hộ chung cư|           2200.0|   43.0|https://batdongsa...| 2024-10-11|     Gia Lâm|          Hà Nội|       51.16|
|     Căn hộ chung cư|           3800.0|   92.0|https://batdongsa...| 2024-10-11|     Hà Đông|          Hà Nội|        41.3|
|           Nhà riêng|           2200.0|   30.0|https://batdongsa...| 2024-10-11|     Hà Đông|          Hà Nội|       73.33|
