# Exercise 2

## Imports

In [35]:
import pickle
import os.path
import pyspark.sql.functions as F
from pyspark import Broadcast
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StringType, ArrayType, FloatType, DoubleType, IntegerType
from itertools import combinations
from typing import Iterable, Any, List, Set, Tuple

import pandas as pd
import numpy as np
from typing import Union

import matplotlib.pyplot as plt

from pyspark.ml.clustering import BisectingKMeans

## Spark initialization

In [2]:
spark = SparkSession.builder \
    .appName('exercise2') \
    .config('spark.master', 'local[*]') \
    .getOrCreate()

# spark.conf.set("spark.sql.shuffle.partitions", "16")

23/04/26 16:23:20 WARN Utils: Your hostname, martinho-MS-7B86 resolves to a loopback address: 127.0.1.1; using 192.168.1.67 instead (on interface enp34s0)
23/04/26 16:23:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/26 16:23:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Prepare the data

*Start with the small dataset first.*

In [3]:
df = (spark.read
    .option('header', 'false')
    .option('sep', '\t')
    .csv('data/ml-100k/u.data')
)

df = (df.withColumnRenamed('_c0', 'user_id') 
    .withColumnRenamed('_c1', 'item_id')
    .withColumnRenamed('_c2', 'rating')
    .drop('_c3')
    .select(F.col('user_id').cast(IntegerType()), F.col('item_id').cast(IntegerType()), F.col('rating').cast(IntegerType()))
)

## Collaborative Filtering

### Calculate baselines.

Explicitly broadcast the `user_baseLine` and `item_baseLine` variables beforehand, as they can take up some memory and will be used for prediciting ratings.

In [41]:
def calculate_user_item_means(df: DataFrame) -> Tuple[Broadcast, Broadcast, float]:
    user_means = {
        row['user_id']:row['user_avg']
        for row in df.groupBy('user_id')
            .agg(F.avg('rating').alias('user_avg'))
            .collect()
    }
    item_means = {
        row['item_id']:row['item_avg']
        for row in df.groupBy('item_id')
            .agg(F.avg('rating').alias('item_avg'))
            .collect()
    }
    mu = df.agg(F.avg('rating')).collect()[0][0]

    user_means = spark.sparkContext.broadcast(user_means)
    item_means = spark.sparkContext.broadcast(item_means)

    return user_means, item_means, mu

In [6]:
def get_user_item_baseline(user_id: int, item_id: int, user_means, item_means, mu) -> float:
    user_avg = user_means[user_id]
    item_avg = item_means[item_id]
    return -mu + user_avg + item_avg

### Clustering the movies

In [7]:
user_number = df.select('user_id').distinct().count()
item_number = df.select('item_id').distinct().count()

In [8]:
@F.udf(returnType=ArrayType(IntegerType(),False))
def utility_matrix_row(elems: Iterable[Any]):

    lista = [0] * user_number
    
    for (user_id,rating) in elems:
        lista[user_id-1] = rating

    return lista

### Find similar items

In [9]:
from pyspark.sql.types import StructType, StructField

@F.udf(returnType=StructType([
    StructField('items_in_cluster', ArrayType(IntegerType(), False)),
    StructField('ratings_in_cluster', ArrayType(ArrayType(DoubleType(), False), False)),
]))
def find_most_similar(user_id: int, items_in_cluster: List[int], ratings_in_cluster: List[List[float]]):
    return tuple(zip(*(
        (item_id, ratings)
        for item_id, ratings in zip(items_in_cluster, ratings_in_cluster)
        if ratings[user_id-1] != 0
    )))

Calculate similarities.

In [10]:
# @F.udf(returnType=DoubleType())
# def predict_rating_no_numpy(user_id: int, item_id: int, ratings: List[float], items_in_cluster: List[int], ratings_in_cluster: List[List[float]]) -> float:

#     # TODO: maybe remove item_id itself?

#     ratings_mean = sum(r for r in ratings if r != 0) / sum(1 for r in ratings if r != 0)
#     ratings_without_mean = [r - ratings_mean for r in ratings]

#     ratings_in_cluster_mean = [sum(r for r in ratings if r != 0) / sum(1 for r in ratings if r != 0) for ratings in ratings_in_cluster]  
#     ratings_in_cluster_without_mean = [[r - mean for r in ratings] for ratings, mean in zip(ratings_in_cluster, ratings_in_cluster_mean)]
    
