In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Spark session creation and imports

In [2]:
# Creating SPARK session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Movie_Recommendation_System") \
  .config("spark.executor.memory", "12g") \
  .config("spark.driver.memory", "12g") \
  .config("spark.memory.offHeap.enabled", "true") \
  .config("spark.memory.offHeap.size", "12g") \
  .getOrCreate()

In [3]:
# All import statements

import numpy as np

from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, trim, sum
from pyspark.sql.functions import regexp_extract, when, regexp_replace, least, greatest, lit, floor, avg, count
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import broadcast

from pyspark.ml.recommendation import ALS
from pyspark.ml.recommendation import ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import Word2Vec, Normalizer

from pyspark.storagelevel import StorageLevel


### Creating Parquet files from csv to use Parquet files for faster processing

In [None]:
# Creating parquet files for all csv files

# spark.read.csv("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/movies.csv", header=True, inferSchema=True,
#                 quote='"', escape='"', multiLine=True).write.mode("overwrite").parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Movies/")

# spark.read.csv("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/ratings.csv", header=True, inferSchema=True,
#                 quote='"', escape='"', multiLine=True).write.mode("overwrite").parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Ratings/")

# spark.read.csv("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/tags.csv", header=True, inferSchema=True,
#                 quote='"', escape='"', multiLine=True).write.mode("overwrite").parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Tags/")

# spark.read.csv("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/links.csv", header=True, inferSchema=True,
#                 quote='"', escape='"', multiLine=True).write.mode("overwrite").parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Links/")

# spark.read.csv("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/genome-tags.csv", header=True, inferSchema=True,
#                 quote='"', escape='"', multiLine=True).write.mode("overwrite").parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Genome_Tags/")

# spark.read.csv("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/genome-scores.csv", header=True, inferSchema=True,
#                 quote='"', escape='"', multiLine=True).write.mode("overwrite").parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Genome_Scores/")


### Creating movies_df and pre-processing movies data

In [4]:
movies_df = spark.read.parquet(
    "/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Movies"
).select("movieId", "title", "genres")

# Action: Kept for better understanding of movies_df. Can be commented during actual production deployment.
print("movies_df schema")
movies_df.printSchema()
print("movies_df sample data")
movies_df.show(5, truncate=False)

# Action: Kept for better understanding of movies_df. Can be commented during actual production deployment.
total_movies = movies_df.count()
print("Total no. of movies -", total_movies)
print()

# Null/blank counts check
null_blank_counts = movies_df.agg(
    F.sum(F.col("movieId").isNull().cast("int")).alias("movieId_nulls"),
    F.sum((F.col("title").isNull() | (F.trim(F.col("title")) == "")).cast("int")).alias("title_null_or_blank"),
    F.sum((F.col("genres").isNull() | (F.trim(F.col("genres")) == "")).cast("int")).alias("genres_null_or_blank")
)
print("No movies with null or blank values -")
null_blank_counts.show()

# Transformations: compute year, clean title, normalize genres

movies_df = movies_df.select(
    "movieId",
    F.regexp_extract(F.col("title"), r"\((\d{4})\)", 1).alias("year"),
    F.trim(F.regexp_replace(F.col("title"), r"\s*\(\d{4}\)", "")).alias("title"),
    "genres"
).select(
    "movieId",
    "title",
    F.when(F.col("year") == "", None).otherwise(F.col("year")).alias("year"),
    F.when(F.col("genres") == "(no genres listed)", None).otherwise(F.col("genres")).alias("genres")
)

# Action: Kept for better understanding of movies_df. Can be commented during actual production deployment.
print("Movies data after segregating year -")
movies_df.show(5, truncate=False)

invalid_movies_df = movies_df.filter(
    F.col("year").isNull() | F.col("genres").isNull()
).select("movieId").distinct()

# Single pass counts for missing attributes
missing_year_cnt, missing_genres_cnt = movies_df.agg(
    F.sum(F.col("year").isNull().cast("int")).alias("missing_year"),
    F.sum(F.col("genres").isNull().cast("int")).alias("missing_genres")
).first()

