## **Nhóm thực hiện:**

- **Trần Gia Bảo - 17133002**
- **Hoàng Thị Cẩm Tú - 17133052**
- **Uông Thị Thanh Thủy - 17133064**
- **Nguyễn Phước Sang - 17133055**

# **Sử dụng Spark để xây dựng hệ thống đề xuất phim**

Hệ thống đề xuất là công cụ giúp dự đoán những gì mà người dùng có thể thích hoặc không thích trong danh sách những mục đã cho. Đây này là cách để thay thế cho việc tìm kiếm nội dung, nó sẽ giúp người dùng khám phá sản phẩm hoặc nội dung mà họ có thể chưa bắt gặp. Ví dụ: Facebook gợi ý bạn bè, các page cho người dùng, Youtube gợi ý video mà người dùng có thể quan tâm, Tiki, Lazada gợi ý các sản phẩm, mặt hàng mà người dùng có thể cần... Công cụ này sẽ thu hút người dùng đến các dịch vụ từ đó có thể tối ưu hóa doanh thu cho các nhà cung cấp và duy trì sự quan tâm đến dịch vụ.
Trong project này, chúng em sẽ tập trung vào việc sử dụng Spark để xây dựng một hệ thống đề xuất đơn giản và sử dụng một thuật toán để dự đoán những mục mà người dùng có thể thích được gọi là ALS (Alternating least squares).

## **1. Data**
Tập dữ liệu là tập dữ liệu MovieLens vào năm 2018. Có bốn tệp csv trong tập dữ liệu, bao gồm movies.csv, ratings.csv, tags.csv, links.csv. Trong project này, chúng em chỉ sử dụng movies.csv và ratings.csv vì chúng cung cấp thông tin cần thiết trong việc xây dựng hệ thống đề xuất, chẳng hạn như id phim, id người dùng, xếp hạng, v.v. Vì tất cả các cột trong tập dữ liệu đều ở kiểu dữ liệu chuỗi, cần chuyển một số trong số chúng sang kiểu dữ liệu số trong bước tiền xử lý.

Để lựa chọn hoặc thiết kế một thuật toán phù hợp cho bài toán, trước tiên chúng ta tìm hiểu các đặc điểm của dữ liệu. Trước tiên, ta cần import một số thư viện cần thiết. Đồng thời, do Notebook này được thực thi trên Google Colab nên ta cần cài đặt một số các gói và thiết lập biến môi trường.

- Cài đặt các thư viện cần thiết:

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

- Tải tệp dữ liệu về và unzip:

In [None]:
! wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
! unzip ml-latest-small.zip
!ls

In [None]:
!ls ml-latest-small/

- Cài đặt java 8, apache spark, findspark:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

- Cài đặt biến môi trường:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

### **1.1.  ETL và Exploration về tập dữ liệu:**

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

- Load dữ liệu:

In [11]:
root = "ml-latest-small/"
movies_df = spark.read.load(root+"movies.csv", format='csv', header = True)
ratings_df = spark.read.load(root+ "ratings.csv", format='csv', header = True)
links_df = spark.read.load(root+"links.csv", format='csv', header = True)
tags_df = spark.read.load(root+"tags.csv", format='csv', header = True)

In [12]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [13]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [1]:
links_df.show(5)

NameError: name 'links_df' is not defined

In [15]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [16]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [17]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


### **1.2. Một số thống kê để hiểu rõ về dữ liệu**

In [18]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

- Đếm số lượng user:

In [19]:
user_count = spark.sql("select count(distinct userId) as user_count from ratings")
user_count.show()

+----------+
|user_count|
+----------+
|       610|
+----------+



- Đếm số lượng movie:

In [20]:
movie_count = spark.sql("select count(distinct movieId) as movie_count from movies")
movie_count.show()

+-----------+
|movie_count|
+-----------+
|       9742|
+-----------+



- Số lượng movie được rating:

In [21]:
from pyspark.sql.functions import col
# %sql 
# Show number of movies rated
movie_rated = spark.sql("select count(distinct movieId) from movies where movieId in (select distinct movieId from ratings)")
movie_rated.show()

+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   9724|
+-----------------------+



- Danh sách các movie không được rated:

