In [1]:
# Instrument for unit tests. This is only executed in local unit tests, not in Databricks.
if 'dbutils' not in locals():
    import databricks_test
    databricks_test.inject_variables()

In [2]:
import os
# from test_helper import Test


dbfs_dir = '/databricks-datasets/cs110x/ml-20m/data-001'
ratings_filename = dbfs_dir + '/ratings.csv'
movies_filename = dbfs_dir + '/movies.csv'

# The following line is here to enable this notebook to be exported as source and
# run on a local machine with a local copy of the files. Just change the dbfs_dir,
# above.
if os.path.sep != '/':
  # Handle Windows.
  ratings_filename = ratings_filename.replace('/', os.path.sep)
  movie_filename = movie_filename.replace('/', os.path.sep)

In [3]:
display(dbutils.fs.ls(dbfs_dir))

path,name,size
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/README.txt,README.txt,8964
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/links.csv,links.csv,569517
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/links.csv.gz,links.csv.gz,245973
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/movies.csv,movies.csv,1397542
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/movies.csv.gz,movies.csv.gz,498839
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv,ratings.csv,533444411
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv.gz,ratings.csv.gz,132656084
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/tags.csv,tags.csv,16603996
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/tags.csv.gz,tags.csv.gz,4787917


In [4]:
# deal with the comprissed files ending in (.gz)
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
)

In [5]:
# caching the data on top of S3 in memory 
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *

raw_ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df = raw_ratings_df.drop('Timestamp')

raw_movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_df = raw_movies_df.drop('Genres').withColumnRenamed('movieId', 'ID')

ratings_df.cache()
movies_df.cache()

assert ratings_df.is_cached
assert movies_df.is_cached

raw_ratings_count = raw_ratings_df.count()
ratings_count = ratings_df.count()
raw_movies_count = raw_movies_df.count()
movies_count = movies_df.count()


In [6]:
print ('There are %s ratings and %s movies in the datasets' % (ratings_count, movies_count))
print ('Ratings:')
ratings_df.show(3)
print ('Movies:')
movies_df.show(3, truncate=False)

assert raw_ratings_count == ratings_count
assert raw_movies_count == movies_count

In [7]:
# quick verification on data
assert ratings_count == 20000263
assert movies_count == 27278
assert movies_df.filter(movies_df.title == 'Toy Story (1995)').count() == 1
assert ratings_df.filter((ratings_df.userId == 6) & (ratings_df.movieId == 1) & (ratings_df.rating == 5.0)).count() == 1


In [8]:
display(movies_df)

ID,title
1,Toy Story (1995)
2,Jumanji (1995)
3,Grumpier Old Men (1995)
4,Waiting to Exhale (1995)
5,Father of the Bride Part II (1995)
6,Heat (1995)
7,Sabrina (1995)
8,Tom and Huck (1995)
9,Sudden Death (1995)
10,GoldenEye (1995)


In [9]:
display(ratings_df)

userId,movieId,rating
1,2,3.5
1,29,3.5
1,32,3.5
1,47,3.5
1,50,3.5
1,112,3.5
1,151,4.0
1,223,4.0
1,253,4.0
1,260,4.0


In [10]:
# one way to recommend movies is to find the movies with the highest average ratings.
# I will use spark to find the name, number of ratings, and average ratings for the top 20 movies with at least 500 reviews.

# get ratings_df into a new dataframe called movie_ids_with_avg_ratings that have
# 1. movieId, 2. no_of ratings per movie, 3. avg rating for all movies
# add movie title column to the movie_ids_with_avg_ratings into a new dataframe called movie_names_with_avg_ratings_df
# will do join for the last step

from pyspark.sql import functions as F

movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias('count'), F.avg(ratings_df.rating).alias('average'))

print('movie_ids_with_avg_ratings_df:')

movie_ids_with_avg_ratings_df.show(3, truncate=False)