#     pearson_correlation_similarities = [
#         sum(r1 * r2 for r1, r2 in zip(ratings_without_mean, rs)) / (sum(r**2 for r in ratings_without_mean)**.5 * sum(r**2 for r in rs)**.5)
#         for rs in ratings_in_cluster_without_mean
#     ]

#     base_line = get_user_item_baseline(user_id, item_id)

#     other_ratings = [r[user_id-1] for r in ratings_in_cluster]

#     other_baselines = [get_user_item_baseline(user_id, item) for item in items_in_cluster]

#     return base_line + sum(pcs * (ort - obl) for pcs, ort, obl in zip(pearson_correlation_similarities, other_ratings, other_baselines)) / sum(pearson_correlation_similarities)

In [11]:
# @F.udf(returnType=DoubleType())
# def predict_rating(user_id: int, item_id: int, ratings: List[float], items_in_cluster: List[int], ratings_in_cluster: List[List[float]]) -> float:

#     # TODO: maybe remove item_id itself?

#     # TODO: use a pandas UDF instead, since it can be vectorized as well?
#     # TODO: with numpy or not?
#     ratings_np = np.array(ratings).reshape((1, -1))
#     ratings_in_cluster_np = np.array(ratings_in_cluster)

#     ratings_np_pc = ratings_np - np.average(ratings_np[ratings_np.nonzero()])
#     non_zero = np.zeros(ratings_in_cluster_np.shape)
#     non_zero[ratings_in_cluster_np.nonzero()] = 1
#     ratings_in_cluster_np_pc = ratings_in_cluster_np - np.average(ratings_in_cluster_np, axis=1, weights=non_zero, keepdims=True)

#     pearson_correlation_similarities = np.sum(ratings_np_pc * ratings_in_cluster_np_pc, axis=1) / (np.sqrt(np.sum(ratings_in_cluster_np_pc**2, axis=1)) * np.sqrt(np.sum(ratings_np_pc**2, axis=1)))

#     base_line = get_user_item_baseline(user_id, item_id)

#     other_ratings = ratings_in_cluster_np[:, user_id-1]

#     other_baselines = np.array([get_user_item_baseline(user_id, item) for item in items_in_cluster])

#     return base_line + float(np.sum(pearson_correlation_similarities * (other_ratings - other_baselines)) / np.sum(pearson_correlation_similarities))

## Testing

Leave out 10% of the ratings for evaluation.

In [32]:
seed_random = 1
# TODO: maybe instead add random column and filter by that? the randomSplit seems too contiguous
train_set, test_set = df.randomSplit([9.0, 1.0], seed=seed_random)

train_matrix = (train_set.groupBy('item_id')
    .agg(F.collect_list(F.array('user_id','rating')).alias('ratings'))
    .withColumn('ratings', utility_matrix_row(F.col('ratings')).cast(ArrayType(DoubleType(),False)))
)

In [42]:
user_means, item_means, mu = calculate_user_item_means(train_set)

In [13]:
def print_progress(progress: float, message: str):
    print(f"[{progress:3%}] {message:100}", end="\r")

In [14]:
neighbours = 10
bkm = BisectingKMeans(featuresCol='ratings',minDivisibleClusterSize = neighbours,predictionCol = "cluster_id").setK(50).setSeed(1)
model = bkm.fit(train_matrix)

train_matrix_with_clusters = model.transform(train_matrix)

                                                                                

*Cluster info.*

In [None]:
# train_matrix_with_clusters.select('item_id', 'cluster_id').distinct().groupBy('cluster_id').sum('item_id').show(50)

