# Recommender systems

## Implementing recommender system

### Description

**item-item collaborative filtering system**

- Baseline Estimate Calculation
    
    Each predicted rating $r_{xi}$ incorporates a baseline estimate $b_{xi}$ , accounting for overall trends:

    $$
    b_{xi} = \mu + b_x + b_i
    $$

    Where:
    - $ \mu $: Overall mean book rating.
    - $ b_x = \bar{r}_x - \mu $: User $x$'s rating deviation from $\mu$.
    - $ b_i = \bar{r}_i - \mu $: Book $i$'s rating deviation from $\mu$.

    > overall_mean = ratings_df.select(F.avg("Rating")).first()[0] <br>
    > user_avg = ratings_df.filter(F.col("User_Id") == user_id).select(F.avg("Rating")).first()[0] <br>
    > bx = user_avg - overall_mean if user_avg else 0 <br>
    > book_avg = ratings_df.filter(F.col("Book_Id") == book_id).select(F.avg("Rating")).first()[0] <br>
    > bi = book_avg - overall_mean if book_avg else 0 <br>
    > bxi = overall_mean + bx + bi <br>

- Similarity Calculation
  
    The similarity between two books $i$ and $j$, rated by common users, is computed using the **Pearson correlation coefficient**:

    $$
    s_{ij} = \frac{\sum_{u \in U_{ij}} (r_{ui} - \bar{r}_i)(r_{uj} - \bar{r}_j)}{\sqrt{\sum_{u \in U_{ij}} (r_{ui} - \bar{r}_i)^2} \sqrt{\sum_{u \in U_{ij}} (r_{uj} - \bar{r}_j)^2}}
    $$

  - Where:
      - $U_{ij}$: Set of users who rated both books $i$ and $j$.
      - $ \bar{r}_i $: Average rating for book $i$.
      - $ \bar{r}_j $: Average rating for book $j$.
    
    > common_ratings = ratings_i.join(ratings_j, "User_Id")<br>
    > mean_i = common_ratings.select(F.avg("i.Rating")).first()[0]<br>
    > mean_j = common_ratings.select(F.avg("j.Rating")).first()[0]<br>
    > numerator = common_ratings.withColumn("diff_i", F.col("i.Rating") - mean_i) \ <br>
    > $\quad$ .withColumn("diff_j", F.col("j.Rating") - mean_j) \ <br>
    > $\quad$ .withColumn("product", F.col("diff_i") * F.col("diff_j")) \ <br>
    > $\quad$ .select(F.sum("product")).first()[0]
    

- Top- $k$  Nearest Neighbors
    - From the set of books rated by the user, the top- $k$ most similar books to the target book $i$ are - selected based on $s_{ij}$.
    > similarities = sorted(similarities, key=lambda x: -x[1])[:k]

- Rating Prediction
  
    The predicted rating $r_{xi}$ is calculated as:

    $$
    r_{xi} = b_{xi} + \frac{\sum_{j \in N(i;x)} s_{ij} \cdot (r_{xj} - b_{xj})}{\sum_{j \in N(i;x)} |s_{ij}|}
    $$

    Where:
    - $ N(i;x) $: Set of top- $k$ similar books to $i$, rated by user $x$.
    - $ r_{xj} $: User $x$'s rating for book $j$.
    - $ b_{xj} $: Baseline estimate for book $j$.
    > numerator = sum(similarity * (rating_j - (overall_mean + bxj)) for _, similarity, rating_j in similarities)<br>
    > denominator = sum(abs(similarity) for _, similarity, _ in similarities)<br>
    > predicted_rating = bxi + (numerator / denominator if denominator != 0 else 0)<br>

- Edge Cases
  - **Existing Ratings**: If the user already rated the book, the actual rating is returned.
  > if existing_rating:<br>
  > $\quad$ return existing_rating["Rating"]
  - **No Neighbors**: If no similar books are found, the baseline estimate \( b_{xi} \) is returned.
  > if not similarities:<br>
  > $\quad$ return bxi


### Preprocessing

In [None]:
df = spark.read.csv("BookRates_DS.csv", header=False, inferSchema=True)

# # Calculate the number of rows
n = df.count()

