In [1]:
import os
import sys
import random
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator, RankingEvaluator
import pandas as pd
import numpy as np
from itertools import chain
from datetime import datetime
import altair as alt
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

warnings.filterwarnings("ignore")

In [2]:
spark = SparkSession.builder \
    .appName("MovieRecs") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

spark.sparkContext.setLogLevel("ALL")
print("Log Level set to ALL")

Spark Version: 4.0.1
Log Level set to ALL


# 1. Load Data

In [3]:
ratings_cols = ['userId', 'movieId', 'rating', 'timestamp']

In [4]:
ratings_df = spark.read.option("delimiter", ",").option("header", True).csv('ml-32m/ratings.csv', schema=StructType([
    StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', FloatType(), True),
    StructField('timestamp', IntegerType(), True)
])).toDF(*ratings_cols)

In [5]:
ratings_df_pd = pd.read_csv('ml-32m/ratings.csv', sep=',', header=0, 
                            dtype={'userId': np.int32, 'movieId': np.int32, 'rating': np.float32, 'timestamp': np.int64})

PySpark reads 32m rows way faster than Pandas. This shows PySpark's superiority in handling large datasets.

In [6]:
movies_cols = ['movieId', 'title', 'genres']
movies_df = spark.read.option("delimiter", ",").option("header", True).csv('ml-32m/movies.csv', schema=StructType([
    StructField('movieId', IntegerType(), True),
    StructField('title', StringType(), True),
    StructField('genres', StringType(), True)
])).toDF(*movies_cols)

tags_cols = ['userId', 'movieId', 'tag', 'timestamp']
tags_df = spark.read.option("delimiter", ",").option("header", True).csv('ml-32m/tags.csv', schema=StructType([
    StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('tag', StringType(), True),
    StructField('timestamp', IntegerType(), True)
])).toDF(*tags_cols)

links_cols = ['movieId', 'imdbId', 'tmdbId']
links_df = spark.read.option("delimiter", ",").option("header", True).csv('ml-32m/links.csv', schema=StructType([
    StructField('movieId', IntegerType(), True),
    StructField('imdbId', IntegerType(), True),
    StructField('tmdbId', IntegerType(), True)
])).toDF(*links_cols)

In [7]:
ratings_df = ratings_df.withColumn("datetime", F.col("timestamp").cast("timestamp"))
tags_df = tags_df.withColumn("datetime", F.col("timestamp").cast("timestamp"))

In [8]:
print(movies_df.printSchema())
print(ratings_df.printSchema())
print(tags_df.printSchema())
print(links_df.printSchema())

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

None
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- datetime: timestamp (nullable = true)

None
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- datetime: timestamp (nullable = true)

None
root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)

None


In [9]:
# merge dataframes
merged_df = ratings_df.join(movies_df, on='movieId')

In [10]:
# merged_df.show(5)

In [11]:
ratings_df = ratings_df.withColumn("year", F.year("datetime"))

In [12]:
movies_df = movies_df.withColumn("releaseYear", F.regexp_extract("title", r'\((\d{4})\)$', 1))

In [13]:
movies_genre_df = movies_df.withColumn("genre", F.explode(F.split(F.col("genres"), "\\|")))
movies_genre_df = movies_genre_df.drop("genres")

# 3. Modeling

RQs:

RQ 1: How does the choice of feedback signal (Explicit Ratings vs. Implicit Binary) impact the Top-N ranking quality of ALS when evaluated under a realistic retrieval protocol?

RQ 2: Does introducing non-linearity via Neural Collaborative Filtering (NCF/AutoRec) provide a statistically significant improvement over linear ALS on sparse data?

RQ 3: Does incorporating sequential dynamics (SASRec) outperform static collaborative filtering (ALS/NCF) for predicting immediate next-item interactions?

## Data Preparation

In [None]:
# Taking last 1 year of data

max_datetime = ratings_df.agg(F.max("datetime")).collect()[0][0]
cutoff_datetime = max_datetime.replace(year=max_datetime.year - 1)
ratings_small_df = ratings_df.filter(F.col("datetime") >= F.lit(cutoff_datetime))
print(f"New Dataset Size: {ratings_small_df.count()} rows")

