In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("recommendation_ALS").getOrCreate()

In [None]:
ratings= spark.read.csv('/kaggle/input/ratings/Products_ThoiTrangNam_rating_raw.csv', header=True, sep='\t')

In [None]:
ratings.show(5)

+----------+-------+------------------+------+
|product_id|user_id|              user|rating|
+----------+-------+------------------+------+
|       190|      1|      karmakyun2nd|     5|
|       190|      2|  tranquangvinh_vv|     5|
|       190|      3|nguyenquoctoan2005|     5|
|       190|      4|    nguyenthuyhavi|     5|
|       190|      5|      luonganh5595|     5|
+----------+-------+------------------+------+
only showing top 5 rows



In [None]:
ratings.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- rating: string (nullable = true)



In [None]:
from pyspark.sql.types import IntegerType
ratings = ratings.dropna()
ratings = ratings.withColumn("rating", ratings["rating"].cast(IntegerType()))
ratings.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- rating: integer (nullable = true)



In [None]:
# drop duplicate
df = ratings.dropDuplicates()
df.count()

999815

## Thu nhỏ bộ data

In [None]:
from pyspark.sql import functions as F

In [None]:
# Ngưỡng lọc
min_user_ratings = 3
min_item_ratings = 5

# Đếm số lượng ratings của từng user và item
user_rating_counts = df.groupBy("user_id").count()
item_rating_counts = df.groupBy("product_id").count()

# Lọc các user có đủ số lượng ratings
filtered_users = user_rating_counts.filter(F.col("count") >= min_user_ratings)
filtered_items = item_rating_counts.filter(F.col("count") >= min_item_ratings)

# Thực hiện join để lọc dữ liệu gốc
filtered_df = df.join(
    filtered_users,
    on="user_id",
    how="inner"
).join(
    filtered_items,
    on="product_id",
    how="inner"
).drop(filtered_users["count"]).drop(filtered_items["count"])

# Hiển thị kích thước sau khi lọc
print("Kích thước sau lọc:", (filtered_df.count(), len(filtered_df.columns)))

Kích thước sau lọc: (298151, 4)


In [None]:
# Kiểm tra lại phân phối ratings
filtered_df.groupBy("rating").count().show()

+------+------+
|rating| count|
+------+------+
|     1| 12734|
|     3| 16311|
|     5|230371|
|     4| 31217|
|     2|  7518|
+------+------+



In [None]:
# Filter rating
rating_5 = filtered_df.filter(F.col("rating")==5)
other_ratings = filtered_df.filter(~F.col("rating").isin([5]))

# Downsample rating 5 -> 100000
rating_5_downsampled = rating_5.sample(fraction=100000/rating_5.count(), seed=42)

# Combine dataset
balanced_df = rating_5_downsampled.union(other_ratings)

In [None]:
# check distribution
balanced_df.groupby('rating').count().orderBy('rating').show()

+------+-----+
|rating|count|
+------+-----+
|     1|12734|
|     2| 7518|
|     3|16311|
|     4|31217|
|     5|99840|
+------+-----+



In [None]:
# Distinct users and products
users = balanced_df.select("user_id").distinct().count()
products = balanced_df.select("product_id").distinct().count()
numerator = balanced_df.count()

In [None]:
display(numerator, users, products)

167620

27965

20380

In [None]:
# Number of ratings matrix could contains if no empty cells
denominator = users * products
denominator

569926700

In [None]:
# Calculating sparsity
sparsity = 1 - (numerator*1.0/denominator)
print("Sparsity: ", sparsity)

Sparsity:  0.999705892003305


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

In [None]:
# Indexing cho user/item (bắt buộc với ALS)
indexer = StringIndexer(
    inputCols=["user_id", "product_id"],
    outputCols=["user_idx", "product_idx"],
    handleInvalid="keep"
)

# ALS với weight và tham số tối ưu
als = ALS(
    maxIter=20,
    regParam=0.2,
    userCol="user_idx",
    itemCol="product_idx",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=False,
    seed = 42
)

In [None]:
pipeline = Pipeline(stages=[indexer, als])
model = pipeline.fit(balanced_df)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(balanced_df)

In [None]:
predictions.select("prediction", "rating").show(5)

+----------+------+
|prediction|rating|
+----------+------+
|  4.472162|     5|
|  4.651831|     5|
| 3.3850188|     5|
|  4.412107|     5|
| 4.6885996|     5|
+----------+------+
only showing top 5 rows



In [None]:
evaluator_rmse = RegressionEvaluator(
    metricName = "rmse",
    labelCol="rating",
    predictionCol="prediction"
)

evaluator_mae = RegressionEvaluator(
    metricName= "mae",
    labelCol= "rating",
    predictionCol = "prediction"
)

In [None]:
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)

