<h2>Thông tin chung về dữ liệu</h2>

In [1]:
#Import các thư viện cần thiết
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import mean as _mean, col, when, count, desc, lit
import pyspark.sql.functions as F

In [2]:
# Tạo Spark session
spark = SparkSession.builder.appName("KMeansClustering").getOrCreate()

# Đọc dữ liệu
data = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("Car_Rates.csv")

24/06/02 22:19:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [3]:
#Đếm số dòng
rows = data.count()
print(f"Number of rows: {rows}")

#Đếm số cột
cols = len(data.columns)
print(f"Number of columns: {cols}")

Number of rows: 4243
Number of columns: 12


In [4]:
#Hiển thị schema dữ liệu
data.printSchema()

root
 |-- Car_name: string (nullable = true)
 |-- Num_of_reviews: double (nullable = true)
 |-- General_rate: double (nullable = true)
 |-- Comfort: double (nullable = true)
 |-- Interior design: double (nullable = true)
 |-- Performance: double (nullable = true)
 |-- Value for the money: double (nullable = true)
 |-- Exterior styling: double (nullable = true)
 |-- Reliability: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Model: string (nullable = true)



In [5]:
# Tạo view tạm thời
data.createOrReplaceTempView("car_rates_view")

# Truy vấn dữ liệu
sqlDF = spark.sql("""
SELECT Car_name, Brand, Model, General_rate, Year
FROM car_rates_view
""")

# Hiển thị kết quả truy
sqlDF.show()

+--------------------+-----+-----------------+------------+----+
|            Car_name|Brand|            Model|General_rate|Year|
+--------------------+-----+-----------------+------------+----+
|2023     Acura   ...|Acura|         Integra.|         4.6|2023|
|2023     Acura   ...|Acura|             MDX.|        NULL|2023|
|2023     Acura   ...|Acura|             TLX.|        NULL|2023|
|2023     Acura   ...|Acura|             RDX.|        NULL|2023|
|2022     Acura   ...|Acura|             TLX.|         4.8|2022|
|2022     Acura   ...|Acura|             MDX.|         4.7|2022|
|2022     Acura   ...|Acura|             RDX.|         4.8|2022|
|2022     Acura   ...|Acura|             ILX.|        NULL|2022|
|2022     Acura   ...|Acura|             NSX.|        NULL|2022|
|2021     Acura   ...|Acura|             ILX.|         4.9|2021|
|2021     Acura   ...|Acura|             TLX.|         4.6|2021|
|2021     Acura   ...|Acura|             RDX.|         4.8|2021|
|2021     Acura   ...|Acu

<h2>Tính toán thống kê</h2>

In [6]:
# Tính giá trị cao nhất, thấp nhất, trung bình cho các cột số
numerical_columns = ["Num_of_reviews", "General_rate", "Comfort", "Interior design", "Performance", "Value for the money", "Exterior styling", "Reliability"]
categorical_columns = ["Year", "Brand", "Model"]

stats = data.select(
    *[F.max(c).alias(f"max_{c}") for c in numerical_columns],
    *[F.min(c).alias(f"min_{c}") for c in numerical_columns],
    *[F.avg(c).alias(f"avg_{c}") for c in numerical_columns]
)

In [7]:
# Tính giá trị trung vị cho các cột số
def median(col_name):
    return F.expr(f'percentile_approx(`{col_name}`, 0.5)')

medians = data.select(
    *[median(c).alias(f"median_{c}") for c in numerical_columns]
)

In [8]:
# Thu thập kết quả từ Spark DataFrame
stats_dict = stats.collect()[0].asDict()
medians_dict = medians.collect()[0].asDict()

# Kết hợp kết quả
combined_stats = {**stats_dict, **medians_dict}

# Định dạng kết quả dưới dạng bảng
header = ["Metric", "Value"]
rows = [[k, v] for k, v in combined_stats.items()]

# Tính độ dài của mỗi cột để căn chỉnh đầu ra
max_lengths = [max(len(str(item)) for item in col) for col in zip(*([header] + rows))]

