### Import libraries

In [None]:
#https://grouplens.org/datasets/movielens/

In [None]:
import pandas as pd
!pip install pyspark
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4d6cc6b500e2da2ba8319690ccfdc49562588a77770029f653550e1c137aaecd
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


### Initiate spark session

In [None]:
from pyspark.sql import SparkSession
sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

# 1. Load data

In [None]:
movies = spark.read.csv("movies.csv",header=True)
ratings = spark.read.csv("ratings.csv",header=True)

In [None]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [None]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
ratings = ratings.\
    withColumn('userId', col('userId').cast('integer')).\
    withColumn('movieId', col('movieId').cast('integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp')
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



## Calculate sparsity

In [None]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  98.30% empty.


## Interpret ratings

In [None]:
# Group data by userId, count ratings
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
|   249| 1046|
|   387| 1027|
|   182|  977|
|   307|  975|
|   603|  943|
|   298|  939|
|   177|  904|
|   318|  879|
|   232|  862|
|   480|  836|
+------+-----+
only showing top 20 rows



In [None]:
# Group data by userId, count ratings
movieId_ratings = ratings.groupBy("movieId").count().orderBy('count', ascending=False)
movieId_ratings.show()

+-------+-----+
|movieId|count|
+-------+-----+
|    356|  329|
|    318|  317|
|    296|  307|
|    593|  279|
|   2571|  278|
|    260|  251|
|    480|  238|
|    110|  237|
|    589|  224|
|    527|  220|
|   2959|  218|
|      1|  215|
|   1196|  211|
|     50|  204|
|   2858|  204|
|     47|  203|
|    780|  202|
|    150|  201|
|   1198|  200|
|   4993|  198|
+-------+-----+
only showing top 20 rows



## Build Out An ALS Model

In [None]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

## Tell Spark how to tune your ALS model

In [None]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \


# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


## Build your cross validation pipeline

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_2bea0fe400bd


## Best Model and Best Model Parameters

In [None]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [None]:

# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 50
  MaxIter: 10
  RegParam: 0.15


In [None]:
# View the predictions
test_predictions = best_model.transform(test)
rmse = evaluator.evaluate(test_predictions)
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
evaluator_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")
mae = evaluator_mae.evaluate(test_predictions)
r2 = evaluator_r2.evaluate(test_predictions)

print(f"RMSE: {rmse}, MAE: {mae}, R2: {r2}")

RMSE: 0.8690673442621993, MAE: 0.6773749374363924, R2: 0.29248080689446043


In [None]:
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   580|   1580|   4.0| 3.4706793|
|   580|  44022|   3.5| 3.3040323|
|   597|    471|   2.0| 4.1068125|
|   108|   1959|   5.0| 3.8204253|
|   368|   2122|   2.0| 1.8162413|
|   436|    471|   3.0| 3.6804643|
|   587|   1580|   4.0| 3.8836937|
|    27|   1580|   3.0| 3.3650382|
|   606|   1580|   2.5| 3.1890929|
|   606|  44022|   4.0| 2.8293602|
|    91|   2122|   4.0| 2.3646014|
|   157|   3175|   2.0| 3.4537184|
|   232|   1580|   3.5| 3.3931732|
|   232|  44022|   3.0| 3.1339257|
|   246|   1645|   4.0| 3.7509031|
|   599|   2366|   3.0| 2.8911195|
|   111|   1088|   3.0| 3.1662276|
|   111|   3175|   3.5| 3.0849307|
|    47|   1580|   1.5| 2.6983342|
|   140|   1580|   3.0| 3.3509948|
+------+-------+------+----------+
only showing top 20 rows



## Make Recommendations

In [None]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(20).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3379, 5.831154}...|
|     2|[{131724, 4.78526...|
|     3|[{70946, 4.947317...|
|     4|[{25825, 4.956715...|
|     5|[{5490, 4.6584444...|
|     6|[{42730, 4.889429...|
|     7|[{3379, 4.4965496...|
|     8|[{3379, 4.6741285...|
|     9|[{3379, 4.9176483...|
|    10|[{71579, 4.507864...|
|    11|[{3379, 5.073489}...|
|    12|[{3925, 5.854617}...|
|    13|[{60943, 5.058846...|
|    14|[{391, 4.4939833}...|
|    15|[{60943, 4.463873...|
|    16|[{3379, 4.710959}...|
|    17|[{3379, 5.256433}...|
|    18|[{3379, 5.061402}...|
|    19|[{3379, 4.1065288...|
|    20|[{5490, 5.0414987...|
+------+--------------------+



Explanation:

* **best_model.recommendForAllUsers(10):** This function generates top n (in this case, 10) recommendations for all users in the dataset using the trained ALS model.
* **nrecommendations.limit(20).show():** This limits the display to the first 30 rows of the recommendation results for easier visualization.
The output of this step includes a DataFrame where each row contains a user ID and a list of recommended movies with their predicted ratings.

In [None]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(10).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1|   3379|5.7070365|
|     1|  33649| 5.640919|
|     1|   5490| 5.502497|
|     1| 171495|5.4188676|
|     1|   5416|5.4028587|
|     1|   5328|5.4028587|
|     1|   3951|5.4028587|
|     1|  78836| 5.375047|
|     1|   5915|5.3479133|
|     1| 184245| 5.311106|
+------+-------+---------+



## Do the recommendations make sense?
Lets merge movie name and genres to the recommendation matrix for interpretability.

In [None]:
nrecommendations.join(movies, on='movieId').filter('userId = 200').show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|  67618|   200| 5.027164|Strictly Sexual (...|Comedy|Drama|Romance|
|  33649|   200|5.0097237|  Saving Face (2004)|Comedy|Drama|Romance|
|   3379|   200|4.9731035| On the Beach (1959)|               Drama|
|  93988|   200|4.8592796|North & South (2004)|       Drama|Romance|
| 171495|   200| 4.840536|              Cosmos|  (no genres listed)|
|   5490|   200| 4.824309|  The Big Bus (1976)|       Action|Comedy|
|   7121|   200|4.8242717|   Adam's Rib (1949)|      Comedy|Romance|
| 184245|   200| 4.823738|De platte jungle ...|         Documentary|
| 179135|   200| 4.823738|Blue Planet II (2...|         Documentary|
| 138966|   200| 4.823738|Nasu: Summer in A...|           Animation|
+-------+------+---------+--------------------+--------------------+



In [None]:
ratings.join(movies, on='movieId').filter('userId = 200').sort('rating', ascending=False).limit(10).show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|    441|   200|   5.0|Dazed and Confuse...|              Comedy|
|   1500|   200|   5.0|Grosse Pointe Bla...|Comedy|Crime|Romance|
|    597|   200|   5.0| Pretty Woman (1990)|      Comedy|Romance|
|     39|   200|   5.0|     Clueless (1995)|      Comedy|Romance|
|   1020|   200|   5.0|Cool Runnings (1993)|              Comedy|
|   1196|   200|   5.0|Star Wars: Episod...|Action|Adventure|...|
|   1380|   200|   5.0|       Grease (1978)|Comedy|Musical|Ro...|
|   1197|   200|   5.0|Princess Bride, T...|Action|Adventure|...|
|    318|   200|   5.0|Shawshank Redempt...|         Crime|Drama|
|   1210|   200|   5.0|Star Wars: Episod...|Action|Adventure|...|
+-------+------+------+--------------------+--------------------+



In [None]:
nrecommendations.join(movies, on='movieId').filter('userId = 5').show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|   3379|     5|4.5838013| On the Beach (1959)|               Drama|
|   5490|     5|4.5131636|  The Big Bus (1976)|       Action|Comedy|
|   5915|     5| 4.387324|Victory (a.k.a. E...|    Action|Drama|War|
|  33649|     5|4.3603888|  Saving Face (2004)|Comedy|Drama|Romance|
|   6201|     5|4.3282003|    Lady Jane (1986)|       Drama|Romance|
|  86781|     5|4.3003535|    Incendies (2010)|   Drama|Mystery|War|
|   7096|     5| 4.299484|Rivers and Tides ...|         Documentary|
|   6460|     5|4.2978234|Trial, The (Procè...|               Drama|
|   3224|     5| 4.274757|Woman in the Dune...|               Drama|
|  92475|     5|4.2649755|All Watched Over ...|         Documentary|
+-------+------+---------+--------------------+--------------------+



In [None]:
ratings.join(movies, on='movieId').filter('userId = 5').sort('rating', ascending=False).limit(20).show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|    290|     5|   5.0|Once Were Warrior...|         Crime|Drama|
|    296|     5|   5.0| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|     58|     5|   5.0|Postman, The (Pos...|Comedy|Drama|Romance|
|    247|     5|   5.0|Heavenly Creature...|         Crime|Drama|
|    590|     5|   5.0|Dances with Wolve...|Adventure|Drama|W...|
|    594|     5|   5.0|Snow White and th...|Animation|Childre...|
|    475|     5|   5.0|In the Name of th...|               Drama|
|    527|     5|   5.0|Schindler's List ...|           Drama|War|
|    595|     5|   5.0|Beauty and the Be...|Animation|Childre...|
|    596|     5|   5.0|    Pinocchio (1940)|Animation|Childre...|
|    457|     5|   4.0|Fugitive, The (1993)|            Thriller|
|    474|     5|   4.0|In the Line of Fi...|     Action|Thriller|
|      1| 