# Action: Kept for better understanding of movies_df. Can be commented during actual production deployment.
print("No. of movies without year -", missing_year_cnt)
print("No. of movies without genres -", missing_genres_cnt)

# Remove movies with blank year and genres
movies_df = movies_df.join(invalid_movies_df, on="movieId", how="left_anti")

# Remove movies with blank year and genres
filtered_movies_df = movies_df.filter(
    (~F.col("year").isNull()) & (~F.col("genres").isNull())
)

# Action: Kept for better understanding of movies_df. Can be commented during actual production deployment.
print("No. of movies after removing invalid entries -", movies_df.count())

movies_df = movies_df.persist(StorageLevel.MEMORY_ONLY)
invalid_movies_df = invalid_movies_df.persist(StorageLevel.MEMORY_ONLY)

movies_df schema
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

movies_df sample data
+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows

Total no. of m

### Creating ratings_df and pre-processing ratings data

In [5]:
ratings_df = spark.read.parquet(
    "/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Ratings"
).select("userId", "movieId", "rating", "timestamp")

# Action: Kept for better understanding of ratings_df. Can be commented during actual production deployment.
print("ratings_df schema")
ratings_df.printSchema()
print("ratings_df sample data")
ratings_df.show(5, truncate=False)
total_ratings = ratings_df.count()
print("Total no. of ratings -", total_ratings)
print()

ratings_df = ratings_df.join(invalid_movies_df, on="movieId", how="left_anti")

# Action: Kept for better understanding of ratings_df. Can be commented during actual production deployment.
total_ratings = ratings_df.count()
print("Total no. of ratings after removing ratings data of invalid movie IDs -", total_ratings)
print()

# Null/blank checks
null_blank_counts = ratings_df.agg(
    F.sum(F.col("userId").isNull().cast("int")).alias("userId_nulls"),
    F.sum(F.col("movieId").isNull().cast("int")).alias("movieId_nulls"),
    F.sum(F.col("rating").isNull().cast("int")).alias("rating_nulls"),
    F.sum(F.col("timestamp").isNull().cast("int")).alias("timestamp_nulls")
)
print("No ratings with null values -")
null_blank_counts.show()

# Distinct ratings - # Action: Kept for better understanding of ratings_df. Can be commented during actual production deployment.
ratings_df.select("rating").distinct().orderBy("rating").show()

# Convert epoch to timestamp
ratings_df = ratings_df.withColumn("rating_date_time", F.from_unixtime(F.col("timestamp")))

# Action: Kept for better understanding of ratings_df. Can be commented during actual production deployment.
print("Ratings data after converting timestamp -")
ratings_df.select("userId", "movieId", "rating", "timestamp", "rating_date_time").show(5, truncate=False)


ratings_df schema
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

ratings_df sample data
+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |1      |4.0   |1225734739|
|1     |110    |4.0   |1225865086|
|1     |158    |4.0   |1225733503|
|1     |260    |4.5   |1225735204|
|1     |356    |5.0   |1225735119|
+------+-------+------+----------+
only showing top 5 rows

Total no. of ratings - 33832162

Total no. of ratings after removing ratings data of invalid movie IDs - 33747841

No ratings with null values -
+------------+-------------+------------+---------------+
|userId_nulls|movieId_nulls|rating_nulls|timestamp_nulls|
+------------+-------------+------------+---------------+
|           0|            0|           0|              0|
+------------+-------------+------------+---------------+

+------+
|rating|


In [6]:
MIN_RATINGS_PER_USER = 10

# null check
ratings_df = ratings_df.filter(F.col("userId").isNotNull())

# Count ratings per user
user_rating_count_df = ratings_df.groupBy("userId").agg(F.count("*").alias("rating_count"))

# Users to KEEP: rating_count >= MIN_RATINGS_PER_USER
users_to_keep_df = user_rating_count_df.filter(F.col("rating_count") >= MIN_RATINGS_PER_USER).select("userId")

