## CASE STUDY - Deploying a recommender

For this lab we will be using the MovieLens data :

* [MovieLens Downloads](https://grouplens.org/datasets/movielens/latest/)

The two important pages for documentation are below.

* [Spark MLlib collaborative filtering docs](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) 
* [Spark ALS docs](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS)


In [1]:
import os
import shutil
import pandas as pd
import numpy as np
import pyspark as ps
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.sql.types import DoubleType

DATA_DIR = os.path.join("/Users/mayayozhikova/Downloads/Model_Deployment-case-study", "data")
SAVE_DIR = os.path.join("/Users/mayayozhikova/Downloads/Model_Deployment-case-study", "saved-recommender")

if os.path.isdir(SAVE_DIR):
    shutil.rmtree(SAVE_DIR)

In [2]:
## ensure the spark context is available
spark = (ps.sql.SparkSession.builder
        .appName("sandbox")
        .getOrCreate()
        )

sc = spark.sparkContext
print(spark.version) 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/13 15:39:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/13 15:39:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
3.3.1


### Ensure the data are downloaded, unziped and placed in the data folder of this lab.


In [3]:
movielens_data_dir = os.path.join(DATA_DIR, "ml-latest-small")        
if not os.path.exists(movielens_data_dir):
    print("ERROR make sure the path to the Movie Lens data is correct")

In [4]:
## load the ratings data as a pysaprk dataframe
ratings_file = os.path.join(movielens_data_dir, "ratings.csv")
df = spark.read.format("csv").options(header="true", inferSchema="true").load(ratings_file)
df = df.withColumnRenamed("movieID", "movie_id")
df = df.withColumnRenamed("userID", "user_id")
df.show(n=4)

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       1|   4.0|964982703|
|      1|       3|   4.0|964981247|
|      1|       6|   4.0|964982224|
|      1|      47|   5.0|964983815|
+-------+--------+------+---------+
only showing top 4 rows



In [5]:
## load the movies data as a pyspark dataframe
movies_file = os.path.join(movielens_data_dir, "movies.csv") 
movies_df = spark.read.format("csv").options(header="true", inferSchema="true").load(movies_file)
movies_df = movies_df.withColumnRenamed("movieID", "movie_id")
movies_df.show(n=4)


+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|Adventure|Animati...|
|       2|      Jumanji (1995)|Adventure|Childre...|
|       3|Grumpier Old Men ...|      Comedy|Romance|
|       4|Waiting to Exhale...|Comedy|Drama|Romance|
+--------+--------------------+--------------------+
only showing top 4 rows




Explore the movie lens data a little and summarize it.

In [15]:
df_p = df.toPandas()
movies_df_p = movies_df.toPandas()
df_p.describe()

print(f"Unique users:{df_p['user_id'].nunique()}")
print(f"Unique movies:{movies_df_p.movie_id.nunique()}")
df_p['rating_mean'] = df_p.groupby('movie_id').apply(lambda x: x['rating'].mean())
print(f"Movies with rating > 2:{df_p[df_p['rating_mean'] > 2].movie_id.nunique()}")
print(f"Movies with rating > 3:{df_p[df_p['rating_mean'] > 3].movie_id.nunique()}")
print(f"Movies with rating > 4:{df_p[df_p['rating_mean'] > 4].movie_id.nunique()}")

Unique users:610
Unique movies:9742
Movies with rating > 2:2983
Movies with rating > 3:2442
Movies with rating > 4:776


In [11]:
df.describe()

DataFrame[summary: string, user_id: string, movie_id: string, rating: string, timestamp: string]


### Find the ten most popular movies. 


1. Create 2 pyspark dataframes one with the count of each film in df and one with the average rating of each movie in df.
2. Join these two dataframes in a third dataframe. Then, filter this dataframe to select only the movies that have been seen more than 100 times.
3. Use the movies_df dataframe to add the names of each movies on the dataframe created in 2. Then, order the dataframe by descending average rating.



In [32]:

## 1_
movie_counts = df_p.groupby('movie_id', as_index = False).apply(lambda x: x['user_id'].count())
top_rated = df_p.groupby('movie_id', as_index = False).apply(lambda x: x['rating'].mean())

## 2_
top_movies = pd.merge(movie_counts, top_rated, how = 'inner', on = 'movie_id',suffixes=("_x", "_y"))

top_movies = top_movies.rename(columns = {'None_x':'count','None_y':'avg(rating)'})
top_movies = pd.merge(top_movies, movies_df_p, how = 'inner', on = 'movie_id')

## 3_
top_movies = top_movies[top_movies['count'] > 100]
top_movies = spark.createDataFrame(top_movies)

top_movies.show(10)

[Stage 30:>                                                         (0 + 1) / 1]

+--------+-----+------------------+--------------------+--------------------+
|movie_id|count|       avg(rating)|               title|              genres|
+--------+-----+------------------+--------------------+--------------------+
|       1|  215|3.9209302325581397|    Toy Story (1995)|Adventure|Animati...|
|       2|  110|3.4318181818181817|      Jumanji (1995)|Adventure|Childre...|
|       6|  102| 3.946078431372549|         Heat (1995)|Action|Crime|Thri...|
|      10|  132| 3.496212121212121|    GoldenEye (1995)|Action|Adventure|...|
|      32|  177| 3.983050847457627|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
|      34|  128|        3.65234375|         Babe (1995)|      Children|Drama|
|      39|  104| 3.293269230769231|     Clueless (1995)|      Comedy|Romance|
|      47|  203|3.9753694581280787|Seven (a.k.a. Se7...|    Mystery|Thriller|
|      50|  204| 4.237745098039215|Usual Suspects, T...|Crime|Mystery|Thr...|
|     110|  237| 4.031645569620253|   Braveheart (1995)|    Acti

                                                                                

## ALS model

We will now fit a ALS model, this is matrix factorization model used for rating recommendation. See the [Spark ALS docs](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS)
for example usage. 

First we split the data

In [33]:
(training, test) = df.randomSplit([0.8, 0.2])

Create a function called **train_model()** that takes two inputs :

1. ``reg_param`` : the regularization parameter of the factorization model
2. ``implicit_prefs`` : a boolean variable that indicate whereas the model should used explicit or implicit ratings.
    
The function train an ALS model on the training set then predict the test set and evaluate this prediction.
The output of the function should be the RMSE of the fitted model on the test set./

In [43]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


def train_model(reg_param, implicit_prefs=False):
    """
    Train and evaluate an ALS model
    Inputs : the regularization parametre of the ALS model and the implicit_prefs flag
    Ouptus : a string with the RMSE and the regularization parameter inputed
    """
    
    als = ALS(regParam = reg_param, implicitPrefs = implicit_prefs, userCol = 'user_id', 
               itemCol = 'movie_id', ratingCol = 'rating', coldStartStrategy = 'drop')
    model = als.fit(training)

    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName = 'rmse', labelCol= 'rating', predictionCol = 'prediction')

    rmse = evaluator.evaluate(predictions)
    print("regParam={}, RMSE={}".format(reg_param, np.round(rmse, 2)))
    return model


Calling the function created above for several ``reg_param`` values find the best regularization parameter.

In [41]:
for reg_param in [0.01, 0.05, 0.1, 0.15, 0.25]:
    train_model(reg_param)

                                                                                

regParam=0.01, RMSE=1.12


                                                                                

regParam=0.05, RMSE=0.95


                                                                                

regParam=0.1, RMSE=0.89


                                                                                

regParam=0.15, RMSE=0.88


                                                                                

regParam=0.25, RMSE=0.9



With best regParam try using the `implicitPrefs` flag.

>Note that the results here make sense because the data are `explicit` ratings

In [44]:
train_model(reg_param = 0.15, implicit_prefs=True)

                                                                                

regParam=0.15, RMSE=3.23


ALSModel: uid=ALS_7ce0acfa248f, rank=10

In [45]:
model = train_model(reg_param = 0.15,implicit_prefs=False)

                                                                                

regParam=0.15, RMSE=0.88


Recommendation examples

In [46]:
user_recs = model.recommendForAllUsers(10)
user_recs.show(4)



+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{170355, 5.76992...|
|      3|[{70946, 4.874582...|
|      5|[{132333, 4.67290...|
|      6|[{3153, 5.0950294...|
+-------+--------------------+
only showing top 4 rows



                                                                                

In [47]:
movie_recs = model.recommendForAllItems(10)
movie_recs.show(4)



+--------+--------------------+
|movie_id|     recommendations|
+--------+--------------------+
|      26|[{43, 4.9194374},...|
|      27|[{543, 5.331378},...|
|      28|[{53, 5.536298}, ...|
|      31|[{53, 4.579166}, ...|
+--------+--------------------+
only showing top 4 rows



                                                                                

Retrain the model with best ``reg_param`` and ``implicit_prefs`` on the entire dataset and save the trained model in the SAVE_DIR directory.

In [48]:
## YOUR CODE HERE (Replace the symbole #<> with your code)

## re-train using the whole data set
print("...training")
als = ALS(regParam = 0.15, implicitPrefs = False, userCol = 'user_id', 
               itemCol = 'movie_id', ratingCol = 'rating', coldStartStrategy = 'drop')
model = als.fit(training)
 
## save model
print("...saving als model")
model.save(SAVE_DIR)  
print("done.")

...training
...saving als model


[Stage 2255:>                                                      (0 + 8) / 10]

22/11/13 17:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/11/13 17:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/11/13 17:09:12 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

done.


We now want to use ``spark-submit`` to load the model and demonstrate that you can load the model and interface with it.

Following the best practices we created a python script (``recommender-submit.py``) in the **scripts** folder that loads the model, creates some hand crafted data points and query the model.

In [50]:
! python /Users/mayayozhikova/Downloads/Model_Deployment-case-study/scripts/recommender-submit.py

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/13 17:15:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/13 17:15:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/11/13 17:15:31 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
best rated [(260,), (2628,), (1196,), (122886,), (187595,), (179819,), (1210,)]
closest_users                                                                   
 [(53,), (543,), (276,), (452,), (99,), (93,), (97,), (475,), (515,), (30,), (1,), (523,), (389,), (12,), (106,), (25,), (122,), (154,), (267,), (371,), (169,), (340,), (52,), (69,), (246,), (80,), (573,), (450,), (574,), (519,), (435,), (491,), (336,), (31,), (171,), (527,), (348,), (224,), (327,), (486,), (413,), (186,), (46,), (585,), (45,), (337,), (555,), (459,