## BUILDING A MOVIE RECOMMENDATION ENGINE WITH SPARK MLlib

#### Launch Your Spark Application & Session

In [None]:
sc

<hr style="border:1px solid blue"></hr>

### MEET MOVIE LENS DATASET 

- Please take time to get acquainted with the dataset :   
  https://grouplens.org/datasets/movielens/  
  http://files.grouplens.org/datasets/movielens/ml-latest-README.html  
    
    
- Before writing dataframe queries, you need to understand your dataset  
  Some valuable information in the description might save you some time trying to figure things out for yourself

### LOADING DATA

#### Download Dataset & Load into HDFS

```
sudo su 
cd 
mkdir mov
cd mov

wget http://files.grouplens.org/datasets/movielens/ml-latest.zip
unzip ml-latest.zip
rm -f ml-latest.zip

usermod -a -G hadoop root
hdfs dfs -mkdir /user/root/data/
hdfs dfs -mkdir /user/root/data/MOV/
hdfs dfs -mkdir /user/root/data/MOV/CSV

hdfs dfs -put ml-latest/* /user/root/data/MOV/CSV
```

- Check you files are loaded correctly in HDFS 
```
hdfs dfs -ls /user/root/data/MOV/CSV
```

- You should have the following output :

```
[root@ip-172-31-17-80 mov]# hdfs dfs -ls /user/root/data/MOV/CSV
Found 7 items
-rw-r--r--   1 root hadoop       9784 2020-11-26 23:57 /user/root/data/MOV/CSV/README.txt
-rw-r--r--   1 root hadoop  414851573 2020-11-26 23:57 /user/root/data/MOV/CSV/genome-scores.csv
-rw-r--r--   1 root hadoop      18103 2020-11-26 23:57 /user/root/data/MOV/CSV/genome-tags.csv
-rw-r--r--   1 root hadoop    1267039 2020-11-26 23:57 /user/root/data/MOV/CSV/links.csv
-rw-r--r--   1 root hadoop    2858223 2020-11-26 23:57 /user/root/data/MOV/CSV/movies.csv
-rw-r--r--   1 root hadoop  759200511 2020-11-26 23:57 /user/root/data/MOV/CSV/ratings.csv
-rw-r--r--   1 root hadoop   39744990 2020-11-26 23:57 /user/root/data/MOV/CSV/tags.csv
```

### INSTALL PANDAS

- From the SSH Terminal 

````
pip install pandas
````

Repeat this step if you need any extra/missing other python library

#### Perform a few useful imports


In [None]:
from pyspark.sql.types import *
import pandas as pd
pd.options.display.max_columns = None
from pyspark.sql.functions import *

#### Define schema for Movies Dataset


In [None]:
moviesStruct = [StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)]

moviesSchema = StructType(moviesStruct)

#### Read Movies dataframe from HDFS

In [None]:
# Read movies from HDFS as CSV (FIRST TIME ONLY)

moviesDF = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .schema(moviesSchema) \
    .load("hdfs:///user/root/data/MOV/CSV/movies.csv")

#### Write Movies Dataframe to Parquet File

In [None]:
moviesDF.write.parquet("hdfs:///user/root/data/MOV/PARQUET/movies.parquet")

#### Reload Movies Dataframe From Parquet File 

In [None]:
# (everytime after the first export to PARQUET (in case you restart your kernel/notebook/session))

moviesDF = spark.read.parquet("hdfs:///user/root/data/MOV/PARQUET/movies.parquet")

#### Make Use of Caching Features

In [None]:
# caching might be of great help for crossvalidation -among others
# read caching comment for ratingsDF below

moviesDF.cache()

#### Define schema for Ratings Dataset

