# Yelp Recommender

## Intro

The purpose of this exercise is to use Spark in a real dataset, instead of just a toy example.

You will use the data from the [Yelp Dataset Challenge](https://www.yelp.de/dataset_challenge), which contains information about businesses, users, reviews and more.

For this exercise, you will need to focus only on the following files:
- yelp_academic_dataset_business.json
- yelp_academic_dataset_review.json

The goal is to build a recommender using [Spark's ALS (Alternating Least Squares)](https://spark.apache.org/docs/2.3.0/ml-collaborative-filtering.html) and then generate recommendations for a given user.

Since the dataset is quite big, you should pick a business category (e.g. Restaurants) and a city (e.g. Edinburgh) and work on the recommender using only this subset of the data.

Please take some time to:
- find out what information you will need to feed as input to Spark's ALS
- check how this information is available in the dataset
- plan how you will tackle this problem

In [None]:
# Download a small version of the Yelp dataset
#!wget https://s3.us-west-2.amazonaws.com/dsr-spark-appliedml/yelp_dataset_small.tar.gz
#!tar -xvzf yelp_dataset_small.tar.gz

In [4]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext('local[*]')
sqlc = SQLContext(sc)

## Business Data

- Load the file ***yelp_academic_dataset_business.json*** and select the following columns:
    - business_id
    - name
    - city
    - stars
    - categories
    - address

In [8]:
df_business = sqlc.read.json('yelp_dataset_small/yelp_academic_dataset_business.json')

In [55]:
df_business = df_business.select('business_id', 
                                 'name', 
                                 'city', 
                                 'stars', 
                                 'categories', 
                                 'address')
df_business.show()

+--------------------+--------------------+--------------------+----------+--------------------+-----+
|             address|         business_id|          categories|      city|                name|stars|
+--------------------+--------------------+--------------------+----------+--------------------+-----+
|227 E Baseline Rd...|0DI8Dt2PJp07XkVvI...|[Tobacco Shops, N...|     Tempe|   Innovative Vapors|  4.5|
|495 S Grand Centr...|LTlCaCGZE14GuaUXU...|[Caterers, Grocer...| Las Vegas|       Cut and Taste|  5.0|
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|   Toronto|         Pizza Pizza|  2.5|
|7014 Steubenville...|cnGIivYRLxpF7tBVR...|[Hair Removal, Be...|   Oakdale| Plush Salon and Spa|  4.0|
|   321 Jarvis Street|cdk-qqJ71q6P7TJTw...|[Hotels & Travel,...|   Toronto|         Comfort Inn|  3.0|
|30 Gibson Drive, ...|Q9rsaUiQ-A3NdEAlo...|[Nail Salons, Bea...|   Markham|         A Plus Nail|  2.5|
|10875 N Frankloyd...|Cu4_Fheh7IrzGiK-P...|[Baby Gear & Furn...|Scottsdal

### Choosing a business category

- Define a regular Python function that takes a list of categories and returns 1 if a category of your choice (for instance, 'Restaurants') is contained in the list of categories or 0 otherwise
- Using the Python function, define a Spark's User Defined Function (UDF) with an IntegerType return
- Using the UDF, filter the businesses that belong to the category you chose

In [56]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

def is_restaurant(categories):
    return ('Restaurants' in categories) * 1 if categories is not None else 0

udf = UserDefinedFunction(is_restaurant, returnType='Integer')

df_restaurants = df_business.filter(udf(df_business.categories) == 1)

type:  <class 'pyspark.sql.udf.UserDefinedFunction'>


In [57]:
df_restaurants.show()

+--------------------+--------------------+--------------------+------------+--------------------+-----+
|             address|         business_id|          categories|        city|                name|stars|
+--------------------+--------------------+--------------------+------------+--------------------+-----+
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|     Toronto|         Pizza Pizza|  2.5|
|11072 No Frank Ll...|GDnbt3isfhd57T1Qq...|[Tex-Mex, Mexican...|  Scottsdale|           Taco Bell|  2.5|
|1500 N Green Vall...|42romV8altAeuZuP2...|[Hawaiian, Restau...|   Henderson|  Ohana Hawaiian BBQ|  4.0|
|1052 Lionel-Dauna...|DNyYOxVAfu0oUcPNL...|[Restaurants, Cafes]|Boucherville|         Chez Lionel|  3.5|
|2000 Mansfield St...|a1Ba6XeIOP48e64YF...|[Sandwiches, Brea...|    Montréal|             La Prep|  4.0|
|123 Front St, Uni...|826djy6K_9Fp0ptqJ...|[Fast Food, Mexic...|     Toronto|Chipotle Mexican ...|  3.5|
|      5646 W Bell Rd|Mi5uhdFB9OJteXPd0...|[Restaurants

- The UDF approach works just fine, but there is a more straightforward way to perform the same operation
    - hint: look at ***array_contains*** SQL function

In [64]:
import pyspark.sql.functions as F

# you can overwrite the former df_restaurants
df_restaurants = df_business.filter(F.array_contains(df_business.categories, "Restaurants"))

In [66]:
df_restaurants.show()

+--------------------+--------------------+--------------------+------------+--------------------+-----+
|             address|         business_id|          categories|        city|                name|stars|
+--------------------+--------------------+--------------------+------------+--------------------+-----+
|  979 Bloor Street W|EDqCEAGXVGCH4FJXg...|[Restaurants, Piz...|     Toronto|         Pizza Pizza|  2.5|
|11072 No Frank Ll...|GDnbt3isfhd57T1Qq...|[Tex-Mex, Mexican...|  Scottsdale|           Taco Bell|  2.5|
|1500 N Green Vall...|42romV8altAeuZuP2...|[Hawaiian, Restau...|   Henderson|  Ohana Hawaiian BBQ|  4.0|
|1052 Lionel-Dauna...|DNyYOxVAfu0oUcPNL...|[Restaurants, Cafes]|Boucherville|         Chez Lionel|  3.5|
|2000 Mansfield St...|a1Ba6XeIOP48e64YF...|[Sandwiches, Brea...|    Montréal|             La Prep|  4.0|
|123 Front St, Uni...|826djy6K_9Fp0ptqJ...|[Fast Food, Mexic...|     Toronto|Chipotle Mexican ...|  3.5|
|      5646 W Bell Rd|Mi5uhdFB9OJteXPd0...|[Restaurants

### Choosing a city
- Having filtered by the business category, now it is time to filter by the city (for instance, Edinburgh)

In [67]:
df_city_restaurants = df_restaurants.filter(df_restaurants.city == 'Edinburgh')

In [68]:
df_city_restaurants.show()

+--------------------+--------------------+--------------------+---------+--------------------+-----+
|             address|         business_id|          categories|     city|                name|stars|
+--------------------+--------------------+--------------------+---------+--------------------+-----+
|7A Castle Street,...|NsarUMMMPOlMBb6K0...|[Food, Fast Food,...|Edinburgh|      Juice Almighty|  4.5|
|   1 Sighthill Court|m5CY1jy3dvBw8J4ii...|[Nightlife, Bars,...|Edinburgh|            Crofters|  3.5|
|   119 Dundee Street|raixiox15brAXfTUb...|[Fast Food, Halal...|Edinburgh|          Spicy Bite|  3.5|
|Odeon Cinema, 118...|ynjAEdXdIw7JF153u...|[Italian, Restaur...|Edinburgh|      Croma Pizzeria|  3.5|
|        123b High St|aCul-vH-5hCxX6XZs...|[Restaurants, Bri...|Edinburgh|Dubh Prais Restau...|  4.5|
|76 Commercial Street|mwO4cm8qs32djjve5...|[Restaurants, Chi...|Edinburgh|     Chop Chop Leith|  4.0|
|226 Oxgangs Road ...|CZRba6sPzqpGwWpz5...|[Restaurants, Fis...|Edinburgh|        

### Generating numeric IDs
- If you haven't done it yet, take one sample from your already filtered DataFrame and notice that the ***business_id*** contains an alphanumeric value - this is not good for Spark's ALS implementation, which requires IDs for items (in our case, businesses) and users to be numeric
- Use a ***StringIndexer*** to create a new column ***business_idn*** from the conversion of business_id into a numeric value

In [69]:
from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="business_id", outputCol="business_idn", handleInvalid='error')

df_city_restaurants = stringIndexer.fit(df_city_restaurants).transform(df_city_restaurants)

In [70]:
df_city_restaurants.take(1)

[Row(address='7A Castle Street, Corstorphine', business_id='NsarUMMMPOlMBb6K04x6hw', categories=['Food', 'Fast Food', 'Restaurants', 'Juice Bars & Smoothies'], city='Edinburgh', name='Juice Almighty', stars=4.5, business_idn=24.0)]

In [71]:
df_city_restaurants.cache()

DataFrame[address: string, business_id: string, categories: array<string>, city: string, name: string, stars: double, business_idn: double]

## Review Data

- Load the file ***yelp_academic_dataset_review.json*** and select the following columns:
    - user_id
    - business-id
    - stars
    - date

In [85]:
df_reviews = sqlc.read.json('yelp_dataset_small/yelp_academic_dataset_review.json')

In [86]:
df_reviews = df_reviews.select('user_id', 
                               'business_id', 
                               'stars', 
                               'date')
df_reviews.show()

+--------------------+----------+-----+--------------------+
|         business_id|      date|stars|             user_id|
+--------------------+----------+-----+--------------------+
|2aFiy99vNLklCx3T_...|2011-10-10|    5|KpkOkG6RIf4Ra25Lh...|
|2aFiy99vNLklCx3T_...|2010-12-29|    5|bQ7fQq1otn9hKX-gX...|
|2aFiy99vNLklCx3T_...|2011-04-29|    5|r1NUhdNmL6yU9Bn-Y...|
|2LfIuF3_sX6uwe-IR...|2014-07-14|    5|aW3ix1KNZAvoM8q-W...|
|2LfIuF3_sX6uwe-IR...|2014-01-15|    4|YOo-Cip8HqvKp_p9n...|
|2LfIuF3_sX6uwe-IR...|2013-04-28|    5|bgl3j8yJcRO-00NkU...|
|2LfIuF3_sX6uwe-IR...|2014-10-12|    4|CWKF9de-nskLYEqDD...|
|2LfIuF3_sX6uwe-IR...|2012-09-18|    5|GJ7PTY7huYORFKKg3...|
|2LfIuF3_sX6uwe-IR...|2015-10-11|    5|rxqp9eXZj1jYTn0UI...|
|2LfIuF3_sX6uwe-IR...|2015-04-05|    5|UU0nHQtHPMAfLidk8...|
|2LfIuF3_sX6uwe-IR...|2014-07-08|    1|A_Hyfk3FcwFVIk1CQ...|
|2LfIuF3_sX6uwe-IR...|2014-08-23|    5|OvD92wp0-uuFoGLBy...|
|2LfIuF3_sX6uwe-IR...|2015-01-13|    4|5NDk-q5mv8PIDvz83...|
|2LfIuF3_sX6uwe-IR...|20

### Keeping reviews for the chosen city only

- You are only interested in reviews of businesses you kept after filtering for category and city - how to filter out everything else? (hint: take a look at the ***join*** operation of DataFrames)

In [89]:
df_city_reviews = (df_reviews
    .join(df_city_restaurants, df_reviews.business_id == df_city_restaurants.business_id)
    .select(df_reviews.user_id, df_city_restaurants.business_idn, df_reviews.stars, 
           df_reviews.date, df_city_restaurants.city))

In [90]:
df_city_reviews.show()

+--------------------+------------+-----+----------+---------+
|             user_id|business_idn|stars|      date|     city|
+--------------------+------------+-----+----------+---------+
|VRVCKQhYDCkzaEDce...|      1208.0|    5|2008-07-06|Edinburgh|
|SxV1Jq7UANuSYpn42...|      1208.0|    1|2010-04-15|Edinburgh|
|soDF6mePh1SuNZI3r...|      1208.0|    3|2015-03-10|Edinburgh|
|LURC3E0DoXYgN9aYT...|       684.0|    2|2012-02-04|Edinburgh|
|W1Nl6_R7amuZ6NStX...|       684.0|    1|2011-02-24|Edinburgh|
|hutJzKEYHuVq6CP-X...|       684.0|    2|2010-08-05|Edinburgh|
|fcMTpwfLS9F5DWTql...|      1351.0|    4|2012-08-23|Edinburgh|
|yuFHrb8YQtVuzu0eE...|      1351.0|    4|2013-04-19|Edinburgh|
|yfXqZkU5iXE07GSHz...|       214.0|    4|2010-06-16|Edinburgh|
|v_lWueG7V_vul4E8r...|       214.0|    4|2010-07-02|Edinburgh|
|LURC3E0DoXYgN9aYT...|       214.0|    4|2012-05-17|Edinburgh|
|IY8cvV2SQuhJjKhRW...|       214.0|    4|2010-07-17|Edinburgh|
|y4uy-FJ9UDamTVJ6Y...|       214.0|    2|2015-06-25|Edi

### Generating numeric IDs

- As it happened with the ***business_id***, you also need to convert ***user_id*** into a numeric value - once again, use a ***StringIndexer*** to create a new column named ***user_idn*** containing the result of the conversion

In [91]:
stringIndexer = StringIndexer(inputCol="user_id", outputCol="user_idn", handleInvalid='error')

df_city_reviews = stringIndexer.fit(df_city_reviews).transform(df_city_reviews)

In [98]:
df_city_reviews = df_city_reviews.drop('user_id')

In [92]:
df_city_reviews.show()

+--------------------+------------+-----+----------+---------+--------+
|             user_id|business_idn|stars|      date|     city|user_idn|
+--------------------+------------+-----+----------+---------+--------+
|VRVCKQhYDCkzaEDce...|      1208.0|    5|2008-07-06|Edinburgh|    63.0|
|SxV1Jq7UANuSYpn42...|      1208.0|    1|2010-04-15|Edinburgh|     6.0|
|soDF6mePh1SuNZI3r...|      1208.0|    3|2015-03-10|Edinburgh|   433.0|
|LURC3E0DoXYgN9aYT...|       684.0|    2|2012-02-04|Edinburgh|     2.0|
|W1Nl6_R7amuZ6NStX...|       684.0|    1|2011-02-24|Edinburgh|    91.0|
|hutJzKEYHuVq6CP-X...|       684.0|    2|2010-08-05|Edinburgh|    54.0|
|fcMTpwfLS9F5DWTql...|      1351.0|    4|2012-08-23|Edinburgh|    15.0|
|yuFHrb8YQtVuzu0eE...|      1351.0|    4|2013-04-19|Edinburgh|    34.0|
|yfXqZkU5iXE07GSHz...|       214.0|    4|2010-06-16|Edinburgh|     3.0|
|v_lWueG7V_vul4E8r...|       214.0|    4|2010-07-02|Edinburgh|    87.0|
|LURC3E0DoXYgN9aYT...|       214.0|    4|2012-05-17|Edinburgh|  

In [93]:
df_city_reviews.cache()

DataFrame[user_id: string, business_idn: double, stars: bigint, date: string, city: string, user_idn: double]

### Adding a sequential number to the user's reviews

- Now add a ***sequential number*** to the user's reviews, that is, for each user, order his/her reviews by date (multiple reviews on the same date can be randomly ordered) and number them (hint: check ***window functions***)
- This sequential number will be useful later to perform a time-wise split of the dataset

In [121]:
from pyspark.sql import Window

window = Window.partitionBy('user_idn').orderBy("date")

df_city_reviews = df_city_reviews.withColumn('sequential_number', F.row_number().over(window))

In [122]:
df_city_reviews.show()

+------------+-----+----------+---------+--------+-----------------+----------+
|business_idn|stars|      date|     city|user_idn|sequential_number|no_reviews|
+------------+-----+----------+---------+--------+-----------------+----------+
|       327.0|    5|2011-04-20|Edinburgh|   299.0|                1|         2|
|       144.0|    5|2011-04-20|Edinburgh|   299.0|                2|         2|
|      1047.0|    2|2011-05-02|Edinburgh|   299.0|                3|         4|
|       228.0|    4|2011-05-02|Edinburgh|   299.0|                4|         4|
|       942.0|    5|2011-05-19|Edinburgh|   299.0|                5|         5|
|        51.0|    5|2011-05-25|Edinburgh|   299.0|                6|         6|
|       850.0|    3|2011-05-30|Edinburgh|   299.0|                7|         8|
|       703.0|    4|2011-05-30|Edinburgh|   299.0|                8|         8|
|      1336.0|    5|2011-06-01|Edinburgh|   299.0|                9|         9|
|       460.0|    5|2011-06-09|Edinburgh

### Subsetting reviews to keep only users with more than 4 reviews

- Some users had rated only 1 or a few businesses - this would pose as a problem to make recommendations - so you would want to keep only users who had rated more than 4 reviews, for instance
- Find the ***total number of reviews*** for each user and then filter them using this information (hint: again, you can use a ***window function***)

In [130]:
window = Window.partitionBy('user_idn')

In [136]:
df_selected = df_city_reviews.withColumn('no_reviews', 
                                             F.max('sequential_number').over(window))

In [137]:
df_selected.show()

+------------+-----+----------+---------+--------+-----------------+----------+
|business_idn|stars|      date|     city|user_idn|sequential_number|no_reviews|
+------------+-----+----------+---------+--------+-----------------+----------+
|       327.0|    5|2011-04-20|Edinburgh|   299.0|                1|        12|
|       144.0|    5|2011-04-20|Edinburgh|   299.0|                2|        12|
|      1047.0|    2|2011-05-02|Edinburgh|   299.0|                3|        12|
|       228.0|    4|2011-05-02|Edinburgh|   299.0|                4|        12|
|       942.0|    5|2011-05-19|Edinburgh|   299.0|                5|        12|
|        51.0|    5|2011-05-25|Edinburgh|   299.0|                6|        12|
|       850.0|    3|2011-05-30|Edinburgh|   299.0|                7|        12|
|       703.0|    4|2011-05-30|Edinburgh|   299.0|                8|        12|
|      1336.0|    5|2011-06-01|Edinburgh|   299.0|                9|        12|
|       460.0|    5|2011-06-09|Edinburgh

In [138]:
df_selected.cache()

DataFrame[business_idn: double, stars: bigint, date: string, city: string, user_idn: double, sequential_number: int, no_reviews: int]

### Calculating mean rating by user

- Now you can calculate the mean rating by user and make it into a dictionary where the key is the ***user_id*** (hint: look at ***rdd*** method of DataFrames and ***collectAsMap*** method of RDDs)

In [149]:
df_selected.select('user_idn', 'stars').groupby('user_idn').mean().show()

+--------+-------------+------------------+
|user_idn|avg(user_idn)|        avg(stars)|
+--------+-------------+------------------+
|   299.0|        299.0| 4.333333333333333|
|   305.0|        305.0|3.9166666666666665|
|   496.0|        496.0| 2.857142857142857|
|   558.0|        558.0|3.1666666666666665|
|   596.0|        596.0| 4.166666666666667|
|   692.0|        692.0|               3.0|
|   769.0|        769.0|               3.8|
|   934.0|        934.0|              3.75|
|  1051.0|       1051.0|               4.5|
|  1761.0|       1761.0|               4.5|
|  2734.0|       2734.0|               3.0|
|  2815.0|       2815.0|               1.0|
|  2862.0|       2862.0|               5.0|
|  3597.0|       3597.0|               5.0|
|  3901.0|       3901.0|               5.0|
|  3980.0|       3980.0|               4.0|
|  4066.0|       4066.0|               5.0|
|  4142.0|       4142.0|               5.0|
|  4800.0|       4800.0|               5.0|
|  5360.0|       5360.0|        

In [150]:
dict_user_means = df_selected.select('user_idn', 'stars') \
                             .groupby('user_idn') \
                             .mean() \
                             .drop('avg(user_idn)') \
                             .rdd \
                             .collectAsMap() \

In [153]:
dict_user_means[299]

4.333333333333333

### Centering rating by user

- The dictionary containing mean ratings by user can be seen as a ***lookup table*** - what is the appropriate way of dealing with those in Spark?
- Once you have figured this out, define a regular Python function that takes two arguments - ***user_id*** (String) and ***rating*** (String, which you will need to convert to float inside the function) - and returns the result of subtracting the mean rating of the user from the rating parameter
- Using the Python function, define a Spark's User Defined Function (UDF) with a DoubleType return
- Using the UDF, create a column in your DataFrame with the centered ratings

In [None]:
from pyspark.sql.types import DoubleType

lookup_user_means = ...

def zero_mean(user_id, rating):
    pass

df_centered = ...

- Once again, the UDF approach is not the most "Sparkonic" way of handling this - can you perform the same operation using only functions from ***pyspark.sql.functions*** (which was imported earlier as F)?
    - hint: you'll need ***Window functions***

In [155]:
# you can overwrite df_centered
window = Window.partitionBy('user_idn')
df_centered = (df_selected
               .withColumn('avg_stars', F.avg('stars').over(window))
               .withColumn('stars', F.expr('stars - avg_stars'))
               .drop('avg_stars'))

In [156]:
df_centered.show()

+------------+--------------------+----------+---------+--------+-----------------+----------+
|business_idn|               stars|      date|     city|user_idn|sequential_number|no_reviews|
+------------+--------------------+----------+---------+--------+-----------------+----------+
|       327.0|   0.666666666666667|2011-04-20|Edinburgh|   299.0|                1|        12|
|       144.0|   0.666666666666667|2011-04-20|Edinburgh|   299.0|                2|        12|
|      1047.0|  -2.333333333333333|2011-05-02|Edinburgh|   299.0|                3|        12|
|       228.0|-0.33333333333333304|2011-05-02|Edinburgh|   299.0|                4|        12|
|       942.0|   0.666666666666667|2011-05-19|Edinburgh|   299.0|                5|        12|
|        51.0|   0.666666666666667|2011-05-25|Edinburgh|   299.0|                6|        12|
|       850.0|  -1.333333333333333|2011-05-30|Edinburgh|   299.0|                7|        12|
|       703.0|-0.33333333333333304|2011-05-30|Edin

## Dataset

### Splitting into training and test sets by time

- In recommender systems, it is common practice to do the training/test split timewise, that is, the test set is composed of the latest reviews
- First, filter only those reviews which have a sequential number smaller than the ***total number of reviews***, by user: this is your training set
- Then, filter only those reviews which have a sequential number identical to the ***total number of reviews***, by user: this is your test set
- Now you can see why you had to add a sequential number to the user's reiews - since some users had done all his/her reviews on the same day, you need to disambiguate them to split the dataset. By doing this, you guarantee your test set will have only 1 review for each user.

In [248]:
df_training = df_centered.filter('sequential_number < no_reviews')
df_test = df_centered.filter('sequential_number == no_reviews')

### If using Spark 2.1 (as in the Docker image), you need to filter out "new" businesses in the test set

In [249]:
businesses = df_training.select('business_idn').distinct()
df_test = df_test.join(businesses, on='business_idn')

## Alternate Least Squares (ALS) Model

- This is the recommender itself - the ALS uses a iterative approach to find the underlying factors that yield the user/item rating matrix
- It takes as input a DataFrame with three columns, representing:
    - userCol: user IDs (numeric - remember the conversion you did)
    - itemCol: item IDs (numeric - remember the conversion you did)
    - ratingCol: rating (numeric, obviously)
    - coldStartStrategy: "drop" (if there is unseen data on the test set, meaning a new user/business, drop it) - ***only available from Spark 2.2 on***
- Its parameters are:
    - rank: the number of factors to consider
    - maxIter: the maximum number of iterations to perform
    - regParam: the regularization parameter
- Use Spark's ALS to fit a model based on your DataFrame

In [237]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

als = ALS(rank=10,
          maxIter=10, 
          regParam=0.1, 
          coldStartStrategy='drop',
          userCol='user_idn',
          itemCol='business_idn',
          ratingCol='stars')
df_training = (df_training.select('user_idn', 'business_idn', 'stars')
      .withColumnRenamed('user_idn', 'user')
      .withColumnRenamed('business_idn', 'item')
      .withColumnRenamed('stars', 'rating')
     )

In [238]:
model = als.fit(df_training)

### Predictions for the training set

- Once the model is trained, make predictions for the training set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [239]:
predictions = model.transform(df_training.select('user', 'item'))

In [240]:
#df_training.join(predictions, on=['user', 'item']).show()

In [241]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol='rating')

In [242]:
train_rmse = evaluator.evaluate(df_training.join(predictions, on=['user', 'item']))

In [243]:
train_rmse

0.3663683763636205

### Predictions for the test set

- Now, make predictions for the test set and use a ***RegressionEvaluator*** to find out the RMSE of the predictions

In [244]:
df_test = (df_test.select('user_idn', 'business_idn', 'stars')
      .withColumnRenamed('user_idn', 'user')
      .withColumnRenamed('business_idn', 'item')
      .withColumnRenamed('stars', 'rating')
     )

In [245]:
predictions_test = model.transform(df_test.select('user', 'item'))

In [246]:
test_rmse = evaluator.evaluate(df_test.join(predictions_test, on=['user', 'item']))

In [247]:
test_rmse

nan

In [232]:
df_test.join(predictions_test, on=['user', 'item']).show()

+------+------+--------------------+------------+
|  user|  item|              rating|  prediction|
+------+------+--------------------+------------+
| 283.0| 709.0|  0.0769230769230771|  0.08602212|
| 816.0| 491.0| -1.2000000000000002|  0.15445091|
| 948.0| 245.0|                 1.5|  0.34028235|
| 978.0| 181.0|                0.25| -0.19238092|
|1003.0|1130.0|                 1.0| -0.14439525|
|1093.0|1227.0|                 2.0|  -1.1584496|
|1125.0| 308.0|                 0.0|         0.0|
|1272.0| 494.0|  0.6666666666666665| 0.109158695|
|1303.0|1300.0| 0.33333333333333304| -0.17399289|
|1418.0| 760.0| 0.33333333333333304|  0.23896167|
|1540.0| 198.0|-0.33333333333333304|  0.10873921|
|1656.0| 815.0|                 0.0|         0.0|
|1915.0| 129.0|                 0.5|-0.018868767|
|2083.0| 390.0|                -1.5|   -0.766736|
|2123.0| 488.0|                 0.0|         0.0|
|2129.0| 893.0|                 0.0|         0.0|
|2490.0|1102.0|                 0.0|         0.0|


In [234]:
predictions_test.filter('item == 767').show()

+------+-----+-----------+
|  user| item| prediction|
+------+-----+-----------+
|4957.0|767.0|        NaN|
|6124.0|767.0|        NaN|
|1956.0|767.0|0.053268258|
|  40.0|767.0|  0.3869143|
|4692.0|767.0|        NaN|
|3677.0|767.0|        NaN|
|1642.0|767.0|-0.04912223|
|5372.0|767.0|        NaN|
| 728.0|767.0|  0.2646231|
|5443.0|767.0|        NaN|
|2516.0|767.0|-0.19701785|
|2800.0|767.0|        NaN|
|3290.0|767.0|        NaN|
|1854.0|767.0| 0.21151784|
|3642.0|767.0|        NaN|
+------+-----+-----------+



## Recommendations

Now, your model is trained, but how can you use it to make recommendations for a given user?

### Organizing business data

- It would not make sense to recommend a place the user has already rated, right? So, generate a dictionary where ***user_idn*** is the key and a list of the already rated ***business_idn*** is the value (hint: when aggregating DataFrames, ***collect_list*** is a VERY useful function to turn multiple records into a list)

In [None]:
from pyspark.sql.functions import collect_list

dict_visited_by_user = ...

- Besides, recommending a given business_id also does not help much, right? So you need to organize the business data in a way it can be shown to the user.
    - Define a regular Python function that takes one argument ***row*** (Row type) and returns a dictionary where ***business_idn*** is the key and the value is yet another dictionary with relevant fields (for instance: name, address, stars, categories)
    - Transform your business DataFrame into an RDD and apply the function you defined - upon collecting, you will end up with a list of dictionaries
    - Transform this list of dictionaries into a single dictionary

In [None]:
def rest_to_json(row):
    pass

rest = ...

dict_rest = {k: v for d in rest for k, v in d.items()}

### Making recommendations for a user

- To actually make the recommendations, we need to build an input DataFrame to feed the model
    - A DataFrame can be created using the SQL Context and a list of Rows, each containg two columns: user_idn and business_idn - the rating will be computed by the model
    - But you only need to have rows for the businesses which were not yet rated by the user - from all businesses, exclude the ones already rated by him/her

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import desc

user_idn = 317
n_business = len(dict_rest)

visited = ...
not_visited = ...

df_test_user = ...

- Now, you can use the generated DataFrame to make predictions
    - If there are any NA predictions, make sure to turn them into a really bad value (for instance, -5.0) (hint: remember ***na*** method of DataFrames)
- Order the predictions and take the ***business_idn*** of the top 5
- Finally, use this information to fetch the business data from the dictionary you assembled a couple of steps ago

In [None]:
predictions = ...

top_predictions = ...

response = list(map(lambda idn: dict_rest[idn], top_predictions))

In [None]:
response

## Congratulations, you finished the exercise!

In [None]:
sc.stop()