In [1]:
import os
import sys
import pyspark

# Set PYSPARK_PYTHON to the current Python executable
os.environ["PYSPARK_PYTHON"] = sys.executable
print("PYSPARK_PYTHON:", os.environ["PYSPARK_PYTHON"])
print("PySpark version:", pyspark.__version__)


from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lit, isnan, sum
from pyspark.ml.recommendation import ALS
import itertools
import math
import pandas as pd

spark = (
    SparkSession.builder
    .appName("AmazonReviewsALS")
    # Try using fewer cores to reduce thread contention
    .master("local[4]")  # instead of local[*]
    # Increase memory limits
    .config("spark.driver.memory", "16g")      # 16 GB for driver
    .config("spark.executor.memory", "8g")     # 8 GB for executors
    .config("spark.driver.maxResultSize", "4g")  # prevent collect() overflows
    # Optional tweaks for large ALS or shuffle-heavy operations
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.default.parallelism", "200")
    .getOrCreate()
)


PYSPARK_PYTHON: c:\utecode\bdp\aws-review23\.venv\Scripts\python.exe
PySpark version: 3.5.6


In [2]:

user_index = spark.read.parquet("mappings/user_index.parquet")
item_index = spark.read.parquet("mappings/item_index.parquet")
item_means = spark.read.parquet("preprocessed/item_means.parquet")

train_df_norm = spark.read.parquet("preprocessed/train_norm.parquet")
valid_df_norm = spark.read.parquet("preprocessed/valid_norm.parquet")
test_df_norm  = spark.read.parquet("preprocessed/test_norm.parquet")

user_index.show(5)
test_df_norm.show()

print(f"Số dòng train + valid + test: {train_df_norm.count()} + {valid_df_norm.count()} + {test_df_norm.count()} = {train_df_norm.count() + valid_df_norm.count() + test_df_norm.count()}")

global_mean = train_df_norm.select(F.avg("rating")).first()[0]
print(f"Global mean: {global_mean}")



+--------------------+---------+
|             user_id|userIndex|
+--------------------+---------+
|AE22236AFRRSMQIKG...|        0|
|AE222H3FGXWLHRFUM...|        1|
|AE224QIIILW6WVFAE...|        2|
|AE224XBMLKDOWJRHA...|        3|
|AE2255XXPI47TT6JO...|        4|
+--------------------+---------+
only showing top 5 rows

+---------+-----------+---------+--------------------+------+--------------------+------------------+
|itemIndex|parent_asin|userIndex|             user_id|rating|         rating_norm|         item_mean|
+---------+-----------+---------+--------------------+------+--------------------+------------------+
|    89574| B0C7CXW1JB|   193523|AHXOCRWNKC552ESFF...|   4.0| -0.7849872773536894|4.7849872773536894|
|    21220| B00N9RIBIO|    41477|AEV2AM5IEUVKSKLEJ...|   5.0|  0.5571428571428569| 4.442857142857143|
|    82529| B0B833LBX2|     9114|AE7YYSYFEWAYIUCAK...|   5.0|  0.4991011434207744| 4.500898856579226|
|    87656| B0BWKDNFBB|    97066|AFZ7R6552CQZPXG42...|   3.0|     

In [3]:
from pyspark.ml.evaluation import RegressionEvaluator

rmse_evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="pred_rating"
)
mae_evaluator = RegressionEvaluator(
    metricName="mae",
    labelCol="rating",
    predictionCol="pred_rating"
)

In [4]:
def get_restore_rating(model, df_norm):
    pred_test = model.transform(df_norm)
    predictions = pred_test.withColumn(
        "pred_rating", 
        when(isnan(col("prediction")), lit(0.0)).otherwise(col("prediction")) + col("item_mean")
    )
    return predictions


In [5]:
import os
import math
import itertools
import pandas as pd
from pyspark.ml.recommendation import ALS

os.makedirs("results", exist_ok=True)