In [None]:
# Define schema for ratings dataset
ratingsStruct = [StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", IntegerType(), True)]

ratingsSchema = StructType(ratingsStruct)

#### Read Ratings Dataframe from HDFS

In [None]:
# Read ratings from HDFS (FIRST TIME ONLY)
ratingsDF = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .schema(ratingsSchema) \
    .load("hdfs:///user/root/data/MOV/CSV/ratings.csv")

ratingsDF.limit(10).toPandas()

#### Write Ratings Dataframe to Parquet File

In [None]:
ratingsDF.write.parquet("hdfs:///user/root/data/MOV/PARQUET/ratings.parquet")

#### Reload Ratings Dataframe From Parquet File 

In [None]:
# LOAD RATINGS From Parquet File (everytime after the first export to PARQUEY)

ratingsDF = spark.read.parquet("hdfs:///user/root/data/MOV/PARQUET/ratings.parquet").drop("timestamp")

#### Make Use of Caching Features

In [None]:
# Caching might be of great help - especially for crossvalidation -among others
# it is recommended for RDD re-use in iterative machine learning applications
# Check the size of your data on disk, and the total memory available to spark
# to see how much of your data fits into memory
# If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. 

ratingsDF.cache()

<hr style="border:1px solid blue"></hr>

### Data Exploration & Filtering

#### Select Users Dataframe

In [None]:
# Unique Users Id :
usersDF = ratingsDF.select("userId").distinct()

In [None]:
# Total User Count

usersDF.count()

In [None]:
usersDF.columns

#### Group Rating Count by Users

In [None]:
ratingsDF.groupBy("userId").count().show(10, False)

In [None]:
# Summary 
# Got 1000209 ratings from 6040 users on 3883 movies.
print("Got {} ratings from {} users on {} movies.".format(ratingsDF.count(), usersDF.count(), moviesDF.count()))

In [None]:
ratingsDF.columns

#### Select Max User ID 
This will be useful for user incrementation

In [None]:
ratingsDF.agg({"userId": "max"}).collect()[0][0]

In [None]:
ratingsDF.schema

#### Create Ratings View

In [None]:
ratingsDF.createTempView("RATING")

#### Use SQL to select Ratings for a specific User

In [None]:
%%sql
select * from RATING where userId = 283228

In [None]:
%%sql
select count(*) from RATING where userId = 283228

#### Create a Sample Dataframe 
(example)

In [None]:
sampleDF = moviesDF.sample(fraction=0.001)

In [None]:
sampleDF.count()

<hr style="border:1px solid blue"></hr>

### Exploring Movies Dataset 

Genres are a pipe-separated list,   
and are selected from the following:

Action  
Adventure  
Animation  
Children's  
Comedy  
Crime  
Documentary  
Drama  
Fantasy  
Film-Noir  
Horror  
Musical  
Mystery  
Romance  
Sci-Fi  
Thriller  
War  
Western  
(no genres listed)  

#### Select Comedy Movies Only


In [None]:
moviesDF.filter(moviesDF.genres.contains('Comedy')).limit(10).toPandas()

#### Display the TOP rated Comedy Movies (not aggregated)


In [None]:
moviesDF.filter(moviesDF.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .sort(col("rating").desc()) \
    .show(20, truncate=False)

#### Count of Comedy Movies Grouped By Rating 

In [None]:
moviesDF.filter(moviesDF.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .groupBy(col("rating")).count().orderBy("rating") \
    .show(20, truncate=False)

#### MOST Rated Comedy Movies - No matter the rating

In [None]:
moviesDF.filter(moviesDf.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .groupBy(col("movieId")).count().orderBy("count", ascending=False) \
    .show(20, truncate=False)

#### Most Rated Comedy Movies, grouped by Movie & Rating


In [None]:
moviesDF.filter(moviesDf.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .groupBy(col("movieId"),col("title"), col("rating")).count().orderBy("count", ascending=False) \
    .show(20, truncate=False)

In [None]:
from pyspark.sql.functions import *

#### Top Rated Comedy Movies with most ratings

In [None]:
genreMovieDF = moviesDF.filter(moviesDF.genres.contains("insert_your_preferred_genre_here"))
    
moviesByRating_counts = genreMovieDF \
        .join(ratingsDF, "movieId") \
        .groupBy("movieId").count().alias("count").orderBy(desc("count"))
        #.groupBy("movieId").count().alias("ratings count").orderBy(desc("count"))

moviesByRating_Full = genreMovieDF.join(moviesByRating_counts, "movieId") \
                                  .dropDuplicates().orderBy(desc("count"))
    
movieByRating_full.show(20)

In [None]:
movieByRating_Full.count()

In [None]:
movieByRating_Full.coalesce(1) \
      .write \
      .option("header","true") \
      .option("sep",",") \
      .mode("overwrite") \
      .csv("file:///path/output/file") \

#### Get User Input

In [None]:
# This Cell does not work on Jupyter unfortunately 
# due to Pyspark limited Python libraries integration
# This need py4j integration and a call to scala/java
# code for that is provided as a standalone python program in demo


for row in sampleDF.rdd.collect():
    print("Please rate the following movie (1-5 (best), or 0 if not seen):\n" + row.title + ":")
    rate = int(input())

#### Other Request Examples

In [None]:
joinDF = moviesDF.join(ratingsDF, "movieId")
joinDF.show(10, False)

In [None]:
ratingsDF.distinct().groupBy("rating").count()

<hr style="border:1px solid blue"></hr>

### MODELING PART

### Model Training

### Splitting data into training & test sets

In [None]:

trainingDF,testDF = ratingsDF.randomSplit([0.8, 0.2], seed=12345)

# Got 1000209 ratings from 6040 users on 3883 movies.
#print("Training {}, test {}.".format(trainingDF.count(), testDF.count()))

### Timing Utility

In [None]:
from time import time
from datetime import timedelta

class T():
    def __enter__(self):
        self.start = time()
    def __exit__(self, type, value, traceback):
        self.end = time()
        elapsed = self.end - self.start
        print(str(timedelta(seconds=elapsed)))

In [None]:
trainingDF.cache()

In [None]:
testDF.cache()

In [None]:
trainingDF.select("userId").distinct().count()

In [None]:
testDF.select("userId").distinct().count()

At this point it is interesting to see that there might be users on the test dataset  
that have no occurence in the training dataset, as the figures below can possibly suggest

### Training ALS model on the data


In [None]:

from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5,
          regParam=0.01, 
          implicitPrefs=False, 
          userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating", 
          coldStartStrategy="drop")

In [None]:
# Fit Model (just a test - DO NOT RUN if you still need to cross-validate)

with T():
    model = als.fit(trainingDF)

<hr style="border:1px solid blue"></hr>

### Cross Validation & Model Selection

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# WARNING : THIS CAN ESCALATE VERY QUICKLY, EVEN WITH ONLY TWO MODELS TO BE TESTED
# UNLESS YOU ARE ABLE TO NARROW DOWN AND LOWER ITERATIONS CONSIDERABLY, 
# YOU WOULD BETTER AVOID THIS STEP ALL TOGETHER

# NOTE THAT USING AN RMSE MINIMIZING LOOP CAN ALSO DO THE JOB

#param_grid = ParamGridBuilder() \
#            .addGrid(als.rank, [10, 50, 100, 150]) \
#            .addGrid(als.regParam, [.01, .05, .1, .15]) \
#            .build()

#param_grid = ParamGridBuilder() \
#            .addGrid(als.rank, [50, 100]) \
#            .addGrid(als.regParam, [.05, .1]) \
#            .build()

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [50]) \
            .addGrid(als.regParam, [.05]) \
            .build()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 

print ("Num models to be tested: ", len(param_grid))

In [None]:
cv = CrossValidator(estimator=als, \
                    estimatorParamMaps=param_grid, \
                    evaluator=evaluator, \
                    numFolds=5
                    parallelism=4) # this last parallelism param is crucial for increasing performance

### Fit The Cross Validator 
#### *(Optional - VERY Time Consuming)*

In [None]:

# WARNING : Running this from a notebook hooked to Spark through Livy 
# will end up in a timeout after waiting for a long time
# the job will still be running but it will be abruptly be killed due to a timeout setting
# you need to setup livy.server.session.timeout and extend it to 1h  in livy.conf on the server, and restart livy
# livy.server.session.state-retain.sec = 600s => move to 3600s 
# location : /etc/livy/conf/livy.conf

# check spark web UI
# check generated DAGs
# check memory usage

cvModel = cv.fit(trainingDF)

<hr style="border:1px solid blue"></hr>

### Model Evaluation

In [None]:
# Evaluate the model by computing the RMSE on the Rating Predictions established for test data

predictions = model.transform(testDF)


In [None]:
predictions.columns

In [None]:
trainingDF.filter(testDF.userId == "12").show()

In [None]:
testDF.filter(testDF.userId == "12").show()

In [None]:
testDF.show(10, False)

In [None]:
testDF.groupBy("userId").count().show(10, False)

In [None]:
with T():
    predictions.show(10, False)

In [None]:
predictions.groupBy("userId").count().show(10, False)

### Evaluate using Regression Evaluator

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

# Lower values of RMSE indicate better fit

rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

<hr style="border:1px solid blue"></hr>

### Model Utilization

#### Generate recommendations for each user

In [None]:
# CAUTION : This takes too much time to compute
# Use subsets instead (cell below)

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

#### Generate recommendations for a subset of user

In [None]:
# Generate top 10 movie recommendations for a specified set of users
users = ratingsDF.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

# Generate top 10 user recommendations for a specified set of movies
movies = ratingsDF.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [None]:
userSubsetRecs.show(10, False)

In [None]:
userSubsetRecs.limit(10).toPandas()

<hr style="border:1px solid blue"></hr>

### Data Formatting

In [None]:
# Get model recommendation for users based on their rating 
# You will notice that the Returned Dataframe has nested values 
# DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]
# You will also notice that predicted ratings are all over the place
# This is not strictly an issue as it is not used "directly" and it can be ignored. 
# For more info see: https://stackoverflow.com/q/29051520/426332

recoDF = userSubsetRecs.limit(10)
#.toPandas()

#### Data Formatting & Join for staging & presentation

In [None]:

# Denormalize Recommendation Dataframe & explode the inner list so it can be selectable as a column   
# and filter out the rating colums as we do not need it

recoDF.select(col("userId"),explode(col("recommendations"))) \
      .select(col("userId"),col("col.movieId")) \
      .join(moviesDF,"movieId") \
      .orderBy("userId") \
      .select("userId", "movieId", "title") \
      .toPandas()

In [None]:
movieSubSetRecs.show(10, False)

In [None]:
from pyspark.sql.functions import col

# Recommendations for a specific user
aUserId = 12
recommandations = userRecs.filter(col("userId") == aUserId)
recommandations.show(1, False)

In [None]:
from pyspark.sql.functions import explode

# Let's flatten the movie recommandations and look in detail
userRecommandations = recommandations.select(
  explode(col("recommendations").movieId).alias("movieId")
)

print("Recommandations for user {} :".format(aUserId))

moviesDF.join(userRecommandations, "movieId").show(10, False)

In [None]:
# Ratings from the user

ratingsDF.filter(col("userId") == aUserId) \
    .join(moviesDF, "movieId") \
    .sort(col("rating").desc()) \
    .show(100, truncate=False)

<hr style="border:1px solid blue"></hr>

### Log Model Experiments with MLFlow


- **Before You do Anything :**
#### Make SURE to read CAREFULLY the MLFlow README File first
#### Make SURE you have successfully installed MLFlow by following the instructions  
<br>

- **WARNING :** Missing Any Small Step might result in corrupting your environment.   

- **IN CASE OF DOUBT :** Call the Trainer in. 


In [None]:
import mlflow
import mlflow.spark

In [None]:
mlflow.set_tracking_uri("file:///mlruns")

In [None]:
mlflow.start_run()

In [None]:
mlflow.spark.log_model(model, "ALSmodel_Lite8")

In [None]:
print("Model saved in run %s" % mlflow.active_run().info.run_uuid)

In [None]:
mlflow.end_run()

**Run MLFlow UI to check your model**
```
mlflow ui --backend-store-uri /mlrun --host 0.0.0.0&
```

go to http://YOUR.IP.ADD.RESS:5000 :

### Serve Model with MLFlow

```
# Console commands
# Serve & Curl Request EXAMPLES  
(Make SURE to read carefully the MLFLOW README FILE first)

# Please understand the syntax and adapt it accordingly to your case
# Mind you copy pasting !

mlflow models serve -m /mlruns/0/6c8050941d0744b8ac3652ff22d40983/artifacts/ALSmodel_Lite2 -h 0.0.0.0 --port 9999 --no-conda


curl -X POST localhost:9999/invocations -H 'Content-Type: application/json; format=pandas-split' -d '{"columns":["userId","movieId","rating"], "data":[[1311,144210,5,1604450652]]}'  

curl --request POST http://localhost:9999/invocations --header 'Content-Type: application/json; format=pandas-split' --data @mlf_data.json

curl -X POST http://localhost:9999/invocations -H 'Content-Type: application/json; format=pandas-split' -d @mlf_data.json
```

