In [1]:
import os

dbfs_dir = '/databricks-datasets/cs110x/ml-20m/data-001'
ratings_filename = dbfs_dir + '/ratings.csv'
movies_filename = dbfs_dir + '/movies.csv'

if os.path.sep != '/':
  # Handle Windows.
  ratings_filename = ratings_filename.replace('/', os.path.sep)
  movie_filename = movie_filename.replace('/', os.path.sep)

In [2]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
)

In [3]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *

raw_ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df = raw_ratings_df.drop('Timestamp')

raw_movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_df = raw_movies_df.drop('Genres').withColumnRenamed('movieId', 'ID')

ratings_df.cache()
movies_df.cache()


raw_ratings_count = raw_ratings_df.count()
ratings_count = ratings_df.count()
raw_movies_count = raw_movies_df.count()
movies_count = movies_df.count()

print 'There are %s ratings and %s movies in the datasets' % (ratings_count, movies_count)
print 'Ratings:'
ratings_df.show(3)
print 'Movies:'
movies_df.show(3, truncate=False)



##  Basic Recommendations

In [5]:
from pyspark.sql import functions as F
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"), F.avg(ratings_df.rating).alias("average"))
print 'movie_ids_with_avg_ratings_df:'
movie_ids_with_avg_ratings_df.show(3, truncate=False)

movie_names_df = movie_ids_with_avg_ratings_df.join(movies_df,movie_ids_with_avg_ratings_df.movieId==movies_df.ID)
movie_names_with_avg_ratings_df = movie_names_df.select("average","title","count","movieId")

print 'movie_names_with_avg_ratings_df:'
movie_names_with_avg_ratings_df.show(3, truncate=False)

###  Movies with Highest Average Ratings and at least 500 reviews

In [7]:
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter("count >= 500")
print 'Movies with highest ratings:'
movies_with_500_ratings_or_more.show(20, truncate=False)

In [8]:
seed = 1800009193L
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([.6,.2,.2],seed)

training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

In [9]:
from pyspark.ml.recommendation import ALS

als = ALS()
als.setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
from pyspark.ml.evaluation import RegressionEvaluator
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  als.setRank(rank)
  model = als.fit(training_df)
  predict_df = model.transform(validation_df)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[err] = error
  models[err] = model
  print 'For rank %s the RMSE is %s' % (rank, error)
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print 'The best model was trained with rank %s' % ranks[best_rank]
my_model = models[best_rank]

### (2c) Testing the Model

In [11]:
predict_df = my_model.transform(test_df)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

### (2d) Comparing Models

In [13]:
=avg_rating_df = training_df.agg(F.avg('rating'))
training_avg_rating = avg_rating_df.collect()[0][0]

print('The average rating for movies in the training set is {0}'.format(training_avg_rating))
test_for_avg_df = test_df.withColumn('prediction', F.lit(training_avg_rating))
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE))

In [14]:
from pyspark.sql import Row
my_user_id = 0
my_rated_movies = [
     (0,318,5),(0,858,3),(0,50,3),(0,6016,3),(0,2959,3),(0,58559,4),(0,2571,4),(0,4226,4),(0,593,5),(0,296,4)
     # The format of each line is (my_user_id, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (my_user_id, 260, 5),
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print 'My movie ratings:'
display(my_ratings_df.limit(10))

### (3b) Add custom ratings of Movies to Training Dataset

In [16]:
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)

print ('The training dataset now has %s more entries than the original training dataset' %
       (training_with_my_ratings_df.count() - training_df.count()))
assert (training_with_my_ratings_df.count() - training_df.count()) == my_ratings_df.count()

In [17]:
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
   .setUserCol("userId").setItemCol("movieId").setRatingCol("rating").setRank(4)

my_ratings_model = als.fit(training_with_my_ratings_df)

In [18]:
my_predict_df = my_ratings_model.transform(test_df)

predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))

### (3e) Predict Ratings for new user

In [20]:
my_rated_movie_ids = [x[1] for x in my_rated_movies]
not_rated_df = movies_df.filter(~movies_df["ID"].isin(my_rated_movie_ids))

my_unrated_movies_df = not_rated_df.select(not_rated_df.ID.alias("movieId"),F.lit(my_user_id).alias("userId"))

raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

In [21]:
predicted_with_counts_df.show()

In [22]:
predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df, predicted_ratings_df.movieId == movie_names_with_avg_ratings_df.movieId, "inner")
predicted_highest_rated_movies_df = predicted_with_counts_df.filter(predicted_with_counts_df['count']>75).orderBy(predicted_with_counts_df['prediction'].desc())

print ('My 25 highest rated movies as predicted (for movies with more than 75 reviews):')
predicted_highest_rated_movies_df.show(25,truncate=False)