# Movie Recommendation Engine

We are using [Collaborative Filtering](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html), a commonly used recommender technique, to predict movie recommendations.

Collaborative filtering fills in the missing entries of a user-item association matrix (e.g. user and movies). `spark.ml` performs distributed collaborative filtering using the alternating least square (ALS) algorithm to predict missing entries (e.g. user has no movies or movies with no users).  In our scenario, we will be the [Spark Training Dataset](https://github.com/databricks/spark-training/tree/master/data/movielens/medium); specifically:
* `movies.dat` containing a list of movies 
* `ratings.dat` containing a list of user_ids and their associated ratings of variious movies

Ready to take this notebook for a spin?
* If you have a Databricks account, import this notebook into your workspace where you can modify it.
* If you do not have a Databricks account, [click here](https://databricks.com/try-databricks) to set-up a free trial to begin playing with this notebook today.

In [2]:
# Clear widgets if any exist
dbutils.widgets.removeAll()

* Extract the `movies` and `ratings`datasets hosted at [Spark Training Dataset](https://github.com/databricks/spark-training/tree/master/data/movielens/medium)
* Create the `ratings` and `movies` tables using the [Databricks Guide > Create a table using the UI](https://docs.databricks.com/user-guide/tables.html#create-a-table-using-the-ui)
  * Note that the delimiter is `::`

* The code snippet below will select 10 random movies out of the the 200 top most rated movies  
* You will use **Databricks Widgets** to personalize your movie ratings

In [6]:
spark.sql("""
    select 
      movie_id, movies.name, count(*) as times_rated 
    from 
      ratings
    join 
      movies on ratings.movie_id = movies.id
    group by 
      movie_id, movies.name, movies.year
    order by 
      times_rated desc
    limit
      200
    """
).createOrReplaceTempView("most_rated_movies")

In [7]:
# Take a sample of 10 movies
most_rated_movies_sample = spark.table("most_rated_movies").rdd.takeSample(True, 10)

# Update widgets with movies for you to rate
for i in range(0, len(most_rated_movies_sample)):
  dbutils.widgets.dropdown("movie_%i" % i, "5", ["1", "2", "3", "4", "5"], most_rated_movies_sample[i].name)

* Change the values on top to be your own personal ratings **before** proceeding.
* The following code snippet will process your personal movie ratings.

In [9]:
# Create DataFrame based on your own personal ratings
from datetime import datetime
from pyspark.sql import Row
ratings = []
for i in range(0, len(most_rated_movies_sample)):
  ratings.append(
    Row(user_id = 0,
        movie_id = most_rated_movies_sample[i].movie_id,
        rating = float(dbutils.widgets.get("movie_%i" %i))
        )
  )
myRatingsDF = spark.createDataFrame(ratings)

# Create myRatings DataFrame with specific column order to match `ratings`
myRatings = myRatingsDF.select("user_id", "movie_id", "rating")
myRatings.createOrReplaceTempView("myRatings")

In [10]:
%sql
-- Display your ratings
select f.movie_id, m.name, f.rating from myRatings f inner join most_rated_movies m on m.movie_id = f.movie_id

movie_id,name,rating
1307,When Harry Met Sally...,5.0
648,Mission: Impossible,4.0
1208,Apocalypse Now,5.0
2395,Rushmore,4.0
2054,"Honey, I Shrunk the Kids",4.0
1221,"Godfather: Part II, The",4.0
3751,Chicken Run,3.0
736,Twister,3.0
2406,Romancing the Stone,4.0
39,Clueless,4.0


In [11]:
from pyspark.sql import functions

ratings = spark.table("ratings")
ratings = ratings.withColumn("rating", ratings.rating.cast("float"))
ratings = ratings.drop('timestamp')

# Split our data for our training and test datasets
(training, test) = ratings.randomSplit([0.8, 0.2])

In [12]:
from pyspark.ml.recommendation import ALS

# Run ALS collaborative filtering
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="movie_id", ratingCol="rating")

# Run training model which includes your own ratings
model = als.fit(training.unionAll(myRatings))

In [13]:
predictions = model.transform(test).dropna()
predictions.createOrReplaceTempView("predictions")
display(predictions)

user_id,movie_id,rating,prediction
4169,148,3.0,2.6594105
4387,148,1.0,2.1560543
3053,148,3.0,2.3044345
673,148,5.0,3.9411874
3184,148,4.0,3.1768913
1605,148,2.0,2.5832853
424,148,4.0,2.9555023
2456,148,2.0,3.359384
3841,463,3.0,2.44702
4858,463,3.0,1.7236093


In [14]:
# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

# Display using `displayHTML`
displayHTML("<span style='font-size:14pt;color:purple'>The Root Mean Square Error is %s</span>" % str(rmse))

In [15]:
# Ensure the columns are the same order as the model is expecting
myRatings = myRatingsDF.select("user_id", "movie_id", "rating")

# Run the same ALS model on my personalized Ratings
myPersonalizedMovies = model.transform(myRatings).dropna()
myPersonalizedMovies.createOrReplaceTempView("myPersonalizedMovies")

# Display the model's predicted ratings
display(myPersonalizedMovies)

user_id,movie_id,rating,prediction
0,1307,5.0,4.5559483
0,1208,5.0,4.6152587
0,2054,4.0,3.5948129
0,39,4.0,4.2730675
0,736,3.0,3.1322343
0,648,4.0,3.6775858
0,2395,4.0,3.8880582
0,3751,3.0,3.2379358
0,1221,4.0,4.3292947
0,2406,4.0,4.370348


In [16]:
# Evaluate model for my personalized ratings movies
my_rmse = evaluator.evaluate(myPersonalizedMovies)

# Display using `displayHTML`
displayHTML("<span style='font-size:14pt;color:purple'>The Root Mean Square Error is %s</span>" % str(my_rmse))

In [17]:
# Take the list of the movies you already rated (myRatings), the list of most_rated_movies (removing the ones you already rated)
# and assign them to yourself (user_id = 0) 
sqlQuery = """
select null as user_id, null as movie_id, null as rating union all 
select user_id, movie_id, rating from myRatings union all
select cast(0 as bigint) as user_id, movie_id, cast(0 as float) as rating from most_rated_movies where movie_id not in (select movie_id from myRatings)
"""
most_rated_movies_n = spark.sql(sqlQuery)
most_rated_movies_n.createOrReplaceTempView("most_rated_movies_n")

# Applying and then removing NULL to ensure original ratings and most_rated_movies_0 schema match
most_rated_movies_0 = spark.sql("select * from most_rated_movies_n where user_id = 0")

# Re-apply our ALS model for all movies
movies_predicted_0 = model.transform(most_rated_movies_0).dropna()
movies_predicted_0.createOrReplaceTempView("movies_predicted_0")
display(movies_predicted_0)

user_id,movie_id,rating,prediction
0,1580,0.0,3.9885874
0,3175,0.0,4.2089124
0,858,0.0,4.529265
0,1127,0.0,4.353883
0,1721,0.0,3.5465503
0,1270,0.0,4.271156
0,1265,0.0,4.316811
0,588,0.0,3.872023
0,296,0.0,3.9212306
0,2396,0.0,4.551294


In [18]:
%sql
-- Show Your Top 10 movies
select m.name, f.prediction
  from movies_predicted_0 f
    inner join most_rated_movies m
      on m.movie_id = f.movie_id
  order by f.prediction desc
  limit 10

name,prediction
As Good As It Gets,5.2447743
Good Will Hunting,5.2286706
Thelma & Louise,5.104697
"Crying Game, The",4.981147
Full Metal Jacket,4.9566545
Seven,4.9440475
"Shawshank Redemption, The",4.934167
Sling Blade,4.9324512
Twelve Monkeys,4.9319396
Run Lola Run,4.9237776
