In [12]:
import pandas as pd

In [41]:
PATH = "../../data/raw/ml-100k/"

ratings_columns = ["user_id", "movie_id", "rating", "timestamp"]
ratings = pd.read_csv(
    PATH + "u.data", sep="\t", names=ratings_columns, encoding="latin-1"
)

user_actual_items = ratings.groupby('user_id')['movie_id'].apply(list).to_dict()

In [15]:
movie_columns = [
    "movie_id",
    "title",
    "release_date",
    "video_release_date",
    "IMDb_URL",
    "unknown",
    "Action",
    "Adventure",
    "Animation",
    "Children",
    "Comedy",
    "Crime",
    "Documentary",
    "Drama",
    "Fantasy",
    "Film-Noir",
    "Horror",
    "Musical",
    "Mystery",
    "Romance",
    "Sci-Fi",
    "Thriller",
    "War",
    "Western",
]
movies = pd.read_csv(PATH + "u.item", sep="|", names=movie_columns, encoding="latin-1")

In [7]:
user_columns = ["user_id", "age", "gender", "occupation", "zip_code"]
users = pd.read_csv(PATH + "u.user", sep="|", names=user_columns, encoding="latin-1")

In [12]:
movie_ratings = pd.merge(ratings, movies[['movie_id', 'title']], on='movie_id')

users

Unnamed: 0,user_id,age,gender,occupation,zip_code
0,1,24,M,technician,85711
1,2,53,F,other,94043
2,3,23,M,writer,32067
3,4,24,M,technician,43537
4,5,33,F,other,15213
...,...,...,...,...,...
938,939,26,F,student,33319
939,940,32,M,administrator,02215
940,941,20,M,student,97229
941,942,48,F,librarian,78209


In [42]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import Row

# Initialize Spark Session
spark = SparkSession.builder.appName("MovieLensALS").getOrCreate()

# Load data
data_path = "../../data/raw/ml-100k/u1.base"  # Replace with your path to the u.data file
ratings = spark.read.csv(data_path, sep='\t', inferSchema=True)
ratings = ratings.withColumnRenamed("_c0", "userId") \
                 .withColumnRenamed("_c1", "movieId") \
                 .withColumnRenamed("_c2", "rating")

In [37]:
# Split the data into training and test sets
# (training, test) = ratings.randomSplit([0.8, 0.2])
test = spark.read.csv("../../data/raw/ml-100k/u1.test", sep='\t', inferSchema=True)
test = test.withColumnRenamed("_c0", "userId") \
                 .withColumnRenamed("_c1", "movieId") \
                 .withColumnRenamed("_c2", "rating")

movies_df = spark.read.csv("../../data/raw/ml-100k/u.item", sep='\t', inferSchema=True)
movies_df = test.withColumnRenamed("_c0", "userId") \
                 .withColumnRenamed("_c1", "movieId") \
                 .withColumnRenamed("_c2", "rating")

# Build the recommendation model using ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)


paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [52, 55, 56, 57, 58]) \
    .addGrid(als.regParam, [0.1, 0.15, 0.2]) \
    .build()
# Fit the model to the training data
# model = als.fit(training)

In [7]:
# Evaluate the model by computing the RMSE on the test data
# predictions = model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction"
)


crossval = CrossValidator(
    estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3
)

cvModel = crossval.fit(ratings)

bestModel = cvModel.bestModel

# Print best rank and regParam
print("Best Rank:", bestModel._java_obj.parent().getRank())
print("Best regParam:", bestModel._java_obj.parent().getRegParam())


Best Rank: 57
Best regParam: 0.15


In [9]:
# Apply the best model to the test data
predictions = bestModel.transform(test)  # Make sure you have a test_data set

evaluator = RegressionEvaluator(
    metricName="mae", labelCol="rating", predictionCol="prediction"
)

# Evaluate best model
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.7471936652914454


In [45]:
from pyspark.sql.functions import col, lit
from pyspark.sql import DataFrame

def get_top_k_recommendations(als_model, user_id, k, ratings_df, movies_df):
    """
    Get top k recommendations for a given user using PySpark ALS model.
    
    Args:
    als_model (ALSModel): Trained ALS model.
    user_id (int): User ID for whom recommendations are to be made.
    k (int): Number of recommendations.
    ratings_df (DataFrame): PySpark DataFrame of ratings.
    movies_df (DataFrame): PySpark DataFrame of movies.
    spark_session (SparkSession): Active Spark session.
    
    Returns:
    DataFrame: Top k recommendations (movie IDs and predicted ratings).
    """
    # Create a DataFrame of movies the user has already rated
    user_rated_movies = ratings_df.filter(col("user_id") == user_id).select("movie_id")

    # Create a DataFrame of movies the user has not rated yet
    movies_not_rated = movies_df.join(user_rated_movies, "movie_id", "left_anti")

    # Add a column with the user ID to this DataFrame
    movies_not_rated = movies_not_rated.withColumn("user_id", lit(user_id))

    # Predict ratings for these movies
    predictions = als_model.transform(movies_not_rated)

    # Get the top k recommendations
    top_k_recommendations = predictions.orderBy(col("prediction").desc()).limit(k)

    return top_k_recommendations


In [39]:
def precision_at_k(recommended_items, actual_items, k):
    """Calculate precision at k."""
    if not recommended_items:
        return 0
    recommended_at_k = recommended_items[:k]
    hits = set(recommended_at_k).intersection(set(actual_items))
    return len(hits) / len(recommended_at_k)

def recall_at_k(recommended_items, actual_items, k):
    """Calculate recall at k."""
    if not actual_items:
        return 0
    recommended_at_k = recommended_items[:k]
    hits = set(recommended_at_k).intersection(set(actual_items))
    return len(hits) / len(actual_items)


In [29]:
user_ids = ratings['user_id'].unique()

In [44]:
# Assuming you have a DataFrame 'user_actual_items' mapping user IDs to lists of item IDs they've interacted with
# And a list of user IDs 'user_ids'

precisions = []
recalls = []


for k in [5, 10, 20, 50]:  # Different values of k
    user_precisions = []
    user_recalls = []
    
    for user_id in user_ids:
        actual_items = user_actual_items[user_id]
        recommended_items = get_top_k_recommendations(bestModel, user_id, k, ratings, movies)
        
#         user_precisions.append(precision_at_k(recommended_items, actual_items, k))
#         user_recalls.append(recall_at_k(recommended_items, actual_items, k))

#     avg_precision = sum(user_precisions) / len(user_precisions)
#     avg_recall = sum(user_recalls) / len(user_recalls)

#     precisions.append(avg_precision)
#     recalls.append(avg_recall)

#     print(f"Average Precision@{k}: {avg_precision}")
#     print(f"Average Recall@{k}: {avg_recall}")


AttributeError: 'DataFrame' object has no attribute 'user_id'

In [None]:
spark.stop()