# Hyperparameter grid
ranks = [10]
regParams = [0.01, 0.1, 1.0]
maxIters = [5, 10, 15]

best_score = math.inf
best_params = None
results = []

print("Starting grid search on validation set...")

for rank, regParam, maxIter in itertools.product(ranks, regParams, maxIters):
    print(f"Testing rank={rank}, regParam={regParam}, maxIter={maxIter}...", end=" ")

    als = ALS(
        rank=rank,
        regParam=regParam,
        maxIter=maxIter,
        userCol="userIndex",
        itemCol="itemIndex",
        ratingCol="rating_norm",
        seed=42,
        coldStartStrategy="drop"
    )

    try:
        model = als.fit(train_df_norm)

        # ---- Compute on TRAIN ----
        pred_train = get_restore_rating(model, train_df_norm)
        rmse_train = rmse_evaluator.evaluate(pred_train)
        mae_train = mae_evaluator.evaluate(pred_train)

        # ---- Compute on VALID ----
        pred_valid = get_restore_rating(model, valid_df_norm)
        rmse_valid = rmse_evaluator.evaluate(pred_valid)
        mae_valid = mae_evaluator.evaluate(pred_valid)

    except Exception as e:
        print(f"Failed -> {e}")
        continue

    print(f"RMSE_valid={rmse_valid:.4f}, MAE_valid={mae_valid:.4f}, RMSE_train={rmse_train:.4f}")

    # Store results
    results.append({
        "rank": rank,
        "regParam": regParam,
        "maxIter": maxIter,
        "RMSE_train": rmse_train,
        "MAE_train": mae_train,
        "RMSE_valid": rmse_valid,
        "MAE_valid": mae_valid
    })

    # Update best
    if rmse_valid + mae_valid < best_score:
        best_score = rmse_valid + mae_valid
        best_params = {"rank": rank, "regParam": regParam, "maxIter": maxIter}
        best_rmse = rmse_valid
        best_mae = mae_valid

print("Grid search finished.")
print(f"Best validation RMSE: {best_rmse:.4f}, MAE: {best_mae:.4f} with params: {best_params}")

# Save results as CSV
df_results = pd.DataFrame(results)
df_results.to_csv("results/als_grid_results.csv", index=False)
print("✅ Saved results to results/als_grid_results.csv")


Starting grid search on validation set...
Testing rank=10, regParam=0.01, maxIter=5... RMSE_valid=1.4950, MAE_valid=0.9857, RMSE_train=0.1684
Testing rank=10, regParam=0.01, maxIter=10... RMSE_valid=1.4519, MAE_valid=0.9677, RMSE_train=0.1251
Testing rank=10, regParam=0.01, maxIter=15... RMSE_valid=1.4389, MAE_valid=0.9616, RMSE_train=0.1073
Testing rank=10, regParam=0.1, maxIter=5... RMSE_valid=1.3461, MAE_valid=0.8895, RMSE_train=0.3303
Testing rank=10, regParam=0.1, maxIter=10... RMSE_valid=1.3413, MAE_valid=0.8858, RMSE_train=0.3067
Testing rank=10, regParam=0.1, maxIter=15... RMSE_valid=1.3335, MAE_valid=0.8787, RMSE_train=0.2975
Testing rank=10, regParam=1.0, maxIter=5... RMSE_valid=1.2829, MAE_valid=0.8518, RMSE_train=0.9710
Testing rank=10, regParam=1.0, maxIter=10... RMSE_valid=1.2829, MAE_valid=0.8518, RMSE_train=0.9713
Testing rank=10, regParam=1.0, maxIter=15... RMSE_valid=1.2829, MAE_valid=0.8518, RMSE_train=0.9713
Grid search finished.
Best validation RMSE: 1.2829, MAE: 0

In [6]:
print("Retraining final model on train using best params...")

final_als = ALS(
    rank=best_params["rank"],
    regParam=best_params["regParam"],
    maxIter=best_params["maxIter"],
    userCol="userIndex",
    itemCol="itemIndex",
    ratingCol="rating_norm",
    seed=42
)

