# PySpark Clustering Models

Notebook này triển khai các mô hình clustering sử dụng PySpark theo yêu cầu đề bài:
- KMeans
- Gaussian Mixture Model (GMM)
- Bisecting K-Means




In [17]:
import pandas as pd
import numpy as np
import re

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator

# Khởi tạo Spark session
spark = SparkSession.builder \
    .appName("MotorbikeClustering") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

from pathlib import Path
import os

# Sử dụng đường dẫn tương đối từ notebook
NOTEBOOK_DIR = Path.cwd()
if "notebooks" in str(NOTEBOOK_DIR):
    PROJECT_ROOT = NOTEBOOK_DIR.parent
else:
    PROJECT_ROOT = NOTEBOOK_DIR

# Thử các tên file có thể có
possible_filenames = [
    "data_motobikes.xlsx - Sheet1.csv",
    "data_motobikes.csv",
    "data_motorbikes.xlsx - Sheet1.csv",
    "data_motorbikes.csv",
]

DATA_PATH = None

# Tìm file trong thư mục project
for filename in possible_filenames:
    test_path = PROJECT_ROOT / filename
    if test_path.exists():
        DATA_PATH = test_path
        break

# Nếu không tìm thấy, thử đường dẫn tuyệt đối
if DATA_PATH is None or not DATA_PATH.exists():
    abs_path = Path("/Users/doananh/Documents/Documents - Doan's MacBook Pro/đồ án DS/project2")
    for filename in possible_filenames:
        test_path = abs_path / filename
        if test_path.exists():
            DATA_PATH = test_path
            break

# Nếu vẫn không tìm thấy, liệt kê các file CSV có sẵn
if DATA_PATH is None or not DATA_PATH.exists():
    csv_files = list(PROJECT_ROOT.glob("*.csv"))
    if csv_files:
        print(f"⚠️ Không tìm thấy file với tên chuẩn. Các file CSV có sẵn trong thư mục:")
        for f in csv_files:
            print(f"  - {f.name}")
        DATA_PATH = csv_files[0]  # Sử dụng file CSV đầu tiên tìm thấy
        print(f"✅ Sử dụng file: {DATA_PATH.name}")
    else:
        raise FileNotFoundError(
            f"Không tìm thấy file dữ liệu. Đã tìm trong:\n"
            f"  - {PROJECT_ROOT}\n"
            f"  - {abs_path if 'abs_path' in locals() else 'N/A'}\n"
            f"Vui lòng kiểm tra đường dẫn file dữ liệu!"
        )

RANDOM_SEED = 42
print(f"✅ Đường dẫn dữ liệu: {DATA_PATH}")
print(f"✅ File tồn tại: {DATA_PATH.exists()}")


✅ Đường dẫn dữ liệu: /Users/doananh/Documents/Documents - Doan’s MacBook Pro/đồ án DS/project2/data_motobikes.xlsx - Sheet1.csv
✅ File tồn tại: True


In [18]:
# Load và preprocess dữ liệu (sử dụng lại code từ notebook 09)
def parse_price(value):
    if pd.isna(value):
        return None
    value = str(value).lower()
    if value in {"đang cập nhật", "liên hệ", "thỏa thuận"}:
        return None
    value = value.replace("triệu", "tr").replace("tỷ", "ty")
    value = value.replace("đ", "").replace("vnđ", "").replace("vnd", "")
    value = value.replace(" ", "")
    match = re.search(r"([0-9]+(?:[\.,][0-9]+)?)", value)
    if not match:
        return None
    number = match.group(1).replace(",", ".")
    try:
        number = float(number)
    except ValueError:
        return None
    if "ty" in value or value.endswith("ty"):
        return number * 1000.0
    return number if value.endswith("tr") else number / 1_000_000

def parse_km(value):
    if pd.isna(value):
        return None
    value = str(value).lower().replace("km", "")
    value = value.replace(",", "").replace(" ", "").replace(".", "")
    match = re.search(r"(\d+)", value)
    return float(match.group(1)) if match else None

df = pd.read_csv(str(DATA_PATH))

base_cols = [
    "Giá", "Khoảng giá min", "Khoảng giá max", "Năm đăng ký", "Số Km đã đi",
    "Thương hiệu", "Dòng xe", "Loại xe", "Dung tích xe", "Tình trạng"
]

df_model = df[base_cols + ["id"]].copy()

for col in ["Giá", "Khoảng giá min", "Khoảng giá max"]:
    df_model[col] = df_model[col].apply(parse_price)

df_model["Năm đăng ký"] = pd.to_numeric(df_model["Năm đăng ký"], errors="coerce")
df_model["Số Km đã đi"] = df_model["Số Km đã đi"].apply(parse_km)

df_model = df_model.dropna(subset=base_cols).reset_index(drop=True)

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(df_model)
numeric_cols = ["Giá", "Khoảng giá min", "Khoảng giá max", "Năm đăng ký", "Số Km đã đi"]

