## CASE STUDY - Deploying a recommender

We have seen the movie lens data on a toy dataset now lets try something a little bigger.  You have some
choices.

* [MovieLens Downloads](https://grouplens.org/datasets/movielens/latest/)

If your resources are limited (your working on a computer with limited amount of memory)

> continue to use the sample_movielens_ranting.csv

If you have a computer with at least 8GB of RAM

> download the ml-latest-small.zip

If you have the computational resources (access to Spark cluster or high-memory machine)

> download the ml-latest.zip

The two important pages for documentation are below.

* [Spark MLlib collaborative filtering docs](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) 
* [Spark ALS docs](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS)


In [1]:
import os
import shutil
import pandas as pd
import numpy as np
import pyspark as ps
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.types import DoubleType

In [2]:
## ensure the spark context is available
spark = (ps.sql.SparkSession.builder
        .appName("sandbox")
        .getOrCreate()
        )

sc = spark.sparkContext
print(spark.version) 

2.4.4


In [3]:
## note that this solution uses ml-latest.zip
data_dir = os.path.join("..","data","ml-latest")
ratings_file = os.path.join(data_dir,"ratings.csv")
movies_file = os.path.join(data_dir,"movies.csv")                            
if not os.path.exists(ratings_file):
    print("ERROR make sure the path to the ratings file is correct")

In [4]:
## load the data
df = spark.read.format("csv").options(header="true",inferSchema="true").load(ratings_file)
df.show(n=4)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
+------+-------+------+----------+
only showing top 4 rows



In [5]:
movies_df = pd.read_csv(movies_file)
movies_df.rename(columns={"movieId": "movie_id"},inplace=True)
movies_df.head(5)

Unnamed: 0,movie_id,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


## QUESTION 1

Explore the movie lens data a little and summarize it

In [6]:
## YOUR CODE HERE (summarize the data)
df = df.withColumnRenamed("movieID", "movie_id")
df = df.withColumnRenamed("userID", "user_id")
df.describe().show()
print('Unique users: {}'.format(df.select('user_id').distinct().count()))
print('Unique movies: {}'.format(df.select('movie_id').distinct().count()))
print('Movies with Rating > 2: {}'.format(df.filter('rating > 2').select('movie_id').distinct().count()))
print('Movies with Rating > 3: {}'.format(df.filter('rating > 3').select('movie_id').distinct().count()))
print('Movies with Rating > 4: {}'.format(df.filter('rating > 4').select('movie_id').distinct().count()))

+-------+------------------+-----------------+------------------+--------------------+
|summary|           user_id|         movie_id|            rating|           timestamp|
+-------+------------------+-----------------+------------------+--------------------+
|  count|          27753444|         27753444|          27753444|            27753444|
|   mean|141942.01557064414|18487.99983414671|3.5304452124932677|1.1931218549319258E9|
| stddev| 81707.40009148757|35102.62524746828| 1.066352750231982|2.1604822852233925E8|
|    min|                 1|                1|               0.5|           789652004|
|    max|            283228|           193886|               5.0|          1537945149|
+-------+------------------+-----------------+------------------+--------------------+

Unique users: 283228
Unique movies: 53889
Movies with Rating > 2: 50735
Movies with Rating > 3: 43107
Movies with Rating > 4: 29374


## QUESTION 2

Find the ten most popular movies---that is the then movies with the highest average rating

>Hint: you may want to subset the movie matrix to only consider movies with a minimum number of ratings

In [7]:
## YOUR CODE HERE

## get the top rated movies with more than 100 ratings
movie_counts = df.groupBy("movie_id").count()
top_rated = df.groupBy("movie_id").avg('rating')
top_rated = top_rated.withColumnRenamed("movie_id", "movie_id_2")
top_movies = top_rated.join(movie_counts, top_rated.movie_id_2 == movie_counts.movie_id)
top_movies = top_movies.filter('count>100').orderBy('avg(rating)',ascending=False).drop("movie_id_2")
top_movies = top_movies.toPandas()

## add the movie titles to data frame
movie_ids = top_movies['movie_id'].values
inds = [np.where(movies_df['movie_id'].values==mid)[0][0] for mid in movie_ids]
top_movies["title"] = movies_df['title'].values[inds]

top_movies.head(10)

Unnamed: 0,avg(rating),movie_id,count,title
0,4.486518,171011,853,Planet Earth II (2016)
1,4.458092,159817,1384,Planet Earth (2006)
2,4.424188,318,97999,"Shawshank Redemption, The (1994)"
3,4.399898,170705,984,Band of Brothers (2001)
4,4.350559,174053,1074,Black Mirror: White Christmas (2014)
5,4.343949,171495,157,Cosmos
6,4.339667,172591,421,The Godfather Trilogy: 1972-1990 (1992)
7,4.332893,858,60904,"Godfather, The (1972)"
8,4.291959,50,62180,"Usual Suspects, The (1995)"
9,4.263889,176601,180,Black Mirror


## QUESTION 3

Compare at least 5 different values for the ``regParam``

Use the `` ALS.trainImplicit()`` and compare it to the ``.fit()`` method.  See the [Spark ALS docs](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS)
for example usage. 

In [13]:
## YOUR CODE HERE

(training, test) = df.randomSplit([0.8, 0.2])

def train_model(reg_param,implicit_prefs=False):
    als = ALS(maxIter=5, regParam=reg_param, userCol="user_id", 
              itemCol="movie_id", ratingCol="rating",
              coldStartStrategy="drop",implicitPrefs=implicit_prefs)
    model = als.fit(training)

    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")

    rmse = evaluator.evaluate(predictions)
    print("regParam={}, RMSE={}".format(reg_param,np.round(rmse,2)))


In [9]:
for reg_param in [0.01, 0.05, 0.1, 0.15, 0.25]:
    train_model(reg_param)

regParam=0.01, RMSE=0.84
regParam=0.05, RMSE=0.83
regParam=0.1, RMSE=0.83
regParam=0.15, RMSE=0.84
regParam=0.25, RMSE=0.87


## QUESTION 4

With your best regParam try using the `implicitPrefs` flag.

>Note that the results here make sense because the data are `explicit` ratings

In [15]:
## YOUR CODE HERE

train_model(0.1, implicit_prefs=True)

regParam=0.1, RMSE=3.26


## QUESTION 5

Use model persistence to save your finalized model

In [11]:
## YOUR CODE HERE

## re-train using the whole data set
print("...training")
als = ALS(maxIter=5, regParam=0.1, userCol="user_id", 
          itemCol="movie_id", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(df)

## save the model for furture use
save_dir = "saved-recommender"
if os.path.isdir(save_dir):
    print("...overwritting saved model")
    shutil.rmtree(save_dir)

## save the top-ten movies
print("...saving top-movies")
top_movies[:10000].to_csv("top-movies.csv",index=False)
    
## save model
model.save(save_dir)
print("done.")

...training
...overwritting saved model
...saving top-movies
done.


## QUESTION 6

Use ``spark-submit`` to load the model and demonstrate that you can load the model and interface with it.

In [12]:
## YOUR CODE HERE

## see recommender-submit.py