In [11]:
movie_names_df = movies_df.withColumnRenamed('ID', 'movieId')
movie_names_with_avg_ratings_df = movie_ids_with_avg_ratings_df.join(movie_names_df, on=['movieId'], how='inner')
print('movie_names_with_avg_ratings_df:')
movie_names_with_avg_ratings_df.show(3, truncate=False)

In [12]:
#Now that we have a DataFrame of the movies with highest average ratings, we can use Spark to determine the 20 movies with highest average ratings and at least 500 reviews. 

movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter(F.col('count')>=500)\
  .sort('average')

print ('Movies with highest ratings:')
movies_with_500_ratings_or_more.show(20, truncate=False)

In [13]:
# (2a) Creating a Training Set
# break the ratings_df into three parts:

# A training set (DataFrame), which we will use to train models
# A validation set (DataFrame), which we will use to choose the best model
# A test set (DataFrame), which we will use for our experiments
# To randomly split the dataset into the multiple groups, we can use the pySpark randomSplit() transformation. randomSplit() takes a set of splits and a seed and returns multiple DataFrames.

In [14]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing

seed = 42
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6, 0.2, 0.2], seed = seed)

In [15]:
# cache the resulted datasets
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

After splitting the dataset, the training set has about 12 million entries and the validation and test sets each have about 4 million entries. (The exact number of entries in each dataset varies slightly due to the random nature of the randomSplit() transformation.)

Alternating Least Squares
In this part, we will use the Apache Spark ML Pipeline implementation of Alternating Least Squares, ALS. ALS takes a training dataset (DataFrame) and several parameters that control the model creation process. To determine the best values for the parameters, we will use ALS to train several models, and then we will select the best model and use the parameters from that model in the rest of this lab exercise.

The process we will use for determining the best model is as follows:

Pick a set of model parameters. The most important parameter to model is the rank, which is the number of columns in the Users matrix (green in the diagram above) or the number of rows in the Movies matrix (blue in the diagram above). In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to overfitting. We will train models with ranks of 4, 8, and 12 using the training_df dataset.

Set the appropriate parameters on the ALS object:

The "User" column will be set to the values in our userId DataFrame column.
The "Item" column will be set to the values in our movieId DataFrame column.
The "Rating" column will be set to the values in our rating DataFrame column.
We'll using a regularization parameter of 0.1.
Note: Read the documentation for the ALS class carefully. It will help you accomplish this step.

Have the ALS output transformation (i.e., the result of ALS.fit()) produce a new column called "prediction" that contains the predicted value.

Create multiple models using ALS.fit(), one for each of our rank values. We'll fit against the training data set (training_df).

For each model, we'll run a prediction against our validation data set (validation_df) and check the error.

We'll keep the model with the best error rate.

Why are we doing our own cross-validation?
A challenge for collaborative filtering is how to provide ratings to a new user (a user who has not provided any ratings at all). Some recommendation systems choose to provide new users with a set of default ratings (e.g., an average value across all ratings), while others choose to provide no ratings for new users. Spark's ALS algorithm yields a NaN (Not a Number) value when asked to provide a rating for a new user.

Using the ML Pipeline's CrossValidator with ALS is thus problematic, because cross validation involves dividing the training data into a set of folds (e.g., three sets) and then using those folds for testing and evaluating the parameters during the parameter grid search process. It is likely that some of the folds will contain users that are not in the other folds, and, as a result, ALS produces NaN values for those new users. When the CrossValidator uses the Evaluator (RMSE) to compute an error metric, the RMSE algorithm will return NaN. This will make all of the parameters in the parameter grid appear to be equally good (or bad).

You can read the discussion on Spark JIRA 14489 about this issue. There are proposed workarounds of having ALS provide default values or having RMSE drop NaN values. Both introduce potential issues. We have chosen to have RMSE drop NaN values. While this does not solve the underlying issue of ALS not predicting a value for a new user, it does provide some evaluation value. We manually implement the parameter grid search process using a for loop (below) and remove the NaN values before using RMSE.

For a production application, you would want to consider the tradeoffs in how to handle new users.

Note: This cell will likely take a couple of minutes to run.

