# Preliminaries

Upload your kaggle API token. You can download it in the [kaggle account settings](https://www.kaggle.com/settings) under "Create new token"

In [None]:
from google.colab import files
files.upload()
!chmod 600 kaggle.json
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/

Set to `m` to limit the dataset to the first `m` entries or set to `None` if you want to use the entire dataset.

In [None]:
limit_data = 150_000

Set to `m` to use the `m` nearest neighbors to calculate the collaborative filtering utility.

In [None]:
n = 100

# Setup

## Download dependencies

In [None]:
!pip install -q kaggle pyspark

In [None]:
import pyspark
import pyspark.sql.functions as SF
import pyspark.sql.window as SW
import pyspark.ml.feature as SFeat

## Download and load dataset

In [None]:
!kaggle datasets download -f yelp_academic_dataset_review.json yelp-dataset/yelp-dataset
!unzip -n yelp_academic_dataset_review.json.zip

In [None]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [None]:
data = spark.read.json("yelp_academic_dataset_review.json").select(["user_id", "business_id", "stars"])
if limit_data is not None: data = data.limit(limit_data)
data = data.withColumnRenamed("user_id", "user").withColumnRenamed("business_id", "business")

### Serialize IDs

In [None]:
# Convert IDs to serial integers
def serialize(df, col):
  return SFeat.StringIndexer(inputCol=col, outputCol=col+"_s").fit(df).transform(df).drop(col).withColumnRenamed(col+"_s",col)

data = serialize(serialize(data, "user"), "business")

### Split in train and test set

In [None]:
reviews, test_set = data.randomSplit([1.5, 0.5])
reviews.show()

# Populating the utility matrix

## Non-normalized version

### Calculating the cosine distance

In [None]:
# building pairs for ultimately calculating cosine distance
def addColumnSuffix(df, suff):
  return df.select(*[SF.col(col_name).alias(col_name + suff) for col_name in df.columns])

def buildPairs(df, pair_col, group_col, val_col):
  return addColumnSuffix(df, "_1").join(addColumnSuffix(df, "_2"), (SF.col(pair_col+'_1')<SF.col(pair_col+'_2')) & (SF.col(group_col+"_1") == SF.col(group_col+"_2"))).withColumnRenamed(group_col+"_1", group_col).drop(group_col+"_2")

paired = buildPairs(reviews, "user", "business", "stars").select(["user_1", "user_2", "business", "stars_1", "stars_2"])
paired.show()

In [None]:
# product of each pair of ratings for each pair of users
prod = paired.withColumn("prod", SF.col("stars_1")*SF.col("stars_2"))
prod.show()

In [None]:
# dot product between each pair of users
dot = prod.groupBy(["user_1","user_2"]).sum("prod").withColumnRenamed("sum(prod)", "dot")
dot.show()

In [None]:
# norm of the rating vector for each user
def getNorms(df, group_col, val_col):
  return df.withColumn("squared", SF.col(val_col)**2).groupBy(group_col).sum("squared").withColumn("norm", SF.col("sum(squared)")**.5).select([group_col, "norm"])

norm = getNorms(reviews, "user", "stars")
norm.show()

In [None]:
# joining dots and norms to calculate cosine distance
dotnorm = dot.join(norm, SF.col("user_1") == SF.col("user")).withColumnRenamed("norm", "norm_1").drop("user").join(norm, SF.col("user_2") == SF.col("user")).withColumnRenamed("norm", "norm_2").drop("user")
cosine = dotnorm.withColumn("cosine", SF.col("dot")/(SF.col("norm_1")*SF.col("norm_2"))).select(["user_1", "user_2", "cosine"])
cosine.show()

### Infering the missing entries

In [None]:
def rankDistances(df, group_col, val_col):
  w = SW.Window.partitionBy(group_col).orderBy(val_col)
  return df.withColumn("rank", SF.row_number().over(w))

# adding symmetric distances
# adding rank of user_2 with respect to user_1
ranked = rankDistances(cosine.union(cosine.select("user_2", "user_1", "cosine")), "user_1", "cosine").drop("cosine").filter(SF.col("rank")<=n)
ranked.show()

In [None]:
# calculating the utility matrix (including the entries we already have)
utility = ranked.join(reviews, SF.col("user_2")==SF.col("user")).groupBy(["user_1", "business"]).mean("stars").withColumnRenamed("avg(stars)", "stars_pred").withColumnRenamed("user_1", "user")
utility.show()

## Normalized version

### Normalizing

In [None]:
# normalization of column with respect to partition (user)

def avgize(df, val_col, part_col):
  return df.groupBy(part_col).avg(val_col)

avgs = avgize(reviews, "stars", "user")
avgs.show()

In [None]:
def normalize(df, avg_df, avg_col, val_col, part_col):
  return avg_df.join(df, part_col).withColumn(val_col+'_norm', SF.col(val_col)-SF.col(avg_col)).drop(avg_col).drop(val_col)

reviews_nonnorm = reviews # keeping non-normalized version for later
reviews = normalize(reviews, avgs, "avg(stars)", "stars", "user").withColumnRenamed("stars_norm", "stars")
reviews.show()

### Calculating

In [None]:
# same as before
paired = buildPairs(reviews, "user", "business", "stars").select(["user_1", "user_2", "business", "stars_1", "stars_2"])
prod = paired.withColumn("prod", SF.col("stars_1")*SF.col("stars_2"))
dot = prod.groupBy(["user_1","user_2"]).sum("prod").withColumnRenamed("sum(prod)", "dot")
norm = getNorms(reviews, "user", "stars")
dotnorm = dot.join(norm, SF.col("user_1") == SF.col("user")).withColumnRenamed("norm", "norm_1").drop("user").join(norm, SF.col("user_2") == SF.col("user")).withColumnRenamed("norm", "norm_2").drop("user")
cosine = dotnorm.withColumn("cosine", SF.col("dot")/(SF.col("norm_1")*SF.col("norm_2"))).select(["user_1", "user_2", "cosine"])
ranked = rankDistances(cosine.union(cosine.select("user_2", "user_1", "cosine")), "user_1", "cosine").drop("cosine").filter(SF.col("rank")<=n)
utility_norm_norm = ranked.join(reviews, SF.col("user_2")==SF.col("user")).groupBy(["user_1", "business"]).mean("stars").withColumnRenamed("avg(stars)", "stars_pred").withColumnRenamed("user_1", "user")
utility_norm_norm.show()

### Denormalizing

In [None]:
# denormalizing in order to have something at scale

def denormalize(df, avg_df, avg_col, val_col, part_col):
  return avg_df.join(df, part_col).withColumn(val_col+'_denorm', SF.col(val_col)+SF.col(avg_col)).drop(avg_col).drop(val_col)

utility_norm = denormalize(utility_norm_norm, avgs, "avg(stars)", "stars_pred", "user").withColumnRenamed("stars_pred_denorm", "stars_pred")
utility_norm.show()

# Evaluating the results

## Comparing against ALS

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

als = ALS(userCol="user",
          itemCol="business",
          ratingCol="stars")

model = als.fit(reviews_nonnorm)

### Non-normalized version

In [None]:
compare = model.transform(utility)
compare.show()

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="prediction", predictionCol="stars_pred")
rmse = evaluator.evaluate(compare)
print("RMSE: " + str(rmse))

### Normalized version

In [None]:
compare = model.transform(utility_norm)
compare.show()

In [None]:
rmse = evaluator.evaluate(compare)
print("RMSE: " + str(rmse))

## Using the selected test set

### Non-normalized version

In [None]:
compare = utility.join(test_set, ["user", "business"]).select(["user", "business", "stars", "stars_pred"])
compare.show()

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="stars", predictionCol="stars_pred")
rmse = evaluator.evaluate(compare)
print("RMSE: " + str(rmse))

### Normalized version

In [None]:
compare = utility_norm.join(test_set, ["user", "business"]).select(["user", "business", "stars", "stars_pred"])
compare.show()

In [None]:
rmse = evaluator.evaluate(compare)
print("RMSE: " + str(rmse))