# Recommending Movies

In this track you will be introduced to the MovieLens dataset. You will walk through how to assess it's use for ALS, build out a full cross-validated ALS model on it, and learn how to evaluate it's performance. This will be the foundation for all subsequent ALS models you build using Pyspark.

## Preparing the environment

### Importing libraries

In [1]:
import time

from pprint import pprint
from environment import SEED
from pyspark.sql.types import (StructType, StructField,
                               DoubleType, IntegerType, StringType)
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

### Connect to Spark

In [2]:
spark = (SparkSession.builder
                     .master('local[*]') \
                     .appName('spark_application') \
                     .config("spark.sql.repl.eagerEval.enabled", True)  # eval DataFrame in notebooks
                     .config("spark.driver.memory", "10g")
                     .config("spark.driver.maxResultSize", "10G")
                     .getOrCreate())

sc = spark.sparkContext
sc.setSystemProperty('spark.executor.memory', '10G')
sc.setCheckpointDir("ml-checkpoint/")

print(f'Spark version: {spark.version}')

Spark version: 3.5.1


In [3]:
# Review current configuration
print(f"Current driver memory: {spark.conf.get('spark.driver.memory')}")
print(f"Current number of partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")

Current driver memory: 10g
Current number of partitions: 200


## Loading data

### Links

In [4]:
# Reading the file
schema_links = StructType([
    StructField("movieId", IntegerType()),
    StructField("imdbId", StringType()),
    StructField("tmdbId", IntegerType())
])
links_data = spark.read.csv('data-sources/movies/links.csv', header=True, schema=schema_links)

# Reviewing the result
links_data.createOrReplaceTempView("links")
print(f'Dataframe shape: ({links_data.count()}, {len(links_data.columns)})')
links_data.printSchema()
links_data.limit(2)

Dataframe shape: (87585, 3)
root
 |-- movieId: integer (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: integer (nullable = true)



movieId,imdbId,tmdbId
1,114709,862
2,113497,8844


### Movies

In [5]:
# Reading the file
schema_movies = StructType([
    StructField("movieId", IntegerType()),
    StructField("title", StringType()),
    StructField("genres", StringType())
])
movies_data = spark.read.csv('data-sources/movies/movies.csv', header=True, schema=schema_movies)

# Reviewing the result
movies_data.createOrReplaceTempView("movies")
print(f'Dataframe shape: ({movies_data.count()}, {len(movies_data.columns)})')
movies_data.printSchema()
movies_data.limit(2)

Dataframe shape: (87585, 3)
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



movieId,title,genres
1,Toy Story (1995),Adventure|Animati...
2,Jumanji (1995),Adventure|Childre...


### Ratings

In [21]:
# Reading the file
schema_ratings = StructType([
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", IntegerType())
])
ratings_data = spark.read.csv('data-sources/movies/ratings.csv', header=True, schema=schema_ratings)

# # Cleaning and mutating some columns
# ratings_data = ratings_data.withColumn('timestamp', F.to_timestamp(F.from_unixtime('timestamp')))
# date_range = ratings_data.select('timestamp').agg(F.min('timestamp'), F.max('timestamp')).collect()[0]
# print(f"Date range: {date_range[0]} - {date_range[1]}")

# # Taking just last month
# ratings_data = ratings_data.where('timestamp >= "2023-01-01"')

# Reviewing the result
ratings_data.createOrReplaceTempView("ratings")
print(f'Dataframe shape: ({ratings_data.count()}, {len(ratings_data.columns)})')
ratings_data.printSchema()
ratings_data.limit(2)

Dataframe shape: (32000204, 4)
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



userId,movieId,rating,timestamp
1,17,4.0,944249077
1,25,1.0,944250228


In [22]:
ratings_data.write.option("maxRecordsPerFile", 1000000).csv('spark-warehouse/ratings-new.csv', header=True, mode="overwrite")

### Tags

In [23]:
# Reading the file
schema_tags = StructType([
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", IntegerType())
])
tags_data = spark.read.csv('data-sources/movies/tags.csv', header=True, schema=schema_tags)

# # Cleaning and mutating some columns
# tags_data = tags_data.withColumn('timestamp', F.to_timestamp(F.from_unixtime('timestamp')))

# Reviewing the result
tags_data.createOrReplaceTempView("tags")
print(f'Dataframe shape: ({tags_data.count()}, {len(tags_data.columns)})')
tags_data.printSchema()
tags_data.limit(2)

Dataframe shape: (2000072, 4)
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)



userId,movieId,tag,timestamp
22,26479,Kevin Kline,1583038886
22,79592,misogyny,1581476297


In [24]:
# tags_data = tags_data.sample(False, .5, 42)
tags_data.write.option("maxRecordsPerFile", 1000000).csv('spark-warehouse/tags-new.csv', header=True, mode="overwrite")