In [None]:
print(f"rmse of model: {rmse}")
print(f"mae of model: {mae}")

rmse of model: 0.7150024122101717
mae of model: 0.5242901531804534


In [None]:
als_model = model.stages[-1]

In [None]:
# Lưu model
model.save('/kaggle/working/ALS_model')

In [None]:
als_model.save('/kaggle/working/ALS_model_m')

### Đưa ra đề xuất cho user

In [None]:
user_recs = als_model.recommendForAllUsers(10)
user_recs.printSchema()

root
 |-- user_idx: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [None]:
for user in user_recs.head(3):
    print(user)

Row(user_idx=1, recommendations=[Row(product_idx=14860, rating=5.414793014526367), Row(product_idx=19262, rating=5.308726787567139), Row(product_idx=19181, rating=5.210608005523682), Row(product_idx=18369, rating=5.150238513946533), Row(product_idx=20032, rating=5.097650527954102), Row(product_idx=7756, rating=5.066824913024902), Row(product_idx=6333, rating=5.060684680938721), Row(product_idx=18797, rating=5.047205448150635), Row(product_idx=8470, rating=5.037919521331787), Row(product_idx=16507, rating=5.0319085121154785)])
Row(user_idx=12, recommendations=[Row(product_idx=19181, rating=5.477630138397217), Row(product_idx=14860, rating=5.4034600257873535), Row(product_idx=20087, rating=5.2751688957214355), Row(product_idx=18969, rating=5.174401760101318), Row(product_idx=8305, rating=5.157158374786377), Row(product_idx=16730, rating=5.153836727142334), Row(product_idx=4746, rating=5.15326452255249), Row(product_idx=20114, rating=5.152772903442383), Row(product_idx=8536, rating=5.1422

In [None]:
# Chuẩn hóa
result = user_recs.select(user_recs.user_idx, explode(user_recs.recommendations))

In [None]:
result = result.withColumn("product_id", result.col.getField("product_idx"))\
                .withColumn("rating", result.col.getField("rating"))
result.show()

+--------+------------------+----------+---------+
|user_idx|               col|product_id|   rating|
+--------+------------------+----------+---------+
|       1| {14860, 5.414793}|     14860| 5.414793|
|       1| {19262, 5.308727}|     19262| 5.308727|
|       1| {19181, 5.210608}|     19181| 5.210608|
|       1|{18369, 5.1502385}|     18369|5.1502385|
|       1|{20032, 5.0976505}|     20032|5.0976505|
|       1|  {7756, 5.066825}|      7756| 5.066825|
|       1| {6333, 5.0606847}|      6333|5.0606847|
|       1|{18797, 5.0472054}|     18797|5.0472054|
|       1| {8470, 5.0379195}|      8470|5.0379195|
|       1|{16507, 5.0319085}|     16507|5.0319085|
|      12|  {19181, 5.47763}|     19181|  5.47763|
|      12|  {14860, 5.40346}|     14860|  5.40346|
|      12| {20087, 5.275169}|     20087| 5.275169|
|      12|{18969, 5.1744018}|     18969|5.1744018|
|      12| {8305, 5.1571584}|      8305|5.1571584|
|      12|{16730, 5.1538367}|     16730|5.1538367|
|      12| {4746, 5.1532645}|  

Make recommendations

In [None]:
df_user = predictions.select('user_idx', 'user_id').distinct()
df_user.count()

27965

In [None]:
df_user.show(5)

+--------+-------+
|user_idx|user_id|
+--------+-------+
|  9920.0|   9536|
|   278.0|  14684|
| 20484.0| 142463|
|   768.0|  28015|
|  4022.0| 108188|
+--------+-------+
only showing top 5 rows



In [None]:
products = spark.read.csv('/kaggle/input/product/products_not_duplicates.csv', header=True)
products.show(5)

+----------+--------------------+--------------+------------+--------------------+--------------------+-------+------+--------------------+
|product_id|        product_name|      category|sub_category|                link|               image|  price|rating|         description|
+----------+--------------------+--------------+------------+--------------------+--------------------+-------+------+--------------------+
|       190|Áo ba lỗ thun gân...|Thời Trang Nam|    Áo Ba Lỗ|https://shopee.vn...|https://cf.shopee...|86250.0|   4.9|Danh Mục Shopee T...|
|       191|Áo Ba Lỗ Nam Trắn...|Thời Trang Nam|    Áo Ba Lỗ|https://shopee.vn...|https://cf.shopee...|26800.0|   4.9|Danh Mục Shopee T...|
|       192|Áo Ba Lỗ Nam Tyas...|Thời Trang Nam|    Áo Ba Lỗ|https://shopee.vn...|https://cf.shopee...|39500.0|   4.8|"Danh Mục Shopee ...|
|       193|ÁO BA LỖ HÀNG VIỆ...|Thời Trang Nam|    Áo Ba Lỗ|https://shopee.vn...|https://cf.shopee...|16500.0|   4.8|Danh Mục Shopee T...|
|       194|Áo Thun 

In [None]:
predictions.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- user_idx: double (nullable = false)
 |-- product_idx: double (nullable = false)
 |-- prediction: float (nullable = false)



In [None]:
df_product = predictions.select('product_idx', 'product_id').distinct()
df_product.count()

20380

In [None]:
df_products = df_product.join(products[['product_id', 'product_name']], on='product_id', how='left')
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_idx: double (nullable = false)
 |-- product_name: string (nullable = true)



In [None]:
df_products.count()

20380

In [None]:
new_user_recs = user_recs.join(df_user, on=['user_idx'], how='left')

In [None]:
new_user_recs.show(10)

+--------+--------------------+-------+
|user_idx|     recommendations|user_id|
+--------+--------------------+-------+
|      47|[{19181, 5.812966...|   3269|
|      44|[{19181, 5.416633...|   1787|
|       1|[{14860, 5.414793...|    159|
|      34|[{19181, 5.708971...|     49|
|      31|[{19181, 5.881964...|    549|
|      22|[{20162, 5.447828...|   1293|
|      28|[{18616, 5.781680...|    486|
|      27|[{18412, 5.407358...|    354|
|      13|[{16730, 5.462687...|     57|
|      26|[{17577, 5.491009...|    105|
+--------+--------------------+-------+
only showing top 10 rows



In [None]:
new_user_recs.count()

27965

In [None]:
# Recommendation for reviewerID = '831'
userID = '831'
find_user_rec = new_user_recs.filter(new_user_recs['user_id'] == userID)
user = find_user_rec.first()
lst = []
for row in user['recommendations']:
    row_f = df_products.filter(df_products.product_idx == row['product_idx'])
    row_f_first = row_f.first()
    lst.append((row['product_idx'], row_f_first['product_name'], row['rating']))
dic_user_rec = {'user_id' : user.user_id, 'recommendations' :lst}

In [None]:
dic_user_rec

{'user_id': '831',
 'recommendations': [(17577,
   'Quần jean nam đen zipper LAZY BOUTIQUE Quần jean đen nam cao cấp',
   5.443449974060059),
  (20162,
   'Trang phục hóa trang nhân vật hoạt hình anime Nhật Bản tokyo revengers độc đáo',
   5.401844024658203),
  (17235, 'Kính Mát Thời Trang Hàn Quốc Cho Nam', 5.386279106140137),
  (18412, 'Tất Vớ Trơn Cổ Thấp 3 Màu', 5.228381633758545),
  (12459,
   'Tất Nam HỘP 10 ĐÔI TẤT KHỬ MÙI HÔI CHÂN ĐỦ MÀU - XUẤT NHẬT (tất siêu chống thối )',
   5.224913120269775),
  (17811,
   'Quần NGỐ đùi thể thao nam thời trang phong cách trẻ trung năng động T20',
   5.140273094177246),
  (10623,
   'Tất cotton phụ kiện đá bóng nam loại ngắn sợi dệt, combo vớ thời trang thể thao chất lượng – 2EV',
   5.117858409881592),
  (4748,
   'Quần sịp đùi nam boxer logo thêu con cá, quần lót nam vải cotton mềm mịn đàn hồi tốt LAC01A',
   5.112788200378418),
  (14860,
   'Bộ Đi Chùa Nam Lãnh Tụ Chất Kate Loại 1 Mịn Đẹp Ko Nhăn Ko Xù,Thoáng Mát Thoải Mái. Bộ Đồ Lam Đi Ch

In [None]:
# Recommendation for reviewerID = '486'
userID2 = '486'
find_user_rec = new_user_recs.filter(new_user_recs['user_id'] == userID2)
user = find_user_rec.first()
lst = []
for row in user['recommendations']:
    row_f = df_products.filter(df_products.product_idx == row['product_idx'])
    row_f_first = row_f.first()
    lst.append((row['product_idx'], row_f_first['product_id'], row['rating']))
dic_user_rec2 = {'user_id' : user.user_id, 'recommendations' :lst}

In [None]:
dic_user_rec2

{'user_id': '486',
 'recommendations': [(18616, '173620', 5.781680583953857),
  (19181, '211026', 5.699519634246826),
  (20162, '24253', 5.347224235534668),
  (16578, '24980', 5.3200883865356445),
  (16730, '26119', 5.317107200622559),
  (9159, '1686', 5.281040668487549),
  (18969, '1989', 5.234376430511475),
  (10664, '171958', 5.164101600646973),
  (9445, '20764', 5.156614303588867),
  (8670, '25468', 5.148423671722412)]}