# 3 . Build a Recommendation Engine with Spark with a dataset of your choice

Import Necessary Libraries

In [16]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator

Initialize Spark Session

In [17]:
from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder \
        .appName("Movie Recommendation System") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    print("Spark session initialized successfully.")
except Exception as e:
    print(f"Error initializing Spark session: {e}")
    exit(1)

Spark session initialized successfully.


Load the MovieLens Dataset

In [21]:
#data set is taken from movielens

ratings = spark.read.csv("/content/ratings.csv",inferSchema=True, header=True)
movies = spark.read.csv("/content/movies.csv",inferSchema=True, header=True)

ratings.show(5)
movies.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



Prepare the Ratings Data

In [22]:
ratings = ratings.select("userId", "movieId", "rating")
ratings = ratings.withColumn("userId", col("userId").cast("int"))
ratings = ratings.withColumn("movieId", col("movieId").cast("int"))
ratings = ratings.withColumn("rating", col("rating").cast("float"))

Split Data

In [23]:
(training_data, test_data) = ratings.randomSplit([0.8, 0.2])

Build and Train ALS Model

In [24]:
from pyspark.ml.recommendation import ALS

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)

model = als.fit(training_data)

Evaluate Model

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse:.2f}")


Root-mean-square error = 0.88


Generate Recommendations

Top 5 for all users:

In [26]:
user_recs = model.recommendForAllUsers(5)
user_recs.show(5, truncate=False)

+------+---------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                          |
+------+---------------------------------------------------------------------------------------------------------+
|1     |[{184245, 5.5768685}, {179135, 5.5768685}, {171495, 5.5768685}, {134796, 5.5768685}, {117531, 5.5768685}]|
|2     |[{131724, 4.89709}, {58301, 4.7669554}, {3153, 4.762315}, {59814, 4.7587514}, {89753, 4.716039}]         |
|3     |[{70946, 5.032664}, {6835, 4.914385}, {5919, 4.8469124}, {5181, 4.84385}, {7991, 4.7472157}]             |
|4     |[{7700, 6.005274}, {522, 5.5971065}, {3089, 5.4002047}, {25825, 5.3504267}, {132333, 5.2476616}]         |
|5     |[{7700, 4.8395557}, {3089, 4.7664423}, {25825, 4.7065177}, {1203, 4.657878}, {898, 4.6559825}]           |
+------+------------------------------------------------------------------------

Map Movie Titles to Recommendations

In [27]:
# Explode recommendation list into rows
from pyspark.sql.functions import explode

recs = user_recs.select("userId", explode("recommendations").alias("rec")) \
                .select("userId", col("rec.movieId"), col("rec.rating"))

# Join with movie titles
recs_with_titles = recs.join(movies, on="movieId")
recs_with_titles.select("userId", "title", "rating").show(truncate=False)


+------+----------------------------------------------------------------------------------------------------+---------+
|userId|title                                                                                               |rating   |
+------+----------------------------------------------------------------------------------------------------+---------+
|1     |De platte jungle (1978)                                                                             |5.5768685|
|1     |Blue Planet II (2017)                                                                               |5.5768685|
|1     |Cosmos                                                                                              |5.5768685|
|1     |Bitter Lake (2015)                                                                                  |5.5768685|
|1     |Watermark (2014)                                                                                    |5.5768685|
|2     |The Jinx: The Life and Deaths of

In [28]:
spark.stop()