In [91]:
predictions = (test_set
    .withColumnRenamed('rating', 'true_rating')
    .join(train_matrix_with_clusters, on='item_id', how='inner') # get cluster and ratings of test item
    # these should be after the join so that we remove items and users that were not trained on (never seen before)
    .withColumn('item_mean', F.udf(lambda x: item_means.value[x], returnType=DoubleType())('item_id'))
    .withColumn('user_mean', F.udf(lambda x: user_means.value[x], returnType=DoubleType())('user_id'))
    .withColumn('ratings_pearson', F.transform('ratings', lambda x: x - F.col('item_mean')))
    .join(train_matrix_with_clusters
        .withColumnsRenamed({
            'item_id': 'other_item_id',
            'ratings': 'other_ratings',
        })
        .withColumn('other_item_mean', F.udf(lambda x: item_means.value[x], returnType=DoubleType())('other_item_id'))
        .withColumn('other_ratings_pearson', F.transform('other_ratings', lambda x: x - F.col('other_item_mean'))),
        on='cluster_id',
        how='inner')
    .withColumn('user_other_rating', F.col('other_ratings')[F.col('user_id') - 1])
    .filter(F.col('user_other_rating') != 0) # should remove the test item since it never gave got a rating from the test user
    .withColumn('similarity',
        F.aggregate(
            F.zip_with('ratings_pearson', 'other_ratings_pearson', lambda x1, x2: x1 * x2),
            initialValue=F.lit(0.0),
            merge=lambda acc, x: acc + x
        )
        /
        (
            F.sqrt(F.aggregate(
                F.transform('ratings_pearson', lambda x: x**2),
                initialValue=F.lit(0.0),
                merge=lambda acc, x: acc + x
            ))
            *
            F.sqrt(F.aggregate(
                F.transform('other_ratings_pearson', lambda x: x**2),
                initialValue=F.lit(0.0),
                merge=lambda acc, x: acc + x
            ))
        )
    )
    .withColumn('baseline', -mu + F.col('user_mean') + F.col('item_mean')) # b_xi
    .withColumn('other_baseline', -mu + F.col('user_mean') + F.col('other_item_mean')) # b_xj
    .groupBy('user_id', 'item_id', 'baseline', 'true_rating')
    .agg((F.col('baseline') + F.sum(F.col('similarity') * (F.col('user_other_rating') - F.col('other_baseline'))) / F.sum('similarity')).alias('predicted_rating'))
    .drop('baseline')
)

In [92]:
predictions.show()

[Stage 611:>                                                        (0 + 1) / 1]

+-------+-------+-----------+------------------+
|user_id|item_id|true_rating|  predicted_rating|
+-------+-------+-----------+------------------+
|    218|      5|          3| 2.914431715167286|
|    804|     33|          4|3.8720220265457024|
|    352|     39|          5|2.1025641025641026|
|    354|     60|          5| 4.175480342029669|
|    922|     67|          3| 3.221525184744525|
|    577|    100|          4| 4.730644829477191|
|    479|    154|          3|3.9223404709660836|
|    201|    185|          5|3.5395898775773946|
|    650|    193|          3| 3.819501276798148|
|    532|    210|          5| 4.783328543273073|
|    778|    226|          4|2.9201318303701083|
|     92|    226|          3|3.1599671967590885|
|    332|    255|          4|  4.38444386904236|
|      6|    259|          1| 2.213027091654288|
|    345|    272|          5| 4.145343824396781|
|    385|    273|          2|3.1033294281552393|
|    110|    288|          4|3.6212777142150343|
|    544|    312|   

                                                                                

In [None]:
# TODO: show how many of the items in the test set could not be predicted because they were not in the train set
# TODO: only use users that were in the train set
# predictions = (test_set
#     .withColumnRenamed('rating', 'true_rating')
#     .join(train_matrix_with_clusters.select('item_id', 'ratings', 'cluster_id'), on='item_id', how='inner') # obtain ratings and cluster_id of item_id
#     .join(
#         train_matrix_with_clusters.groupBy('cluster_id').agg(F.collect_list('item_id').alias('items_in_cluster'), F.collect_list('ratings').alias('ratings_in_cluster')),
#         on='cluster_id',
#         how='inner'
#     )
#     .withColumn('find_most_similar_results', find_most_similar('user_id', 'items_in_cluster', 'ratings_in_cluster'))
#     .withColumns({
#         'items_in_cluster': F.col('find_most_similar_results').items_in_cluster,
#         'ratings_in_cluster': F.col('find_most_similar_results').ratings_in_cluster,
#     })
#     .drop('find_most_similar_results')
#     .withColumn('predicted_rating', predict_rating_no_numpy('user_id', 'item_id', 'ratings', 'items_in_cluster', 'ratings_in_cluster'))
#     .select('user_id', 'item_id', 'true_rating', 'predicted_rating')
# )

Use evaluation metric on the 10% (RMSE).

In [94]:
total_tested = test_set.join(train_matrix_with_clusters, on='item_id', how='inner').count()

In [96]:
rmse = ((predictions
    .withColumn('square_error', (F.col('true_rating') - F.col('predicted_rating'))**2)
    .select('square_error')
    .groupBy()
    .sum('square_error')
    .collect()[0]['sum(square_error)']
    ) / (total_tested)) ** 0.5

                                                                                

In [97]:
print('RMSE:', rmse)

RMSE: 0.9710793061099181