New Dataset Size: 988745 rows


In [15]:
quantiles = ratings_small_df.stat.approxQuantile("timestamp", [0.9], 0.001)
split_timestamp = quantiles[0]
print(f"Splitting data at datetime: {datetime.fromtimestamp(split_timestamp)}")

# 2. Define Train & Cache Immediately
train_df = ratings_small_df.filter(F.col("timestamp") < split_timestamp)
train_df.cache()

# 3. OPTIMIZATION A: Single-Pass Metrics Calculation
# Instead of 3 separate count() calls, we get all metrics in 1 job.
# This triggers the cache for 'train_df' efficiently.
train_metrics = train_df.agg(
    F.count("*").alias("interactions"),
    F.countDistinct("userId").alias("users"),
    F.countDistinct("movieId").alias("items")
).collect()[0]

print("Train")
print(f"Interaction: {train_metrics['interactions']}")
print(f"Users: {train_metrics['users']}")
print(f"Items: {train_metrics['items']}")

# 4. Define Distinct Lists for Filtering
# Since train_df is now cached, these are computed quickly.
train_users_distinct = train_df.select("userId").distinct()
train_items_distinct = train_df.select("movieId").distinct()

# 5. Define Test Raw
test_df_raw = ratings_small_df.filter(F.col("timestamp") >= split_timestamp)

# 6. OPTIMIZATION B: Broadcast Join
# Since distinct users/items are usually small compared to interaction data,
# we 'broadcast' them to all nodes to avoid a massive Shuffle Sort Merge Join.
test_df_clean = test_df_raw \
    .join(F.broadcast(train_users_distinct), on="userId", how="inner") \
    .join(F.broadcast(train_items_distinct), on="movieId", how="inner")

# 7. OPTIMIZATION C: Single-Pass Test Metrics
# Again, get all 3 test metrics in just 1 job.
test_metrics = test_df_clean.agg(
    F.count("*").alias("interactions"),
    F.countDistinct("userId").alias("users"),
    F.countDistinct("movieId").alias("items")
).collect()[0]

print("\nTest")
print(f"Interaction: {test_metrics['interactions']}")
print(f"Users: {test_metrics['users']}")
print(f"Items: {test_metrics['items']}")

Splitting data at datetime: 2023-09-10 18:00:00
Train
Interaction: 889317
Users: 9562
Items: 38046

Test
Interaction: 25305
Users: 2118
Items: 8078


Train data is 99.77% sparse.

In [16]:
# For implicit feedback, set rating threshold
rating_threshold = 3.5
train_i_df = train_df.withColumn("implicit_label", F.when(F.col("rating") >= rating_threshold, 1.0).otherwise(0.0))
test_i_df_clean = test_df_clean.withColumn("implicit_label", F.when(F.col("rating") >= rating_threshold, 1.0).otherwise(0.0))

# 3. Collaborative Filtering

RQ 1: How does the choice of feedback signal (Explicit Ratings vs. Implicit Binary) impact the Top-N ranking quality of ALS when evaluated under a realistic retrieval protocol?

### ALS

In [17]:
# Model on explicit feedback

ALS_E_MODEL_PATH = "models/als_e_model"

if os.path.exists(ALS_E_MODEL_PATH):
    print("Loading existing explicit ALS model...")
    als_e_model = ALSModel.load(ALS_E_MODEL_PATH)
else:
    print("Training new ALS model...")
    als = ALS(rank=30, 
            maxIter=10, 
            regParam=0.1, 
            numUserBlocks=10, 
            numItemBlocks=10, 
            implicitPrefs=False,
            alpha=1.0,
            userCol='userId',
            itemCol='movieId',
            seed=42,
            ratingCol='rating',
            nonnegative=False,
            checkpointInterval=10,
            coldStartStrategy="drop")
    als_e_model = als.fit(train_df)
    als_e_model.save(ALS_E_MODEL_PATH)

Training new ALS model...


In [18]:
# Model on implicit feedback

ALS_I_MODEL_PATH = "models/als_i_model"

if os.path.exists(ALS_I_MODEL_PATH):
    print("Loading existing implicit ALS model...")
    als_i_model = ALSModel.load(ALS_I_MODEL_PATH)