final_model = final_als.fit(train_df_norm)
pred_train = get_restore_rating(final_model, train_df_norm)
pred_valid = get_restore_rating(final_model, valid_df_norm)
pred_test = get_restore_rating(final_model, test_df_norm)
pred_test.show()

Retraining final model on train using best params...
+---------+-----------+---------+--------------------+------+--------------------+------------------+-------------+-----------------+
|itemIndex|parent_asin|userIndex|             user_id|rating|         rating_norm|         item_mean|   prediction|      pred_rating|
+---------+-----------+---------+--------------------+------+--------------------+------------------+-------------+-----------------+
|    77422| B09PFQ3NJW|      970|AE2OBGULEEQ6SQPV6...|   5.0|  0.4991011434207744| 4.500898856579226|          NaN|4.500898856579226|
|    85321| B0BL98LTHX|     1591|AE33OVGN5CPQU63AF...|   5.0|  0.4991011434207744| 4.500898856579226|          NaN|4.500898856579226|
|    87656| B0BWKDNFBB|     1903|AE3BPACGVE4BOJN3A...|   5.0|                0.75|              4.25|          NaN|             4.25|
|    89476| B0C6KS2P7Z|     4900|AE5AEJHZMHQYKXBE6...|   5.0|  0.4991011434207744| 4.500898856579226|          NaN|4.500898856579226|
|    8033

In [7]:
os.makedirs("embeddings", exist_ok=True)

# User embeddings
final_model.userFactors.write.mode("overwrite").parquet("embeddings/user_factors.parquet")

# Item embeddings
final_model.itemFactors.write.mode("overwrite").parquet("embeddings/item_factors.parquet")

print("✅ Saved user/item embeddings to 'embeddings/' folder")


✅ Saved user/item embeddings to 'embeddings/' folder


In [8]:

# Kiểm tra số NULL trong từng cột
pred_test.select([sum(isnan(col(c)).cast("int")).alias(c) for c in pred_test.columns]).show()


+---------+-----------+---------+-------+------+-----------+---------+----------+-----------+
|itemIndex|parent_asin|userIndex|user_id|rating|rating_norm|item_mean|prediction|pred_rating|
+---------+-----------+---------+-------+------+-----------+---------+----------+-----------+
|        0|          0|        0|      0|     0|          0|        0|    136838|          0|
+---------+-----------+---------+-------+------+-----------+---------+----------+-----------+



In [9]:
# Tính RMSE/MAE cho train
train_rmse = rmse_evaluator.evaluate(pred_train)
train_mae  = mae_evaluator.evaluate(pred_train)

# Tính RMSE/MAE cho valid
valid_rmse = rmse_evaluator.evaluate(pred_valid)
valid_mae  = mae_evaluator.evaluate(pred_valid)

# Tính RMSE/MAE cho test
test_rmse = rmse_evaluator.evaluate(pred_test)
test_mae  = mae_evaluator.evaluate(pred_test)

# In kết quả
print(f"Train RMSE: {train_rmse:.4f}, MAE: {train_mae:.4f}")
print(f"Valid RMSE: {valid_rmse:.4f}, MAE: {valid_mae:.4f}")
print(f"Test  RMSE: {test_rmse:.4f}, MAE: {test_mae:.4f}")


Train RMSE: 0.9713, MAE: 0.6615
Valid RMSE: 1.2499, MAE: 0.8396
Test  RMSE: 1.1078, MAE: 0.7635


In [10]:

items_per_user = (
    pred_test.groupBy("userIndex")
    .agg(F.countDistinct("itemIndex").alias("num_items"))
    .orderBy(F.asc("userIndex"))
)

items_per_user.show(10)  # show top 10 users with most test items


+---------+---------+
|userIndex|num_items|
+---------+---------+
|        2|       10|
|        4|        1|
|        5|        7|
|        6|        1|
|        7|        3|
|        8|        7|
|       11|        1|
|       17|        1|
|       19|        7|
|       24|        2|
+---------+---------+
only showing top 10 rows



