In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, expr
import numpy as np


# Initialize a Spark session
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# Modify to work with a subset of the data (e.g., 10%)
subset_fraction = 0.1  # Adjust this fraction as needed

# Load the ratings.csv file
ratings_df = spark.read.csv("Data/ratings.csv", header=True, inferSchema=True)

# Take a random sample of the data
subset_ratings_df = ratings_df.sample(subset_fraction, seed=1234)

# Split the subset dataset into training and testing sets
(train_data, test_data) = subset_ratings_df.randomSplit([0.8, 0.2], seed=1234)

# Hyperparameters to tune
rank = 10
max_iter = 12
reg_param = 0.1

# Build the ALS model with hyperparameters
als = ALS(rank=rank, maxIter=max_iter, regParam=reg_param, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(train_data)

# Generate recommendations for all users
userRecs = model.recommendForAllUsers(10)  # You can adjust the number of recommendations (10 in this example)


[Stage 65:>                                                         (0 + 1) / 1]                                                                                

In [5]:
# Define a function to calculate NDCG for a single user
def calculate_ndcg(user_id, true_items, recommended_items, k=10):
    if user_id not in userRecs.select("userId").rdd.flatMap(lambda x: x).collect():
        return None  # User not in recommendations

    # Calculate DCG (Discounted Cumulative Gain)
    dcg = 0.0
    for i in range(min(k, len(recommended_items))):
        item = recommended_items[i]
        if item in true_items:
            relevance = 1.0
        else:
            relevance = 0.0
        dcg += (2 ** relevance - 1) / np.log2(i + 2)

    # Calculate IDCG (Ideal DCG)
    idcg = sum(1.0 / np.log2(i + 2) for i in range(min(k, len(true_items)))

    # Calculate NDCG
    ndcg = dcg / idcg
    return ndcg

# Calculate NDCG for each user in the test data
ndcg_values = []
for row in test_data.collect():
    user_id = row["userId"]
    true_items = [row["movieId"]]
    recommended_items = userRecs.where(col("userId") == user_id).select("recommendations.movieId").rdd.flatMap(lambda x: x[0]).collect()
    ndcg = calculate_ndcg(user_id, true_items, recommended_items)
    if ndcg is not None:
        ndcg_values.append(ndcg)

# Calculate the average NDCG score
average_ndcg = sum(ndcg_values) / len(ndcg_values)
print("Average NDCG:", average_ndcg)


SyntaxError: invalid syntax (4278770036.py, line 20)