In [1]:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import *
from pyspark.mllib.linalg import VectorUDT, DenseVector
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
from pyspark.sql.window import Window
from pyspark.ml.feature import StopWordsRemover, Tokenizer, IDF, HashingTF, Normalizer
from pyspark.ml import Pipeline
import pandas as pd
import json

In [2]:
spark = SparkSession.builder.appName("movieRecommendation").getOrCreate()

22/01/18 09:23:40 WARN Utils: Your hostname, HK-Nitro-AN515-51 resolves to a loopback address: 127.0.1.1; using 192.168.0.106 instead (on interface wlp2s0)
22/01/18 09:23:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/18 09:23:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
tmdbMoviesSchema = StructType([
    StructField("budget", LongType(), True),
    StructField("genres", StringType(), True),
    StructField("homepage", StringType(), True),
    StructField("id", StringType(), False),
    StructField("keywords", StringType(), True),
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", FloatType(), True),
    StructField("production_companies", StringType(), True),
    StructField("production_countries", StringType(), True),
    StructField("release_date", DateType(), True),
    StructField("revenue", LongType(), True),
    StructField("runtime", IntegerType(), True),
    StructField("spoken_languages", StringType(), True),
    StructField("status", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("title", StringType(), True),
    StructField("vote_average", FloatType(), True),
    StructField("vote_count", IntegerType(), True)
])

In [12]:
tmdbMovies = spark.read.csv(
    path="../../movies-recommendations-notebook-data/input/tmdb 5000 movie/tmdb_5000_movies.csv", 
    header=True, sep=",", quote="\"", escape="\"", schema=tmdbMoviesSchema
)

tmdbMovies.printSchema()

root
 |-- budget: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)



### Điều chỉnh cột genres và keywords

In [13]:
tmdbMovies = tmdbMovies\
.withColumn("genres", f.from_json("genres", ArrayType(StructType([
  StructField("id", IntegerType(), False),
  StructField("name", StringType(), False)
]))))\
.withColumn("keywords", f.from_json("keywords" ,ArrayType(StructType([
  StructField("id", IntegerType(), False),
  StructField("name", StringType(), False)
]))))\
.drop("homepage","production_companies","production_countries","revenue","spoken_languages","status","budget")

tmdbMovies.printSchema()

root
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)



### Điều chỉnh giá trị thành rỗng cho hàng có giá trị null ở cột "overview", "tagline"

In [14]:
tmdbMovies = tmdbMovies.na.fill(value="",subset="overview")
tmdbMovies = tmdbMovies.na.fill(value="",subset="tagline")

tmdbMovies.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in tmdbMovies.select("overview","tagline").columns]).show()

+--------+-------+
|overview|tagline|
+--------+-------+
|       0|      0|
+--------+-------+





### Tạo ra các bảng Genre và Movie-Genre

In [15]:
movies_genres = tmdbMovies.select(f.col("id").alias("movie_id"), f.explode("genres.id").alias("genre_id"))
genres = tmdbMovies.withColumn("genres", f.explode("genres"))\
.select(f.col("genres.id").alias("id"), f.col("genres.name").alias("name"))\
.distinct()

print("Genres")
genres.show()

print("Movie-Genre")
movies_genres.show()

Genres
+-----+---------------+
|   id|           name|
+-----+---------------+
|  878|Science Fiction|
|   28|         Action|
|   35|         Comedy|
| 9648|        Mystery|
|10769|        Foreign|
|   36|        History|
|   27|         Horror|
|10751|         Family|
|   16|      Animation|
|   18|          Drama|
|10749|        Romance|
|   14|        Fantasy|
|10770|       TV Movie|
|   37|        Western|
|10752|            War|
|   53|       Thriller|
|   99|    Documentary|
|   12|      Adventure|
|   80|          Crime|
|10402|          Music|
+-----+---------------+

Movie-Genre
+--------+--------+
|movie_id|genre_id|
+--------+--------+
|   19995|      28|
|   19995|      12|
|   19995|      14|
|   19995|     878|
|     285|      12|
|     285|      14|
|     285|      28|
|  206647|      28|
|  206647|      12|
|  206647|      80|
|   49026|      28|
|   49026|      80|
|   49026|      18|
|   49026|      53|
|   49529|      28|
|   49529|      12|
|   49529|     878|
|   

### Lưu bảng genre và movie-genre vào thư mục lưu trữ