In [22]:
movie_not_rated = spark.sql("select distinct movieId, title from movies where movieId not in (select distinct movieId from ratings)")
movie_not_rated.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|   3338|For All Mankind (...|
|  34482|Browning Version,...|
|   2939|      Niagara (1953)|
|   3456|Color of Paradise...|
|  30892|In the Realms of ...|
|   7792|Parallax View, Th...|
|  32160|Twentieth Century...|
|  26085|Mutiny on the Bou...|
|   1076|Innocents, The (1...|
|   4194|I Know Where I'm ...|
|  32371|Call Northside 77...|
|  25855|Roaring Twenties,...|
|   5721|  Chosen, The (1981)|
|   8765|This Gun for Hire...|
|   6668|Road Home, The (W...|
|   6849|      Scrooge (1970)|
|   7020|        Proof (1991)|
|  85565|  Chalet Girl (2011)|
+-------+--------------------+



- Kiểm tra xem có dữ liệu null không:

In [23]:
# Với bảng rating
rating_missing_value = spark.sql("select * from ratings where rating = NULL or timestamp = NULL or userId=NULL or movieId=NULL")
rating_missing_value.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
+------+-------+------+---------+



In [24]:
# Với bảng movie
movie_missing_value= movies_df.where(col('title').isNull() | col('genres').isNull())
movie_missing_value.show()

+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
+-------+-----+------+



- Danh sách các thể loại của phim:

In [25]:
# directly list movie genres
movie_genres_df = spark.sql("select distinct genres as genres_count from movies")
movie_genres_df.show()

+--------------------+
|        genres_count|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Action|Animation|...|
|Animation|Childre...|
|Action|Adventure|...|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|Adventure|Childre...|
| Adventure|Animation|
| Musical|Romance|War|
|Action|Adventure|...|
|Adventure|Childre...|
|Comedy|Crime|Horr...|
|Crime|Drama|Fanta...|
|Comedy|Mystery|Th...|
|   Adventure|Fantasy|
|Horror|Romance|Sc...|
|Drama|Film-Noir|R...|
+--------------------+
only showing top 20 rows



In [26]:
# splite movie genres and then count
from pyspark.sql.functions import col, explode, split
movie_genres = movies_df.withColumn("splited_genres", explode(split(col("genres"),"[|]")))
splited_genres_df = movie_genres.select(["splited_genres"]).distinct().orderBy("splited_genres",ascending=True)
splited_genres_df.show()

+------------------+
|    splited_genres|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



- Số lượng phim của mỗi thể loại

In [27]:
# Using SQL + Pyspark method
category_movie_count = spark.sql(" with movie_category as (select distinct (explode(split(genres, '[|]'))) as category, * from movies) \
                  select category, count(category) as category_count from movie_category group by category order by category_count desc")
category_movie_count.show()
print("Number of categories: ",len(category_movie_count.collect()))

+------------------+--------------+
|          category|category_count|
+------------------+--------------+
|             Drama|          4361|
|            Comedy|          3756|
|          Thriller|          1894|
|            Action|          1828|
|           Romance|          1596|
|         Adventure|          1263|
|             Crime|          1199|
|            Sci-Fi|           980|
|            Horror|           978|
|           Fantasy|           779|
|          Children|           664|
|         Animation|           611|
|           Mystery|           573|
|       Documentary|           440|
|               War|           382|
|           Musical|           334|
|           Western|           167|
|              IMAX|           158|
|         Film-Noir|            87|
|(no genres listed)|            34|
+------------------+--------------+

Number of categories:  20


## **2. Xây dựng mô hình đưa ra đề xuất**

### **2.1. Giới thiệu về hệ thống đề xuất**

Trong một ứng dụng hệ thống đề xuất, có hai loại thực thể là người dùng (users) và mặt hàng (items). Người dùng có những ưu tiên cho một số mặt hàng, và những ưu tiên này phải có từ dữ liệu. Bản thân dữ liệu được biểu biểu dưới dạng một ma trận ưu tiên (preference matrix) $A$, với mỗi cặp người dùng-mặt hàng, có một giá trị thể hiện mức độ ưu tiên của người dùng với mặt hàng đó. Ví dụ ma trận ưu tiên 5 người dùng và k mặt hàng:

Giá trị của hàng i, cột j biểu thị sự yêu thích của người dùng i đối với mặt hàng j, lấy ví dụ giá trị 1 (dislike), giá trị 5 (love). Những ô trống tức là không có thông tin về sở thích của người dùng với mặt hàng đó. Mục tiêu của hệ thống đề xuất là dự đoán những ô trống này. Có 2 cách tiếp cận:

- Xem xét sự giống nhau dựa trên các thuộc tính của mặt hàng: nhãn hiệu, giá cả, danh mục. Ví tụ IT2 khá giống IT3, U5 không thích IT2 nên có thể họ cũng không thích IT3.
- Quan sát xu hướng đánh giá của các người dùng cho các sản phẩm. Ví dụ những người dùng không thích IT2 có xu hướng không thích IT3. Do đó, có thể dự đoán U5 cũng không thích IT3.

### **2.2 Collaborative-Filtering và ví dụ của Matrix Factorization**

Collaborative-Filtering là một kỹ thuật được sử dụng trong hệ thống đề xuất. Nó tập trung vào mối quan hệ giữa người dùng và mặt hàng, trong bài toán này, các nghệ sĩ giữ vai trò là mặt hàng. Độ tương tự của các mặt hàng được xác định bởi sự giống nhau về xếp hạng của các mặt hàng đó bởi những người dùng đã xếp hạng cả hai mặt hàng.

Ta sẽ nghiên cứu một thành viên trong lớp các thuật toán được gọi là mô hình latent-factor.

Ban đầu ta có $n$ người dùng và $m$ mặt hàng (trong bài này là nghệ sĩ) và playCount tương ứng (xếp hạng ngầm - implicit rating). Ta sẽ xây dựng được ma trận $R$ có $n$ hàng $m$ cột. Mỗi ô $(u,i)$ trong ma trận có giá trị là xếp hạng (rating) của người dùng $u$ cho mặt hàng $i$.

Ma trận $R$ có nhiều mục bị thiếu. Chúng ta sẽ sử dụng mô hình matrix factorization để điền những mục này.

Ví dụ: cho ma trận 5x5 như bên dưới. Ta muốn xấp xỉ ma trận thành hai ma trận nhỏ hơn $X$ và $Y$.

$$ M = \begin{bmatrix}
5 &amp;2 &amp;4  &amp;4  &amp;3 \\ 
3 &amp;1  &amp;2  &amp;4  &amp;1 \\ 
2 &amp;  &amp;3  &amp;1  &amp;4 \\ 
2 &amp;5  &amp;4  &amp;3  &amp;5 \\ 
4 &amp;4  &amp;5  &amp;4  &amp; 
\end{bmatrix}
\approx M' = \begin{bmatrix}
x_{11} &amp;x_{12} \\ 
x_{21} &amp;x_{22} \\ 
x_{31} &amp;x_{32} \\ 
x_{41} &amp;x_{42} \\ 
x_{51} &amp;x_{52} 
\end{bmatrix}
\times
\begin{bmatrix}
y_{11} &amp;y_{12}  &amp;y_{13}  &amp;y_{14}  &amp;y_{15} \\ 
y_{21} &amp;y_{22}  &amp;y_{23}  &amp;y_{24}  &amp;y_{25} 
\end{bmatrix} $$

$M'$ là một xấp xỉ càng gần $M$ càng tốt. Sai số $M$ và $M'$ được tính bằng tổng bình phương sai số của các phần tử không trống trong $M$ và các phần tử tương ứng trong $M'$. Trong $M'$ không có phần tử trống, do đó để xem mức độ người dùng i ưa thích mặt hàng j ta chỉ cần lấy ra giá trị $M{'_{i,j}}$.

Vấn đề là làm sao tính được $X$ và $Y$. Ta không thể tính cùng lúc $X$ và $Y$ đều tốt nhất nhưng nếu biết $Y$ ta có thể tính $X$ tốt nhất và ngược lại. Do đó, ta khởi tạo những giá trị ban đầu cho $X$ và $Y$, ta tính $X$ tốt nhất theo $Y$ và sau đó tính $Y$ tốt nhất theo $X$ mới. Lặp lại quá trình này cho đến khi sai số $X \times Y$ tới M là hội tụ.

Với ví dụ trên, ta khởi tạo giá trị của $X$ và $Y$ như sau:

$$ M' = X \times Y = \begin{bmatrix}
1 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 
\end{bmatrix}
\times \begin{bmatrix}
1 &amp;1 &amp;1 &amp;1 &amp;1 \\
1 &amp;1 &amp;1 &amp;1 &amp;1
\end{bmatrix}
= \begin{bmatrix}
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2
\end{bmatrix} $$

Trong lần lặp đầu tiên, tính trung bình bình phương sai số (RMSE - Root Mean Square Error) giữa $XY$ và $M$ (lưu ý: ô nào trống bỏ qua):

$$RMSE = \sqrt {\frac{{{{\left( {5 - 2} \right)}^2} + {{\left( {2 - 2} \right)}^2} + ... + {{\left( {5 - 2} \right)}^2} + {{\left( {4 - 2} \right)}^2}}}{{23}}}  = \sqrt {\frac{{75}}{{23}}}  = 1.806$$

23 là số ô không trống trong $M$.

Tiếp theo, với $Y$ đã cho, ta tính $X$ bằng cách tìm giá trị tốt nhất cho ${X_{11}}$.

$$ M' = X \times Y = \begin{bmatrix}
x &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 
\end{bmatrix}
\times \begin{bmatrix}
1 &amp;1 &amp;1 &amp;1 &amp;1 \\
1 &amp;1 &amp;1 &amp;1 &amp;1
\end{bmatrix}
= \begin{bmatrix}
x+1 &amp;x+1 &amp;x+1 &amp;x+1 &amp;x+1 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2
\end{bmatrix} $$

Bây giờ để giảm thiểu $RMSE$, ta sẽ giảm thiểu sai số ở hàng đầu tiên, bằng cách đạo hàm ta sẽ tìm được $x = 2.6$

$$ (5 - (x + 1))^{2} + (2 - (x + 1))^{2} + (4 - (x + 1))^{2} + (4 - (x + 1))^{2} + (3 - (x + 1))^{2} $$

Với giá trị mới của $X$, ta có thể tính giá trị tốt nhất cho $Y$

$$  M' = X \times Y = \begin{bmatrix}
2.6 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 \\ 
1 &amp;1 
\end{bmatrix}
\times \begin{bmatrix}
y &amp;1 &amp;1 &amp;1 &amp;1 \\
1 &amp;1 &amp;1 &amp;1 &amp;1
\end{bmatrix}
= \begin{bmatrix}
3.6 &amp;3.6 &amp;3.6 &amp;3.6 &amp;3.6 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2 \\
2 &amp;2 &amp;2 &amp;2 &amp;2
\end{bmatrix} $$

Thực hiện tương tự, ta sẽ tìm được $y=1.617$. Sau đó, ta có thể kiểm tra nếu $RMSE$ không hội tụ, ta tiếp tục cập nhật $X$ và $Y$ và ngược lại. Trong ví dụ này, ta chỉ cập nhật một phần tử của mỗi ma trận trong mỗi lần lặp. Thực tế chúng ta có thể cập nhật một hàng đầy đủ hoặc một ma trận đầy đủ trong một lần.

### **2.3 Matrix Factorization và thuật toán ALS trên một máy**

Nếu chúng ta chọn $k$ latent features và mô tả mỗi người dùng $u$ với một vector $k$-chiều $x_{u}$, và mỗi mặt hàng $i$ với một vector $k$-chiều $y_{i}$.

Sau đó, để dự đoán xếp hạng của người dùng $u$ cho mặt hàng $i$, ta tính: ${r_{ui}} \approx x_u^T{y_i}$

Có $n$ người dùng, sẽ có $n$ vector ${x_1},...,{x_n}$. Có $m$ mặt hàng, sẽ có $m$ vector ${y_1},...,{y_m}$. Ta định nghĩa ma trận $X$ và $Y$ như sau:

Mục tiêu là phải ước tính ma trận $R \approx {X^T}Y$ hay bài toán đưa về tìm $X$ và $Y$ tốt nhất để tối ưu hàm mục tiêu dưới đây:

$$\mathop {\min }\limits_{X,Y} \sum\limits_{{r_{ui}}observed} {{{\left( {{r_{ui}} - x_u^T{y_i}} \right)}^2} + \lambda \left( {\sum\limits_u {{{\left\| {{x_u}} \right\|}^2} + \sum\limits_i {{{\left\| {{y_i}} \right\|}^2}} } } \right)}$$

Cách làm là ta sửa $Y$ và tối ưu $X$, sau đó sửa $X$ và tối ưu $Y$, lặp lại cho đến khi hội tụ. Đây được gọi là thuật toán ALS (Alternating Least Squares). Dưới đây là giả code của thuật toán:

Khởi tạo $X$, $Y$

while (không hội tụ) do

for $u = 1...n$ do

${x_u} = {\left( {\sum\nolimits_{{r_u}i \in {r_{u*}}} {{y_i}y_i^T + \lambda {I_k}} } \right)^{ - 1}}\sum\nolimits_{{r_u}i \in {r_{u*}}} {{r_{ui}}{y_i}} $

end for

for $u = 1...n$ do

${y_i} = {\left( {\sum\nolimits_{{r_u}i \in {r_{*i}}} {{x_u}x_u^T + \lambda {I_k}} } \right)^{ - 1}}\sum\nolimits_{{r_u}i \in {r_{*i}}} {{r_{ui}}{x_u}} $

end for

end while

### **2.4 Thuật toán ALS song song trên Spark**

Ta có thể tính toán thuật toán ALS song song trên Spark bằng cách sau:

Dữ liệu đầu vào xếp hạng (ratings) và tham số ($X$ và $Y$) được lưu trữ trong Spark RDD. Cụ thể, các xếp hạng được lưu trữ dưới dạng RDD của một bộ ba

Ratings: RDD$\left( {\left( {u,i,{r_{ui}}} \right),...} \right)$

Ma trận $X$ và $Y$ được lưu trữ dưới dạng RDD của vectors:

$X$: RDD$\left( {{x_1},...,{x_n}} \right)$

$Y$: RDD$\left( {{y_1},...,{y_m}} \right)$

Biểu thức tính $x_u$:

${x_u} = {\left( {\sum\nolimits_{{r_u}i \in {r_{u*}}} {{y_i}y_i^T + \lambda {I_k}} } \right)^{ - 1}}\sum\nolimits_{{r_u}i \in {r_{u*}}} {{r_{ui}}{y_i}}$

Gọi phần tổng đầu tiên là part A, phần tổng thứ hai là part B. Để tính toán các part như vậy song song, ta có thể thực hiện với giả code sau:

- join RDD Ratings với RDD ma trận $Y$ bằng key $i$ (items)
- map để tính toán ${y_i}y_i^T$ và phát ra bằng key $u$ (user)
- reduceByKey $u$ (user) để tính toán $\sum\nolimits_{{r_u}i \in {r_{u*}}} {{y_i}y_i^T}$
- Đảo ngược
- reduceByKey $u$ (user) để tính toán $\sum\nolimits_{{r_u}i \in {r_{u*}}} {{r_{ui}}{y_i}}$

Tương tự, ta cũng tính toán được $y_i$

## **3. Phân tích**

Tập dữ liệu thực sự chưa hoàn chỉnh, mặc dù không có giá trị Null, vì có 34 mục được gắn nhãn là (no genres listed). 

Tuy nhiên, vì chúng ta sẽ sử dụng phương pháp **Non-negative Matrix Factorization** cho hệ thống đề xuất phim, hệ thống này chỉ quan tâm đến userId và movieId, mà không xem xét các danh mục, **không cần loại bỏ các phim được gắn nhãn là (no genres listed)**

In [28]:
#Using PySpark method
from pyspark.sql.functions import col, explode, split
movie_genres = movies_df.withColumn("splited_genres", explode(split(col("genres"),"[|]")))
splited_genres_df = movie_genres.groupBy(["splited_genres"]).count().orderBy("count",ascending=False)
splited_genres_df = splited_genres_df.withColumnRenamed('count','category_count')
splited_genres_df.show()

+------------------+--------------+
|    splited_genres|category_count|
+------------------+--------------+
|             Drama|          4361|
|            Comedy|          3756|
|          Thriller|          1894|
|            Action|          1828|
|           Romance|          1596|
|         Adventure|          1263|
|             Crime|          1199|
|            Sci-Fi|           980|
|            Horror|           978|
|           Fantasy|           779|
|          Children|           664|
|         Animation|           611|
|           Mystery|           573|
|       Documentary|           440|
|               War|           382|
|           Musical|           334|
|           Western|           167|
|              IMAX|           158|
|         Film-Noir|            87|
|(no genres listed)|            34|
+------------------+--------------+



In [29]:
#show các movie với thể loại là (no genres listed)
movie_genres.where(col("splited_genres").isin(["(no genres listed)"])).join(movies_df,"movieId","left").show()

+-------+--------------------+------------------+------------------+--------------------+------------------+
|movieId|               title|            genres|    splited_genres|               title|            genres|
+-------+--------------------+------------------+------------------+--------------------+------------------+
| 114335|   La cravate (1957)|(no genres listed)|(no genres listed)|   La cravate (1957)|(no genres listed)|
| 122888|      Ben-hur (2016)|(no genres listed)|(no genres listed)|      Ben-hur (2016)|(no genres listed)|
| 122896|Pirates of the Ca...|(no genres listed)|(no genres listed)|Pirates of the Ca...|(no genres listed)|
| 129250|   Superfast! (2015)|(no genres listed)|(no genres listed)|   Superfast! (2015)|(no genres listed)|
| 132084| Let It Be Me (1995)|(no genres listed)|(no genres listed)| Let It Be Me (1995)|(no genres listed)|
| 134861|Trevor Noah: Afri...|(no genres listed)|(no genres listed)|Trevor Noah: Afri...|(no genres listed)|
| 141131|    Guardi

## **4.Tiếp cận dựa trên SPARK ALS cho training model**

Chúng ta sẽ sử dụng Spark ML để dự đoán xếp hạng, vì vậy hãy load lại "rating.csv" bằng cách sử dụng sc.textFile và sau đó chuyển đổi nó thành tuples (user, item, rating)

In [30]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



Trong bước tiền xử lý dữ liệu, chuyển đổi cột movieId, userId, rating từ kiểu string sang kiểu Int và kiểu float để có thể tạo thành các features để cung cấp cho mô hình học máy. Trong quá trình phân tích dữ liệu, chúng em thấy rằng có một số thể loại và tên phim bị thiếu. Tuy nhiên, vì chỉ quan tâm đến userId và movieId và điểm rating trong quá trình đề xuất, chúng em chọn giữ những bộ phim thiếu thể loại và tiêu đề ở đây.

In [31]:
#drop cột timestamp vì không cần thiết
movie_ratings=ratings_df.drop('timestamp')

In [33]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



## **5. Lựa chọn và đánh giá mô hình ALS**

Với mô hình ALS ta có thể dùng grid search để tìm siêu tham số tối ưu nhất

### **5.1. Training mô hình:**

Đầu tiên phải import các package cần thiết

In [34]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

Ngoài ra, để đào tạo mô hình và mô hình thử nghiệm, chúng em tách toàn bộ tập dữ liệu thành training data 80% và test data 20% một cách ngẫu nhiên.

In [35]:
#chia dữ liệu thành hai tập train , test
(training,test)=movie_ratings.randomSplit([0.8,0.2])

Xây dựng model theo ALS đã được cung cấp, và xác định các giá trị của tham số của model.

In [36]:
als_model = ALS(rank=10, maxIter=10,coldStartStrategy='drop' )
als_model.setUserCol("userId")
als_model.setItemCol("movieId")
als_model.setRatingCol("rating")
als_model.setPredictionCol("Prediction")
als_model.getUserCol(),als_model.getItemCol(),als_model.getRatingCol(),als_model.getPredictionCol(),

('userId', 'movieId', 'rating', 'Prediction')

Trong đó: 
    
- rank là số feature, 
- maxIter là số lần lặp lại(mặc định là 10)
- coldStartStrategy  dùng để drop bất kỳ hàng nào có chứa giá trị Null

Sử dụng grid search để điều chỉnh mô hình, sử dụng ParamGridBuilder được cung cấp

In [37]:
#Tune model using ParamGridBuilder
grid = ParamGridBuilder()\
  .baseOn({als_model.predictionCol:"Prediction"})\
  .addGrid(als_model.regParam,[0.1,0.5,0.8])\
  .addGrid(als_model.rank,[5,10,15])\
  .build()

Trong đó: 
    
- regParam: tham số regularization

Xác định thuật toán để đánh giá là trung bình bình phương sai số (RMSE - Root Mean Square Error):

In [38]:
# Define evaluator as RMSE
evaluator= RegressionEvaluator(predictionCol="Prediction",labelCol="rating",metricName="rmse")

Xây dựng Cross Validation:

In [39]:
# Build Cross validation 
cv = CrossValidator(estimator=als_model, 
            estimatorParamMaps=grid,
            evaluator=evaluator,  
            numFolds=5,seed=2020,parallelism=2)

Áp dụng ALS model cho training data và lấy ra model với tham số tốt nhất:

In [40]:
#Fit ALS model to training data
cvModel = cv.fit(training)

In [None]:
#Fit ALS model to training data
cvModel = cv.fit(training)
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cvModel.bestModel
best_model

Ta thu được kết quả model tốt nhất:  ALSModel: uid=ALS_fe40882ea590, rank=5

Sử dụng model tốt nhất vừa tìm được để áp vào tập test data và tính RMSE cho kết quả mới tính ra được:

In [42]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In các chỉ số đánh giá và thông số mô hình:

In [43]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:",best_model.rank), 
print (" MaxIter:", best_model._java_obj.parent().getMaxIter()), 
print (" RegParam:",best_model._java_obj.parent().getRegParam()),

RMSE = 0.8777575483805319
**Best Model**
 Rank: 5
 MaxIter: 10
 RegParam: 0.1


(None,)

In [44]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|Prediction|
+------+-------+------+----------+
|   597|    471|   2.0|  4.250283|
|   385|    471|   4.0|  3.174941|
|   436|    471|   3.0| 3.7113974|
|   599|    471|   2.5| 2.6988902|
|   500|    471|   1.0| 2.3297105|
|    57|    471|   3.0| 3.8028398|
|   555|    471|   3.0| 3.9314785|
|   176|    471|   5.0|  3.584816|
|   273|    471|   5.0|   3.97824|
|   287|    471|   4.5|  2.991026|
|   260|    471|   4.5| 2.9915707|
|   599|    833|   1.5| 0.8894125|
|   307|    833|   1.0| 0.9200773|
|   599|   1088|   2.5| 2.5443435|
|    20|   1088|   4.5| 3.0700521|
|   169|   1088|   4.5| 4.1964893|
|   479|   1088|   4.0| 2.5450184|
|    64|   1088|   4.0| 3.2659693|
|    84|   1088|   3.0| 3.1080565|
|   509|   1088|   3.0| 3.3037367|
+------+-------+------+----------+
only showing top 20 rows



Áp dụng model cho toàn bộ data move_ratings:

In [45]:
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.6901489486663652


In [46]:
alldata.registerTempTable("alldata")

In [47]:
result = spark.sql("select * from alldata")
result.show()

+------+-------+------+----------+
|userId|movieId|rating|Prediction|
+------+-------+------+----------+
|   191|    148|   5.0| 4.9062953|
|   133|    471|   4.0| 3.1419559|
|   597|    471|   2.0|  4.250283|
|   385|    471|   4.0|  3.174941|
|   436|    471|   3.0| 3.7113974|
|   602|    471|   4.0| 3.2078743|
|    91|    471|   1.0|  2.337249|
|   409|    471|   3.0|  3.525944|
|   372|    471|   3.0| 3.2596598|
|   599|    471|   2.5| 2.6988902|
|   603|    471|   4.0| 3.3979774|
|   182|    471|   4.5| 3.7351933|
|   218|    471|   4.0| 2.9637034|
|   474|    471|   3.0| 3.7245278|
|   500|    471|   1.0| 2.3297105|
|    57|    471|   3.0| 3.8028398|
|   462|    471|   2.5|  3.414112|
|   387|    471|   3.0| 3.1626472|
|   610|    471|   4.0|  3.632616|
|   217|    471|   2.0| 2.7994628|
+------+-------+------+----------+
only showing top 20 rows



In [48]:
result = spark.sql("select * from movies join alldata on movies.movieId=alldata.movieId")
result.show()

+-------+--------------------+------+------+-------+------+----------+
|movieId|               title|genres|userId|movieId|rating|Prediction|
+-------+--------------------+------+------+-------+------+----------+
|    148|Awfully Big Adven...| Drama|   191|    148|   5.0| 4.9062953|
|    471|Hudsucker Proxy, ...|Comedy|   133|    471|   4.0| 3.1419559|
|    471|Hudsucker Proxy, ...|Comedy|   597|    471|   2.0|  4.250283|
|    471|Hudsucker Proxy, ...|Comedy|   385|    471|   4.0|  3.174941|
|    471|Hudsucker Proxy, ...|Comedy|   436|    471|   3.0| 3.7113974|
|    471|Hudsucker Proxy, ...|Comedy|   602|    471|   4.0| 3.2078743|
|    471|Hudsucker Proxy, ...|Comedy|    91|    471|   1.0|  2.337249|
|    471|Hudsucker Proxy, ...|Comedy|   409|    471|   3.0|  3.525944|
|    471|Hudsucker Proxy, ...|Comedy|   372|    471|   3.0| 3.2596598|
|    471|Hudsucker Proxy, ...|Comedy|   599|    471|   2.5| 2.6988902|
|    471|Hudsucker Proxy, ...|Comedy|   603|    471|   4.0| 3.3979774|
|    4

In [49]:
best_model.save("/content/model") 

Nhận xét: 

Trong bước đánh giá và thử nghiệm, chúng em sử dụng lỗi Root Mean Square Error (RMSE) để tính toán tỷ lệ lỗi của mô hình ALS, giữa điểm xếp hạng và điểm dự đoán. RMSE nhỏ hơn thì hệ thống có hiệu suất tốt hơn. Trong bước này, có thể thấy rằng điểm RMSE trên test data là 0,868 và 0,69 trên tất cả dữ liệu. Có vẻ như mô hình over-fitting với training data.

Hơn nữa, phương pháp Matrix factorization cho hệ thống đề xuất hoạt động tốt trên user, movies mà nó biết. Đối với các movies mới và user không có xếp hạng, thật khó để học cách dự đoán xếp hạng. Đây là một nhược điểm trong phương pháp ALS. 


### **5.2. Triển khai hệ thống đề xuất:**

**Hàm đề xuất:**

In [50]:
from pyspark.sql.types import *
def get_recommendation(id_type="userId",id =None, numItems = 5):
  """
  id_type: type of id to input:  userId, movieId
  id: movie/ user id to query, either string or integer type
  numItems: number of Items to recommend in each query
  """
  if id_type =="userId" :
    # User-Id based 
    recommendation = best_model.recommendForAllUsers(numItems)
    recommended_movies_df = recommendation.where(col("userId")==int(id)).toPandas()
  elif id_type =="movieId" :
    # Movie-Id based
    recommendation = best_model.recommendForAllItems(numItems)
    recommended_movies_df = recommendation.where(col("movieId")==int(id)).toPandas()  
  else:
    print("id_type should be either 'userId' or 'movieId'")
    print("id should integer")
    return None

  #make sure there are movies recommended
  if len(recommended_movies_df) >0:
    movie_recommended = recommended_movies_df.iloc[0].loc["recommendations"]
    
    schema = StructType([
          StructField('movieId', IntegerType(), False),
          StructField('Prediction', FloatType(), False)
      ])
    movies = spark.createDataFrame(movie_recommended,schema)
    print("Movies recommended to User:%d"%int(id))
    movies = movies.join(movies_df,'movieId','left').toPandas()
  else:
    print("No movies for "+id_type+ ": %d"%int(id))
    movies = None
  return movies

Kết quả:

In [51]:
#Query User Id: 575
get_recommendation(id_type="userId", id =575)

Movies recommended to User:575


Unnamed: 0,movieId,Prediction,title,genres
0,141718,5.715101,Deathgasm (2015),Comedy|Horror
1,123,5.424976,Chungking Express (Chung Hing sam lam) (1994),Drama|Mystery|Romance
2,5490,5.409215,The Big Bus (1976),Action|Comedy
3,26171,5.407846,Play Time (a.k.a. Playtime) (1967),Comedy
4,3494,5.363942,True Grit (1969),Adventure|Drama|Western


In [52]:
#Query User Id: 232
get_recommendation(id_type="movieId", id =594)

Movies recommended to User:594


Unnamed: 0,movieId,Prediction,title,genres
0,43,4.850459,Restoration (1995),Drama
1,53,4.667772,Lamerica (1994),Adventure|Drama
2,413,4.511996,Airheads (1994),Comedy
3,35,4.486012,,
4,99,4.469158,Heidi Fleiss: Hollywood Madam (1995),Documentary


In [62]:
#Query movie Id: 191
get_recommendation(id_type="movieId", id =191)

Movies similarity movies have id 191 is:


Unnamed: 0,movieId,Prediction,title,genres,cos_similarity
0,543,4.978553,So I Married an Axe Murderer (1993),Comedy|Romance|Thriller,0.191273
1,544,4.900724,Striking Distance (1993),Action|Crime,0.288675
2,147,4.797874,"Basketball Diaries, The (1995)",Drama,0.210819
3,337,4.54567,What's Eating Gilbert Grape (1993),Drama,0.139573
4,327,4.467473,Tank Girl (1995),Action|Comedy|Sci-Fi,0.280056


In [54]:
from ipywidgets import interact
interact(get_recommendation,id_type= ["userId","movieId"], id= '575',numItems= [1,5,10,15])

interactive(children=(Dropdown(description='id_type', options=('userId', 'movieId'), value='userId'), Text(val…

<function __main__.get_recommendation>

In [55]:
interact(get_recommendation,id_type= ["userId","movieId"], id= '471',numItems= [1,5,10,15])

interactive(children=(Dropdown(description='id_type', options=('userId', 'movieId'), value='userId'), Text(val…

<function __main__.get_recommendation>

**Hàm tính toán độ tương đồng giữa hai movies**

In [56]:
from pyspark.sql.functions import *

def cos_similarity(id1,id2,type="userId"):
  """Implementation of cosine similarity
  """
  similarity = 0.0
  if type =="userId":
    # If input ids are user Id, Compute similarity between users
    user1 = movie_ratings.where((col('userId')==id1)).select(["movieId"]).distinct()
    user2 = movie_ratings.where((col('userId')==id2)).select(["movieId"]).distinct()
    N1 = user1.count()
    N2 = user2.count()
    
    
    intersection = user1.intersect(user2).count()
    if N1!=0 and N2!= 0:
      similarity = intersection/np.sqrt(N1*N2)
    pass
  elif type == "movieId":
    # If input ids are movies Id, Compute similarity between movies
    movie1 = movie_ratings.where((col('movieId')==id1)).select(["userId"]).distinct()
    movie2 = movie_ratings.where((col('movieId')==id2)).select(["userId"]).distinct()
    N1 = movie1.count()
    N2 = movie2.count()
    
    
    intersection = movie1.intersect(movie2).count()
    # print(N1,N2, intersection)
    if N1!=0 and N2!= 0:
      similarity = intersection/np.sqrt(N1*N2)
      
    pass
  else:

    pass
  return similarity
cos_similarity(138, 575, type="movieId"), cos_similarity(575, 232, type="userId")

(0.0, 0.028383445970423818)

In [57]:
cos_similarity(138, 575, type="movieId")

0.0

**Hàm đề xuất cải tiến, áp dụng hàm tính độ tương đồng và cho phép người dùng thay đổi useId và movieId để tìm phim đề xuất:**

In [58]:
def get_recommendation(id_type="userId",id =None, numItems = 5):
  """
  id_type: type of id to input:  userId, movieId
  id: movie/ user id to query, either string or integer type
  numItems: number of Items to recommend in each query
  """
  if id_type =="userId" :
    # User-Id based 
    recommendation = best_model.recommendForAllUsers(numItems)
    recommended_movies_df = recommendation.where(col("userId")==int(id)).toPandas()
    
  elif id_type =="movieId" :
    # Movie-Id based
    recommendation = best_model.recommendForAllItems(numItems)
    recommended_movies_df = recommendation.where(col("movieId")==int(id)).toPandas()  
  else:
    print("id_type should be either 'userId' or 'movieId'")
    print("id should integer")
    return None

  #make sure there are movies recommended
  if len(recommended_movies_df) >0:
    movie_recommended = recommended_movies_df.iloc[0].loc["recommendations"]
    
    schema = StructType([
          StructField('movieId', IntegerType(), False),
          StructField('Prediction', FloatType(), False)
      ])
    movies = spark.createDataFrame(movie_recommended,schema)
    if id_type=="userId":
      print("Movies recommended to User:%d"%int(id))
    else:
      print("Movies similarity movies have id %d is:"%int(id))
    movies = movies.join(movies_df,'movieId','left').toPandas()

    if id_type =="movieId":
      id_list = movies["movieId"].tolist()
      cos_similarity_ls = []
      for movieId in id_list:
        cos_similarity_ls.append(cos_similarity(movieId, id, type="movieId"))
      movies["cos_similarity"]= cos_similarity_ls
  else:
    print("No movies for "+id_type+ ": %d"%int(id))
    movies = None
  return movies
interact(get_recommendation,id_type= ["userId","movieId"], id= '471',numItems= [1,5,10,15])

interactive(children=(Dropdown(description='id_type', options=('userId', 'movieId'), value='userId'), Text(val…

<function __main__.get_recommendation>

In [59]:
recommendation = best_model.recommendForAllUsers(5)
recommended_movies_df = recommendation.where(col("userId")==575).toPandas()
print(recommended_movies_df)

   userId                                    recommendations
0     575  [(141718, 5.7151007652282715), (123, 5.4249763...
