In [8]:
from pyspark.sql import SparkSession

from pyspark.sql import SQLContext

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

from pyspark.sql.functions import udf, col, when

import numpy as np

from IPython.display import Image

from IPython.display import display

In [9]:
directory = "ml-latest-small"


In [10]:
!ls


data					  movies.csv
DemoCK.ipynb				  ratings.csv
DemoGenres.ipynb			  README.txt
Demo.ipynb				  spark
elasticsearch-7.12.0			  spark-2.4.5-bin-hadoop2.7.tgz
elasticsearch-7.12.0-linux-x86_64.tar.gz  tags.csv
elasticsearch-spark-30_2.12-7.12.0.jar	  Untitled1.ipynb
elasticsearch-spark-recommender		  Untitled2.ipynb
links.csv				  Untitled3.ipynb
ml-latest-small				  Untitled.ipynb


In [11]:
!ls ml-latest-small


links.csv  movies.csv  ratings.csv  README.txt	tags.csv


In [12]:
# Tạo một phiên Spark (SparkSession) với tên ứng dụng là "Mrecommend_demo"
spark = SparkSession.builder.appName("Mrecommend_demo").getOrCreate()


In [13]:
# Tạo một đối tượng SparkContext (sc) để quản lý các tác vụ trên Spark.
sc = spark.sparkContext

# Tạo một đối tượng SQLContext (sqlContext) để thực hiện các truy vấn SQL trên Spark DataFrame.
sqlContext = SQLContext(sc)

In [14]:
ratings_df = spark.read.csv(directory+'/ratings.csv', inferSchema=True, header=True) 
ratings_df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



                                                                                

In [15]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [16]:
# Đọc dữ liệu từ tệp tin CSV chứa thông tin về các bộ phim vào DataFrame trong môi trường Apache Spark.
# Tham số 'directory+'/movies.csv'' là đường dẫn đến tệp tin CSV cần đọc.
# Tham số inferSchema=True được sử dụng để tự động xác định kiểu dữ liệu của các cột.
# Tham số header=True cho biết rằng dòng đầu tiên của tệp tin chứa tên của các cột.
movies_df = spark.read.csv(directory+'/movies.csv', inferSchema=True, header=True) 

# In ra cấu trúc của DataFrame để kiểm tra kiểu dữ liệu của các cột.
movies_df.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [17]:
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [18]:
# Đọc dữ liệu từ tệp CSV 'links.csv' trong thư mục được chỉ định
# DataFrame 'links_df' sẽ chứa dữ liệu từ tệp CSV
links_df = spark.read.csv(directory+'/links.csv', inferSchema=True, header=True)

# In ra cấu trúc của DataFrame, hiển thị tên cột và kiểu dữ liệu tương ứng
links_df.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [19]:
links_df.show()

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
|      6|113277|   949|
|      7|114319| 11860|
|      8|112302| 45325|
|      9|114576|  9091|
|     10|113189|   710|
|     11|112346|  9087|
|     12|112896| 12110|
|     13|112453| 21032|
|     14|113987| 10858|
|     15|112760|  1408|
|     16|112641|   524|
|     17|114388|  4584|
|     18|113101|     5|
|     19|112281|  9273|
|     20|113845| 11517|
+-------+------+------+
only showing top 20 rows



In [20]:
# Chia ngẫu nhiên dữ liệu trong DataFrame 'ratings_df' thành hai phần: training và validation
# Tỷ lệ chia là 80% cho training và 20% cho validation
training_df, validation_df = ratings_df.randomSplit([.8, .2])


In [21]:
# Số vòng lặp (iterations) cho thuật toán huấn luyện mô hình
iterations = 10

# Tham số regularization (chống quá mức) được sử dụng để kiểm soát độ phức tạp của mô hình và tránh overfitting
regularization_parameter = 0.1

# Rank là một tham số quan trọng liên quan đến kích thước ẩn của mô hình, thường được sử dụng trong các mô hình độ phức tạp cao như mô hình hệ thống gợi ý (recommendation systems)
rank = 4

