## Required installations

In [None]:
!pip install gdown
!pip install pyspark

## Required imports

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from sklearn.metrics import recall_score, precision_score, f1_score,mean_squared_error
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import numpy as np

## Spark Session

In [None]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Recommender") \
        .config("spark.driver.memory", "16g") \
        .config("spark.executor.memory", "16g") \
        .getOrCreate()

spark.conf.set("spark.sql.pivotMaxValues", "71567")

## Load and format the ratings data


### 100k

In [None]:
!gdown 1lwPW7OefaJnwsaqYBQs-wgcIGiatYLXb

def load_100k() :
    data = spark.read.option("delimiter", "\t")\
                    .option("header", "False")\
                    .csv('/kaggle/working/u.data')\
                    .select('_c0','_c1','_c2')\
                    .withColumnRenamed('_c0','userId')\
                    .withColumnRenamed('_c1', 'movieId') \
                    .withColumnRenamed('_c2', 'rating')
    data = data.select([F.col(c).cast("int") for c in data.columns])
    return data
    
ratings_df = load_100k()

### 1M

In [None]:
!gdown 18sHWE7Eu28hDqXib2PvesBYMea5AQmZs

def load_1m() :
    data = spark.read.option("delimiter", "::")\
                    .option("header", "False")\
                    .csv('/kaggle/working/ratings.dat')\
                    .select('_c0','_c1','_c2')\
                    .withColumnRenamed('_c0','userId')\
                    .withColumnRenamed('_c1', 'movieId') \
                    .withColumnRenamed('_c2', 'rating')
    data = data.select([F.col(c).cast("int") for c in data.columns])
    return data
    
ratings_df = load_1m()

### 10M

In [None]:
!gdown 1e064MFX83PYtPDcISjYQw4fTQtv-PG38

def load_10m() :
    data = spark.read.option("delimiter", "::")\
                    .option("header", "False")\
                    .csv('/kaggle/working/ratings.dat')\
                    .select('_c0','_c1','_c2')\
                    .withColumnRenamed('_c0','userId')\
                    .withColumnRenamed('_c1', 'movieId') \
                    .withColumnRenamed('_c2', 'rating')
    data = data.select([F.col(c).cast("int") for c in data.columns])
    return data
    
ratings_df = load_10m()

## Split/Train/Test

In [None]:
# Split Data into train and test
train, test = ratings_df.randomSplit([0.9,0.1],1111)

# Convert the ratings data to a matrix where rows are users and columns are movies
ratings_matrix = train.groupBy("userId").pivot("movieId").agg({"rating": "avg"}).na.fill(0)
# Convert the matrix to a feature vector using VectorAssembler
assembler = VectorAssembler(inputCols=ratings_matrix.columns[1:], outputCol="features")
features_df = assembler.transform(ratings_matrix).select("userId", "features")
# Run KMeans clustering to create clusters of users based on their movie rating patterns
kmeans = KMeans(k=10, maxIter=10, tol=0.1, seed=123,distanceMeasure='cosine')
model = kmeans.fit(features_df)
cluster_assignments = model.transform(features_df).select("userId", "prediction")

# Join the cluster assignments with the original ratings dataframe
ratings_df_with_clusters = ratings_df.join(cluster_assignments, on='userId')

# Split the ratings dataframe into separate dataframes for each cluster
num_clusters = cluster_assignments.select('prediction').distinct().count()
cluster_dfs = []
for i in range(num_clusters):
    cluster_df = ratings_df_with_clusters.filter(f"prediction = {i}")
    cluster_dfs.append(cluster_df)

matrices = {}
#Loop through each cluster DataFrame
for i in range(num_clusters):
    # Get the ratings DataFrame for the current cluster
    cluster_df = cluster_dfs[i]
    matrix = cluster_df.toPandas().pivot_table(index='userId', columns='movieId', values='rating')
    matrices[i] = pd.DataFrame(cosine_similarity(matrix.fillna(0)) , index = matrix.index , columns = matrix.index)  

clustered_ratings_pd=ratings_df_with_clusters.toPandas()
ratings_df_pd = ratings_df.toPandas()
predictions = []
for row in test.collect():
    # Get user and movie ids
    user_id = row['userId']
    movie = row['movieId']
    # Get cluster id
    cluster = clustered_ratings_pd.loc[clustered_ratings_pd['userId'] == user_id, 'prediction']
    # Check if no cluster is assigned to current user
    if cluster.empty :
        predictions.append(None)
    else :
        cluster_id = cluster.values[0]
        # Get the N closest neighbors for the user
        N = 10
        neighbors = matrices[cluster_id][user_id].sort_values(ascending=False)[1:N+1]
        # Filter the ratings dataframe to get only the ratings for the given movie and neighbor IDs
        filtered_ratings_df = ratings_df_pd[(ratings_df_pd['userId'].isin(neighbors.index.tolist())) & (ratings_df_pd['movieId'] == movie)]

        if filtered_ratings_df.empty:
            predictions.append(None)
        else :
            ratings = filtered_ratings_df['rating'].to_numpy().tolist()
            has_rated = filtered_ratings_df['userId'].to_numpy().tolist()
            similarities = neighbors[has_rated].values
            if np.sum(np.array(similarities)) == 0:
                predictions.append(None)
            else :
                predictions.append(np.average(ratings, weights=similarities))

## Evaluate

In [None]:
def fillNa(lst):
    non_none_values = [x for x in lst if x is not None]
    if len(non_none_values) == 0:
        return lst
    mean_value = sum(non_none_values) / len(non_none_values)
    return [mean_value if x is None else x for x in lst]

def create_binarised_output(ratings):
    binary = []
    for rating in ratings:
        if rating >= treshold:
            binary.append(1)
        else:
            binary.append(0)
    return binary


treshold = 3.5

y = test.select('rating').rdd.flatMap(lambda x : x ).collect()

filtered_y = []
filtered_pred = []

for i in range(len(predictions)):
    if predictions[i] != None :
        filtered_y.append(y[i])
        filtered_pred.append(predictions[i])
        
y_binary = create_binarised_output(filtered_y)
pred_binary = create_binarised_output(filtered_pred)

# Calcuate RMSE
rmse = np.sqrt(mean_squared_error(y, fillNa(predictions)))
print(rmse)

precision = precision_score(y_binary, pred_binary)
print("Precision:", precision)

# Calculate recall
recall = recall_score(y_binary, pred_binary)
print("Recall:", recall)

# Calculate f1
f1 = f1_score(y_binary, pred_binary)
print("f1-score:", f1)