In [11]:

# Define window partitioned by user, ordered by rating (ground truth)
window_true = Window.partitionBy("userIndex").orderBy(F.col("rating").desc())

# Define window partitioned by user, ordered by prediction (model output)
window_pred = Window.partitionBy("userIndex").orderBy(F.col("pred_rating").desc())

# Add both ranking columns
ranked_df = (
    pred_test
    .withColumn("true_rank", F.row_number().over(window_true))
    .withColumn("pred_rank", F.row_number().over(window_pred))
)

ranked_df.show(50)


+---------+-----------+---------+--------------------+------+-------------------+------------------+-------------+------------------+---------+---------+
|itemIndex|parent_asin|userIndex|             user_id|rating|        rating_norm|         item_mean|   prediction|       pred_rating|true_rank|pred_rank|
+---------+-----------+---------+--------------------+------+-------------------+------------------+-------------+------------------+---------+---------+
|    66562| B08MF1FFWK|       28|AE22PPDYXIJXG66VR...|   5.0|                0.0|               5.0|          0.0|               5.0|        1|        1|
|    83352| B0BBZQTYZL|       28|AE22PPDYXIJXG66VR...|   1.0|-3.4000000000000004|               4.4|-3.2575613E-8| 4.399999967424387|        2|        2|
|    83463| B0BCHB3683|       65|AE23CIZ4OTQEFKSRO...|   5.0| 0.3061224489795915|4.6938775510204085|-3.8997115E-8| 4.693877512023294|        1|        1|
|    76378| B09LCLJGML|       78|AE23K3IUSXN4BIL4A...|   4.0|-0.500898856579

In [12]:
ranked_df.write.mode("overwrite").parquet("results/ranked_predictions.parquet")


NDCG

In [13]:


k = 10  # top-K

# --- Compute DCG using predicted rank ---
dcg_df = (ranked_df
    .withColumn(
        "dcg_component", 
        F.col("rating") / F.log2(F.col("pred_rank") + 1)
    )
    .where(F.col("pred_rank") <= k)  # only top-K items
    .groupBy("userIndex")
    .agg(F.sum("dcg_component").alias("dcg"))
)
    
dcg_df.show(5)



+---------+------------------+
|userIndex|               dcg|
+---------+------------------+
|        2| 22.71779669044173|
|        4|               3.0|
|        5|15.380727460522657|
|        6|               1.0|
|        7|10.654648767857287|
+---------+------------------+
only showing top 5 rows



In [14]:
# --- Compute IDCG using ideal (true) rank ---
idcg_df = (
    ranked_df
    .withColumn(
        "idcg_component",
        F.col("rating") / F.log2(F.col("true_rank") + 1)
    )
    .where(F.col("true_rank") <= k)  # only top-K ideal items
    .groupBy("userIndex")
    .agg(F.sum("idcg_component").alias("idcg"))
)
idcg_df.show(5)


+---------+------------------+
|userIndex|              idcg|
+---------+------------------+
|        2| 22.71779669044173|
|        4|               3.0|
|        5| 16.14425048905436|
|        6|               1.0|
|        7|10.654648767857287|
+---------+------------------+
only showing top 5 rows



In [15]:

# --- Combine and compute NDCG per user ---
ndcg_df = (
    dcg_df.join(idcg_df, on="userIndex", how="inner")
    .withColumn("ndcg", F.col("dcg") / F.col("idcg"))
)
ndcg_df.show(5)


+---------+------------------+------------------+-----------------+
|userIndex|               dcg|              idcg|             ndcg|
+---------+------------------+------------------+-----------------+
|        2| 22.71779669044173| 22.71779669044173|              1.0|
|        4|               3.0|               3.0|              1.0|
|        5|15.380727460522657| 16.14425048905436|0.952706195369716|
|        6|               1.0|               1.0|              1.0|
|        7|10.654648767857287|10.654648767857287|              1.0|
+---------+------------------+------------------+-----------------+
only showing top 5 rows