In [17]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter = 5, regParam = 0.1, userCol = 'userId', itemCol = 'movieId', ratingCol='rating', coldStartStrategy = 'drop')
# Now we set the parameters for the method


In [18]:
from pyspark.ml.evaluation import RegressionEvaluator
# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1

for rank in ranks:
  # Set the rank here:
  als.setRank(5)
  # Create the model with these parameters.
  model = als.fit(training_df)
  # Run the model to create a prediction. Predict against the validation_df.
  predict_df = model.transform(validation_df)

  # Remove NaN values from prediction (due to SPARK-14489)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predict_df)
  errors[err] = error
  models[err] = model
  print ('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print ('The best model was trained with rank %s' % ranks[best_rank])
my_model = models[best_rank]

## Testing the Model
So far, we used the training_df and validation_df datasets to select the best model. Since we used these two datasets to determine what model is best, we cannot use them to test how good the model is; otherwise, we would be very vulnerable to overfitting. To decide how good our model is, we need to use the test_df dataset. We will use the best_rank you determined in part (2b) to create a model for predicting the ratings for the test dataset and then we will compute the RMSE.

The steps you should perform are:

* Run a prediction, using my_model as created above, on the test dataset (test_df), producing a new predict_df DataFrame.
* Filter out unwanted NaN values (necessary because of a bug in Spark). We've supplied this piece of code for you.
* Use the previously created RMSE evaluator, reg_eval to evaluate the filtered DataFrame.

In [20]:
# In ML Pipelines, this next step has a bug that produces unwanted NaN values. We
# have to filter them out. See https://issues.apache.org/jira/browse/SPARK-14489
predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

## Comparing the Model
Looking at the RMSE for the results predicted by the model versus the values in the test set is one way to evalute the quality of the model. Another way to evaluate the model is to evaluate the error from a test set where every rating is the average rating for the training set.

### The steps are:

1. Use the training_df to compute the average rating across all movies in that training dataset.
2. Use the average rating that you just determined and the test_df to create a DataFrame (test_for_avg_df) with a prediction column containing the average rating. 
3. Use the previously created reg_eval object to evaluate the test_for_avg_df and calculate the RMSE.

In [22]:
avg_rating_df = training_df.select(F.avg(training_df.rating))
avg_rating_df.show()

In [23]:
# Compute the average rating

# Extract the average rating value. (This is row 0, column 0.)
training_avg_rating = avg_rating_df.collect()[0][0]
training_avg_rating
print('The average rating for movies in the training set is {0}'.format(training_avg_rating))

# Add a column with the average rating
# df.select(lit(5).alias('height')).withColumn('spark_user', lit(True)).take(1)
# [Row(height=5, spark_user=True)]
# test_for_avg_df = test_df.select(lit(training_avg_rating).alias('average')).withColumn('prediction', lit(True)).take(1)
test_for_avg_df = test_df.withColumn('prediction', F.lit(training_avg_rating))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE))

Predictions for Yourself
The ultimate goal of this lab exercise is to predict what movies to recommend to yourself. In order to do that, you will first need to add ratings for yourself to the ratings_df dataset.

### Movie Ratings

To help you provide ratings for yourself, we have included the following code to list the names and movie IDs of the 50 highest-rated movies from movies_with_500_ratings_or_more which we created in part 1 the lab.

In [25]:
print ('Most rated movies:')
print ('(average rating, movie name, number of reviews, movie ID)')
display(movies_with_500_ratings_or_more.orderBy(movies_with_500_ratings_or_more['average'].desc()).take(50))

movieId,count,average,title
318,63366,4.446990499637029,"Shawshank Redemption, The (1994)"
858,41355,4.364732196832306,"Godfather, The (1972)"
50,47006,4.334372207803259,"Usual Suspects, The (1995)"
527,50054,4.310175010988133,Schindler's List (1993)
1221,27398,4.275640557704942,"Godfather: Part II, The (1974)"
2019,11611,4.2741796572216,Seven Samurai (Shichinin no samurai) (1954)
904,17449,4.271333600779414,Rear Window (1954)
7502,4305,4.263182346109176,Band of Brothers (2001)
912,24349,4.258326830670664,Casablanca (1942)
922,6525,4.256934865900383,Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)


