Import the necessary packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
import mlflow
import mlflow.spark
import pandas as pd
import numpy as np
import pyspark
import os

Initiliaze Spark and Load data

In [None]:
# Clear stale Spark context references
pyspark.SparkContext._gateway = None
pyspark.SparkContext._jvm = None
pyspark.SparkContext._active_spark_context = None

# Ensure local IP is set correctly to avoid network conflicts
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MovieLensALS") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "8") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()

# Load movies and ratings data from S3
movies_df = spark.read.csv("data/movies.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv("data/ratings.csv", header=True, inferSchema=True)

ratings_df.printSchema()
movies_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- year: string (nullable = true)



Preprocessing and splitting the data

In [3]:
# Select relevant columns and rename them for ALS
matrix = ratings_df.selectExpr("userId as user", "movieId as item", "rating")

# Check schema and sample data
matrix.printSchema()
matrix.show(5)

# Split the data into training and test sets (80%-20%)
(training, test) = matrix.randomSplit([0.8, 0.2], seed=42)

print("Training data count:", training.count())
print("Testing data count:", test.count())

root
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: double (nullable = true)

+----+----+------+
|user|item|rating|
+----+----+------+
|   1|  17|   4.0|
|   1|  25|   1.0|
|   1|  29|   2.0|
|   1|  30|   5.0|
|   1|  32|   5.0|
+----+----+------+
only showing top 5 rows

Training data count: 25572748
Testing data count: 6394124


Experiment Tracking Setup with MLFlow and Hyperopt

In [4]:
def objective(params):
    with mlflow.start_run():
        als = ALS(userCol="user", itemCol="item", ratingCol="rating", coldStartStrategy="drop", nonnegative=True,
                  rank=int(params['rank']),
                  maxIter=int(params['maxIter']),
                  regParam=params['regParam'])

        model = als.fit(training)
        predictions = model.transform(test)

        evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
        evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
        
        rmse = evaluator_rmse.evaluate(predictions)
        mae = evaluator_mae.evaluate(predictions)

        mlflow.log_param("rank", params['rank'])
        mlflow.log_param("maxIter", params['maxIter'])
        mlflow.log_param("regParam", params['regParam'])
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("mae", mae)

        return {'loss': rmse, 'status': STATUS_OK}

Define Hyperparameter Search Space

In [6]:
# Define Hyperparameter Search Space
search_space = {
    'rank': hp.choice('rank', [10, 20, 30, 50]),
    'maxIter': hp.quniform('maxIter', 5, 15, 5),
    'regParam': hp.loguniform('regParam', np.log(0.001), np.log(1))
}

Execute Hyperparameter Optimization

In [7]:
# Execute Hyperparameter Optimization
trials = Trials()
best_params = fmin(fn=objective,
                   space=search_space,
                   algo=tpe.suggest,
                   max_evals=20,
                   trials=trials)

print("Best Parameters:", best_params)

  0%|          | 0/20 [00:00<?, ?trial/s, best loss=?]

100%|██████████| 20/20 [46:42<00:00, 140.12s/trial, best loss: 0.7710100154492384]
Best Parameters: {'maxIter': np.float64(15.0), 'rank': np.int64(3), 'regParam': np.float64(0.05392921721979053)}


Training model with the best params (lowest MAE tracked using MLFlow)

In [None]:
# Final selected hyperparameters (based on MAE optimization)
final_params = {
    'maxIter': 15,
    'rank': 30,
    'regParam': 0.0287
}

# Train final ALS model with selected hyperparameters
final_als = ALS(userCol="user", itemCol="item", ratingCol="rating",
                coldStartStrategy="drop", nonnegative=True,
                rank=final_params['rank'],
                maxIter=final_params['maxIter'],
                regParam=final_params['regParam'])

final_model = final_als.fit(training)

# Save ALS model as Parquet
final_model.save("models/best_als_model")

print("✅ Best ALS model saved with Rank={final_params['rank']}, MaxIter={final_params['maxIter']}, RegParam={final_params['regParam']}.")

Visualizing Actual vs Predicted Ratings for a random user

In [12]:
# Load the best ALS model
final_model = ALSModel.load("models/best_als_model")

# Perform predictions on the test set
predictions = final_model.transform(test)

from pyspark.sql.functions import col, expr, explode, min, max

# Select a random user from the test set
random_user = test.select("user").distinct().orderBy(expr("rand()")).limit(1).collect()[0]["user"]

# Get actual ratings for this user
actual_ratings = test.filter(col("user") == random_user).join(movies_df, test.item == movies_df.movieId).select("title", "rating")

# Generate recommendations for the selected user
top_n_recommendations = final_model.recommendForAllUsers(10)
user_recommendations = top_n_recommendations.filter(col("user") == random_user)

# Explode recommendations into separate rows
user_recommendations = user_recommendations.select("user", explode("recommendations").alias("recommendation"))

# Extract movie ID and predicted rating
user_recommendations = user_recommendations.select(col("user"), col("recommendation.item").alias("movieId"), col("recommendation.rating").alias("prediction"))

# Compute min and max predictions
min_pred = user_recommendations.agg(min("prediction")).collect()[0][0]
max_pred = user_recommendations.agg(max("prediction")).collect()[0][0]

# Apply Min-Max Scaling: Scale values into [0.5, 5.0] range
user_recommendations = user_recommendations.withColumn(
    "prediction",
    ((col("prediction") - min_pred) / (max_pred - min_pred)) * (5.0 - 0.5) + 0.5
)

# Join with movie titles
predicted_ratings = user_recommendations.join(movies_df, user_recommendations.movieId == movies_df.movieId) \
    .select("title", "prediction") \
    .orderBy(col("prediction").desc())

# Show results
print(f"Actual Ratings for User {random_user}:")
actual_ratings.show(truncate=False)

print(f"Predicted Ratings for User {random_user} (Top 10 Unseen Movies):")
predicted_ratings.show(truncate=False)

Actual Ratings for User 46404:
+-----------------------------------------------------+------+
|title                                                |rating|
+-----------------------------------------------------+------+
|Braveheart (1995)                                    |4.0   |
|Star Wars: Episode V - The Empire Strikes Back (1980)|3.5   |
|Big Lebowski, The (1998)                             |3.5   |
|Fight Club (1999)                                    |3.0   |
|Departed, The (2006)                                 |3.0   |
|Bourne Ultimatum, The (2007)                         |3.0   |
+-----------------------------------------------------+------+

Predicted Ratings for User 46404 (Top 10 Unseen Movies):
+-------------------------------------------------+------------------+
|title                                            |prediction        |
+-------------------------------------------------+------------------+
|Africa addio (1966)                              |5.0              

In [13]:
spark.stop()