# Một danh sách (errors) được sử dụng để lưu trữ các giá trị lỗi trong quá trình huấn luyện mô hình
errors = []

# Biến err được sử dụng để lưu trữ giá trị lỗi tạm thời trong mỗi vòng lặp
err = 0


In [22]:
# Xây dựng mô hình ALS (Alternating Least Squares) với các tham số đã được đặt trước
als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=4, userCol="userId", itemCol="movieId", ratingCol="rating")

# Huấn luyện mô hình trên tập dữ liệu huấn luyện (training_df)
model = als.fit(training_df)

# Tạo dự đoán trên tập dữ liệu validation_df sử dụng mô hình đã huấn luyện
predictions = model.transform(validation_df)

# Loại bỏ các dự đoán có giá trị NaN (không phù hợp)
new_predictions = predictions.filter(col('prediction') != np.nan)

# Sử dụng đánh giá viên (evaluator) để đánh giá chất lượng dự đoán bằng cách tính Root Mean Square Error (RMSE)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(new_predictions)

# In ra giá trị RMSE, một độ đo đánh giá chất lượng của mô hình hệ thống gợi ý
print("Root-mean-square error = " + str(rmse))


[Stage 117:>                                                        (0 + 1) / 1]

Root-mean-square error = 0.8799175985875036


                                                                                

In [23]:
predictions.limit(10).toPandas()

                                                                                

Unnamed: 0,userId,movieId,rating,timestamp,prediction
0,1,596,5.0,964982838,3.909336
1,1,223,3.0,964980985,4.4962
2,1,157,5.0,964984100,3.539331
3,1,47,5.0,964983815,4.607905
4,1,235,4.0,964980908,3.986697
5,1,1029,5.0,964982855,4.071447
6,1,231,5.0,964981179,4.00458
7,1,552,4.0,964982653,3.807122
8,1,50,5.0,964982931,4.961934
9,1,1023,5.0,964982681,3.965741


In [24]:
# Kết hợp DataFrame 'predictions' với DataFrame 'movies_df' dựa trên cột "movieId"
# Chọn các cột quan trọng bao gồm "userId", "title", "genres", và "prediction"
# Hiển thị 5 dòng đầu tiên của kết quả dưới dạng Pandas DataFrame để dễ quan sát
# Đối tượng 'movies_df' chứa thông tin về tiêu đề và thể loại của các sản phẩm (movies)
# Đối tượng 'predictions' chứa các dự đoán xếp hạng cho từng người dùng và sản phẩm
predictions.join(movies_df, "movieId").select("userId", "title", "genres", "prediction").limit(5).toPandas()

                                                                                

Unnamed: 0,userId,title,genres,prediction
0,580,Galaxy Quest (1999),Adventure|Comedy|Sci-Fi,3.644958
1,597,"Hudsucker Proxy, The (1994)",Comedy,4.451787
2,155,Galaxy Quest (1999),Adventure|Comedy|Sci-Fi,3.50157
3,34,Men in Black (a.k.a. MIB) (1997),Action|Comedy|Sci-Fi,3.348808
4,115,The Devil's Advocate (1997),Drama|Mystery|Thriller,2.793694


In [25]:
# Lọc các dự đoán trong DataFrame 'predictions' chỉ cho người dùng có id là 599
# Kết hợp với DataFrame 'movies_df' để lấy thông tin về tiêu đề và thể loại của các sản phẩm
# Kết hợp với DataFrame 'links_df' để lấy thông tin về tmdbId của các sản phẩm
# Chọn các cột quan trọng bao gồm "userId", "title", "genres", "tmdbId", và "prediction"
# Hiển thị 5 dòng đầu tiên của kết quả
# Đối tượng 'movies_df' chứa thông tin về tiêu đề và thể loại của các sản phẩm (movies)
# Đối tượng 'links_df' chứa thông tin về tmdbId của các sản phẩm
# Đối tượng 'predictions' chứa các dự đoán xếp hạng cho từng người dùng và sản phẩm
for_one_user = predictions.filter(col("userId") == 599).join(movies_df, "movieId").join(links_df, "movieId").select("userId", "title", "genres", "tmdbId", "prediction")
for_one_user.show(5)


                                                                                