In [17]:
genres.toPandas()\
.to_csv("./data/genres.csv", index = False)

movies_genres.toPandas()\
.to_csv("./data/movies_genres.csv", index = True, index_label="id")

### Tương tự cho keyword và movie-keyword

In [18]:
movies_keywords = tmdbMovies.select(f.col("id").alias("movie_id"), f.explode("keywords.id").alias("keyword_id"))

keywords = tmdbMovies.withColumn("keywords", f.explode("keywords"))\
.select(f.col("keywords.id").alias("id"), f.col("keywords.name").alias("key"))\
.distinct()
maxLengthKeyword = keywords.agg(f.max(f.length(f.col("key"))).alias("max length")).collect()[0][0]

print("Movie-Keyword")
movies_keywords.show()

print(f"Keyword (Max length {maxLengthKeyword})")
keywords.show()

Movie-Keyword
+--------+----------+
|movie_id|keyword_id|
+--------+----------+
|   19995|      1463|
|   19995|      2964|
|   19995|      3386|
|   19995|      3388|
|   19995|      3679|
|   19995|      3801|
|   19995|      9685|
|   19995|      9840|
|   19995|      9882|
|   19995|      9951|
|   19995|     10148|
|   19995|     10158|
|   19995|     10987|
|   19995|     11399|
|   19995|     13065|
|   19995|     14643|
|   19995|     14720|
|   19995|    165431|
|   19995|    193554|
|   19995|    206690|
+--------+----------+
only showing top 20 rows

Keyword (Max length 48)
+------+--------------------+
|    id|                 key|
+------+--------------------+
|  1585|               snake|
| 13084|                king|
|163014|      injured animal|
| 15167|    police detective|
|161868|child's point of ...|
|  3298|       hallucination|
|  1334|        wedding vows|
|159548|            clouseau|
|  2564|          wheelchair|
|  2406|              picnic|
|  3487|          

In [19]:
keywords.toPandas()\
.to_csv("./data/keywords.csv", index = False)

movies_keywords.toPandas()\
.to_csv("./data/movies_keywords.csv", index = True, index_label="id")

## Demographic Filtering 

In [20]:
tmdbMovies.select("id","title","vote_count","vote_average").show(truncate=False)

+------+-------------------------------------------+----------+------------+
|id    |title                                      |vote_count|vote_average|
+------+-------------------------------------------+----------+------------+
|19995 |Avatar                                     |11800     |7.2         |
|285   |Pirates of the Caribbean: At World's End   |4500      |6.9         |
|206647|Spectre                                    |4466      |6.3         |
|49026 |The Dark Knight Rises                      |9106      |7.6         |
|49529 |John Carter                                |2124      |6.1         |
|559   |Spider-Man 3                               |3576      |5.9         |
|38757 |Tangled                                    |3330      |7.4         |
|99861 |Avengers: Age of Ultron                    |6767      |7.3         |
|767   |Harry Potter and the Half-Blood Prince     |5293      |7.4         |
|209112|Batman v Superman: Dawn of Justice         |7004      |5.7         |

#### Vote trung bình

In [21]:
C = tmdbMovies.select(f.avg("vote_average")).collect()[0][0]
C

6.092171562405494

#### Tính quantile của cột vote_count với giá trị 0.6

In [23]:
m = tmdbMovies.approxQuantile("vote_count",[0.6], 0)[0]
m

370.0

#### Hàm dùng để tính toán giá trị rating

In [24]:
@f.udf
def weighted_rating(v, R, m = m, C = C):
    return (v/(v+m) * R) + (m/(m+v) * C)

### Thực hiện tính toán cho từng movie và hiển thị kết quả

In [25]:
movies = tmdbMovies\
.withColumn("rating", weighted_rating("vote_count", "vote_average").cast("float"))\
.orderBy("rating", ascending=0)

movies.select("id","title","rating").show()

+------+--------------------+---------+
|    id|               title|   rating|
+------+--------------------+---------+
|   278|The Shawshank Red...| 8.396106|
|   238|       The Godfather|8.2636595|
|   550|          Fight Club| 8.216498|
|   680|        Pulp Fiction|8.2071495|
|   155|     The Dark Knight| 8.136963|
|   424|    Schindler's List| 8.126156|
|244786|            Whiplash| 8.123336|
|    13|        Forrest Gump| 8.106003|
|   129|       Spirited Away| 8.105963|
|   240|The Godfather: Pa...| 8.079694|
|  1891|The Empire Strike...| 8.075196|
| 27205|           Inception| 8.047395|
|157336|        Interstellar| 8.033889|
|   497|      The Green Mile| 8.023473|
|   122|The Lord of the R...| 8.011917|
|    11|           Star Wars| 7.993781|
|   807|               Se7en| 7.978909|
|   769|          GoodFellas| 7.977045|
|    73|  American History X|7.9696703|
|   510|One Flew Over the...|7.9628773|
+------+--------------------+---------+
only showing top 20 rows