# Tạo feature vector
assembler = VectorAssembler(
    inputCols=numeric_cols,
    outputCol="features",
    handleInvalid="skip"
)

spark_df = assembler.transform(spark_df)

# Chuẩn hóa features
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

scaler_model = scaler.fit(spark_df)
spark_df_scaled = scaler_model.transform(spark_df)

print(f"Số lượng bản ghi: {spark_df_scaled.count()}")
print("Schema:")
spark_df_scaled.printSchema()

# Kiểm tra có dữ liệu không
sample_count = spark_df_scaled.count()
if sample_count == 0:
    raise ValueError("Không có dữ liệu sau khi preprocessing!")

print(f"\n✅ Đã load thành công {sample_count} bản ghi")


Số lượng bản ghi: 6942
Schema:
root
 |-- Giá: double (nullable = true)
 |-- Khoảng giá min: double (nullable = true)
 |-- Khoảng giá max: double (nullable = true)
 |-- Năm đăng ký: double (nullable = true)
 |-- Số Km đã đi: double (nullable = true)
 |-- Thương hiệu: string (nullable = true)
 |-- Dòng xe: string (nullable = true)
 |-- Loại xe: string (nullable = true)
 |-- Dung tích xe: string (nullable = true)
 |-- Tình trạng: string (nullable = true)
 |-- id: long (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled_features: vector (nullable = true)


✅ Đã load thành công 6942 bản ghi


In [19]:
# --- PySpark Clustering Models ---
# Kiểm tra spark_df_scaled đã được định nghĩa chưa
try:
    # Thử truy cập biến để kiểm tra
    test_df = spark_df_scaled
    print("✅ spark_df_scaled đã sẵn sàng")
except NameError:
    raise NameError(
        "❌ spark_df_scaled chưa được định nghĩa!\n"
        "⚠️ Hãy chạy Cell 2 trước (Load và preprocess dữ liệu)"
    )

K = 5
evaluator = ClusteringEvaluator(predictionCol="prediction", featuresCol="scaled_features", metricName="silhouette")

pyspark_results = []

# 1. PySpark KMeans
print("Training PySpark KMeans...")
kmeans_pyspark = KMeans(
    featuresCol="scaled_features",
    k=K,
    seed=RANDOM_SEED,
    maxIter=50
)
kmeans_model_pyspark = kmeans_pyspark.fit(spark_df_scaled)
predictions_kmeans = kmeans_model_pyspark.transform(spark_df_scaled)
sil_kmeans_pyspark = evaluator.evaluate(predictions_kmeans)

pyspark_results.append({
    "model": "PySpark KMeans",
    "silhouette": sil_kmeans_pyspark,
    "wssse": kmeans_model_pyspark.summary.trainingCost,
    "centers": kmeans_model_pyspark.clusterCenters(),
})

# 2. PySpark Gaussian Mixture Model (GMM)
print("Training PySpark GMM...")
gmm_pyspark = GaussianMixture(
    featuresCol="scaled_features",
    k=K,
    seed=RANDOM_SEED,
    maxIter=50
)
gmm_model_pyspark = gmm_pyspark.fit(spark_df_scaled)
predictions_gmm = gmm_model_pyspark.transform(spark_df_scaled)
sil_gmm_pyspark = evaluator.evaluate(predictions_gmm)

pyspark_results.append({
    "model": "PySpark GMM",
    "silhouette": sil_gmm_pyspark,
    "log_likelihood": gmm_model_pyspark.summary.logLikelihood,
    "weights": list(gmm_model_pyspark.weights) if isinstance(gmm_model_pyspark.weights, list) else (gmm_model_pyspark.weights.tolist() if isinstance(gmm_model_pyspark.weights, np.ndarray) else [float(w) for w in gmm_model_pyspark.weights]),
})

# 3. PySpark Bisecting K-Means
print("Training PySpark Bisecting K-Means...")
bisecting_kmeans_pyspark = BisectingKMeans(
    featuresCol="scaled_features",
    k=K,
    seed=RANDOM_SEED,
    maxIter=50
)
bisecting_model_pyspark = bisecting_kmeans_pyspark.fit(spark_df_scaled)
predictions_bisecting = bisecting_model_pyspark.transform(spark_df_scaled)
sil_bisecting_pyspark = evaluator.evaluate(predictions_bisecting)

pyspark_results.append({
    "model": "PySpark Bisecting K-Means",
    "silhouette": sil_bisecting_pyspark,
    "wssse": bisecting_model_pyspark.summary.trainingCost,
    "centers": bisecting_model_pyspark.clusterCenters(),
})

# Tổng hợp kết quả
pyspark_eval_df = pd.DataFrame([
    {
        "model": r["model"],
        "silhouette": r["silhouette"],
        "wssse": r.get("wssse", None),
        "log_likelihood": r.get("log_likelihood", None),
    }
    for r in pyspark_results
])

print("\n✅ Đã training thành công tất cả 3 mô hình PySpark!")
print(f"✅ Đã tạo pyspark_eval_df với {len(pyspark_results)} mô hình")
print("\n" + "="*60)
pyspark_eval_df


✅ spark_df_scaled đã sẵn sàng
Training PySpark KMeans...
Training PySpark GMM...
Training PySpark Bisecting K-Means...

✅ Đã training thành công tất cả 3 mô hình PySpark!
✅ Đã tạo pyspark_eval_df với 3 mô hình



Unnamed: 0,model,silhouette,wssse,log_likelihood
0,PySpark KMeans,0.542585,11211.198301,
1,PySpark GMM,0.392965,,38789.870328
2,PySpark Bisecting K-Means,0.165867,15288.640403,


In [20]:
# Cluster summaries cho PySpark models
def get_cluster_summary(df_pd):
    """Tạo summary cho mỗi cluster"""
    summary_list = []
    for cluster_id in sorted(df_pd["prediction"].unique()):
        cluster_data = df_pd[df_pd["prediction"] == cluster_id].copy()
        top_brands = cluster_data["Thương hiệu"].value_counts().head(3)
        top_brand_dict = top_brands.to_dict() if len(top_brands) > 0 else {}
        
        summary_list.append({
            "count": len(cluster_data),
            "avg_price": float(cluster_data["Giá"].mean()) if len(cluster_data) > 0 else 0.0,
            "median_year": float(cluster_data["Năm đăng ký"].median()) if len(cluster_data) > 0 else 0.0,
            "top_brand": top_brand_dict,
        })
    
    summary_df = pd.DataFrame(summary_list, index=sorted(df_pd["prediction"].unique()))
    summary_df.index.name = "cluster"
    return summary_df

pyspark_summaries = {}

try:
    # KMeans summary
    predictions_kmeans_pd = predictions_kmeans.select("id", "prediction", "Giá", "Năm đăng ký", "Thương hiệu").toPandas()
    pyspark_summaries["pyspark_kmeans"] = get_cluster_summary(predictions_kmeans_pd)
    
    # GMM summary
    predictions_gmm_pd = predictions_gmm.select("id", "prediction", "Giá", "Năm đăng ký", "Thương hiệu").toPandas()
    pyspark_summaries["pyspark_gmm"] = get_cluster_summary(predictions_gmm_pd)
    
    # Bisecting K-Means summary
    predictions_bisecting_pd = predictions_bisecting.select("id", "prediction", "Giá", "Năm đăng ký", "Thương hiệu").toPandas()
    pyspark_summaries["pyspark_bisecting_kmeans"] = get_cluster_summary(predictions_bisecting_pd)
except NameError as e:
    print(f"Lỗi: Các biến predictions chưa được định nghĩa. Hãy chạy cell trên trước: {e}")
except Exception as e:
    print(f"Lỗi khi tạo cluster summaries: {e}")

print("=== PySpark Clustering Evaluation ===")
try:
    if 'pyspark_eval_df' in globals():
        print(pyspark_eval_df)
    else:
        print("⚠️ Lỗi: pyspark_eval_df chưa được định nghĩa. Hãy chạy cell trên trước!")
except NameError:
    print("⚠️ Lỗi: pyspark_eval_df chưa được định nghĩa. Hãy chạy cell trên trước!")

if pyspark_summaries:
    print("\n=== PySpark Cluster Summaries ===")
    for model_name, summary in pyspark_summaries.items():
        print(f"\n{model_name}:")
        print(summary)

# Trả về kết quả nếu có
result = {}
if 'pyspark_eval_df' in globals():
    result["pyspark_evaluation"] = pyspark_eval_df
if pyspark_summaries:
    result["pyspark_summaries"] = pyspark_summaries

if result:
    result
else:
    print("\n⚠️ Chưa có kết quả. Hãy chạy các cell trên trước!")


=== PySpark Clustering Evaluation ===
                       model  silhouette         wssse  log_likelihood
0             PySpark KMeans    0.542585  11211.198301             NaN
1                PySpark GMM    0.392965           NaN    38789.870328
2  PySpark Bisecting K-Means    0.165867  15288.640403             NaN

=== PySpark Cluster Summaries ===

pyspark_kmeans:
         count  avg_price  median_year  \
cluster                                  
0         2667   0.000013       2009.0   
1           38   0.000352       2019.0   
2         3235   0.000025       2019.0   
3          726   0.000104       2019.0   
4          276   0.000015       2012.0   

                                                top_brand  
cluster                                                    
0              {'Honda': 1555, 'Yamaha': 558, 'SYM': 171}  
1        {'Harley Davidson': 7, 'Honda': 7, 'Triumph': 6}  
2          {'Honda': 1961, 'Yamaha': 732, 'Piaggio': 170}  
3             {'Honda': 599, 'P

25/11/15 21:34:37 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 931354 ms exceeds timeout 120000 ms
25/11/15 21:34:37 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/15 21:34:41 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$