+------+--------------------+--------------------+------+----------+
|userId|               title|              genres|tmdbId|prediction|
+------+--------------------+--------------------+------+----------+
|   599|High School High ...|              Comedy|  9308| 1.5976142|
|   599|Dirty Dancing (1987)|Drama|Musical|Rom...|    88| 2.9098053|
|   599|     Candyman (1992)|     Horror|Thriller|  9529| 1.8324723|
|   599|        Spawn (1997)|Action|Adventure|...| 10336| 2.2674296|
|   599|Out of Africa (1985)|       Drama|Romance|   606| 2.9199157|
+------+--------------------+--------------------+------+----------+
only showing top 5 rows



In [26]:
# Sử dụng thư viện webbrowser để mở trình duyệt và hiển thị thông tin chi tiết về các sản phẩm trên trang web The Movie Database (TMDb)
# Với mỗi sản phẩm trong hai sản phẩm đầu tiên của DataFrame 'for_one_user'
# Xây dựng URL cho sản phẩm bằng cách thêm tmdbId vào đường link cơ bản "https://www.themoviedb.org/movie/"
# In ra tiêu đề của sản phẩm
# Mở trình duyệt và mở URL tương ứng với sản phẩm
import webbrowser

link = "https://www.themoviedb.org/movie/"

for movie in for_one_user.take(2):
    movieURL = link + str(movie.tmdbId)
    print(movie.title)
    webbrowser.open(movieURL)


High School High (1996)
Dirty Dancing (1987)


In [27]:
# Tạo ra 5 gợi ý xếp hạng cho tất cả người dùng dựa trên mô hình đã huấn luyện
userRecomments = model.recommendForAllUsers(5)

# Tạo ra 5 gợi ý xếp hạng cho tất cả sản phẩm dựa trên mô hình đã huấn luyện
movieRecomments = model.recommendForAllItems(5)


In [28]:
userRecomments.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [29]:
# Chọn các cột "userId" và "recommendations.movieId" từ DataFrame 'userRecomments'
# Hiển thị 10 dòng đầu tiên của kết quả, trong đó "recommendations.movieId" là danh sách các sản phẩm được gợi ý cho từng người dùng
userRecomments.select("userId", "recommendations.movieId").show(10, False)




+------+--------------------------------------+
|userId|movieId                               |
+------+--------------------------------------+
|1     |[104875, 170355, 68945, 3379, 60943]  |
|2     |[5075, 68073, 5607, 3436, 183897]     |
|3     |[3837, 26865, 33834, 86377, 2772]     |
|4     |[158783, 89118, 48322, 5666, 130634]  |
|5     |[8477, 40491, 3266, 6818, 213]        |
|6     |[68073, 42730, 93008, 77846, 25906]   |
|7     |[170355, 68945, 3379, 26171, 104875]  |
|8     |[170355, 68945, 3379, 104875, 33649]  |
|9     |[141718, 25947, 104875, 26326, 132333]|
|10    |[7842, 3086, 120635, 998, 4476]       |
+------+--------------------------------------+
only showing top 10 rows



                                                                                

In [30]:
# Chọn các cột "movieId" và "recommendations.userId" từ DataFrame 'movieRecomments'
# Hiển thị 10 dòng đầu tiên của kết quả, trong đó "recommendations.userId" là danh sách các người dùng được gợi ý cho từng sản phẩm
movieRecomments.select("movieId", "recommendations.userId").show(10, False)




+-------+------------------------+
|movieId|userId                  |
+-------+------------------------+
|1      |[53, 452, 276, 12, 43]  |
|12     |[243, 12, 584, 406, 154]|
|13     |[267, 43, 413, 327, 544]|
|22     |[53, 12, 171, 154, 548] |
|26     |[53, 171, 12, 548, 224] |
|27     |[53, 12, 43, 243, 584]  |
|28     |[53, 171, 250, 224, 122]|
|31     |[43, 53, 594, 224, 12]  |
|34     |[53, 224, 250, 171, 43] |
|44     |[43, 267, 53, 413, 276] |
+-------+------------------------+
only showing top 10 rows



                                                                                