### Tables catalogue

In [8]:
spark.catalog.listTables()

[Table(name='links', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='movies', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='tags', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

# Content

## Introduction to the MovieLens dataset

### MovieLens sparsity

To measure how sparse is the data.
$$ Sparcity = 1 - \frac{Number Of Ratings In Matrix}{Number Of Users \times Number Of Movies} $$

In [9]:
# Exploring the data
ratings_df = ratings_data.select('*')
ratings_df.show(3)

print(f'''
Unique users in:
ratings: {ratings_df.select('userId').distinct().count()}
   tags: {tags_data.select('userId').distinct().count()}
''')

# Finding columns with nulls
Dict_Null = {col: ratings_df.filter(ratings_df[f"`{col}`"].isNull()).count() for col in ratings_df.columns}
Dict_Null = {k: v for k, v in Dict_Null.items() if v != 0}
print('Columns with nulls:')
pprint(Dict_Null)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|    28|   7458|   3.5|2023-09-22 12:42:52|
|    28| 285593|   3.0|2023-09-22 21:20:05|
|    28| 291485|   4.0|2023-10-03 21:46:29|
+------+-------+------+-------------------+
only showing top 3 rows


Unique users in:
ratings: 2583
   tags: 15848

Columns with nulls:
{}


In [10]:
# Calculating MovieLens sparsity
number_of_ratings = ratings_df.count()
number_of_users = ratings_df.select('userId').distinct().count()
number_of_movies = ratings_df.select('movieId').distinct().count()

sparsity = 1 - (number_of_ratings / (number_of_users * number_of_movies))
print('MovieLens Sparsity:', sparsity)

MovieLens Sparsity: 0.9975261125560048


In [11]:
# Number of ratins provided by users (top 10)
ratings_df.groupby('userId').count().sort(F.col('count').desc()).show(10)

# Minumum, maximum, average ratings provided per user
ratings_df.groupby('userId').count().select(
    F.min('count'), F.max('count'), F.avg('count')
).show()

# Removes user with less than 20 ratings
ratings_df.groupby('userId').count().filter(F.col('count')>=20).show(5)

+------+-----+
|userId|count|
+------+-----+
|108029| 2106|
|  3135| 1908|
|127521| 1906|
|141567| 1565|
|118615| 1537|
|159298| 1399|
|  6587| 1333|
|193877| 1139|
|169558| 1103|
|183401| 1062|
+------+-----+
only showing top 10 rows

+----------+----------+------------------+
|min(count)|max(count)|        avg(count)|
+----------+----------+------------------+
|         1|      2106|35.708091366627954|
+----------+----------+------------------+

+------+-----+
|userId|count|
+------+-----+
|  6654|   48|
|  9914|  340|
|   784|  355|
|  8986|   41|
|  7122|   51|
+------+-----+
only showing top 5 rows



In [12]:
# A silly example to show multiple filters
ratings_df.filter((F.col('userId') < 100) & (F.col('userId') > 50)).show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|    79|  61132|   3.0|2023-09-12 11:05:17|
|    79| 111759|   5.0|2023-09-12 11:04:52|
|    79| 281864|   3.5|2023-09-12 11:05:03|
+------+-------+------+-------------------+



### Ex. 1 - MovieLens Summary Statistics

Let's take the `groupBy()` method a bit further.

Once you've applied the `.groupBy()` method to a dataframe, you can subsequently run aggregate functions such as `.sum()`, `.avg()`, `.min()` and have the results grouped. 

**Instructions:**

1. Group the data by `movieId` and use the `.count()` method to calculate how many ratings each movie has received. From there, call the `.select()` method to select the following metrics:
    - `min("count")` to get the smallest number of ratings that any movie in the dataset. 
    - `avg("count")` to get the average number of ratings per movie
2. Do the same thing, but this time group by `userId` to get the min and avg number of ratings.

In [13]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings_df.groupBy("movieId").count().select(F.min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings_df.groupBy("movieId").count().select(F.avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
ratings_df.groupBy("userId").count().select(F.min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings_df.groupBy("userId").count().select(F.avg("count")).show()

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per movie: 
+-----------------+
|       avg(count)|
+-----------------+
|6.390051267839823|
+-----------------+

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|35.708091366627954|
+------------------+



## ALS model buildout on MovieLens Data

### ParamGrid and CrossValidation

In [14]:
# Load the data
ratings_df = ratings_data.select('*').repartition(5)

# Split into train and test set.
ratings_df_train, ratings_df_test = ratings_df.randomSplit([0.8, 0.2], seed=SEED)
print(f"Training set: {ratings_df_train.count()}, Testing set: {ratings_df_test.count()}")

# Build ALS model
als_model = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
                nonnegative=True, coldStartStrategy="drop", implicitPrefs=False)

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="rating")

Training set: 73917, Testing set: 18317


In [15]:
# Creates a ParamGridBuilder
param_grid = (ParamGridBuilder().addGrid(als_model.rank, [10, 25, 40])
                                .addGrid(als_model.maxIter, [10, 50])
                                .addGrid(als_model.regParam, [.05, .1])
                                .build())
print('Number of models to be tested: ', len(param_grid))

# Creates cross validator
cv = CrossValidator(estimator = als_model,
                    estimatorParamMaps = param_grid,
                    evaluator = evaluator,
                    numFolds = 5)

Number of models to be tested:  12


In [16]:
# Run the cv on the training data
start = time.time()
cv = cv.fit(ratings_df_train)

# Extract best combination of values from cross validation
best_model = cv.bestModel
print(f'''
Best Model: 
{best_model}

What's the cross-validated RMSE for each model? 
{cv.avgMetrics}

Consumed time: {(time.time() - start)/60} min
''')


Best Model: 
ALSModel: uid=ALS_9d9cea6c787c, rank=40

What's the cross-validated RMSE for each model? 
[0.9919127949753592, 0.9336891571518917, 0.9864765605851892, 0.931628367682816, 1.0013689891006963, 0.9304171413639495, 0.9752693941316191, 0.9283925967379677, 1.001741585181454, 0.9306970716279516, 0.9671780175431101, 0.9266830257173287]

Consumed time: 8.416968246301016 min



In [17]:
# Generate test set predictions and evaluate using RMSE
df_predictions = best_model.transform(ratings_df_test) 
rmse = evaluator.evaluate(df_predictions)

# Print evaluation metrics and model parameters
print(f'''
Best Model:
    RMSE                        : {rmse}
    Rank                        : {best_model.rank}
    Rank (Another way to get it): {best_model._java_obj.parent().getRank()}
    MaxIter                     : {best_model._java_obj.parent().getMaxIter()}
    RegParam                    : {best_model._java_obj.parent().getRegParam()}
''')


Best Model:
    RMSE                        : 0.9113285839250678
    Rank                        : 40
    Rank (Another way to get it): 40
    MaxIter                     : 50
    RegParam                    : 0.1



## Model Performance Evaluation

### Recommend for all users

In [18]:
# Generate top n=10 recommendations for all users
top_recommendations = best_model.recommendForAllUsers(50)
top_recommendations.show(10, truncate=True)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   458|[{215171, 4.81490...|
|  1199|[{1916, 3.505082}...|
|  4859|[{104879, 4.19958...|
|  5217|[{290789, 3.90096...|
|  6654|[{165969, 4.13924...|
|  9914|[{292241, 5.16510...|
| 10090|[{231701, 4.24253...|
| 11642|[{201863, 5.51226...|
| 14556|[{170705, 4.51296...|
| 15968|[{292241, 4.04884...|
+------+--------------------+
only showing top 10 rows



In [19]:
# Reviewing in detail the content of recommendations column
top_recommendations.limit(1).collect()

[Row(userId=458, recommendations=[Row(movieId=215171, rating=4.814901828765869), Row(movieId=198185, rating=4.7287163734436035), Row(movieId=139195, rating=4.715865135192871), Row(movieId=140713, rating=4.601424694061279), Row(movieId=110281, rating=4.60083532333374), Row(movieId=85312, rating=4.592772006988525), Row(movieId=268438, rating=4.584874629974365), Row(movieId=103057, rating=4.576087951660156), Row(movieId=123, rating=4.5615973472595215), Row(movieId=3224, rating=4.543295383453369), Row(movieId=292241, rating=4.519752502441406), Row(movieId=7505, rating=4.504077911376953), Row(movieId=223876, rating=4.473747253417969), Row(movieId=214955, rating=4.468325614929199), Row(movieId=170705, rating=4.465029716491699), Row(movieId=27523, rating=4.450263977050781), Row(movieId=159817, rating=4.4488043785095215), Row(movieId=50594, rating=4.444928169250488), Row(movieId=73808, rating=4.441458225250244), Row(movieId=109364, rating=4.436191558837891), Row(movieId=231701, rating=4.420706

### Cleaning up recommendation output

In [20]:
top_recommendations.createOrReplaceTempView("top_recommendations")
spark.catalog.listTables()

[Table(name='links', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='movies', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='tags', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='top_recommendations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [21]:
clean_recommendation = spark.sql("""
    SELECT userId, 
           movieIds_and_ratings.movieId AS movieId,
           movieIds_and_ratings.rating AS recommendation
    FROM top_recommendations
    LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings
""")
clean_recommendation.show(3)

+------+-------+--------------+
|userId|movieId|recommendation|
+------+-------+--------------+
|   458| 215171|      4.814902|
|   458| 198185|     4.7287164|
|   458| 139195|      4.715865|
+------+-------+--------------+
only showing top 3 rows



In [22]:
# Reviewing in detail - Explode function
temp_df = spark.sql("""
    SELECT userId, 
           explode(recommendations) AS MovieRecommendation
    FROM top_recommendations
""")
temp_df.show(3)

+------+-------------------+
|userId|MovieRecommendation|
+------+-------------------+
|   458| {215171, 4.814902}|
|   458|{198185, 4.7287164}|
|   458| {139195, 4.715865}|
+------+-------------------+
only showing top 3 rows



In [23]:
# Reviewing in detail the column `MovieRecommendation`
temp_df.limit(1).collect()

[Row(userId=458, MovieRecommendation=Row(movieId=215171, rating=4.814901828765869))]

### Adding Movie Title and Rating to discard already seen movies

In [24]:
clean_recommendation = (clean_recommendation.join(movies_data, 'movieId', 'left')
                                            .join(ratings_df.drop('timestamp'), ['userId', 'movieId'], 'left')
                                            .filter('rating IS NOT NULL')
                                            .select('userId', 'movieId', 'recommendation', 'rating',
                                                    'genres', 'title'))
clean_recommendation.show(3, truncate=False)

+------+-------+--------------+------+-----------------------------------------------+--------------------------------------------+
|userId|movieId|recommendation|rating|genres                                         |title                                       |
+------+-------+--------------+------+-----------------------------------------------+--------------------------------------------+
|26796 |7361   |4.465444      |5.0   |Drama|Romance|Sci-Fi                           |Eternal Sunshine of the Spotless Mind (2004)|
|175758|79132  |4.5281725     |5.0   |Action|Crime|Drama|Mystery|Sci-Fi|Thriller|IMAX|Inception (2010)                            |
|56264 |3503   |4.0673265     |4.5   |Drama|Mystery|Sci-Fi                           |Solaris (Solyaris) (1972)                   |
+------+-------+--------------+------+-----------------------------------------------+--------------------------------------------+
only showing top 3 rows



## Ex. 2 - Do recommendations make sense

Now that we have an understanding of how well our model performed, and have some confidence that it will provide recommendations that are relevant to users, let's actually look at recommendations made to a user and see if they make sense.

The original ratings data is provided here as original_ratings. Take a look at user 73645 and user 79563's original ratings, and compare them to what ALS recommended for them. In your opinion, are the recommendations consistent with their original preferences?

**Instructions:**

1. Use the `.filter()` on the original ratings dataframe to ensure that `col("userId")` equals `73645` to look at user `73645`'s original ratings. Call the `.sort()` method to sort the output by rating and set the `ascending` argument to `False` to see the highest ratings first.
2. Use the `.filter()` and `.show()` methods on the recommendations dataset to look at user `73645`'s ratings. Note that when ALS generates recommendations, they are provided in descending order by `userId`, so there's no need to sort the dataframe.
3. Do the same thing for user `79563`.
4. Do the recommendation genres have some consistency with each user's original ratings?

In [25]:
# Look at user 73645's ratings
print("User 73645's Ratings:")
ratings_df.filter(F.col("userId") == 73645).sort("rating", ascending = False).show()

# Look at the movies recommended to user 73645
print("User 73645's Recommendations:")
clean_recommendation.filter(F.col("userId") == 73645).show(truncate=False)

# Look at user 79563's ratings
print("User 79563's Ratings:")
ratings_df.filter(F.col("userId") == 79563).sort("rating", ascending = False).show()

# Look at the movies recommended to user 79563
print("User 79563's Recommendations:")
clean_recommendation.filter(F.col("userId") == 79563).show(truncate=False)

User 73645's Ratings:
+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
| 73645| 287699|   4.5|2023-09-13 20:13:52|
| 73645| 136445|   1.0|2023-09-13 20:29:23|
+------+-------+------+-------------------+

User 73645's Recommendations:
+------+-------+--------------+------+------+------------------+
|userId|movieId|recommendation|rating|genres|title             |
+------+-------+--------------+------+------+------------------+
|73645 |287699 |4.2920156     |4.5   |Drama |Oppenheimer (2023)|
+------+-------+--------------+------+------+------------------+

User 79563's Ratings:
+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
| 79563|  68157|   5.0|2023-10-11 13:43:42|
| 79563|  88129|   5.0|2023-10-10 09:59:10|
| 79563|  81535|   4.5|2023-10-08 13:39:13|
| 79563| 194396|   4.5|2023-09-13 12:14:44|
| 79563|  77798|   4.0|2023

## Close session

In [26]:
spark.stop()