# User-User collaborative filtering

In [5]:
from pyspark.sql import SparkSession, Row
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

### data preparation

In [6]:
#NOT USED

# Initialize Spark session
spark = SparkSession.builder.appName("ALSRecommender").getOrCreate()

# Load data from a CSV file, considering semicolon delimiter and quotes
data = spark.read.csv("Book reviews/BX-Book-Ratings.csv", header=True, inferSchema=True, sep=';', quote='"')

# Select and rename the columns according to the CSV file's format
ratings = data.select(
    col('User-ID').cast('int').alias('userId'), 
    col('ISBN').alias('bookId'), 
    col('Book-Rating').cast('int').alias('rating')
)


# Transform the ISBN string to an index using StringIndexer
stringIndexer = StringIndexer(inputCol="bookId", outputCol="bookIdIndexed")
model = stringIndexer.fit(ratings)
ratingsIndexed = model.transform(ratings)

# filter out users with less than 3 ratings
userRatingTreshold = 3
ratingsFiltered = ratingsIndexed.groupBy("userId").count().filter(col("count") > userRatingTreshold)
ratingsFiltered = ratingsFiltered.select("userId")

ratingsIndexed = ratingsIndexed.join(ratingsFiltered, "userId", "inner")

#filter top 10% of books
bookRatingTreshold = 0.9
ratingsFiltered = ratingsIndexed.groupBy("bookIdIndexed").count().filter(col("count") > bookRatingTreshold)
ratingsFiltered = ratingsFiltered.select("bookIdIndexed")

ratingsIndexed = ratingsIndexed.join(ratingsFiltered, "bookIdIndexed", "inner")


# Split data into training and test sets
seed = 12345
(training, test) = ratingsIndexed.randomSplit([0.8, 0.2], seed=seed)

24/03/26 22:18:32 WARN Utils: Your hostname, Anhs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.215 instead (on interface en0)
24/03/26 22:18:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/26 22:18:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

24/03/26 22:18:46 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/03/27 01:41:20 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 955321 ms exceeds timeout 120000 ms
24/03/27 01:41:20 WARN SparkContext: Killing executors is not supported by current scheduler.
24/03/27 01:41:23 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.Rpc

In [3]:
import pandas as pd
import numpy as np
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import cosine_similarity
import dask.dataframe as dd


ddf = dd.read_csv('Book reviews/BX-Book-Ratings.csv', sep=';', encoding='latin-1',dtype={'User-ID': 'int32', 'Book-Rating': 'int8', 'ISBN': 'category'})

rating_summary = ddf.groupby('ISBN')['Book-Rating'].mean().compute()

df = ddf.compute()

df = df.head(100000)

#df = df[df['Book-Rating'] > 0] 

# filter out users with less than 3 ratings
user_rating_treshold = 3
user_rating_count = df['User-ID'].value_counts()
active_users = user_rating_count[user_rating_count >= user_rating_treshold].index
df = df[df['User-ID'].isin(active_users)]

# filter top 10% of books
book_rating_treshold = 0.9
book_rating_count = df['ISBN'].value_counts()
popular_books = book_rating_count[book_rating_count >= book_rating_treshold].index
df = df[df['ISBN'].isin(popular_books)]

### Modeling

In [4]:

# create user-item interaction matrix
ratings_matrix = df.pivot(index='User-ID', columns='ISBN', values='Book-Rating').fillna(0)


# calculate the cosine similarity of the users
cosine_sim = cosine_similarity(ratings_matrix, ratings_matrix)

# convert the similarity matrix into a DataFrame
cosine_sim_df = pd.DataFrame(cosine_sim, index=ratings_matrix.index, columns=ratings_matrix.index)

# create a df with the 5 most similar users for each user
top_5_similar_users = {}
for user in cosine_sim_df.columns:
    top_5_similar_users[user] = cosine_sim_df[user].sort_values(ascending=False)[1:6]
    
top_5_similar_users_df = pd.DataFrame(top_5_similar_users)


# predict the rating that a user would give to a book using mean of top 5 similar users
def predict_rating(user_id, book_id):
    top_5_similar = top_5_similar_users_df[user_id]
    
    # get the ratings of the top 5 similar users for the book
    ratings = []
    for user in top_5_similar.index:
        rating = ratings_matrix.loc[user, book_id]
        ratings.append(rating)
        
    # calculate the mean rating
    prediction = np.mean(ratings)
    
    return prediction


# get the mean rating for each row in df and put it in predictions column
df['prediction'] = df.apply(lambda x: predict_rating(x['User-ID'], x['ISBN']), axis=1)

# calculate the RMSE
rmse = np.sqrt(np.mean((df['Book-Rating'] - df['prediction'])**2))
print(rmse)

4.942315987334405