# Users to REMOVE (if you still need this DataFrame for reporting)
users_to_remove_df = user_rating_count_df.filter(F.col("rating_count") < MIN_RATINGS_PER_USER).select("userId")

# Keep only ratings from users with sufficient activity
ratings_df = ratings_df.join(users_to_keep_df, on="userId", how="left_semi")

# Action: Kept for better understanding of ratings_df. Can be commented during actual production deployment.
removed_users_count = users_to_remove_df.count()
remaining_ratings_count = ratings_df.count()
print("Users with less than 10 ratings:", removed_users_count)
print("No. of ratings after removing users with less than 10 ratings -", remaining_ratings_count)

# Action: Kept for better understanding of ratings_df. Can be commented during actual production deployment.
sample_removed_user_ids = [r.userId for r in users_to_remove_df.limit(20).collect()]
print("Sample of removed userIds (up to 20):", sample_removed_user_ids)

# Persist only if reusing multiple times below
ratings_df = ratings_df.persist(StorageLevel.MEMORY_ONLY)


Users with less than 10 ratings: 51383
No. of ratings after removing users with less than 10 ratings - 33514031
Sample of removed userIds (up to 20): [1591, 3794, 4519, 7880, 8086, 8592, 8638, 10623, 11748, 12799, 12940, 13832, 14570, 14832, 15447, 15790, 16503, 16574, 21700, 22346]


### Creating tags_df and pre-processing tags data

In [7]:
tags_df = spark.read.parquet(
    "/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Tags"
).select("userId", "movieId", "tag", "timestamp")

# Action: Kept for better understanding of tags_df. Can be commented during actual production deployment.
print("tags_df schema"); tags_df.printSchema()
print("tags_df sample data"); tags_df.show(5, truncate=False)



total_tags = tags_df.count()
print("Total no. of tags -", total_tags)
print()

# Filter out invalid movies
tags_df = tags_df.join(invalid_movies_df, on="movieId", how="left_anti")
print("No. of tags after removing invalid movieId entries -", tags_df.count())

# Remove tags of users with insufficient ratings
tags_df = tags_df.join(users_to_remove_df, on="userId", how="left_anti")

# Action: Kept for better understanding of tags_df. Can be commented during actual production deployment.
print("No. of tags after removing tags of new users -", tags_df.count())
print()

# Null/blank checks
null_blank_counts = tags_df.agg(
    F.sum(F.col("userId").isNull().cast("int")).alias("userId_nulls"),
    F.sum(F.col("movieId").isNull().cast("int")).alias("movieId_nulls"),
    F.sum((F.col("tag").isNull() | (F.trim(F.col("tag")) == "")).cast("int")).alias("tag_null_or_blank"),
    F.sum(F.col("timestamp").isNull().cast("int")).alias("timestamp_nulls")
)
print("No tags with null or blank values -")
null_blank_counts.show()

# Convert epoch to timestamp
tags_df = tags_df.withColumn("tag_date_time", F.from_unixtime(F.col("timestamp")))

# Action: Kept for better understanding of tags_df. Can be commented during actual production deployment.
print("Tags data after converting timestamp -")
tags_df.select("userId", "movieId", "tag", "timestamp", "tag_date_time").show(5, truncate=False)

# Optional: persist if you plan multiple actions on tags_df
tags_df = tags_df.persist(StorageLevel.MEMORY_ONLY)


tags_df schema
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)

tags_df sample data
+------+-------+-------------+----------+
|userId|movieId|tag          |timestamp |
+------+-------+-------------+----------+
|10    |260    |good vs evil |1430666558|
|10    |260    |Harrison Ford|1430666505|
|10    |260    |sci-fi       |1430666538|
|14    |1221   |Al Pacino    |1311600756|
|14    |1221   |mafia        |1311600746|
+------+-------+-------------+----------+
only showing top 5 rows

Total no. of tags - 2328315

No. of tags after removing invalid movieId entries - 2299862
No. of tags after removing tags of new users - 2285224

No tags with null or blank values -
+------------+-------------+-----------------+---------------+
|userId_nulls|movieId_nulls|tag_null_or_blank|timestamp_nulls|
+------------+-------------+-----------------+---------------+
|           0|            0| 

