In [90]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [74]:
spark = SparkSession.builder.appName('MovieRecommender').getOrCreate()

#### Loading Data

Since there was some difficulty getting MongoDB Pyspark to work, we wrote most of our code pulling data directly from the JSON files.

In [75]:
with open('../ratings/movies_with_ratings.json', 'r') as f:
    data = json.load(f)

for movie in data:
    if isinstance(movie.get('popularity'), int):
        movie['popularity'] = float(movie['popularity'])
    if isinstance(movie.get('vote_average'), int):
        movie['vote_average'] = float(movie['vote_average'])

schema = StructType([
    StructField('_id', StructType([StructField('$oid', StringType(), True)]), True),
    StructField('title', StringType(), True),
    StructField('release_date', StringType(), True),
    StructField('popularity', FloatType(), True),
    StructField('vote_average', FloatType(), True),
    StructField('vote_count', IntegerType(), True),
    StructField('overview', StringType(), True),
    StructField('tmdb_id', IntegerType(), True),
    StructField('ratings', ArrayType(
        StructType([
            StructField('rated_at', StringType(), True),
            StructField('rating', IntegerType(), True),
            StructField('user', StringType(), True)
        ])
    ), True)
])

data = spark.createDataFrame(data, schema=schema)
data.show(10)

25/03/07 19:24:11 WARN TaskSetManager: Stage 659 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+--------------------+------------+----------+------------+----------+--------------------+-------+--------------------+
|                 _id|               title|release_date|popularity|vote_average|vote_count|            overview|tmdb_id|             ratings|
+--------------------+--------------------+------------+----------+------------+----------+--------------------+-------+--------------------+
|{67b7d9d43806d131...|Sonic the Hedgehog 3|  2024-12-19|  3084.368|       7.783|      1555|Sonic, Knuckles, ...| 939243|[{2025-01-26T09:2...|
|{67b7d9d43806d131...|   Kraven the Hunter|  2024-12-11|   1785.56|         6.6|      1066|Kraven Kravinoff'...| 539972|[{2024-12-22T03:3...|
|{67b7d9d43806d131...|             Moana 2|  2024-11-21|  1427.587|       7.208|      1311|After receiving a...|1241982|[{2025-01-29T17:1...|
|{67b7d9d43806d131...|      Back in Action|  2025-01-15|  1312.816|       6.633|       793|Fifteen years aft...| 993710|[{2025-01-21T20:1...|
|{67b7

                                                                                

#### DataFrame Creation

- Select relevant movie data to create movies DataFrame.
- Expand ratings data to be one per row.

In [76]:
movies = data.select(
    col('tmdb_id').alias('movie_id'),
    'title',
    'overview',
    'release_date',
    'popularity',
    'vote_average',
    'vote_count'
)
movies.show(5)

25/03/07 19:24:11 WARN TaskSetManager: Stage 660 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.


+--------+--------------------+--------------------+------------+----------+------------+----------+
|movie_id|               title|            overview|release_date|popularity|vote_average|vote_count|
+--------+--------------------+--------------------+------------+----------+------------+----------+
|  939243|Sonic the Hedgehog 3|Sonic, Knuckles, ...|  2024-12-19|  3084.368|       7.783|      1555|
|  539972|   Kraven the Hunter|Kraven Kravinoff'...|  2024-12-11|   1785.56|         6.6|      1066|
| 1241982|             Moana 2|After receiving a...|  2024-11-21|  1427.587|       7.208|      1311|
|  993710|      Back in Action|Fifteen years aft...|  2025-01-15|  1312.816|       6.633|       793|
| 1249289|              Alarum|Two married spies...|  2025-01-16|  1149.463|         5.9|       124|
+--------+--------------------+--------------------+------------+----------+------------+----------+
only showing top 5 rows



In [77]:
ratings = data.select(
    col('tmdb_id').alias('movie_id'),
    explode('ratings').alias('rating_info')
).select(
    'movie_id',
    col('rating_info.user').alias('user'),
    col('rating_info.rating').alias('rating'),
    col('rating_info.rated_at').alias('rated_at')
)
ratings.show(5)

+--------+------------+------+--------------------+
|movie_id|        user|rating|            rated_at|
+--------+------------+------+--------------------+
|  939243|  abesempire|     4|2025-01-26T09:23:...|
|  939243|arcanjoalain|     9|2025-01-21T19:09:...|
|  939243|    alx_zero|     6|2025-01-03T21:10:...|
|  939243|     anizero|     5|2025-01-22T21:20:...|
|  939243|    acaranta|     7|2025-01-22T07:43:...|
+--------+------------+------+--------------------+
only showing top 5 rows



25/03/07 19:24:11 WARN TaskSetManager: Stage 661 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.


The user_ids are strings, but the model only takes numeric user_ids, so we have to assign a numeric user_id to each user.

In [78]:
# assign a unique idx to each user
user_idx = StringIndexer(inputCol='user', outputCol='user_id_num')
user_fit = user_idx.fit(ratings)
ratings = user_fit.transform(ratings)
ratings.show(5)

25/03/07 19:24:11 WARN TaskSetManager: Stage 662 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
25/03/07 19:24:12 WARN TaskSetManager: Stage 665 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.


+--------+------------+------+--------------------+-----------+
|movie_id|        user|rating|            rated_at|user_id_num|
+--------+------------+------+--------------------+-----------+
|  939243|  abesempire|     4|2025-01-26T09:23:...|       37.0|
|  939243|arcanjoalain|     9|2025-01-21T19:09:...|     1033.0|
|  939243|    alx_zero|     6|2025-01-03T21:10:...|      280.0|
|  939243|     anizero|     5|2025-01-22T21:20:...|       16.0|
|  939243|    acaranta|     7|2025-01-22T07:43:...|        7.0|
+--------+------------+------+--------------------+-----------+
only showing top 5 rows



In [79]:
# cast movie_id column to be an int/numeric value as well
ratings = ratings.withColumn(
    'movie_id', col('movie_id').cast(IntegerType())
).withColumn('rating', col('rating').cast('float'))
ratings.show(5)

+--------+------------+------+--------------------+-----------+
|movie_id|        user|rating|            rated_at|user_id_num|
+--------+------------+------+--------------------+-----------+
|  939243|  abesempire|   4.0|2025-01-26T09:23:...|       37.0|
|  939243|arcanjoalain|   9.0|2025-01-21T19:09:...|     1033.0|
|  939243|    alx_zero|   6.0|2025-01-03T21:10:...|      280.0|
|  939243|     anizero|   5.0|2025-01-22T21:20:...|       16.0|
|  939243|    acaranta|   7.0|2025-01-22T07:43:...|        7.0|
+--------+------------+------+--------------------+-----------+
only showing top 5 rows



25/03/07 19:24:13 WARN TaskSetManager: Stage 666 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.


#### Simple Recommender System

Implement a simple movie recommender system. ALS is a matrix factorization technique from pyspark that decomposes user-item interaction matrix into two matrices: one for users and one for items (the movies).

In [80]:
# split data into test and training
training, test = ratings.randomSplit([0.8, 0.2], seed=42)
training.show(5)

25/03/07 19:24:22 WARN TaskSetManager: Stage 667 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.


+--------+-------------+------+--------------------+-----------+
|movie_id|         user|rating|            rated_at|user_id_num|
+--------+-------------+------+--------------------+-----------+
|      12|  a-bristow23|   9.0|2013-11-19T20:06:...|       43.0|
|      12|     a-vargas|   6.0|2015-02-09T03:53:...|      577.0|
|      12|   a_karenina|   8.0|2014-01-13T07:52:...|      205.0|
|      12|aabdulgalimov|   7.0|2014-09-13T20:58:...|      325.0|
|      12|       aak112|   9.0|2012-10-25T23:54:...|      128.0|
+--------+-------------+------+--------------------+-----------+
only showing top 5 rows



                                                                                

In [82]:
# fit model; most of these are defaults
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol='user_id_num',
    itemCol='movie_id',
    ratingCol='rating',
    coldStartStrategy='drop',
    implicitPrefs=False
)

model = als.fit(training)

25/03/07 19:24:59 WARN TaskSetManager: Stage 668 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
25/03/07 19:25:00 WARN TaskSetManager: Stage 669 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [87]:
user_ids = ratings.select('user', 'user_id_num').distinct()
user_recs = model.recommendForAllUsers(5)

user_recs = user_recs.join(
    user_ids, 
    user_recs.user_id_num == user_recs.user_id_num, 
    'inner'
).select(
    'user',
    F.explode(col('recommendations')).alias('rec')
).select(
    'user',
    col('rec.movie_id').alias('rec_movie_id'),
    col('rec.rating').alias('pred_rating')
)

movie_id_map = ratings.select('movie_id').distinct()

user_recs = user_recs.join(
    movie_id_map,
    user_recs.rec_movie_id == movie_id_map.movie_id,
    'inner'
).select(
    'user',
    'movie_id',
    'pred_rating'
)

user_recs.show(10)

25/03/07 19:29:31 WARN Column: Constructing trivially true equals predicate, 'user_id_num#5147 = user_id_num#5147'. Perhaps you need to use aliases.
25/03/07 19:29:37 WARN TaskSetManager: Stage 856 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
25/03/07 19:29:38 WARN TaskSetManager: Stage 857 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------+--------+-----------+
|     user|movie_id|pred_rating|
+---------+--------+-----------+
|allengero| 1397314|   8.851983|
|allengero| 1404382|   8.527263|
|allengero|   68555|   8.454443|
|allengero| 1000837|   8.149622|
|allengero|     278|   8.024152|
|allengero| 1397314|  10.485161|
|allengero| 1404382|  9.9202385|
|allengero|   68555|   9.603919|
|allengero| 1000837|   9.426096|
|allengero|  545742|   9.392661|
+---------+--------+-----------+
only showing top 10 rows



In [88]:
preds = model.transform(test)

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)

rmse = evaluator.evaluate(preds)
rmse

25/03/07 19:29:53 WARN TaskSetManager: Stage 887 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

1.3902207038205663

Error for predicted rating is on average about 1.39 off.

In [89]:
final_df = preds.select(
        'user', 
        'movie_id', 
        col('rating').alias('true_rating'), 
        col('prediction').alias('pred_rating')
    ).orderBy(F.rand()).limit(100).withColumn(
    'diff', 
    F.abs(col('true_rating') - col('pred_rating'))
).join(
    movies.select('movie_id', 'title'),
    'movie_id',
    'left'
)

final_df.show(10)

25/03/07 19:31:20 WARN TaskSetManager: Stage 942 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
25/03/07 19:31:22 WARN TaskSetManager: Stage 968 contains a task of very large size (6426 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+---------+-----------+-----------+----------+--------------------+
|movie_id|     user|true_rating|pred_rating|      diff|               title|
+--------+---------+-----------+-----------+----------+--------------------+
|      13| antifriz|       10.0|   8.606488| 1.3935118|        Forrest Gump|
|      13|ar-aburas|       10.0|   9.666296|  0.333704|        Forrest Gump|
|      18|   adam22|        9.0|   8.948988|0.05101204|   The Fifth Element|
|      58|   adam22|        8.0|   8.295311|0.29531097|Pirates of the Ca...|
|      85|ar-aburas|        9.0|   8.572687|0.42731285|Raiders of the Lo...|
|     105|  agomist|        7.0|   7.805289| 0.8052888|  Back to the Future|
|     105|  agomist|        7.0|   7.805289| 0.8052888|  Back to the Future|
|     122|   adam22|       10.0|   9.530949|0.46905136|The Lord of the R...|
|     129|   adam22|        9.0|   9.261142|0.26114178|       Spirited Away|
|     170|  agomist|        4.0|  7.2854257| 3.2854257|       28 Days Later|

In [91]:
spark.stop()