# Tạo chuỗi định dạng cho các cột
format_str = ' | '.join([f'{{:<{length}}}' for length in max_lengths])

# In tiêu đề
print(format_str.format(*header))
print('-' * (sum(max_lengths) + len(max_lengths) * 3 - 1))

# In các hàng dữ liệu
for row in rows:
    print(format_str.format(*row))

24/06/02 22:19:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Metric                     | Value            
------------------------------------------------
max_Num_of_reviews         | 1024.0           
max_General_rate           | 5.0              
max_Comfort                | 5.0              
max_Interior design        | 5.0              
max_Performance            | 5.0              
max_Value for the money    | 5.0              
max_Exterior styling       | 5.0              
max_Reliability            | 5.0              
min_Num_of_reviews         | 1.0              
min_General_rate           | 1.0              
min_Comfort                | 1.0              
min_Interior design        | 1.0              
min_Performance            | 1.0              
min_Value for the money    | 1.0              
min_Exterior styling       | 1.0              
min_Reliability            | 1.0              
avg_Num_of_reviews         | 55.57977711334602
avg_General_rate           | 4.626963848871978
avg_Comfort                | 4.654879043218254
avg_Interio

In [9]:
# Tìm giá trị phổ biến và hiếm nhất cho các cột chữ
categorical_columns = ["Brand", "Model", "Year"]

most_frequent = {}
least_frequent = {}

for col_name in categorical_columns:
    freq_df = data.groupBy(col_name).agg(count(col_name).alias("count"))
    
    # Tìm giá trị phổ biến nhất
    most_freq_row = freq_df.orderBy(col("count").desc()).first()
    most_frequent[col_name] = (most_freq_row[col_name], most_freq_row["count"])
    
    # Tìm giá trị hiếm nhất
    least_freq_row = freq_df.orderBy(col("count")).first()
    least_frequent[col_name] = (least_freq_row[col_name], least_freq_row["count"])

In [10]:
# Định dạng kết quả dưới dạng bảng
header = ["Column", "Most Frequent Value", "Count", "Least Frequent Value", "Count"]
rows = [
    [col_name, most_frequent[col_name][0], most_frequent[col_name][1], least_frequent[col_name][0], least_frequent[col_name][1]]
    for col_name in categorical_columns
]

# Tính độ dài của mỗi cột để căn chỉnh đầu ra
max_lengths = [max(len(str(item)) for item in col) for col in zip(*([header] + rows))]

# Tạo chuỗi định dạng cho các cột
format_str = ' | '.join([f'{{:<{length}}}' for length in max_lengths])

# In tiêu đề
print(format_str.format(*header))
print('-' * (sum(max_lengths) + len(max_lengths) * 3 - 1))

# In các hàng dữ liệu
for row in rows:
    print(format_str.format(*row))

Column | Most Frequent Value | Count | Least Frequent Value | Count
---------------------------------------------------------------------
Brand  | BMW                 | 436   | Tesla                | 1    
Model  | Escalade ESV.       | 12    | Rio5.                | 1    
Year   | 2022                | 390   | 2012                 | 278  


<h2>Tiền xử lý dữ liệu</h2>

In [11]:
# Bỏ cột Car_name
data = data.drop("Car_name")

In [12]:
# Xử lý missing values: điền giá trị trung bình cho các cột số và giá trị phổ biến cho các cột phân loại
for col_name in numerical_columns:
    mean_value = data.select(_mean(col_name)).collect()[0][0]
    data = data.na.fill({col_name: mean_value})

for col_name in categorical_columns:
    mode_value = data.groupBy(col_name).count().orderBy('count', ascending=False).first()[0]
    data = data.na.fill({col_name: mode_value})

In [13]:
# Tạo view tạm thời
data.createOrReplaceTempView("car_rates_view")

# Truy vấn dữ liệu
sqlDF = spark.sql("""
SELECT Brand, Model, General_rate, Year
FROM car_rates_view
""")