### Creating links_df and pre-processing links data - NOT USING LINKS DATA

In [None]:
# # Checking data of links data

# # Read the links file
# links_df = spark.read.parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Links")

# # Show the first few rows
# links_df.show(5,truncate=False)
# links_df.printSchema()
# print(links_df.count())

# links_df = links_df.filter(~col("movieId").isin(invalid_movie_ids))
# print(links_df.count(),"after removing links of invalid movies.")

# links_df.select([
#     sum(
#         (col(c).isNull() | (trim(col(c)) == "")).cast("int")
#     ).alias(c)
#     for c in links_df.columns
# ]).show()


### Creating genome_tags_df and pre-processing genome tag data - NOT USING GENOME DATA

In [None]:
# # Checking data of genome-tags data

# # Read the genome_tags file
# genome_tags_df = spark.read.parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Genome_Tags")

# # Show the first few rows
# genome_tags_df.show(5,truncate=False)
# genome_tags_df.printSchema()
# print(genome_tags_df.count())

# genome_tags_df.select([
#     sum(
#         (col(c).isNull() | (trim(col(c)) == "")).cast("int")
#     ).alias(c)
#     for c in genome_tags_df.columns
# ]).show()


### Creating genome_scores_df and pre-processing genome scores data - NOT USING GENOME DATA

In [None]:
# # Checking data of genome-scores data

# # Read the genome_scores file
# genome_scores_df = spark.read.parquet("/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/Genome_Scores")

# # Show the first few rows
# genome_scores_df.show(5,truncate=False)
# genome_scores_df.printSchema()
# print(genome_scores_df.count())

# genome_scores_df.select([
#     sum(
#         (col(c).isNull() | (trim(col(c)) == "")).cast("int")
#     ).alias(c)
#     for c in genome_scores_df.columns
# ]).show()

### Creating popular_movies_df and storing 100 popular movies in cache

In [8]:
# Aggregate ratings per movie
popular_movies_df = ratings_df.groupBy("movieId").agg(
    F.avg("rating").alias("avg_rating"),
    F.count("rating").alias("rating_count")
)

# Compute C and m in one pass
stats = popular_movies_df.agg(
    F.avg("rating_count").alias("C"),
    F.avg("avg_rating").alias("m")
).collect()[0]
C, m = stats["C"], stats["m"]

# Compute Bayesian score
popular_movies_df = popular_movies_df.withColumn(
    "bayesian_score",
    (F.col("rating_count") / (F.col("rating_count") + F.lit(C)) * F.col("avg_rating")) +
    (F.lit(C) / (F.col("rating_count") + F.lit(C)) * F.lit(m))
)

# Join with movies before limiting
popular_movies_df = popular_movies_df.join(movies_df, on="movieId", how="left")

# Use Window for top-N instead of full sort
window = Window.orderBy(F.col("bayesian_score").desc())
popular_movies_df = popular_movies_df.withColumn("rank", F.row_number().over(window)) \
                                        .filter(F.col("rank") <= 100) \
                                        .drop("rank")
popular_movies_df.show(100,truncate=False)
popular_movies_df = popular_movies_df.persist(StorageLevel.MEMORY_ONLY)

+-------+------------------+------------+------------------+-----------------------------------------------------------------------+----+-----------------------------------------------+
|movieId|avg_rating        |rating_count|bayesian_score    |title                                                                  |year|genres                                         |
+-------+------------------+------------+------------------+-----------------------------------------------------------------------+----+-----------------------------------------------+
|318    |4.410218395886979 |115341      |4.404924472481345 |Shawshank Redemption, The                                              |1994|Crime|Drama                                    |
|858    |4.3221309434901665|72306       |4.314236117072335 |Godfather, The                                                         |1972|Crime|Drama                                    |
|202439 |4.322354706912366 |11993       |4.276149108818355 |Parasite  

### Collaborative Filtering

In [9]:
# Splitting train and test data based on userID

