# Movie Recommendation Engine using PySpark

## Setup

In [None]:
!pip install pyspark matplotlib

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import (col, isnan, when, count, to_timestamp, explode)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import matplotlib.pyplot as plt

spark = SparkSession.builder.getOrCreate()

## Data Loading and Exploration

In [None]:
ratings_df = spark.read.csv('ratings.csv', header=True, inferSchema=True)

movies_df = spark.read.csv('movies.csv', header=True, inferSchema=True)

ratings_df = ratings_df.withColumn("timestamp", to_timestamp("timestamp"))

ratings_df.show(5)
movies_df.show(5)

print(f"Number of ratings: {ratings_df.count()}")
print(f"Number of unique users: {ratings_df.select('userId').distinct().count()}")
print(f"Number of unique movies: {ratings_df.select('movieId').distinct().count()}")


In [None]:
# Check for missing values
ratings_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)
                  for c in ratings_df.columns if c != 'timestamp']).show()

In [None]:
# Visualize rating distribution
rating_counts = ratings_df.groupBy("rating").count().orderBy("rating").collect()
ratings = [row["rating"] for row in rating_counts]
counts = [row["count"] for row in rating_counts]
plt.figure(figsize=(10, 6))
plt.bar(ratings, counts)
plt.xlabel("Rating")
plt.ylabel("Number of Ratings")
plt.title("Distribution of Ratings")
plt.show()

## Data Preparation and Feature Engineering

In [None]:
als_data = ratings_df.select(
    col("userId").cast("integer"),
    col("movieId").cast("integer"),
    col("rating").cast("float")
)
(training, test) = als_data.randomSplit([0.8, 0.2], seed=42)

## Build Model

In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(
    maxIter=10,
    regParam=0.1,
    rank=20,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)


## Training

In [None]:
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(als.maxIter, [10, 15]) \
    .build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3
)

cv_model = cv.fit(training)
model = cv_model.bestModel

## Evaluation

In [None]:
predictions = model.transform(test)
predictions.show(5)

In [None]:

rmse = evaluator.evaluate(predictions)
print(f"RMSE = {rmse}")

In [None]:
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.show(5, truncate=False)


In [None]:
movie_recommendations = model.recommendForAllItems(10)
movie_recommendations.show(5, truncate=False)

In [None]:
user_id = 53
user_recs = model.recommendForUserSubset(spark.createDataFrame([(user_id,)], ["userId"]), 10)
user_recs.show(truncate=False)


In [None]:
def get_recommendations_with_titles(user_id, num_recs=10):
    user_recs = model.recommendForUserSubset(
        spark.createDataFrame([(user_id,)], ["userId"]),
        num_recs
    )

    user_recs = user_recs.select(
        "userId",
        "recommendations.movieId",
        "recommendations.rating"
    )

    user_recs = user_recs.withColumn("movieId", explode("movieId"))
    user_recs = user_recs.withColumn("rating", explode("rating"))

    recommendations_with_titles = user_recs.join(
        movies_df, on="movieId", how="left"
    ).select(
        "userId", "movieId", "title", "genres", "rating"
    ).orderBy("rating", ascending=False)

    return recommendations_with_titles

In [None]:
user_recommendations = get_recommendations_with_titles(53)
user_recommendations.show(10)