In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
spark = SparkSession.builder.appName("Q3").getOrCreate()

In [None]:
ratings = spark.read.csv("/content/ratings.csv", header=True, inferSchema=True)
ratings.show(5)

movies = spark.read.csv("/content/movies.csv", header=True, inferSchema=True)
movies.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
data = ratings.select("userId", "movieId", "rating")

(training, test) = data.randomSplit([0.8, 0.2], seed=42)

In [None]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",  # avoid NaN predictions
    nonnegative=True,
    implicitPrefs=False,
    rank=10,
    maxIter=10,
    regParam=0.1
)

model = als.fit(training)

predictions = model.transform(test)
predictions.show(5)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse:.2f}")

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|   4896|   4.0|  3.429373|
|   148|   5618|   3.0| 3.6807697|
|   148|   7153|   3.0| 3.4784653|
|   148|  40629|   5.0| 3.2575603|
|   148|  40815|   4.0| 3.5905585|
+------+-------+------+----------+
only showing top 5 rows

Root-mean-square error = 0.89


In [None]:

from pyspark.sql.functions import explode
user_recs = model.recommendForAllUsers(5)

user_recs_exploded = user_recs.select("userId", explode("recommendations").alias("rec"))
user_recs_flat = user_recs_exploded.select(
    "userId",
    user_recs_exploded.rec.movieId.alias("movieId"),
    user_recs_exploded.rec.rating.alias("predicted_rating")
)

# user_recs_flat.show(5)

user_recs_with_titles = user_recs_flat.join(movies, on="movieId")
user_recs_with_titles.show(5)

user_recs_with_titles.select("userId", "title", "predicted_rating").show(10, truncate=False)

+-------+------+----------------+--------------------+------------------+
|movieId|userId|predicted_rating|               title|            genres|
+-------+------+----------------+--------------------+------------------+
| 184245|     1|       5.8248386|De platte jungle ...|       Documentary|
| 171495|     1|       5.8248386|              Cosmos|(no genres listed)|
| 134796|     1|       5.8248386|  Bitter Lake (2015)|       Documentary|
| 117531|     1|       5.8248386|    Watermark (2014)|       Documentary|
|  86237|     1|       5.8248386|  Connections (1978)|       Documentary|
+-------+------+----------------+--------------------+------------------+
only showing top 5 rows

+------+----------------------------------------------------+----------------+
|userId|title                                               |predicted_rating|
+------+----------------------------------------------------+----------------+
|1     |De platte jungle (1978)                             |5.8248386  