In [None]:
# Load table into pandas df

import pandas as pd

ratings_df = pd.read_csv('sample_data/ratings.csv')

print(ratings_df.head())

In [None]:
!pip3 install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("MovieRecommendationALS").getOrCreate()

In [None]:
from sklearn.model_selection import train_test_split

train_df, temp_df = train_test_split(ratings_df, test_size=0.3, random_state=42) 
dev_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42) 

train_spark_df = spark.createDataFrame(train_df)
dev_spark_df = spark.createDataFrame(dev_df)
test_spark_df = spark.createDataFrame(test_df)

In [None]:
def tune_ALS(train_data, validation_data, maxIter, rank):
    # get ALS model
    als = ALS(
        maxIter=maxIter,
        regParam=0,
        rank=rank,
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop")
    # train ALS model
    model = als.fit(train_data)
    # evaluate the model by computing the RMSE on the validation data
    predictions = model.transform(validation_data)
    evaluator = RegressionEvaluator(metricName="mae",
                                    labelCol="rating",
                                    predictionCol="prediction")
    mae = evaluator.evaluate(predictions)
    print(f"{rank} latent factors, max iter = {maxIter}: validation MAE = {mae}")

# def tune_ALS(train_data, validation_data, maxIter, regParam, rank):
#     # get ALS model
#     als = ALS(
#         maxIter=maxIter,
#         regParam=regParam,
#         rank=rank,
#         userCol="userId",
#         itemCol="movieId",
#         ratingCol="rating",
#         coldStartStrategy="drop")
#     # train ALS model
#     model = als.fit(train_data)
#     # evaluate the model by computing the RMSE on the validation data
#     predictions = model.transform(validation_data)
#     evaluator = RegressionEvaluator(metricName="mae",
#                                     labelCol="rating",
#                                     predictionCol="prediction")
#     mae = evaluator.evaluate(predictions)
#     print(f"{rank} latent factors, regularization = {regParam}, max iter = {maxIter}: validation MAE = {mae}")

In [None]:
for iter in [10, 20, 50]:
    for r in [10, 20, 50]:
      tune_ALS(train_spark_df, dev_spark_df, iter, r)

In [None]:
# 10 latent factors, regularization = 0.01, max iter = 10: validation MAE = 0.8809409648936434
# 20 latent factors, regularization = 0.01, max iter = 10: validation MAE = 0.9865915344359836
# 50 latent factors, regularization = 0.01, max iter = 10: validation MAE = 1.092827599524713
# 10 latent factors, regularization = 0.05, max iter = 10: validation MAE = 0.7407131944113025
# 20 latent factors, regularization = 0.05, max iter = 10: validation MAE = 0.7574510508867892
# 50 latent factors, regularization = 0.05, max iter = 10: validation MAE = 0.753138706570574
# 10 latent factors, regularization = 0.1, max iter = 10: validation MAE = 0.6912527214782043 <- Best
# 20 latent factors, regularization = 0.1, max iter = 10: validation MAE = 0.6950250406099087
# 50 latent factors, regularization = 0.1, max iter = 10: validation MAE = 0.6919204329948865 
# 10 latent factors, regularization = 0.01, max iter = 20: validation MAE = 0.892745112963494
# 20 latent factors, regularization = 0.01, max iter = 20: validation MAE = 0.9969077982169593
# 50 latent factors, regularization = 0.01, max iter = 20: validation MAE = 1.0373869455851963
# 10 latent factors, regularization = 0.05, max iter = 20: validation MAE = 0.734153067477066
# 20 latent factors, regularization = 0.05, max iter = 20: validation MAE = 0.7456158548305457
# 50 latent factors, regularization = 0.05, max iter = 20: validation MAE = 0.7354275553562601
# 10 latent factors, regularization = 0.1, max iter = 20: validation MAE = 0.6899199263238752
# 20 latent factors, regularization = 0.1, max iter = 20: validation MAE = 0.6942792276238924
# 50 latent factors, regularization = 0.1, max iter = 20: validation MAE = 0.6921828139817162 

In [None]:
def test_ALS(train_data, test_data):
    # get ALS model
    als = ALS(
        maxIter=10,
        regParam=0.1,
        rank=10,
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop")
    # train ALS model
    model = als.fit(train_data)
    # evaluate the model by computing the RMSE on the validation data
    predictions = model.transform(test_data)
    evaluator_mae = RegressionEvaluator(metricName="mae",
                                    labelCol="rating",
                                    predictionCol="prediction")
    evaluator_mse = RegressionEvaluator(metricName="mse",
                                    labelCol="rating",
                                    predictionCol="prediction")
    mae = evaluator_mae.evaluate(predictions)
    mse = evaluator_mse.evaluate(predictions)

    print(f"MAE = {mae}, MSE = {mse}")

In [None]:
test_ALS(train_spark_df, test_spark_df)

In [None]:
# MAE = 0.6899739807562887
# MSE = 0.8052170727133248

In [1]:
def train_ALS(train_data):
    # get ALS model
    als = ALS(
        maxIter=10,
        regParam=0.1,
        rank=10,
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop")
    # train ALS model
    model = als.fit(train_data)

    # return model
    return model

In [2]:
import numpy as np

trained_model = train_ALS(train_spark_df)

# Fetch user and item matrices, then convert to pandas df
user_factors = trained_model.userFactors
item_factors = trained_model.itemFactors
user_factors_df = user_factors.toPandas()
item_factors_df = item_factors.toPandas()

# Convert to np arrays
user_factors_matrix = np.array(user_factors_df['features'].tolist())
item_factors_matrix = np.array(item_factors_df['features'].tolist())
user_ids = user_factors_df['id'].tolist()
item_ids = item_factors_df['id'].tolist()

# Matmul to create user-item matrix
user_item_matrix = np.dot(user_factors_matrix, item_factors_matrix.T)

user_item_df = pd.DataFrame(user_item_matrix, index=user_ids, columns=item_ids)


In [3]:
def get_top_10_movies_for_user(user_item_df, user_id):
    # Check if the user ID exists in the DataFrame
    if user_id not in user_item_df.index:
        raise ValueError(f"User ID {user_id} not found in the DataFrame")
    
    # Get the user's predicted ratings
    user_ratings = user_item_df.loc[user_id]
    
    # Sort the ratings in descending order
    sorted_ratings = user_ratings.sort_values(ascending=False)
    
    # Get the top 10 movie IDs
    top_10_movie_ids = sorted_ratings.head(10).index.tolist()
    
    return top_10_movie_ids

(2, 3)

In [None]:
# Example usage
user_id = 1  # Replace with the desired user ID
top_10_movies = get_top_10_movies_for_user(user_item_df, user_id)
print(f"Top 10 movies for user {user_id}: {top_10_movies}")