# Sort by timestamp for time-based split
window = Window.partitionBy("userId").orderBy("timestamp")

# Add row number per user
ratings_df_als = ratings_df.withColumn("row_num", F.row_number().over(window))

# Compute split index (e.g., 80% train, 20% test)
user_counts = ratings_df_als.groupBy("userId").agg(F.max("row_num").alias("max_row"))
ratings_df_als = ratings_df_als.join(user_counts, on="userId")
ratings_df_als = ratings_df_als.withColumn("split", F.when(F.col("row_num") <= F.col("max_row") * 0.8, "train").otherwise("test"))

train_df = ratings_df_als.filter(F.col("split") == "train").select("userId", "movieId", "rating")
test_df = ratings_df_als.filter(F.col("split") == "test").select("userId", "movieId", "rating")

# Action: Kept for better understanding of training and test split. Can be commented during actual production deployment.
print("No. of rating data rows for training -",train_df.count())
print("No. of rating data rows for test -",test_df.count())

No. of rating data rows for training - 26704697
No. of rating data rows for test - 6809334


In [10]:
# Creating ALS model for collaborative filtering and saving as file

# Build the ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    rank=30, # Number of latent factors
    regParam=0.05,
    maxIter=15,
    implicitPrefs=False,
    coldStartStrategy="drop" # Drop users/items not in the training set
)

# Train the ALS model
als_model = als.fit(train_df)

# Evaluate the ALS model with a RegressionEvaluator
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
als_predictions = als_model.transform(test_df)

als_predictions = als_predictions.withColumn(
    "prediction",
    least(
        greatest((floor(col("prediction") * 2 + 0.5) / 2).cast("float"), lit(0.5)),
        lit(5.0)
    )
)

rmse = evaluator.evaluate(als_predictions)
print(f"ALS RMSE: {rmse}")

# Save model to a directory

als_model.write().overwrite().save(
    "/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/ALSModel"
)


ALS RMSE: 0.8255941291572584


In [None]:
# ALS model was tuned using below code to reduce RMSE error.

# # Define ALS model (without fixed hyperparameters)
# als = ALS(
#     userCol="userId",
#     itemCol="movieId",
#     ratingCol="rating",
#     coldStartStrategy="drop",
#     nonnegative=True
# )

# # Build parameter grid for tuning
# paramGrid = ParamGridBuilder() \
#     .addGrid(als.rank, [10, 20, 30]) \
#     .addGrid(als.regParam, [0.05, 0.1, 0.2]) \
#     .addGrid(als.maxIter, [10, 15]) \
#     .build()

# # Define evaluator
# evaluator = RegressionEvaluator(
#     metricName="rmse",
#     labelCol="rating",
#     predictionCol="prediction"
# )

# # Configure CrossValidator
# crossval = CrossValidator(
#     estimator=als,
#     estimatorParamMaps=paramGrid,
#     evaluator=evaluator,
#     numFolds=3,  # 3-fold cross-validation
#     parallelism=4  # Run in parallel for speed
# )

# # Train and select best model
# cvModel = crossval.fit(train_df)
# bestModel = cvModel.bestModel

# # Extract parameter map from bestModel
# param_map = bestModel.extractParamMap()

# print("Best rank:", bestModel.rank)

# # Print all parameters in param_map
# print("Best Model Parameters:")
# for param, value in param_map.items():
#     print(f"{param.name}: {value}")

# # Evaluate on test data
# als_predictions = bestModel.transform(test_df)

# # Round predictions to nearest 0.5 and clamp between 0.5 and 5.0
# als_predictions = als_predictions.withColumn(
#     "prediction",
#     least(
#         greatest((floor(col("prediction") * 2 + 0.5) / 2).cast("float"), lit(0.5)),
#         lit(5.0)
#     )
# )

# rmse = evaluator.evaluate(als_predictions)
# print(f"Best Model RMSE: {rmse}")

### Content Based Filtering

In [11]:
# Aggregate tags per movie
tags_clean_df = tags_df.filter(
    (F.col("tag").isNotNull()) & (F.trim(F.col("tag")) != "")
).select("movieId", F.lower(F.col("tag")).alias("tag"))

