<a href="https://colab.research.google.com/github/tungdang24/big-data-recommendation-project/blob/main/Hybrid_ALS_LightGCN_RS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorly
!pip install torch-scatter -f https://data.pyg.org/whl/torch-1.10.0+cu113.html
!pip install torch-sparse -f https://data.pyg.org/whl/torch-1.10.0+cu113.html
!pip install torch-cluster -f https://data.pyg.org/whl/torch-1.10.0+cu113.html
!pip install torch-spline-conv -f https://data.pyg.org/whl/torch-1.10.0+cu113.html
!pip install torch-geometric

**Hybrid ALS & LightGCN**

In [70]:
import os
import numpy as np
import pandas as pd
import torch
from torch_geometric.data import Data
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, lit, row_number, sum as sql_sum, count
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
import matplotlib.pyplot as plt
from matplotlib import style
import zipfile
import urllib.request
from typing import Dict, Tuple, List
import uuid

# Khởi tạo Spark session với cấu hình tối ưu
spark = SparkSession.builder \
    .appName("HybridRecommendationSystem") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.default.parallelism", 100) \
    .config("spark.sql.shuffle.partitions", 200) \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "4g") \
    .master("local[*]") \
    .getOrCreate()

# Cấu hình cho hệ thống
config_dict = {
    "num_samples_per_user": 500,
    "num_users": 200,
    "embedding_size": 64,
    "num_layers": 5,
    "K": 10,
    "mf_rank": 8,
    "model_name": "model.pth"
}

# Đường dẫn dữ liệu MovieLens 1M
DATA_URL = "https://files.grouplens.org/datasets/movielens/ml-1m.zip"

# Tải và giải nén dữ liệu MovieLens
def download_and_extract() -> str:
    print("Đang tải và giải nén dữ liệu MovieLens...")
    raw_dir = "/content/raw"
    os.makedirs(raw_dir, exist_ok=True)
    zip_path = os.path.join(raw_dir, "ml-1m.zip")
    urllib.request.urlretrieve(DATA_URL, zip_path)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(raw_dir)
    return raw_dir

# Tải dữ liệu MovieLens vào Spark DataFrame
def load_movielens_data(raw_dir: str) -> tuple:
    print("Đang tải dữ liệu MovieLens vào Spark...")
    # Tạo DataFrame từ ratings.dat
    ratings = pd.read_table(
        os.path.join(raw_dir, 'ml-1m/ratings.dat'),
        sep='::', header=None, names=['user_id', 'movie_id', 'rating', 'timestamp'],
        engine='python', encoding='latin-1'
    )
    # Giới hạn số lượng người dùng
    if config_dict["num_users"] != -1:
        selected_users = ratings['user_id'].unique()[:config_dict["num_users"]]
        ratings = ratings[ratings['user_id'].isin(selected_users)]
    ratings_df = spark.createDataFrame(ratings)
    ratings_df = ratings_df.repartition(200, "user_id").cache()

    # Tạo DataFrame từ movies.dat
    movies = pd.read_table(
        os.path.join(raw_dir, 'ml-1m/movies.dat'),
        sep='::', header=None, names=['movie_id', 'title', 'genres'],
        engine='python', encoding='latin-1'
    )
    # Chỉ giữ các phim có trong ratings
    movie_ids = ratings['movie_id'].unique()
    movies = movies[movies['movie_id'].isin(movie_ids)]
    movies_df = spark.createDataFrame(movies).cache()

    # Tạo DataFrame từ users.dat
    users = pd.read_table(
        os.path.join(raw_dir, 'ml-1m/users.dat'),
        sep='::', header=None, names=['user_id', 'gender', 'age', 'occupation', 'zip_code'],
        engine='python', encoding='latin-1'
    )
    users_df = spark.createDataFrame(users).cache()

    return ratings_df, movies_df, users_df

