# SETUP

In [None]:
if "google.colab" in str(get_ipython()):
    !git clone https://github.com/lukebella/YelpRecommenderSystem.git
    !mv YelpRecommenderSystem/* .
    !rm -fr YelpRecommenderSystem


In [None]:
import os

os.environ['KAGGLE_USERNAME'] = "xxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxx"
!kaggle datasets download -p ./data -d yelp-dataset/yelp-dataset
!unzip -n ./data/yelp-dataset.zip -d ./data


In [None]:
review_filename = 'data/yelp_academic_dataset_review.json'
user_filename = 'data/yelp_academic_dataset_user.json'
business_filename = 'data/yelp_academic_dataset_business.json'

In [None]:
from pyspark.sql import SparkSession

# .master().config('spark.driver.memory', "15g")
spark = SparkSession.builder.getOrCreate()
# sc = pyspark.SparkContext().getOrCreate()
sc = spark.sparkContext


In [None]:
# TODO add docstring
def from_json_to_RDD(filename):
    raw_df = spark.read.json(filename)
    # raw_df.printSchema()
    raw_df.show()
    return raw_df.rdd

raw_review_RDD = from_json_to_RDD(review_filename)
raw_user_RDD = from_json_to_RDD(user_filename)
raw_business_RDD = from_json_to_RDD(business_filename)


# RETRIEVE RDDs

# TODO ADD REPARTITION

In [None]:
def get_review_tuple(entry):
    """ Parse a row in the review dataset form pyspark.sql.Row to tuple
    Args:
        entry (pyspark.sql.types.Row): a row in the review dataset in JSON format
    Returns:
        tuple: (review_id, user_id, business_id, stars, useful, funny, cool, text)
    """

    return (str(entry["review_id"]),    # 0
            str(entry["user_id"]),      # 1
            str(entry["business_id"]),  # 2
            int(entry["stars"]),        # 3
            int(entry["useful"]),       # 4
            int(entry["funny"]),        # 5
            int(entry["cool"]),         # 6
            str(entry["text"]))         # 7


review_RDD = raw_review_RDD.map(get_review_tuple)

review_count = review_RDD.count()

print(f'There are {review_count} reviews in the dataset')
print(f'Reviews: {review_RDD.first()}')


In [None]:
def get_user_tuple(entry):
    """ Parse a row in the user dataset form pyspark.sql.Row to tuple
    Args:
        entry (pyspark.sql.types.Row): a row in the user dataset in JSON format
    Returns:
        tuple: (user_id, name, review_count, average_stars, useful, funny, cool, fans)
    """

    return (str(entry["user_id"]),          # 0
            str(entry["name"]),             # 1
            int(entry["review_count"]),     # 2
            float(entry["average_stars"]),  # 3
            int(entry["useful"]),           # 4
            int(entry["funny"]),            # 5
            int(entry["cool"]),             # 6
            int(entry["fans"]))             # 7


user_RDD = raw_user_RDD.map(get_user_tuple)

user_count = user_RDD.count()

print(f'There are {user_count} users in the dataset')
print(f'Users: {user_RDD.first()}')


In [None]:
def get_business_tuple(entry):
    """ Parse a row in the business dataset form pyspark.sql.Row to tuple
    Args:
        entry (pyspark.sql.types.Row): a row in the business dataset in JSON format
    Returns:
        tuple: (business_id, name, city, state, stars, review_count, categories)
    """

    categories = [] if entry["categories"] == None \
                    else str(entry["categories"]).split(", ")
    
    return (str(entry["business_id"]),  # 0
            str(entry["name"]),         # 1
            str(entry["city"]),         # 2
            str(entry["state"]),        # 3
            float(entry["stars"]),      # 4
            int(entry["review_count"]), # 5
            categories)                 # 6

#TODO Attributes?

business_RDD = raw_business_RDD.map(get_business_tuple)

business_count = business_RDD.count()

print(f'There are {business_count} business in the dataset')
print(f'Business: {business_RDD.first()}')

# Calculate overall

$$
  \Delta = \dfrac{1}{2} 
              \left( 
                  \dfrac{\text{useful} + \dfrac{1}{2}(\text{funny} + \text{cool})} 
                        {\text{best\ useful} + \dfrac{1}{2}(\text{best\ funny} + \text{best\ cool})}
                        + 
                  \dfrac{\text{fans}}
                        {\text{best\ fans}}
              \right)
$$
$$
\Delta : [0, 1]
$$
$$
  \text{overall} = \begin{cases} 
              \text{stars} + \Delta & \text{if stars } \ge 3\\ 
              \text{stars} - \Delta & \text{if stars } \lt 3
            \end{cases}
$$

In [None]:
# TODO fix all comment

#review_RDD.filter(lambda x: x[3]==0).count()


#stars - useful - funny - cool - numero fan


#if stars <3
    #stars - {[useful + (funny + cool)/2]/[best useful + (best funny + best cool)/2] + (numero fan/best user fans)}/2
#else
    #stars + {[useful + (funny + cool)/2]/[best useful + (best funny + best cool)/2] + (numero fan/best user fans)}/2

    #dati mancanti: numero fan; best fan; best useful/funny/cool per ristorante
#rdd_test_ufc = (id ristorsante, best useful, best funny, best cool)
#best fan = query su dataset utenti --> user_RDD.max().first()

#review_id - user_id - id_rist - overall


In [None]:
# get_overall_rating: function that generates the bias in order to incentivate or decrease the importance or overall rating of a review

# TODO fix function

#+ (fans/best_user_fans)) / 2

# def get_overall_rating(stars, best_useful, best_funny, best_cool, useful, funny, cool):
#     overall = (useful + (funny + cool)/2) / (best_useful + (best_funny + best_cool)/2) 
#     if stars < 3:
#         return stars - overall
#     else:
#         return stars + overall


In [None]:
# review_best_ufc_RDD tuple: (business_id, (useful, funny, cool)): for each reastaurant, this tuple takes the maxima values of useful, funny and cool. 
review_best_ufc_RDD = review_RDD.map(lambda x : (x[2], (x[4], x[5], x[6]))).reduceByKey(lambda x, y : tuple(max(x[i], y[i]) for i in range(len(y))))

# TODO split map and reduce
# review_best_ufc_RDD.take(5)


In [None]:
# TODO add map to turn the result in a simple tuple

# (id_business, ((id_review, id_user, star, useful, funny , cool), (best useful, best funny, best cool)))
review_ufc_RDD = review_RDD.map(lambda x: (x[2], (x[0], x[1], x[3], x[4], x[5], x[6]))).join(review_best_ufc_RDD)
# review_ufc_RDD.first()


In [None]:
# TODO add function / rewrite after map in previous cell

partial_review_overall_RDD = review_ufc_RDD.map(lambda x: (x[1][0][0], x[1][0][1], x[0], x[1][0][2], 
                                                           (x[1][0][3] + (x[1][0][4] + x[1][0][5])/2) / (x[1][1][0] + (x[1][1][1] + x[1][1][2])/2) if sum(x[1][1]) != 0 else 0))
# partial_review_overall_RDD.first()

# tuple: (review_id, user_id, buisness_id, stars, partial overall) 

# ('A2q7d-CBM2-81tVkmS4JMw',(('RB8UpF_kT2xoOC51OzXEeA', 'EZjT2qJN0mOXypMAqZdSrQ', 2, 1, 1, 0),(21, 13, 18)))

#if(best_useful && best_funny && best_cool !=0 )
# {[useful + (funny + cool)/2]/[best useful + (best funny + best cool)/2] + (numero fans/best user fans)}/2

# x = 10 if sum(x[1][1]) != 0 else 0


In [None]:
# tuple: (user_id, fans)
user_fans_RDD = user_RDD.map(lambda x: (x[0], x[7]))

# TODO add map to turn the result in a simple tuple
# tuple: (user_id, (review_id, buisness_id, stars, partial overall), fans)
partial_overall_fans_RDD = partial_review_overall_RDD.map(lambda x: (x[1], (x[0], x[2], x[3], x[4]))).join(user_fans_RDD)

# partial_overall_fans_RDD.first()


In [None]:
# user with the highest number of fans
best_user_fans = user_RDD.max(lambda x: x[7])
print(best_user_fans)

# TODO if best user fans = 0
# TODO add function / rewrite after map in previous cell
review_overall_RDD = partial_overall_fans_RDD.map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][0][2] + (x[1][0][3]+x[1][1]/best_user_fans[7])/(2 if x[1][0][2] >=3 else -2)))

review_overall_RDD.sortBy(lambda x:x[3], ascending=False).take(20)


# Basic Recommendation

In [None]:
#business_RDD.sortBy(lambda x: x[5], ascending=False).take(10)

# business_review_count_mean = business_RDD.map(lambda x: x[5]).stats()
# print(business_review_count_mean)
# business_RDD.map(lambda x: x[5]).filter(lambda x: x >=15).stats()  #Keeping at least half of the restaurants present in the dataset


In [None]:
review_by_business_filtered_RDD = review_overall_RDD.map(lambda x: (x[2], (x[0], x[1], x[3]))).groupByKey().mapValues(list).filter(lambda x: len(x[1])>=20)
# review_by_business_filtered_RDD.first()


In [None]:
def sortFunction(tuple):
    """ Construct the sort string (does not perform actual sorting)
    Args:
        tuple: (rating, MovieName)
    Returns:
        sortString: the value to sort with, 'rating MovieName'
    """
    #key = unicode('%.3f' % tuple[0])
    if (tuple[1][1]!= None):
        value = '{:.3f}'.format(tuple[1][1])
    else:
        value = ''
    key = tuple[0]
    return (value + ' ' + key)


business_overall_RDD = review_by_business_filtered_RDD.map(lambda x: (x[0],  sum(i[2] for i in x[1])/len(x[1])))


In [None]:
# top reccomended business
# business_RDD.map(lambda x: (x[0], (x[1], x[6]))).leftOuterJoin(business_overall_RDD).sortBy(sortFunction, False).take(10)  #x[2], x[3],.filter(lambda x: 'Restaurants' in x[1][1])


# Collaborative Filtering

In [None]:
#TODO in the collaborative prt, take into consideration the categories and the State
#TODO based on the number of reviews, multiply by the percentage of the reviews made in a particular state (same for the categories)
# print(dict(sorted(business_RDD.flatMap(lambda x: tuple(x[6])).countByValue().items(), key=lambda x: x[1], reverse=True))) lista delle categorie più frequenti


## Preprocessing

In [None]:
# Preprocessing
# Because the differences in the quality of items and the rating scales of users are
# such important factors in determining the missing elements of the matrix M , it
# is often useful to remove these influences before doing anything else. The idea
# was introduced in Section 9.3.1. We can subtract from each nonblank element
# mij the average rating of user i. Then, the resulting matrix can be modified
# by subtracting the average rating (in the modified matrix) of item j. It is also
# possible to first subtract the average rating of item j and then subtract the
# average rating of user i in the modified matrix. The results one obtains from
# doing things in these two different orders need not be the same, but will tend
# to be close. A third option is to normalize by subtracting from mij the average
# of the average rating of user i and item j, that is, subtracting one half the sum
# of the user average and the item average.
# If we choose to normalize M , then when we make predictions, we need to
# undo the normalization. That is, if whatever prediction method we use results
# in estimate e for an element mij of the normalized matrix, then the value
# we predict for mij in the true utility matrix is e plus whatever amount was
# subtracted from row i and from column j during the normalization process.

#review_overall_RDD.flatMap(lambda x : ((x[1], x[2]), 1)).reduceByKey(lambda x, y : x + y).sortBy(lambda x: x[1], False)


In [None]:
#media voti utente
average_user_RDD = review_overall_RDD.map(lambda x: (x[1], x[3])).reduceByKey(lambda x,y: (x+y)/2)
# average_user_RDD.take(10)


In [None]:
#media voti business
average_business_RDD = review_overall_RDD.map(lambda x: (x[2], x[3])).reduceByKey(lambda x,y: (x+y)/2)
# average_business_RDD.take(10)


In [None]:
#Merge review rates with average user
partial_normalized_RDD = review_overall_RDD.map(lambda x: (x[1], (x[2], x[3]))).join(average_user_RDD).map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1]))
# partial_normalized_RDD.take(10)

# tuple: (user_id, buisness_id, overall, average_user)


In [None]:
normalized_RDD = partial_normalized_RDD.map(lambda x: (x[1], (x[0], x[2], x[3]))) \
                                       .join(average_business_RDD) \
                                       .map(lambda x: ((x[1][0][0], x[0]), (x[1][0][1] - (x[1][1]+x[1][0][2])/2))) \
                                       .reduceByKey(lambda x, y : (x+y)/2) \
                                       .map(lambda x: (*x[0], x[1]))

normalized_RDD.first()
# tuple: (user_id, buisness_id, overall - normalized_value)


## Initialization

In [1]:
#We need to train and predict the vectors U and V
# train(trainingRDD, rank, iterations, regularization_parameter)  --> (U,V)
#We need to calculate RMSE between the utility and our matrix

rank = 2
iterations = 5
# regularization_parameter = 0.1

# U_RDD = user_RDD.map(lambda x: (x[0], [1 for _ in range(rank)])).sortBy(lambda x: x[0])
# V_RDD = business_RDD.map(lambda x: (x[0], [1 for _ in range(rank)])).sortBy(lambda x: x[0])

# U_RDD.take(5)
# V_RDD.take(5)


In [None]:
U_RDD = normalized_RDD.map(lambda x: (x[0], (x[1], x[2]))).groupByKey().mapValues(list).map(lambda x : (x[0], [1 for _ in range(rank)], x[1]))
U_RDD.first()

# tuple: (id_user, [1, ... ,1], [reviews])


In [None]:
V_RDD = normalized_RDD.map(lambda x: (x[1], (x[0], x[2]))).groupByKey().mapValues(list).map(lambda x : (x[0], [1 for _ in range(rank)], x[1]))
V_RDD.first()

# tuple: (id_business, [1, ... ,1], [reviews])


## Performing Optimization

In [None]:
# (non-blank - (U x V))^2 + ...

# CV : 
# for (iter):
#     calc U
#     calc V
# RMSE 


In [None]:
V_BC = sc.broadcast(dict(V_RDD.map(lambda x : (x[0], x[1])).collect()))

V_BC.value


In [None]:
U_BC = sc.broadcast(dict(U_RDD.map(lambda x : (x[0], x[1])).collect()))

U_BC.value


In [None]:
def update_U(entry):
    id = entry[0]
    U_value = entry[1]
    reviews = entry[2] # normalized

    # out = [1]*rank
    for s in range(rank):
        res = 0
        den = 0
        for review in reviews:
            j = review[0] # business_id
            p = 0
            for k in [i for i in range(rank) if i != s]:
                p += U_value[k] * V_BC.value[j][k]

            res += V_BC.value[j][s] * (review[1] - p)
            den += V_BC.value[j][s]**2
        U_value[s] = res / (den if den > 0 else 1)

    return (id, U_value)


U_RDD.map(lambda x :  update_U(x)).take(20)