# deduplicate tags
tags_agg = tags_clean_df.groupBy("movieId").agg(
    F.collect_set("tag").alias("tags_array")
)

# Prepare genres as ARRAY
movies_base = movies_df.select(
    "movieId",
    F.lower(F.col("genres")).alias("genres")
)

# Convert genres string to array; if NULL, use empty array
movies_base = movies_base.select(
    "movieId",
    F.when(F.col("genres").isNotNull(), F.split(F.col("genres"), r"\|"))
     .otherwise(F.array().cast("array<string>")).alias("genres_array")
)

# Join movies with aggregated tags
movies_with_tags = movies_base.join(broadcast(tags_agg), on="movieId", how="left")

# Replace NULL tags_array with empty array to simplify downstream ops
movies_with_tags = movies_with_tags.select(
    "movieId",
    "genres_array",
    F.when(F.col("tags_array").isNotNull(), F.col("tags_array"))
     .otherwise(F.array().cast("array<string>")).alias("tags_array")
)

# Combine genres and tags as ARRAY
movies_with_tags = movies_with_tags.select(
    "movieId",
    F.array_union(F.col("genres_array"), F.col("tags_array")).alias("words")
)

# Optional sanity check (limited show to avoid triggering big jobs)
movies_with_tags.show(5, truncate=False)

# Train Word2Vec on combined words (genres + tags)
word2vec = Word2Vec(
    vectorSize=100,
    minCount=1,
    windowSize=5,          # default; tune as needed
    inputCol="words",
    outputCol="features"
)
word2vec_model = word2vec.fit(movies_with_tags)

# Transform to get movie embeddings
movie_vectors = word2vec_model.transform(movies_with_tags).select("movieId", "features")

# Normalize vectors for cosine similarity
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
movie_vectors = normalizer.transform(movie_vectors).select("movieId", "norm_features")

# Save as Parquet for content-based filtering
output_path = "/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/movie_word2vec_vectors"
movie_vectors.write.mode("overwrite").parquet(output_path)
print(f"Word2Vec vectors written to {output_path}")


+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Methods popularMoviesRecommender, contentFilteringRecommender and collaborativeFilteringRecommender

In [12]:
def popularMoviesRecommender(n=20):

  recommendations_df = popular_movies_df.select("movieId", "title", "year", "genres", "bayesian_score").limit(n)
  return recommendations_df

In [13]:
def contentFilteringRecommender(user_id_to_recommend: int, n: int = 20, weight_by_rating: bool = True):
    # Load normalized content vectors once
    cb_path = "/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet/movie_word2vec_vectors"
    cb_df = spark.read.parquet(cb_path).select("movieId", "norm_features")

    # Convert normalized feature vector to array for SQL operations
    cb_df = cb_df.withColumn("vec", vector_to_array(col("norm_features"))) \
                 .select("movieId", "vec")

    # Get user's rated movies
    user_rated_df = ratings_df.filter(col("userId") == user_id_to_recommend) \
                              .select("movieId", "rating").distinct()

    # Join to fetch source vectors for rated movies
    user_vecs = user_rated_df.join(cb_df, on="movieId", how="inner") \
                             .select(col("movieId").alias("src_movieId"),
                                     col("rating"),
                                     col("vec").alias("src_vec"))

    # Candidates: all movies with vectors (rename cols)
    candidates = cb_df.select(col("movieId").alias("cand_movieId"),
                              col("vec").alias("cand_vec"))

    # Compute cosine similarity
    sim_pairs = candidates.crossJoin(broadcast(user_vecs)) \
        .withColumn(
            "dot",
            F.expr("""
                aggregate(
                    zip_with(cand_vec, src_vec, (x, y) -> x * y),
                    cast(0.0 as double),
                    (acc, v) -> acc + v
                )
            """)
        )

    # Optional weighting by user's rating (map ratings to [0,1] via /5.0)
    if weight_by_rating:
        sim_pairs = sim_pairs.withColumn("sim", col("dot") * (col("rating") / F.lit(5.0)))
    else:
        sim_pairs = sim_pairs.withColumn("sim", col("dot"))

    # Aggregate similarity across all source movies per candidate
    agg_sim = sim_pairs.groupBy("cand_movieId").agg(F.max("sim").alias("similarity"))

    # Exclude movies the user already rated (left-anti join)
    rated_ids = user_rated_df.select(col("movieId").alias("cand_movieId")).distinct()
    agg_sim = agg_sim.join(broadcast(rated_ids), on="cand_movieId", how="left_anti")

    # Top-N & enrich with movie metadata
    topn = agg_sim.orderBy(col("similarity").desc()).limit(n)

    # Join with movies metadata for display
    meta_cols = ["movieId", "title", "year", "genres"]

    # Alias both sides
    t = topn.alias("t")  # columns: cand_movieId, similarity
    m = movies_df.select(*meta_cols).alias("m")

    # Join with qualified columns, then select explicitly to avoid ambiguity
    recs = (
        t.join(m, F.col("t.cand_movieId") == F.col("m.movieId"), how="left")
        .select(
            F.col("m.movieId").alias("movieId"),
            F.col("m.title").alias("title"),
            F.col("m.year").alias("year"),
            F.col("m.genres").alias("genres"),
            F.col("t.similarity").alias("similarity")
        )
    )


    return recs