else:
    print("Training new ALS model...")
    als = ALS(rank=30, 
            maxIter=10, 
            regParam=0.1, 
            numUserBlocks=10, 
            numItemBlocks=10, 
            implicitPrefs=True,
            alpha=1.0,
            userCol='userId',
            itemCol='movieId',
            seed=42,
            ratingCol='rating',
            nonnegative=False,
            checkpointInterval=10,
            coldStartStrategy="drop")
    als_i_model = als.fit(train_i_df)
    als_i_model.save(ALS_I_MODEL_PATH)

Training new ALS model...


In [19]:
user_movie_count_df = train_df.groupBy("userId").agg(F.count("movieId").alias("seenItemsCount"))

In [21]:
user_movie_count_df.sort(F.col("seenItemsCount").desc()).show(5)

+------+--------------+
|userId|seenItemsCount|
+------+--------------+
|103013|          2991|
|108412|          2917|
| 87324|          2741|
|161180|          2720|
|  1668|          2544|
+------+--------------+
only showing top 5 rows


In [22]:
def precision_recall_ndcg_at_k_full_ranking(test_df, train_df, model, k=10):
    userid_test_df = test_df.select('userId').distinct()
    raw_recs = model.recommendForUserSubset(userid_test_df, 3500)
    exploded_recs = raw_recs.select("userId", F.explode("recommendations").alias("rec")).select("userId", F.col("rec.movieId").alias("movieId"), F.col("rec.rating").alias("prediction"))
    new_item_recs = exploded_recs.join(train_df, on=["userId", "movieId"], how="left_anti")
    final_recs = new_item_recs.withColumn("rank", F.row_number().over(
        Window.partitionBy("userId").orderBy(F.desc("prediction"))
    )).filter(F.col("rank") <= k) \
    .groupBy("userId") \
    .agg(F.collect_list(F.col("movieId").cast("double")).alias("predicted_movieIds"))
    ground_truth_per_user = test_df.filter(F.col("rating") >= 3.5) \
        .groupBy('userId') \
        .agg(F.collect_set(F.col('movieId').cast("double")).alias('liked_movieIds'))
    recommendations_with_truth = final_recs.join(ground_truth_per_user, on='userId', how='inner')
    
    # hit@k
    recommendations_with_truth = recommendations_with_truth.withColumn("hit", F.expr("array_intersect(predicted_movieIds, liked_movieIds)")) \
        .withColumn("hit", F.when(F.size(F.col("hit")) > 0, 1.0).otherwise(0.0))
    hit_at_k = recommendations_with_truth.agg(F.avg("hit")).collect()[0][0]
    
    # precision@k
    p_evaluator = RankingEvaluator(
        metricName="precisionAtK", 
        k=k, 
        predictionCol="predicted_movieIds", 
        labelCol="liked_movieIds"
    )
    
    # recall@k
    r_evaluator = RankingEvaluator(
        metricName="recallAtK", 
        k=k, 
        predictionCol="predicted_movieIds", 
        labelCol="liked_movieIds"
    )

    precision_at_k = p_evaluator.evaluate(recommendations_with_truth)
    recall_at_k = r_evaluator.evaluate(recommendations_with_truth)
    return precision_at_k, recall_at_k, hit_at_k

In [23]:
test_e_precision_fr, test_e_recall_fr, test_e_hit_fr = precision_recall_ndcg_at_k_full_ranking(test_df_clean, train_df, als_e_model, k=10)
print(f"Explicit ALS Model - Test Precision@10: {test_e_precision_fr:.4f}, Recall@10: {test_e_recall_fr:.4f}, Hit@10: {test_e_hit_fr:.4f}")

Explicit ALS Model - Test Precision@10: 0.0001, Recall@10: 0.0001, Hit@10: 0.0011


In [None]:
test_i_precision_fr, test_i_recall_fr, test_i_hit_fr = precision_recall_ndcg_at_k_full_ranking(test_df_clean, train_df, als_i_model, k=10)
print(f"Implicit ALS Model - Test Precision@10: {test_i_precision_fr:.4f}, Recall@10: {test_i_recall_fr:.4f}, Hit@10: {test_i_hit_fr:.4f}")