# Get the first half of the DataFrame
df = df.limit(n//2)

# Rename columns
df = df.withColumnRenamed("_c0", "User_Id") \
       .withColumnRenamed("_c1", "Book_Id") \
       .withColumnRenamed("_c2", "Rating") \
       .withColumnRenamed("_c3", "Date") \
       
df.show(5, truncate = False)

+--------------+----------+------+----------+
|User_Id       |Book_Id   |Rating|Date      |
+--------------+----------+------+----------+
|AH2L9G3DQHHAJ |0000000116|4.0   |1019865600|
|A2IIIDRK3PRRZY|0000000116|1.0   |1395619200|
|A1TADCM7YWPQ8M|0000000868|4.0   |1031702400|
|AWGH7V0BDOJKB |0000013714|4.0   |1383177600|
|A3UTQPQPM4TQO0|0000013714|5.0   |1374883200|
+--------------+----------+------+----------+
only showing top 5 rows



### Predict

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window
from math import sqrt

def compute_similarity(item_i, item_j, ratings_df):
    """
    Compute the similarity (e.g., Pearson correlation coefficient) between two items.
    """
    # Filter ratings for both items i and j
    ratings_i = ratings_df.filter(F.col("Book_Id") == item_i).select("User_Id", "Rating").alias("i")
    ratings_j = ratings_df.filter(F.col("Book_Id") == item_j).select("User_Id", "Rating").alias("j")
    
    # Join on User_Id
    common_ratings = ratings_i.join(ratings_j, "User_Id")
    
    if common_ratings.count() == 0:
        return 0  # No common users; similarity is 0
    
    # Calculate mean ratings for i and j
    mean_i = common_ratings.select(F.avg("i.Rating")).first()[0]
    mean_j = common_ratings.select(F.avg("j.Rating")).first()[0]
    
    # Compute numerator and denominator for Pearson correlation
    common_ratings = common_ratings.withColumn("diff_i", F.col("i.Rating") - mean_i) \
                                   .withColumn("diff_j", F.col("j.Rating") - mean_j) \
                                   .withColumn("product", F.col("diff_i") * F.col("diff_j"))
    
    numerator = common_ratings.select(F.sum("product")).first()[0]
    denominator = sqrt(common_ratings.select(F.sum(F.col("diff_i") ** 2)).first()[0]) * \
                  sqrt(common_ratings.select(F.sum(F.col("diff_j") ** 2)).first()[0])
    
    if denominator == 0:
        return 0  # Avoid division by zero
    
    return numerator / denominator


def predict_rating(user_id, book_id, ratings_df, k=5):
    """
    Predict the rating for a given user and book using item-item collaborative filtering.
    """
    # Check if the book is already rated by the user
    existing_rating = ratings_df.filter((F.col("User_Id") == user_id) & (F.col("Book_Id") == book_id)).select("Rating").first()
    if existing_rating:
        return existing_rating["Rating"]  # Return actual rating if already exists
    
    # Overall mean rating
    overall_mean = ratings_df.select(F.avg("Rating")).first()[0]
    
    # User bias (bx)
    user_avg = ratings_df.filter(F.col("User_Id") == user_id).select(F.avg("Rating")).first()[0]
    bx = user_avg - overall_mean if user_avg else 0
    
    # Book bias (bi)
    book_avg = ratings_df.filter(F.col("Book_Id") == book_id).select(F.avg("Rating")).first()[0]
    bi = book_avg - overall_mean if book_avg else 0
    
    # Baseline estimate
    bxi = overall_mean + bx + bi
    
    # Find other books rated by the user
    user_books = ratings_df.filter(F.col("User_Id") == user_id).select("Book_Id", "Rating").collect()
    
    # Compute similarities with the target book
    similarities = []
    for row in user_books:
        book_j = row["Book_Id"]
        rating_j = row["Rating"]
        similarity = compute_similarity(book_id, book_j, ratings_df)
        if similarity > 0:  # Only consider positive similarities
            similarities.append((book_j, similarity, rating_j))
    
    # Select top-k similar books
    similarities = sorted(similarities, key=lambda x: -x[1])[:k]
    
    if not similarities:
        return bxi  # Return baseline estimate if no neighbors are found
    
    # Compute weighted average
    numerator = sum(similarity * (rating_j - (overall_mean + (rating_j - overall_mean))) for _, similarity, rating_j in similarities)
    denominator = sum(abs(similarity) for _, similarity, _ in similarities)
    
    return bxi + (numerator / denominator if denominator != 0 else 0)


# Find a random user
random_user = df.select("User_Id").distinct().orderBy(F.rand()).first()[0]

# Find a book that the user has not rated
rated_books = df.filter(F.col("User_Id") == random_user).select("Book_Id").distinct()
all_books = df.select("Book_Id").distinct()
not_rated_books = all_books.subtract(rated_books)

if not_rated_books.count() > 0:
    random_book = not_rated_books.orderBy(F.rand()).first()[0]
    
    # Predict the rating for the not-rated book
    predicted_rating = predict_rating(random_user, random_book, df)
    print(f"Predicted rating for User {random_user} on Book {random_book}: {predicted_rating}")
else:
    print(f"No books available that User {random_user} has not rated.")


Predicted rating for User AQIY318OX7XYW on Book 0001468685: 1.8137943293929428


## Evaluation 

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import math

# 1. Select 25% of books randomly
books = df.select("Book_Id").distinct().collect()
sampled_books = random.sample([row["Book_Id"] for row in books], int(0.25 * len(books)))

# 2. Mask 25% of ratings for the sampled books
masked_df = df.filter(F.col("Book_Id").isin(sampled_books))
unmasked_df = df.subtract(masked_df)

# Randomly sample 25% of ratings for the selected books
mask_window = Window.partitionBy("Book_Id")
masked_df = masked_df.withColumn("rand", F.rand()) \
    .withColumn("mask_flag", F.when(F.col("rand") <= 0.25, 1).otherwise(0)) \
    .filter(F.col("mask_flag") == 1) \
    .drop("rand", "mask_flag")

# Remaining ratings after masking
available_df = df.subtract(masked_df)

# Ground truth for masked ratings
ground_truth_df = masked_df.select("User_Id", "Book_Id", "Rating")

# Collect item-user ratings into a Python dictionary
ratings_data = df.groupBy("Book_Id").agg(F.collect_list(F.struct("User_Id", "Rating")).alias("ratings")).collect()

# Create a dictionary mapping Book_Id to its ratings
item_ratings_dict = {row["Book_Id"]: [(x["User_Id"], x["Rating"]) for x in row["ratings"]] for row in ratings_data}

# Broadcast the dictionary
item_ratings_broadcast = spark.sparkContext.broadcast(item_ratings_dict)

def predict_rating_v01(user_id, book_id, item_ratings):
    """
    Predict the rating for a given user and book using broadcasted data.
    """
    # Baseline predictions (mean ratings)
    overall_mean = sum([r for ratings in item_ratings.values() for _, r in ratings]) / sum([len(ratings) for ratings in item_ratings.values()])
    user_ratings = [(bid, rating) for bid, ratings in item_ratings.items() for uid, rating in ratings if uid == user_id]

    if not user_ratings:
        return overall_mean  # Return overall mean if the user has no ratings

    # Compute similarity-weighted average
    similarities = []
    for book_j, rating_j in user_ratings:
        ratings_i = item_ratings.get(book_id, [])
        ratings_j = item_ratings.get(book_j, [])

        # Compute similarity
        common_users = set(uid for uid, _ in ratings_i) & set(uid for uid, _ in ratings_j)
        if not common_users:
            continue

        mean_i = sum(r for uid, r in ratings_i if uid in common_users) / len(common_users)
        mean_j = sum(r for uid, r in ratings_j if uid in common_users) / len(common_users)

        numerator = sum(
            (r_i - mean_i) * (r_j - mean_j)
            for (uid, r_i) in ratings_i
            for (uid_j, r_j) in ratings_j
            if uid == uid_j
        )
        denominator = sqrt(
            sum((r_i - mean_i) ** 2 for uid, r_i in ratings_i if uid in common_users) *
            sum((r_j - mean_j) ** 2 for uid, r_j in ratings_j if uid in common_users)
        )

        similarity = numerator / denominator if denominator != 0 else 0
        similarities.append((similarity, rating_j))

    # Aggregate weighted ratings
    numerator = sum(sim * rating for sim, rating in similarities)
    denominator = sum(abs(sim) for sim, _ in similarities)

    return overall_mean + (numerator / denominator if denominator != 0 else 0)

@udf(DoubleType())
def predict_rating_udf(user_id, book_id):
    return predict_rating_v01(user_id, book_id, item_ratings_broadcast.value)

# Generate predictions
predictions_df = ground_truth_df.withColumn(
    "Predicted_Rating",
    predict_rating_udf(F.col("User_Id"), F.col("Book_Id"))
)

# Compute RMSE
rmse = predictions_df.withColumn(
    "squared_error", F.pow(F.col("Rating") - F.col("Predicted_Rating"), 2)
).select(F.avg("squared_error").alias("mse")).withColumn(
    "rmse", F.sqrt(F.col("mse"))
).select("rmse").first()["rmse"]

print(f"RMSE: {rmse}")



RMSE: 4.0301960237651135


## Saving

In [None]:
import pandas as pd
from pyspark.sql.functions import when, col

# Load the input data
df_eval = spark.read.csv("Evaluate_P2.csv", header=False, inferSchema=True)

# Rename columns
df_eval = df_eval.withColumnRenamed("_c0", "User_Id") \
                 .withColumnRenamed("_c1", "Book_Id") \
                 .withColumnRenamed("_c2", "Rating")

# Apply the UDF to estimate ratings for rows with question marks
df_with_estimated_ratings = df_eval.withColumn(
    "Rating",
    when(
        col("Rating") == "?", 
        predict_rating(col("User_Id"), col("Book_Id"), df)  
    ).otherwise(col("Rating"))  
)
data = df_with_estimated_ratings.select("User_Id", "Book_Id", "Rating").collect()

columns = ["User_Id", "Book_Id", "Rating"]

# Create the DataFrame
Data_df = pd.DataFrame(data, columns=columns)
# Step 4: Save the DataFrame as a CSV file
Data_df.to_csv("Evaluate_P2.csv", index=False, header=False)