# Dự đoán LightGCN
def get_lightgcn_predictions(model, data, user_ids: List[int], item_ids: List[int], device: str = 'cpu') -> np.ndarray:
    print("Đang tạo dự đoán LightGCN...")
    model.eval()
    model.to(device)
    all_users_items = model(model.embedding_user_item.weight.clone(), data["edge_index"])

    num_users = len(data["users"])
    num_items = len(data["items"])
    print(f"Kích thước embedding: users={num_users}, items={num_items}, expected items={len(item_ids)}")

    if num_items != len(item_ids):
        print(f"Cảnh báo: Số lượng item không khớp (data: {num_items}, expected: {len(item_ids)}). Điều chỉnh ma trận điểm số...")

    all_users = all_users_items[:num_users]
    items_emb = all_users_items[num_users:num_users + num_items]

    # Kiểm tra chỉ số người dùng
    valid_user_indices = [i for i, uid in enumerate(data["users"]) if uid in user_ids]
    if not valid_user_indices:
        raise ValueError("Không tìm thấy user_ids hợp lệ trong dữ liệu LightGCN")

    user_indices = torch.tensor(valid_user_indices, dtype=torch.long).to(device)
    users_emb = all_users[user_indices]
    scores = torch.matmul(users_emb, items_emb.t())
    scores = model.f(scores).cpu().detach().numpy()

    # Tạo ma trận đầy đủ cho tất cả user_ids và item_ids
    full_scores = np.zeros((len(user_ids), len(item_ids)))
    item_id_to_index = {item_id: idx for idx, item_id in enumerate(data["items"])}

    for i, uid in enumerate(user_ids):
        if uid in data["users"]:
            user_idx = data["users"].index(uid)
            if user_idx < num_users:
                valid_user_idx = valid_user_indices.index(user_idx)
                for j, item_id in enumerate(item_ids):
                    if item_id in item_id_to_index:
                        item_idx = item_id_to_index[item_id]
                        if item_idx < num_items:
                            full_scores[i, j] = scores[valid_user_idx, item_idx]

    return full_scores

# Chuyển dự đoán LightGCN sang Spark DataFrame
def lightgcn_predictions_to_df(predictions: np.ndarray, user_ids: List[int], item_ids: List[int]):
    print("Đang chuyển dự đoán LightGCN sang Spark DataFrame...")
    data = []
    for i, user_id in enumerate(user_ids):
        for j, item_id in enumerate(item_ids):
            if i < predictions.shape[0] and j < predictions.shape[1]:
                data.append((int(user_id), int(item_id), float(predictions[i, j])))

    df = spark.createDataFrame(data, ["user_id", "movie_id", "lightgcn_score"])
    df = df.repartition(200, "user_id")
    return df

# Huấn luyện mô hình ALS
def train_als(ratings_df, rank: int = 10, reg_param: float = 0.1, max_iter: int = 10):
    print("Đang huấn luyện mô hình ALS...")
    als = ALS(
        rank=rank,
        maxIter=max_iter,
        regParam=reg_param,
        userCol="user_id",
        itemCol="movie_id",
        ratingCol="rating",
        coldStartStrategy="drop",
        nonnegative=True
    )
    model = als.fit(ratings_df)
    return model

# Tạo dự đoán ALS
def get_als_predictions(als_model, ratings_df):
    print("Đang tạo dự đoán ALS...")
    predictions = als_model.transform(ratings_df)
    predictions = predictions.select("user_id", "movie_id", col("prediction").alias("als_score"))
    return predictions

# Kết hợp dự đoán LightGCN và ALS
def combine_predictions(lightgcn_df, als_df, lightgcn_weight: float = 0.7, als_weight: float = 0.3):
    print("Đang kết hợp dự đoán LightGCN và ALS...")
    combined_df = lightgcn_df.join(als_df, ["user_id", "movie_id"], "inner")

    lightgcn_max = combined_df.agg({"lightgcn_score": "max"}).collect()[0][0] or 1.0
    als_max = combined_df.agg({"als_score": "max"}).collect()[0][0] or 1.0

    combined_df = combined_df.withColumn("lightgcn_score_norm", col("lightgcn_score") / lightgcn_max)
    combined_df = combined_df.withColumn("als_score_norm", col("als_score") / als_max)

    combined_df = combined_df.withColumn(
        "hybrid_score",
        lightgcn_weight * col("lightgcn_score_norm") + als_weight * col("als_score_norm")
    )

    return combined_df

# Đánh giá top-K
def evaluate_topk(df, ratings_df, k: int = 10, score_col: str = "hybrid_score") -> Tuple[float, float]:
    print(f"Đang đánh giá top-{k} khuyến nghị...")
    window = Window.partitionBy("user_id").orderBy(col(score_col).desc())
    topk_df = df.withColumn("rank", row_number().over(window)) \
                .filter(col("rank") <= k) \
                .select("user_id", "movie_id")

    ground_truth = ratings_df.filter(col("rating") >= 3.0) \
                            .select("user_id", "movie_id") \
                            .withColumn("relevant", lit(1))

    eval_df = topk_df.join(ground_truth, ["user_id", "movie_id"], "left") \
                     .fillna({"relevant": 0})

    eval_df = eval_df.groupBy("user_id").agg(
        sql_sum("relevant").alias("true_positives"),
        count("*").alias("recommended")
    )

    eval_df = eval_df.withColumn("precision", col("true_positives") / col("recommended"))
    eval_df = eval_df.withColumn("recall", col("true_positives") / config_dict["num_samples_per_user"])

    avg_precision = eval_df.agg({"precision": "avg"}).collect()[0][0] or 0.0
    avg_recall = eval_df.agg({"recall": "avg"}).collect()[0][0] or 0.0

    return avg_precision, avg_recall