The user ID 0 is unassigned, so we will use it for the ratings. We set the variable my_user_ID to 0. Next, create a new DataFrame called my_ratings_df, with your ratings for at least 10 movie ratings. Each entry should be formatted as (my_user_id, movieID, rating). As in the original dataset, ratings should be between 1 and 5 (inclusive). If you have not seen at least 10 of these movies, you can increase the parameter passed to ```take()``` in the above cell until there are 10 movies that you have seen (or you can also guess what your rating would be for movies you have not seen).

In [27]:
from pyspark.sql import Row
my_user_id = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
my_rated_movies = [
     (5,4973, 4),
     (5, 750, 4),
     (5, 858, 4),
     (10, 1221, 4),
     (400, 593, 3),
     (697, 584, 2),
     (91, 389, 5),
     (100, 924, 3),
     (837, 836, 4),
     (903, 369, 3)
     # The format of each line is (my_user_id, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (my_user_id, 260, 5),
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print ('My movie ratings:')
display(my_ratings_df.limit(10))

userId,movieId,rating
5,4973,4
5,750,4
5,858,4
10,1221,4
400,593,3
697,584,2
91,389,5
100,924,3
837,836,4
903,369,3


### Add Your Movies to Training Dataset

Now that you have ratings for yourself, you need to add your ratings to the training dataset so that the model you train will incorporate your preferences. Spark's unionAll() transformation combines two DataFrames; use unionAll() to create a new training dataset that includes your ratings and the data in the original training dataset.

In [29]:
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)

print ('The training dataset now has %s more entries than the original training dataset' %
       (training_with_my_ratings_df.count() - training_df.count()))

In [30]:
training_with_my_ratings_df.count() - training_df.count() == my_ratings_df.count()

### Train a Model with Your Ratings
Now, train a model with your ratings added and the parameters you used in in part (2b) and (2c). Mke sure you include all of the parameters.

In [32]:
# Reset the parameters for the ALS object.
als = ALS(maxIter = 5, regParam = 0.1, userCol = 'userId', itemCol = 'movieId', ratingCol='rating', coldStartStrategy = 'drop')


# Create the model with these parameters.
my_ratings_model = als.fit(training_with_my_ratings_df)

In [33]:
my_predict_df = my_ratings_model.transform(training_with_my_ratings_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))


In [34]:
movies_df.show(3)
not_rated_df.show(3)
training_with_my_ratings_df.show(3)

In [35]:
# Create a list of my rated movie IDs
my_rated_movie_ids = [x[1] for x in my_rated_movies]

# Filter out the movies I already rated.
not_rated_df = movies_df.filter(~ movies_df.ID.isin(my_rated_movie_ids))

# Rename the "ID" column to be "movieId", and add a column with my_user_id as "userId".
my_unrated_movies_df = not_rated_df.withColumnRenamed('ID', 'movieId')\
  .withColumn('userId', F.lit(my_user_id))

# Use my_rating_model to predict ratings for the movies that I did not manually rate.
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))


## Predict Your Ratings
We have our predicted ratings. Now we can print out the 25 movies with the highest predicted ratings.

The steps:

1. Join your predicted_ratings_df DataFrame with the movie_names_with_avg_ratings_df DataFrame to obtain the ratings counts for each movie.
2. Sort the resulting DataFrame (predicted_with_counts_df) by predicted rating (highest ratings first), and remove any ratings with a count of 75 or less.
3. Print the top 25 movies that remain.

In [37]:
predicted_with_counts_df = movie_names_with_avg_ratings_df.join(predicted_ratings_df.select('movieId'), 'movieId')
# predicted_highest_rated_movies_df = predicted_with_counts_df.sort(predicted_with_counts_df.prediction.desc()).collect()

# print ('My 25 highest rated movies as predicted (for movies with more than 75 reviews):')
# predicted_highest_rated_movies_df.filter(F.col('count')> 75)


In [38]:
predicted_ratings_df