In [14]:
def collaborativeFilteringRecommender(
    user_id: int,
    n: int = 10,
):

    ratings_clean = ratings_df.select(
        F.col("userId").cast(IntegerType()),
        F.col("movieId").cast(IntegerType())
    )
    movies_clean = movies_df.select("movieId", "title", "year", "genres") \
                            .withColumn("movieId", F.col("movieId").cast(IntegerType()))

    # Movies this user has rated
    movies_rated_by_user = ratings_clean.filter(F.col("userId") == int(user_id)).select("movieId")
    unrated_movies = movies_clean.select("movieId") \
                                 .join(F.broadcast(movies_rated_by_user), on="movieId", how="left_anti")

    # Create candidate pairs
    user_unrated_pairs = unrated_movies.withColumn("userId", F.lit(int(user_id)))

    # Score with ALS
    als_predictions = als_model.transform(user_unrated_pairs) \
                               .withColumnRenamed("prediction", "als_prediction")

    # Drop null predictions
    als_predictions = als_predictions.filter(F.col("als_prediction").isNotNull())

    # Round to nearest 0.5 & clamp to [0.5, 5.0]
    als_predictions = als_predictions.withColumn(
        "als_prediction",
        F.least(
            F.greatest((F.floor(F.col("als_prediction") * 2 + F.lit(0.5)) / F.lit(2)).cast("float"), F.lit(0.5)),
            F.lit(5.0)
        )
    )

    # Top-N first
    top_n = als_predictions.orderBy(F.col("als_prediction").desc()).limit(n)

    # Join movie metadata
    top_n_recommendations = top_n.join(F.broadcast(movies_clean), on="movieId", how="left") \
                                 .select("movieId", "title", "year", "genres", "als_prediction")

    return top_n_recommendations


### User Recommendations

In [18]:
userId = int(input('Enter user Id for movie recommendation : '))
number = int(input('Enter number of movies to be recommended : '))

rated_movie_count = 0
rated_movie_count = ratings_df.filter(col('userId')==userId).count()

if rated_movie_count < 10:
  print("This is a new user. Recommend most popular movies")
  recommendations = popularMoviesRecommender(number)

elif rated_movie_count < 100:
  print("This is an ocassional user. Recommend movies based on content filtering.")
  recommendations = contentFilteringRecommender(userId, number) #User ID - 1089

else:
  print("User has rated more than 100 movies. Recommend movies based on collaborative filtering.")
  recommendations = collaborativeFilteringRecommender(userId, number) #User ID - 189

print(f"{number} recommended movies for userId {userId} is : ")
recommendations.show(number, truncate=False)