# Vẽ biểu đồ đánh giá
def plot_evaluation_metrics(metrics: Dict[str, Tuple[float, float]], k: int):
    print("Đang vẽ biểu đồ đánh giá...")
    style.use('ggplot')
    labels = list(metrics.keys())
    precisions = [m[0] for m in metrics.values()]
    recalls = [m[1] for m in metrics.values()]

    x = np.arange(len(labels))
    width = 0.35

    fig, ax = plt.subplots(figsize=(10, 6))
    ax.bar(x - width/2, precisions, width, label=f'Precision@{k}', color='skyblue')
    ax.bar(x + width/2, recalls, width, label=f'Recall@{k}', color='lightcoral')

    ax.set_ylabel('Score')
    ax.set_title('Model Performance Comparison')
    ax.set_xticks(x)
    ax.set_xticklabels(labels, rotation=45)
    ax.legend()

    plt.tight_layout()
    plt.savefig('evaluation_metrics.png')
    plt.close()

# Hiển thị kết quả gợi ý
def display_recommendations(combined_df, movies_df, k: int = 10, num_users: int = 5):
    print(f"Đang hiển thị top-{k} gợi ý cho {num_users} người dùng...")
    window = Window.partitionBy("user_id").orderBy(col("hybrid_score").desc())
    topk_df = combined_df.withColumn("rank", row_number().over(window)) \
                        .filter(col("rank") <= k) \
                        .select("user_id", "movie_id", "hybrid_score")

    # Kết hợp với thông tin phim
    recommendations = topk_df.join(movies_df, "movie_id", "inner") \
                            .select("user_id", "movie_id", "title", "genres", "hybrid_score") \
                            .orderBy("user_id", col("hybrid_score").desc())

    # Lấy gợi ý cho num_users người dùng đầu tiên
    user_ids = recommendations.select("user_id").distinct().limit(num_users).rdd.flatMap(lambda x: x).collect()
    for user_id in user_ids:
        print(f"\nGợi ý cho người dùng {user_id}:")
        user_recs = recommendations.filter(col("user_id") == user_id).collect()
        for rec in user_recs:
            print(f"- {rec['title']} (Genres: {rec['genres']}, Score: {rec['hybrid_score']:.4f})")

# Tinh chỉnh siêu tham số
def tune_hybrid_system(ratings_df, lightgcn_model, data, user_ids: List[int], item_ids: List[int]) -> Tuple[Dict, Dict]:
    print("Đang tinh chỉnh hệ thống kết hợp...")
    best_precision = 0
    best_params = {}
    all_metrics = {}

    lightgcn_weights = [0.5, 0.7, 0.9]
    als_ranks = [10, 20]
    reg_params = [0.1, 0.01]

    # Đánh giá LightGCN riêng lẻ
    lightgcn_predictions = get_lightgcn_predictions(lightgcn_model, data, user_ids, item_ids)
    lightgcn_df = lightgcn_predictions_to_df(lightgcn_predictions, user_ids, item_ids)
    lightgcn_precision, lightgcn_recall = evaluate_topk(lightgcn_df, ratings_df, k=config_dict["K"], score_col="lightgcn_score")
    all_metrics["LightGCN"] = (lightgcn_precision, lightgcn_recall)

    # Đánh giá ALS riêng lẻ
    als_model = train_als(ratings_df, rank=10, reg_param=0.1)
    als_predictions = get_als_predictions(als_model, ratings_df)
    als_precision, als_recall = evaluate_topk(als_predictions, ratings_df, k=config_dict["K"], score_col="als_score")
    all_metrics["ALS"] = (als_precision, als_recall)

    # Tinh chỉnh hệ thống kết hợp
    for lw in lightgcn_weights:
        for rank in als_ranks:
            for reg in reg_params:
                print(f"Thử LightGCN weight={lw}, ALS rank={rank}, reg_param={reg}")
                als_model = train_als(ratings_df, rank=rank, reg_param=reg)
                als_predictions = get_als_predictions(als_model, ratings_df)

                lightgcn_predictions = get_lightgcn_predictions(lightgcn_model, data, user_ids, item_ids)
                lightgcn_df = lightgcn_predictions_to_df(lightgcn_predictions, user_ids, item_ids)

                combined_df = combine_predictions(lightgcn_df, als_predictions, lightgcn_weight=lw, als_weight=1-lw)
                precision, recall = evaluate_topk(combined_df, ratings_df, k=config_dict["K"])

                print(f"Precision: {precision:.4f}, Recall: {recall:.4f}")
                all_metrics[f"Hybrid_lw{lw}_rank{rank}_reg{reg}"] = (precision, recall)

                if precision > best_precision:
                    best_precision = precision
                    best_params = {"lightgcn_weight": lw, "als_rank": rank, "reg_param": reg}

    return best_params, all_metrics