In [16]:

# --- Compute average NDCG across all users ---
avg_ndcg = ndcg_df.agg(F.mean("ndcg").alias("NDCG@10")).collect()[0]["NDCG@10"]

print(f"NDCG@10: {avg_ndcg:.4f}")

NDCG@10: 0.9821


Precision@K

In [17]:
rating_threshold = 4.0  # Define what counts as "relevant"

# 1️. Mark relevant items (based on ground truth)
ranked_df = ranked_df.withColumn(
    "is_relevant",
    F.when(F.col("rating") >= rating_threshold, 1).otherwise(0)
)
ranked_df.show(5)


+---------+-----------+---------+--------------------+------+-------------------+------------------+-------------+-----------------+---------+---------+-----------+
|itemIndex|parent_asin|userIndex|             user_id|rating|        rating_norm|         item_mean|   prediction|      pred_rating|true_rank|pred_rank|is_relevant|
+---------+-----------+---------+--------------------+------+-------------------+------------------+-------------+-----------------+---------+---------+-----------+
|    66562| B08MF1FFWK|       28|AE22PPDYXIJXG66VR...|   5.0|                0.0|               5.0|          0.0|              5.0|        1|        1|          1|
|    83352| B0BBZQTYZL|       28|AE22PPDYXIJXG66VR...|   1.0|-3.4000000000000004|               4.4|-3.2575613E-8|4.399999967424387|        2|        2|          0|
|    83463| B0BCHB3683|       65|AE23CIZ4OTQEFKSRO...|   5.0| 0.3061224489795915|4.6938775510204085|-3.8997115E-8|4.693877512023294|        1|        1|          1|
|    76378

In [18]:


# 2️. Keep only the top-K predicted items per user
K = 10
window_pred = Window.partitionBy("userIndex").orderBy(F.col("pred_rank"))
topK_df = ranked_df.filter(F.col("pred_rank") <= K)
topK_df.show(20)


+---------+-----------+---------+--------------------+------+-------------------+------------------+-------------+------------------+---------+---------+-----------+
|itemIndex|parent_asin|userIndex|             user_id|rating|        rating_norm|         item_mean|   prediction|       pred_rating|true_rank|pred_rank|is_relevant|
+---------+-----------+---------+--------------------+------+-------------------+------------------+-------------+------------------+---------+---------+-----------+
|    66562| B08MF1FFWK|       28|AE22PPDYXIJXG66VR...|   5.0|                0.0|               5.0|          0.0|               5.0|        1|        1|          1|
|    83352| B0BBZQTYZL|       28|AE22PPDYXIJXG66VR...|   1.0|-3.4000000000000004|               4.4|-3.2575613E-8| 4.399999967424387|        2|        2|          0|
|    83463| B0BCHB3683|       65|AE23CIZ4OTQEFKSRO...|   5.0| 0.3061224489795915|4.6938775510204085|-3.8997115E-8| 4.693877512023294|        1|        1|          1|
|   

In [19]:
# 3. Compute precision@K per user
precision_per_user = topK_df.groupBy("userIndex") \
    .agg(
        (F.sum("is_relevant") / F.count("itemIndex")).alias("precision_at_k")
    ).orderBy(F.asc("userIndex"))
precision_per_user.show(5)


+---------+------------------+
|userIndex|    precision_at_k|
+---------+------------------+
|        2|               1.0|
|        4|               0.0|
|        5|0.7142857142857143|
|        6|               0.0|
|        7|               1.0|
+---------+------------------+
only showing top 5 rows



In [20]:

# 4. Compute overall average precision@K across all users
overall_precision = precision_per_user.agg(F.mean("precision_at_k").alias("mean_precision_at_k"))

overall_precision.show()

+-------------------+
|mean_precision_at_k|
+-------------------+
| 0.7952298044563032|
+-------------------+