Enter user Id for movie recommendation : 189
Enter number of movies to be recommended : 20
User has rated more than 100 movies. Recommend movies based on collaborative filtering.
20 recommended movies for userId 189 is : 
+-------+-------------------------------------------+----+---------------------+--------------+
|movieId|title                                      |year|genres               |als_prediction|
+-------+-------------------------------------------+----+---------------------+--------------+
|275201 |Gensan Punch                               |2021|Action|Drama         |5.0           |
|27718  |Injury to One, An                          |2002|Documentary          |4.5           |
|188155 |The Godfather Legacy                       |2012|Documentary          |4.5           |
|70712  |Abandon Ship! (Seven Waves Away)           |1957|Adventure|Drama      |4.5           |
|163050 |Desperate Man Blues                        |2003|Documentary          |4.5           |
|187247 |T

In [None]:
from pyspark.ml.recommendation import ALSModel
from pyspark.sql.functions import col, explode, collect_set, size, array_intersect
import os


BASE = "/content/drive/MyDrive/Movie_Recommendation_Project/big_dataset/Parquet"
ALS_MODEL_PATH = f"{BASE}/ALSModel"
ALS_SAVE_DIR   = f"{BASE}/ALS"
RATINGS_PATH   = f"{BASE}/Ratings"

os.makedirs(ALS_SAVE_DIR, exist_ok=True)


als_model  = ALSModel.load(ALS_MODEL_PATH)
ratings_df = spark.read.parquet(RATINGS_PATH)

print("ALS model and ratings loaded")


active_users_df = (
    ratings_df.groupBy("userId")
              .count()
              .filter(col("count") >= 100)
              .select("userId")
)

print("Active users (>=100 ratings):", active_users_df.count())

# Random Split (Train/Test)
train_df, test_df = ratings_df.randomSplit([0.8, 0.2], seed=42)


test_actual_path = f"{ALS_SAVE_DIR}/test_actual_active"

if not os.path.exists(test_actual_path):
    print("Saving test_actual for active users...")

    test_actual = (
        test_df.join(active_users_df, "userId")
               .groupBy("userId")
               .agg(collect_set("movieId").alias("actual_movies"))
    )

    test_actual.write.mode("overwrite").parquet(test_actual_path)
    print("Saved test_actual_active")
else:
    print("test_actual_active already exists")


K = 100
rec_path = f"{ALS_SAVE_DIR}/recommendations_active_K_BigDataset{K}"

if not os.path.exists(rec_path):
    print(f"Generating recommendations for active users with K={K}...")

    als_model.recommendForUserSubset(active_users_df, K) \
             .write.mode("overwrite").parquet(rec_path)

    print("Saved recommendations for active users")
else:
    print(f"recommendations_active_K{K} already exists")


pred_df     = spark.read.parquet(rec_path)
test_actual = spark.read.parquet(test_actual_path)


pred_movies = (
    pred_df
        .withColumn("rec", explode("recommendations"))
        .select("userId", col("rec.movieId").alias("pred_movie"))
        .groupBy("userId")
        .agg(collect_set("pred_movie").alias("predicted_movies"))
)

eval_df = pred_movies.join(test_actual, "userId")

precision_df = (
    eval_df
        .withColumn("hits", array_intersect("predicted_movies", "actual_movies"))
        .withColumn("precision", size(col("hits")) / K)
)

precision_value = precision_df.selectExpr("avg(precision)").first()[0]
print(f"\n Precision@{K} (Users >=100 ratings) = {precision_value:.4f}\n")


recall_df = (
    eval_df
        .withColumn("hits", array_intersect("predicted_movies", "actual_movies"))
        .withColumn("recall", size(col("hits")) / size(col("actual_movies")))
)

recall_value = recall_df.selectExpr("avg(recall)").first()[0]
print(f"\n  Recall@{K} (Users >=100 ratings) = {recall_value:.4f}\n")


In [None]:

# Remove reference to model
# del als_model

# Clear Spark cache
# spark.catalog.clearCache()

# Stop Spark session
# spark.stop()