# Hàm chính
def main():
    print("Khởi động hệ thống khuyến nghị kết hợp...")

    # Tải dữ liệu
    raw_dir = download_and_extract()
    ratings_df, movies_df, users_df = load_movielens_data(raw_dir)

    # Tải mô hình LightGCN
    try:
        lightgcn_model = torch.load(config_dict["model_name"], weights_only=False)
    except FileNotFoundError:
        print("Không tìm thấy model.pth. Vui lòng cung cấp mô hình LightGCN đã huấn luyện.")
        return

    # Chuẩn bị dữ liệu cho LightGCN
    users = sorted(ratings_df.select("user_id").distinct().rdd.flatMap(lambda x: x).collect())
    items = sorted(ratings_df.select("movie_id").distinct().rdd.flatMap(lambda x: x).collect())
    num_users = len(users)
    num_items = len(items)
    print(f"Số lượng người dùng: {num_users}, Số lượng phim: {num_items}")

    user_to_id = {u: i for i, u in enumerate(users)}
    item_to_id = {i: j for j, i in enumerate(items)}

    # Tạo ma trận định dạng cho LightGCN
    rat = torch.zeros(num_users, num_items)
    for row in ratings_df.collect():
        user, item, rating = row["user_id"], row["movie_id"], row["rating"]
        if user in user_to_id and item in item_to_id:
            rat[user_to_id[user], item_to_id[item]] = 1 if rating >= 3 else 0

    data = Data(edge_index=rat, raw_edge_index=rat.clone(), users=users, items=items)

    # Tinh chỉnh hệ thống
    best_params, all_metrics = tune_hybrid_system(ratings_df, lightgcn_model, data, users, items)

    # Xuất thông tin mô hình
    print("=== Thông tin mô hình kết hợp ===")
    print(f"Tham số tốt nhất: {best_params}")
    print("Hiệu suất các mô hình:")
    for model, (precision, recall) in all_metrics.items():
        print(f"{model}: Precision@{config_dict['K']} = {precision:.4f}, Recall@{config_dict['K']} = {recall:.4f}")

    # Vẽ biểu đồ
    plot_evaluation_metrics(all_metrics, k=config_dict["K"])
    print("Biểu đồ đánh giá đã được lưu tại 'evaluation_metrics.png'")

    # Huấn luyện mô hình cuối cùng với tham số tốt nhất
    als_model = train_als(ratings_df, rank=best_params["als_rank"], reg_param=best_params["reg_param"])
    als_predictions = get_als_predictions(als_model, ratings_df)
    lightgcn_predictions = get_lightgcn_predictions(lightgcn_model, data, users, items)
    lightgcn_df = lightgcn_predictions_to_df(lightgcn_predictions, users, items)

    combined_df = combine_predictions(
        lightgcn_df, als_predictions,
        lightgcn_weight=best_params["lightgcn_weight"],
        als_weight=1-best_params["lightgcn_weight"]
    )

    # Hiển thị kết quả gợi ý
    display_recommendations(combined_df, movies_df, k=config_dict["K"], num_users=5)

    # Lưu dự đoán
    combined_df.write.mode("overwrite").parquet("hybrid_recommendations.parquet")
    print("Dự đoán đã được lưu tại 'hybrid_recommendations.parquet'")

    # Dọn dẹp
    ratings_df.unpersist()
    movies_df.unpersist()
    users_df.unpersist()
    spark.stop()

if __name__ == "__main__":
    main()

Khởi động hệ thống khuyến nghị kết hợp...
Đang tải và giải nén dữ liệu MovieLens...
Đang tải dữ liệu MovieLens vào Spark...
Số lượng người dùng: 200, Số lượng phim: 2833
Đang tinh chỉnh hệ thống kết hợp...
Đang tạo dự đoán LightGCN...
Kích thước embedding: users=200, items=2833, expected items=2833
Đang chuyển dự đoán LightGCN sang Spark DataFrame...
Đang đánh giá top-10 khuyến nghị...
Đang huấn luyện mô hình ALS...
Đang tạo dự đoán ALS...
Đang đánh giá top-10 khuyến nghị...
Thử LightGCN weight=0.5, ALS rank=10, reg_param=0.1
Đang huấn luyện mô hình ALS...
Đang tạo dự đoán ALS...
Đang tạo dự đoán LightGCN...
Kích thước embedding: users=200, items=2833, expected items=2833
Đang chuyển dự đoán LightGCN sang Spark DataFrame...
Đang kết hợp dự đoán LightGCN và ALS...
Đang đánh giá top-10 khuyến nghị...
Precision: 0.9985, Recall: 0.0200
Thử LightGCN weight=0.5, ALS rank=10, reg_param=0.01
Đang huấn luyện mô hình ALS...
Đang tạo dự đoán ALS...
Đang tạo dự đoán LightGCN...
Kích thước embeddin