In [31]:
# Lựa chọn cột "userId" từ DataFrame 'ratings_df' và chọn các giá trị độc nhất
# Giới hạn kết quả thành 3 dòng và hiển thị danh sách người dùng độc nhất
users = ratings_df.select("userId").distinct().limit(3)
users.show()


[Stage 453:>                                                        (0 + 1) / 1]

+------+
|userId|
+------+
|   148|
|   463|
|   471|
+------+



                                                                                

In [32]:
# Tạo ra 10 gợi ý xếp hạng sản phẩm cho tập hợp con người dùng đã được chỉ định (users) dựa trên mô hình đã huấn luyện
userSubsetRecs = model.recommendForUserSubset(users, 10)


In [33]:
userSubsetRecs.show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{8477, 5.197691}...|
|   463|[{170355, 5.28543...|
|   148|[{5075, 4.7106347...|
+------+--------------------+



                                                                                

In [34]:
# Chọn các cột "userId" và "recommendations.movieId" từ DataFrame 'userSubsetRecs'
# Hiển thị 10 dòng đầu tiên của kết quả, trong đó "recommendations.movieId" là danh sách các sản phẩm được gợi ý cho từng người dùng
userSubsetRecs.select("userId", "recommendations.movieId").show(10, False)




+------+-----------------------------------------------------------------------+
|userId|movieId                                                                |
+------+-----------------------------------------------------------------------+
|471   |[8477, 170355, 68945, 3379, 3266, 40491, 6201, 4495, 7815, 132333]     |
|463   |[170355, 68945, 3379, 33649, 7841, 60943, 59018, 104875, 102217, 92494]|
|148   |[5075, 7841, 33649, 60943, 59018, 84847, 136469, 2899, 93988, 7842]    |
+------+-----------------------------------------------------------------------+



                                                                                

In [35]:
# Lựa chọn cột "movieId" từ DataFrame 'ratings_df' và chọn các giá trị độc nhất
# Giới hạn kết quả thành 3 dòng và hiển thị danh sách sản phẩm độc nhất
movies = ratings_df.select("movieId").distinct().limit(3)
movies.show()


+-------+
|movieId|
+-------+
|   1580|
|   2366|
|   3175|
+-------+



                                                                                

In [36]:
# Tạo ra 10 gợi ý xếp hạng người dùng cho tập hợp con sản phẩm đã được chỉ định (movies) dựa trên mô hình đã huấn luyện
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
# Chọn các cột "movieId" và "recommendations.userId" từ DataFrame 'movieSubSetRecs'
# Hiển thị 10 dòng đầu tiên của kết quả, trong đó "recommendations.userId" là danh sách các người dùng được gợi ý cho từng sản phẩm
movieSubSetRecs.select("movieId", "recommendations.userId").show(10, False)




+-------+------------------------------------------------+
|movieId|userId                                          |
+-------+------------------------------------------------+
|1580   |[53, 276, 544, 12, 452, 543, 93, 337, 413, 523] |
|3175   |[53, 452, 276, 12, 371, 93, 99, 122, 544, 171]  |
|2366   |[236, 59, 53, 251, 273, 302, 465, 296, 258, 122]|
+-------+------------------------------------------------+



                                                                                

In [37]:
# Tạo danh sách movie_ids và user_ids
movie_ids = [1580, 3175, 2366, 1590]
user_ids = [543, 543, 543, 543]

# Tạo DataFrame mới 'new_user_preds' từ danh sách movie_ids và user_ids
new_user_preds = sqlContext.createDataFrame(zip(movie_ids, user_ids), schema=["movieId", "userId"])


In [38]:
# Sử dụng mô hình để tạo ra các dự đoán xếp hạng cho DataFrame 'new_user_preds'
new_predictions = model.transform(new_user_preds)

# Hiển thị kết quả
new_predictions.show()


                                                                                

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   1580|   543| 4.4607997|
|   3175|   543|  4.308376|
|   2366|   543|  2.549186|
|   1590|   543|   2.96852|
+-------+------+----------+