# Hiển thị kết quả truy
sqlDF.show()

+-----+-----------------+-----------------+----+
|Brand|            Model|     General_rate|Year|
+-----+-----------------+-----------------+----+
|Acura|         Integra.|              4.6|2023|
|Acura|             MDX.|4.626963848871978|2023|
|Acura|             TLX.|4.626963848871978|2023|
|Acura|             RDX.|4.626963848871978|2023|
|Acura|             TLX.|              4.8|2022|
|Acura|             MDX.|              4.7|2022|
|Acura|             RDX.|              4.8|2022|
|Acura|             ILX.|4.626963848871978|2022|
|Acura|             NSX.|4.626963848871978|2022|
|Acura|             ILX.|              4.9|2021|
|Acura|             TLX.|              4.6|2021|
|Acura|             RDX.|              4.8|2021|
|Acura|             NSX.|4.626963848871978|2021|
|Acura|             ILX.|              4.6|2020|
|Acura|RLX Sport Hybrid.|              5.0|2020|
|Acura|             MDX.|              4.8|2020|
|Acura|             TLX.|              4.9|2020|
|Acura|MDX Sport Hyb

In [14]:
# Mã hóa các cột phân loại thành số nguyên
for col_name in categorical_columns:
    categories = data.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()
    mapping_expr = when(col(col_name) == lit(categories[0]), 0)
    for idx, category in enumerate(categories[1:], 1):
        mapping_expr = mapping_expr.when(col(col_name) == lit(category), idx)
    data = data.withColumn(col_name + "_encoded", mapping_expr.otherwise(col(col_name)))

                                                                                

In [15]:
# Chọn tất cả các cột số và các cột đã được mã hóa
feature_columns = numerical_columns + [col + "_encoded" for col in categorical_columns]

# Chuẩn hóa các cột số và cột đã mã hóa
summary = data.select(*feature_columns).summary().filter("summary = 'mean' or summary = 'stddev'").collect()
means = {row['summary']: {col: row[col] for col in feature_columns} for row in summary}

def standardize(col, mean, stddev):
    return (col - mean) / stddev

for col_name in feature_columns:
    mean = float(means['mean'][col_name])
    stddev = float(means['stddev'][col_name])
    data = data.withColumn(col_name, standardize(col(col_name), mean, stddev))

                                                                                

In [16]:
# Tạo view tạm thời
data.createOrReplaceTempView("car_rates_view")

# Truy vấn dữ liệu
sqlDF = spark.sql("""
SELECT Brand, Model, General_rate, Year
FROM car_rates_view
""")

# Hiển thị kết quả truy
sqlDF.show()

+-----+-----------------+--------------------+----+
|Brand|            Model|        General_rate|Year|
+-----+-----------------+--------------------+----+
|Acura|         Integra.|-0.08643390285902246|2023|
|Acura|             MDX.|1.252723235112115...|2023|
|Acura|             TLX.|1.252723235112115...|2023|
|Acura|             RDX.|1.252723235112115...|2023|
|Acura|             TLX.|  0.5546756306465788|2022|
|Acura|             MDX.| 0.23412086389377956|2022|
|Acura|             RDX.|  0.5546756306465788|2022|
|Acura|             ILX.|1.252723235112115...|2022|
|Acura|             NSX.|1.252723235112115...|2022|
|Acura|             ILX.|  0.8752303973993808|2021|
|Acura|             TLX.|-0.08643390285902246|2021|
|Acura|             RDX.|  0.5546756306465788|2021|
|Acura|             NSX.|1.252723235112115...|2021|
|Acura|             ILX.|-0.08643390285902246|2020|
|Acura|RLX Sport Hybrid.|    1.19578516415218|2020|
|Acura|             MDX.|  0.5546756306465788|2020|
|Acura|     

In [17]:
# Lưu DataFrame vào file CSV mới
data.write.mode("overwrite").csv("data_processed.csv", header=True)

                                                                                

In [18]:
# Dừng Spark session
spark.stop()