[Stage 30:>                                                         (0 + 2) / 2]                                                                                

### Thêm cột rating cho tmdbMovies DF

In [28]:
tmdbMovies = tmdbMovies.join(movies.select("id","rating"), on="id")

### Lưu bảng movies vào thư mục

In [26]:
movies = movies\
.withColumn("rating", f.round(f.col("rating"),2))\
.select("id","title", "original_language","overview","popularity","release_date","runtime","tagline","vote_count","rating")

In [27]:
movies.toPandas().to_csv("./data/movies.csv", index = False)

In [32]:
tokenizer = Tokenizer(inputCol="text", outputCol="words1").transform(
    tmdbMovies.select("id", 
        f.concat_ws(" ", f.col("overview"), f.concat_ws(" ", f.col("keywords.name"))).alias("text"))
)

tokenizer.select("text","words1").show(truncate=50)

+--------------------------------------------------+--------------------------------------------------+
|                                              text|                                            words1|
+--------------------------------------------------+--------------------------------------------------+
|In the 22nd century, a paraplegic Marine is dis...|[in, the, 22nd, century,, a, paraplegic, marine...|
|Captain Barbossa, long believed to be dead, has...|[captain, barbossa,, long, believed, to, be, de...|
|A cryptic message from Bond’s past sends him on...|[a, cryptic, message, from, bond’s, past, sends...|
|Following the death of District Attorney Harvey...|[following, the, death, of, district, attorney,...|
|John Carter is a war-weary, former military cap...|[john, carter, is, a, war-weary,, former, milit...|
|The seemingly invincible Spider-Man goes up aga...|[the, seemingly, invincible, spider-man, goes, ...|
|When the kingdom's most wanted-and most charmin...|[when, the, 

In [34]:
stopwords_remover = StopWordsRemover(
    inputCol="words1", 
    outputCol="words2", 
    stopWords=StopWordsRemover.loadDefaultStopWords("english")
).transform(tokenizer)

stopwords_remover.select("words2").show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                              words2|
+----------------------------------------------------------------------------------------------------+
|[22nd, century,, paraplegic, marine, dispatched, moon, pandora, unique, mission,, becomes, torn, ...|
|[captain, barbossa,, long, believed, dead,, come, back, life, headed, edge, earth, turner, elizab...|
|[cryptic, message, bond’s, past, sends, trail, uncover, sinister, organization., m, battles, poli...|
|[following, death, district, attorney, harvey, dent,, batman, assumes, responsibility, dent's, cr...|
|[john, carter, war-weary,, former, military, captain, inexplicably, transported, mysterious, exot...|
|[seemingly, invincible, spider-man, goes, all-new, crop, villain, –, including, shape-shifting, s...|
|[kingdom's, wanted-and, charming-bandit, flynn, rider, hides, mysterious

In [35]:
tf = HashingTF(inputCol='words2', outputCol='vectorizer').transform(stopwords_remover)
tf.select("vectorizer").show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                          vectorizer|
+----------------------------------------------------------------------------------------------------+
|(262144,[8005,19637,19684,23090,25337,45252,53989,78896,87325,89896,97376,116443,117217,140871,14...|
|(262144,[6769,12336,29562,29955,41095,41198,45006,60825,62084,62606,63704,70152,74853,85281,10302...|
|(262144,[6099,15377,15675,23783,23998,32890,40872,43756,52774,68538,85673,99696,99749,113919,1322...|
|(262144,[3185,4738,6512,13956,16319,19684,33092,33260,33532,33803,47171,55411,63316,66053,68723,6...|
|(262144,[2613,15885,18858,20287,22124,25337,44238,45006,45252,51284,52787,57234,58639,60080,60501...|
|(262144,[6104,6696,25964,48102,48174,57146,58044,66663,68217,73531,74520,78955,81916,85939,86917,...|
|(262144,[1232,2934,3329,4277,9129,11862,15668,17576,20606,29423,32913,33

In [37]:
idf = IDF(inputCol="vectorizer", outputCol="features").fit(tf)
tfidf = idf.transform(tf).select("id","features")
tfidf.select("features").show(truncate=100)



+----------------------------------------------------------------------------------------------------+
|                                                                                            features|
+----------------------------------------------------------------------------------------------------+
|(262144,[8005,19637,19684,23090,25337,45252,53989,78896,87325,89896,97376,116443,117217,140871,14...|
|(262144,[6769,12336,29562,29955,41095,41198,45006,60825,62084,62606,63704,70152,74853,85281,10302...|
|(262144,[6099,15377,15675,23783,23998,32890,40872,43756,52774,68538,85673,99696,99749,113919,1322...|
|(262144,[3185,4738,6512,13956,16319,19684,33092,33260,33532,33803,47171,55411,63316,66053,68723,6...|
|(262144,[2613,15885,18858,20287,22124,25337,44238,45006,45252,51284,52787,57234,58639,60080,60501...|
|(262144,[6104,6696,25964,48102,48174,57146,58044,66663,68217,73531,74520,78955,81916,85939,86917,...|
|(262144,[1232,2934,3329,4277,9129,11862,15668,17576,20606,29423,32913,33

22/01/18 10:24:57 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [38]:
normalizer = Normalizer(inputCol = "features", outputCol = "norm")
data = normalizer.transform(tfidf)

In [39]:
data.show()

+------+--------------------+--------------------+
|    id|            features|                norm|
+------+--------------------+--------------------+
| 19995|(262144,[8005,196...|(262144,[8005,196...|
|   285|(262144,[6769,123...|(262144,[6769,123...|
|206647|(262144,[6099,153...|(262144,[6099,153...|
| 49026|(262144,[3185,473...|(262144,[3185,473...|
| 49529|(262144,[2613,158...|(262144,[2613,158...|
|   559|(262144,[6104,669...|(262144,[6104,669...|
| 38757|(262144,[1232,293...|(262144,[1232,293...|
| 99861|(262144,[8402,107...|(262144,[8402,107...|
|   767|(262144,[2820,332...|(262144,[2820,332...|
|209112|(262144,[1753,230...|(262144,[1753,230...|
|  1452|(262144,[8538,449...|(262144,[8538,449...|
| 10764|(262144,[7484,132...|(262144,[7484,132...|
|    58|(262144,[6978,123...|(262144,[6978,123...|
| 57201|(262144,[1232,712...|(262144,[1232,712...|
| 49521|(262144,[21570,33...|(262144,[21570,33...|
|  2454|(262144,[5935,594...|(262144,[5935,594...|
| 24428|(262144,[8402,139...|(2

22/01/18 10:26:04 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [40]:
@f.udf
def sim_cos(v1,v2):
    try:
        p = 2
        return float(v1.dot(v2))/float(v1.norm(p)*v2.norm(p))
    except:
        return 0

In [41]:
df1 = df2 = data.select("id", "norm")
df1 = df1.withColumnRenamed("id","from_movie_id").withColumnRenamed("norm","norm1")
df2 = df2.withColumnRenamed("id","to_movie_id").withColumnRenamed("norm","norm2")

sim_cos_values  = df1.join(df2, on=f.col("from_movie_id") != f.col("to_movie_id"))\
.withColumn("sim_cos", sim_cos(f.col("norm1"), f.col("norm2")))\
.withColumn("sim_cos", f.col("sim_cos").cast("float"))\
.select("from_movie_id","to_movie_id","sim_cos")

In [42]:
sim_cos_values.printSchema()

root
 |-- from_movie_id: string (nullable = true)
 |-- to_movie_id: string (nullable = true)
 |-- sim_cos: float (nullable = true)



In [31]:
window = Window.partitionBy(sim_cos_values["from_movie_id"]).orderBy(sim_cos_values["sim_cos"].desc())

movieContentSimilar = sim_cos_values.withColumn("rank", f.rank().over(window))\
.filter(f.col("rank") <= 8)\
.select("from_movie_id","to_movie_id","sim_cos")

In [32]:
movieContentSimilar.toPandas()\
.to_csv("/home/hoangkhang/Projects/movies-recommendation/backend/data/similar_movies.csv", index = True, index_label="id")

22/01/15 22:49:28 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/01/15 22:49:29 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/01/15 23:20:58 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/01/15 23:21:11 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
                                                                                

In [34]:
ratingDF = spark.read.csv(path="./input/movies-data/ratings.csv", header=True)

In [35]:
ratingDF.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
|     1|   1968|   4.0|1425942148|
|     1|   2762|   4.5|1425941300|
|     1|   2918|   5.0|1425941593|
|     1|   2959|   4.0|1425941601|
|     1|   4226|   4.0|1425942228|
|     1|   4878|   5.0|1425941434|
|     1|   5577|   5.0|1425941397|
|     1|  33794|   4.0|1425942005|
|     1|  54503|   3.5|1425941313|
|     1|  58559|   4.0|1425942007|
|     1|  59315|   5.0|1425941502|
|     1|  68358|   5.0|1425941464|
|     1|  69844|   5.0|1425942139|
|     1|  73017|   5.0|1425942699|
|     1|  81834|   5.0|1425942133|
+------+-------+------+----------+
only showing top 20 rows



In [36]:
ratingDF = tmdbMovies.select("id").join(ratingDF, on=f.col("id") == f.col("movieId"))
ratingDF = ratingDF.select("userId","movieId","rating")
ratingDF.toPandas().to_csv("./input/tmdb 5000 movie/ratings.csv", index=False)

                                                                                

In [37]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [38]:
schema = StructType([
    StructField("userId", IntegerType(), nullable=False),
    StructField("movieId", IntegerType(), nullable=False),
    StructField("rating", FloatType(), nullable=False)
])
movie_ratings = spark.read.csv(path="./input/tmdb 5000 movie/ratings.csv", header=True, schema=schema)

In [39]:
movie_ratings.printSchema()
movie_ratings.show()
movie_ratings.count()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   1246|   5.0|
|     1|   2959|   4.0|
|     2|      5|   3.0|
|     2|     25|   3.0|
|     2|     58|   3.0|
|     2|     79|   4.0|
|     2|    141|   3.0|
|     2|    377|   4.0|
|     2|    605|   4.0|
|     2|    628|   4.0|
|     2|    762|   3.0|
|     2|    786|   1.0|
|     2|    788|   1.0|
|     3|    480|   3.0|
|     3|    500|   2.0|
|     3|   4474|   3.0|
|     4|    223|   4.0|
|     4|    415|   4.0|
|     4|   1422|   4.0|
|     4|   1597|   3.0|
+------+-------+------+
only showing top 20 rows



4655281

In [40]:
def get_mat_sparsity(ratings):
    count_nonzero = ratings.select("rating").count()
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()
    sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100
    print(f"The ratings dataframe is {round(sparsity, 2)}% sparse.")
    
get_mat_sparsity(ratings=movie_ratings)



The ratings dataframe is 98.61% sparse.


                                                                                

In [41]:
seed = 2022
(training, testing) = movie_ratings.randomSplit([0.8,0.2], seed=2022)

In [43]:
als = ALS(userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating", 
          coldStartStrategy="drop", 
          nonnegative=True, 
          maxIter=5,
          seed=seed,
          regParam=0.1,
          implicitPrefs= False)

reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

ranks = [3, 5, 8]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
    als.setRank(rank)
    model = als.fit(training)
    # Run the model to create a prediction. Predict against the validation_df.
    predict_df = model.transform(testing)

    # Remove NaN values from prediction (due to SPARK-14489)
    predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

    # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
    error = reg_eval.evaluate(predict_df)
    errors[err] = error
    models[err] = model
    print(f'For rank {rank} the RMSE is {error}')
    if error < min_error:
        min_error = error
        best_rank = err
    err += 1

als.setRank(ranks[best_rank])
print(f'The best model was trained with rank {ranks[best_rank]}')
my_model = models[best_rank]

                                                                                

For rank 3 the RMSE is 0.8916174158450135


                                                                                

For rank 5 the RMSE is 0.8864718333088998


                                                                                

For rank 8 the RMSE is 0.8953218422811495
The best model was trained with rank 5


[Stage 1216:>                                                       (0 + 4) / 4]                                                                                

In [47]:
print(f"MaxIter: {my_model._java_obj.parent().getMaxIter()}")
print(f"RegParm: {my_model._java_obj.parent().getRegParam()}")

MaxIter: 5
RegParm: 0.1


In [54]:
all_user_recommendations = my_model.recommendForAllUsers(3)
all_user_recommendations.show(truncate=100)



+------+-------------------------------------------------------------+
|userId|                                              recommendations|
+------+-------------------------------------------------------------+
|     1|   [{130150, 6.00465}, {86829, 5.304157}, {60309, 5.0171995}]|
|     6|  [{130150, 3.631001}, {86829, 3.5607057}, {60309, 3.048162}]|
|    12|[{110415, 4.9515705}, {88005, 4.8444214}, {158852, 4.842569}]|
|    13|[{62206, 6.0224953}, {110415, 5.769824}, {153158, 5.2702246}]|
|    16|   [{130150, 6.0109015}, {2959, 5.0322866}, {5971, 4.957446}]|
|    22|    [{62206, 6.548557}, {86829, 5.5066853}, {1724, 4.829375}]|
|    27| [{153158, 5.094948}, {96724, 5.0937023}, {88005, 4.8653736}]|
|    28|   [{2959, 3.9354534}, {130150, 3.9255168}, {296, 3.7827373}]|
|    31|   [{62206, 6.593297}, {110415, 6.12729}, {153158, 5.920595}]|
|    34|     [{130150, 5.28414}, {2959, 4.8616714}, {296, 4.6789007}]|
|    44|  [{110415, 6.3511386}, {153158, 5.8823776}, {787, 5.725555}]|
|    4

                                                                                

In [55]:
all_user_recommendations = all_user_recommendations.withColumn("rec_exp", f.explode("recommendations"))\
.select("userId","rec_exp.movieId","rec_exp.rating")
all_user_recommendations.show()



+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1| 130150|  6.00465|
|     1|  86829| 5.304157|
|     1|  60309|5.0171995|
|     6| 130150| 3.631001|
|     6|  86829|3.5607057|
|     6|  60309| 3.048162|
|    12| 110415|4.9515705|
|    12|  88005|4.8444214|
|    12| 158852| 4.842569|
|    13|  62206|6.0224953|
|    13| 110415| 5.769824|
|    13| 153158|5.2702246|
|    16| 130150|6.0109015|
|    16|   2959|5.0322866|
|    16|   5971| 4.957446|
|    22|  62206| 6.548557|
|    22|  86829|5.5066853|
|    22|   1724| 4.829375|
|    27| 153158| 5.094948|
|    27|  96724|5.0937023|
+------+-------+---------+
only showing top 20 rows



                                                                                

In [59]:
user_1_recommendations = [row.movieId for row in all_user_recommendations.filter(f"userId == {1}").collect()] 
print(user_1_recommendations)



[130150, 86829, 60309]




In [51]:
all_user_recommendations = all_user_recommendations\
.join(tmdbMovies.withColumnRenamed("id","movieId"), on="movieId")\
.select("userId", "movieId", "rating", "title", f.concat_ws("|", "genres.name").alias("genres"))

all_user_recommendations.show(truncate=100)

                                                                                

+------+-------+---------+---------------------+---------------------------------------+
|userId|movieId|   rating|                title|                                 genres|
+------+-------+---------+---------------------+---------------------------------------+
|     1| 130150|  6.00465|            Labor Day|                                  Drama|
|     1|  86829| 5.304157|  Inside Llewyn Davis|                            Drama|Music|
|     1|  60309|5.0171995|      The Conspirator|                    Crime|Drama|History|
|     1|  96724| 4.907068|        Anna Karenina|                          Drama|Romance|
|     1|     73| 4.869559|   American History X|                                  Drama|
|     1|    593|4.8687935|              Solaris|Drama|Science Fiction|Adventure|Mystery|
|     1|    116|4.8533626|          Match Point|           Drama|Thriller|Crime|Romance|
|     1|    590| 4.850157|            The Hours|                                  Drama|
|     1|     62| 4.84

In [53]:
my_model.write().overwrite().save("/home/hoangkhang/Projects/movies-recommendation/backend/mlmodel")

                                                                                

In [105]:
top_movies = tmdbMovies.filter((f.col("rating") > 7)).withColumnRenamed("id", "movieId").select("movieId")
top_movie_ratings = top_movies.join(movie_ratings, on="movieId")

top_movies\
.toPandas().to_csv("/home/hoangkhang/Projects/movies-recommendation/backend/data/top_movie/movie.csv", index = False)

top_movie_ratings\
.toPandas().to_csv("/home/hoangkhang/Projects/movies-recommendation/backend/data/top_movie/ratings.csv", index = False)