# Building a Movie Recommendation System in PySpark - Lab Code-along
![images of vhs tapes on shelf](img/movies.jpg)

## Introduction

In this last lab, we will implement a a movie recommendation system using Alternating Least Squares (ALS) in the Spark programming environment.<br> Spark's machine learning libraray `ml` comes packaged with a very efficient imeplementation of the ALS algorithm. 

The lab will require you to put into practice your spark programming skills for creating and manipulating pyspark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and pyspark using the MovieLens Dataset.

Note: You are advised to refer to [PySpark Documentation](http://spark.apache.org/docs/2.2.0/api/python/index.html) heavily for completing this lab as it will introduce a few new methods. 

In [1]:
pwd

'/home/jovyan/work'

## Objectives

You will be able to:

* Identify the key components of the ALS 
* Demonstrate an understanding on how recommendation systems are being used for personalization of online services/products
* Parse and filter datasets into Spark DataFrame, performing basic feature selection
* Run a brief hyper-parameter selection activity through a scalable grid search
* Train and evaluate the predictive performance of recommendation system
* Generate predictions from the trained model

## Building a Recommendation System

We have seen how recommender/Recommendation Systems have played an  integral parts in the success of Amazon (Books, Items), Pandora/Spotify (Music), Google (News, Search), YouTube (Videos) etc.  For Amazon these systems bring more than 30% of their total revenues. For Netflix service, 75% of movies that people watch are based on some sort of recommendation.

> The goal of Recommendation Systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.

### We sort of get the concept

For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of recommendation system is such as this:

1.    User A watches Game of Thrones and Breaking Bad.
2.    User B performs a search query for Game of Thrones.
3.    The system suggests Breaking Bad to user B from data collected about user A.


This lab will guide you through a step-by-step process into developing such a movie recommendation system. We will use the MovieLens dataset to build a movie recommendation system using the collaborative filtering technique with Spark's Alternating Least Saqures implementation. After building that recommendation system, we will go through the process of adding a new user to the dataset with some new ratings and obtaining new recommendations for that user.

## Will Nightengale like Toy Story?

Collaborative filtering and matrix decomposition allows us to use the history of others ratings, along with the entire community of ratings, to answer that question.

![image1](img/collab.png)


## Person vs vegetable

It's important to realize that there are two sides to recommendation

![image2](img/item_user_based.png)

## Code for model

If we wanted, we could jump to the code right now to make this happen.

But would we understand it?
```
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS, ALSModel

als = ALS(
    rank=10,
    maxIter=10,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
)

als_model = als.fit(movie_ratings)
```

## Documentation Station

Let's explore the [documentation](http://spark.apache.org/docs/2.4.3/api/python/pyspark.ml.html#module-pyspark.ml.recommendation) together to maybe get a better idea of what is happening. 

- which parameters make sense?
- which are completely foreign?

## Rank

What's all this rank of the factorization business?<br>
[the source code documentation](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala) describes that variable as the "Rank of the feature matrices"

## Assumptions

Matrix decomposition is built on the theory that every individual (user, movie) score is actually the **dot product** of two separate vectors:
- user characteristics 
- movie characteristics

Wait, do you mean like gender, whether the movie is sci-fi or action? Do we have that data?

![beyonce-gif](img/beyonce.gif)

## Huh?
![what](img/what.gif)

In [2]:
2000 * 2000

4000000

In [5]:
(2000 * 100) + (2000 * 100)

400000

## The hidden matricies 
![image4](img/matrix_decomp.png)

## Embeddings

Embeddings are low dimensional hidden factors for items and users.

For e.g. say we have 5 dimensional (i.e., **rank** = 5) embeddings for both items and users (5 chosen randomly, this could be any number - as we saw with PCA and dim. reduction).

For user-X & movie-A, we can say those 5 numbers might represent 5 different characteristics about the movie e.g.:

- How much movie-A is political
- How recent is the movie
- How much special effects are in movie A
- How dialogue driven is the movie
- How linear is the narrative in the movie

In a similar way, 5 numbers in the user embedding matrix might represent:

- How much does user-X like sci-fi movies
- How much does user-X like recent movies … and so on.

But we have *no actual idea* what those factors actually represent.

### If we knew the feature embeddings in advance, it would look something like this:

In [6]:
import numpy as np

# the original matrix of rankings
R = np.array([[2, np.nan, np.nan, 1, 4],
       [5, 1, 2, np.nan, 2],
       [3, np.nan, np.nan, 3, np.nan],
       [1, np.nan, 4, 2, 1]])

# users X factors
P =np.array([[-0.63274434,  1.33686735, -1.55128517],
       [-2.23813661,  0.5123861 ,  0.14087293],
       [-1.0289794 ,  1.62052691,  0.21027516],
       [-0.06422255,  1.62892864,  0.33350709]])

# factors X items
Q = np.array([[-2.09507374,  0.52351075,  0.01826269],
       [-0.45078775, -0.07334991,  0.18731052],
       [-0.34161766,  2.46215058, -0.18942263],
       [-1.0925736 ,  1.04664756,  0.69963111],
       [-0.78152923,  0.89189076, -1.47144019]])

What about that `np.nan` in the third row, last column? How will that item be reviewed by that user?

In [7]:
print(P[2])
print(Q.T[:,4])
P[2].dot(Q.T[:,4])

[-1.0289794   1.62052691  0.21027516]
[-0.78152923  0.89189076 -1.47144019]


1.9401031341455333

## Wait, I saw a transpose in there - what's the actual formula?

Terms:<br>
$R$ is the full user-item rating matrix

$P$ is a matrix that contains the users and the k factors represented as (user,factor)

$Q^T$ is a matrix that contains the items and the k factors represented as

$r̂_{u,i}$ represents our prediction for the true rating $r_{ui}$ In order to get an individual rating, you must take the dot product of a row of P and a column of Q

for the entire matrix:
$$ R = PQ^T $$ 

or for individual ratings

$$r̂_{u,i}=q_i^⊤p_u $$ 





### Let's get the whole matrix!

In [8]:
P.dot(Q.T)

array([[ 1.99717984, -0.10339773,  3.80157388,  1.00522135,  3.96947118],
       [ 4.95987359,  0.99772807,  1.9994742 ,  3.08017572,  1.99887552],
       [ 3.00799117,  0.38437256,  4.30166793,  2.96747131,  1.94010313],
       [ 0.99340337, -0.02806164,  3.96943336,  2.00841398,  1.01228247]])

### Look at those results

Are they _exactly_ correct?
![check](img/check.gif)

## ALS benefit: Loss Function

The Loss function $L$ can be calculated as:

$$ L = \sum_{u,i ∈ \kappa}(r_{u,i}− q_i^T p_u)^2 + λ( ||q_i||^2 + |p_u||^2)$$

Where $\kappa$ is the set of (u,i) pairs for which $r_{u,i}$ is known.

To avoid overfitting, the loss function also includes a regularization parameter $\lambda$. We will choose a $\lambda$ to minimize the square of the difference between all ratings in our dataset $R$ and our predictions.

There's the **least squares** part of ALS, got it!

## So now we use gradient decent, right?

![incorrect](img/incorrect.gif)

### Here comes the alternating part

ALS alternates between holding the $q_i$'s constant and the $p_u$'s constant. 

While all $q_i$'s are held constant, each $p_u$ is computed by solving the least squared problem.<br>
After that process has taken place, all the $p_u$'s are held constant while the $q_i$'s are altered to solve the least squares problem, again, each independently.<br> 
This process repeats many times until you've reached convergence (ideally).

# Enough - let's get to the data

### Importing the Data
To begin with:
* initialize a SparkSession object
* import the dataset found at './data/ratings.csv' into a pyspark DataFrame

In [9]:
import pyspark

spark = (pyspark.sql.SparkSession.builder 
  .master("local[*]")
  .getOrCreate())

In [10]:
!ls data/

movies.csv  ratings.csv


In [11]:
!file data/ratings.csv

/bin/sh: 1: file: not found


In [12]:
!head data/ratings.csv

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041


In [13]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('data/ratings.csv',
                               inferSchema=True,
                               header=True)

Check the data types of each of the values to ensure that they are a type that makes sense given the column.

In [14]:
movie_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [15]:
movie_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



But if they were ever incorrectly assigned, here's how we fix it:

In [16]:
from pyspark.sql.types import (
    ArrayType,
    AtomicType,
    BinaryType,
    BooleanType,
    ByteType,
    CloudPickleSerializer,
    DataType,
    DataTypeSingleton,
    DateConverter,
    DateType,
    DatetimeConverter,
    DecimalType,
    DoubleType,
    FloatType,
    FractionalType,
    IntegerType,
    IntegralType,
    JavaClass,
    LongType,
    MapType,
    NullType,
    NumericType,
    Row,
    ShortType,
    SparkContext,
    StringType,
    StructField,
    StructType,
    TimestampType,
    UserDefinedType,
)

In [18]:
schema = StructType(
    [
        StructField('userId', IntegerType()),
        StructField('movieId', IntegerType()),
        StructField('rating', FloatType()),
        StructField('timestamp', LongType()),
    ]
)

In [19]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('data/ratings.csv',
                               inferSchema=False,
                               schema=schema,
                               header=True)

In [20]:
movie_ratings.persist()

DataFrame[userId: int, movieId: int, rating: float, timestamp: bigint]

In [21]:
movie_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: long (nullable = true)



In [22]:
type(movie_ratings)

pyspark.sql.dataframe.DataFrame

In [23]:
movie_ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



We aren't going to need the time stamp, so we can go ahead and remove that column.

In [25]:
movie_ratings.columns[:-1]

['userId', 'movieId', 'rating']

In [26]:
movie_ratings = movie_ratings.select(movie_ratings.columns[:-1])
#what do we put here?

In [27]:
movie_ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



### Fitting the Alternating Least Squares Model

Because this dataset is already preprocessed for us, we can go ahead and fit the Alternating Least Squares model.

* Use the randomSplit method on the pyspark DataFrame to separate the dataset into a training and test set
* Import the ALS module from pyspark.ml.recommendation.
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the userCol, itemCol, and ratingCol to the appropriate names given this dataset. Then fit the data to the training set and assign it to a variable model. 

### Train-Test Split

Split the data into training and testing sets.

In [28]:
movie_ratings.count()

100836

In [29]:
train_data, test_data = movie_ratings.randomSplit([0.8, 0.2])

In [30]:
train_data.count()

80743

In [31]:
test_data.count()

20093

In [32]:
train_data.count() / movie_ratings.count()

0.8007358483081439

In [34]:
test_data.count() / movie_ratings.count()

0.19926415169185607

In [35]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

als = ALS(
    rank=10,
    maxIter=10,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
)

In [36]:
# Build the recommendation model using ALS on the training data
# fit the ALS model to the training set

als_model = als.fit(train_data)

Now you've fit the model, and it's time to evaluate it to determine just how well it performed.

* Import `RegressionEvalutor` from pyspark.ml.evaluation [documentation](http://spark.apache.org/docs/2.4.3/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator)
* Generate predictions with your model for the train and test set by using the `transform` method on your ALS model
* Evaluate your model and print out the RMSE from your test set [options for evaluating regressors](http://spark.apache.org/docs/2.4.3/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator.metricName)

In [37]:
train_predictions = als_model.transform(train_data)
test_predictions = als_model.transform(test_data)

In [38]:
train_predictions.persist()
test_predictions.persist()

DataFrame[userId: int, movieId: int, rating: float, prediction: float]

In [39]:
train_predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   191|    148|   5.0| 4.9399343|
|   133|    471|   4.0| 3.3171768|
|   597|    471|   2.0| 4.0303655|
|   385|    471|   4.0|  3.662281|
|    91|    471|   1.0| 2.2268791|
+------+-------+------+----------+
only showing top 5 rows



In [40]:
test_predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   436|    471|   3.0| 3.3652458|
|   602|    471|   4.0| 3.0683546|
|   409|    471|   3.0| 3.3030767|
|   218|    471|   4.0| 2.7493901|
|   520|    471|   5.0| 3.6308029|
+------+-------+------+----------+
only showing top 5 rows



In [42]:
user_factors = als_model.userFactors

In [46]:
user_factors.show(5)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.24594471, 0.6...|
| 20|[0.37253463, -0.6...|
| 30|[-0.57537425, -0....|
| 40|[-0.42906234, -0....|
| 50|[-0.03952579, -0....|
+---+--------------------+
only showing top 5 rows



In [44]:
item_factors = als_model.itemFactors

In [47]:
train_predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   191|    148|   5.0| 4.9399343|
|   133|    471|   4.0| 3.3171768|
|   597|    471|   2.0| 4.0303655|
|   385|    471|   4.0|  3.662281|
|    91|    471|   1.0| 2.2268791|
+------+-------+------+----------+
only showing top 5 rows



In [49]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='rating')
evaluator.evaluate(train_predictions)

0.5654019178368366

In [50]:
test_predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   436|    471|   3.0| 3.3652458|
|   602|    471|   4.0| 3.0683546|
|   409|    471|   3.0| 3.3030767|
|   218|    471|   4.0| 2.7493901|
|   520|    471|   5.0| 3.6308029|
+------+-------+------+----------+
only showing top 5 rows



In [51]:
evaluator.evaluate(test_predictions)

nan

In [52]:
test_predictions.filter(test_predictions.prediction.isNotNull()).show(2500)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   436|    471|   3.0| 3.3652458|
|   602|    471|   4.0| 3.0683546|
|   409|    471|   3.0| 3.3030767|
|   218|    471|   4.0| 2.7493901|
|   520|    471|   5.0| 3.6308029|
|   216|    471|   3.0|  3.181619|
|   492|    833|   4.0| 2.4571538|
|   474|   1088|   3.5| 2.9498584|
|   489|   1088|   4.5|  3.263337|
|   286|   1088|   3.5| 3.4048371|
|    51|   1088|   4.0| 3.6054924|
|   600|   1088|   3.5| 2.2608473|
|    19|   1238|   3.0| 3.0327106|
|   223|   1342|   1.0| 1.7232274|
|   593|   1580|   1.5| 2.9181647|
|   597|   1580|   3.0|  3.943423|
|    34|   1580|   2.5|  3.281466|
|   233|   1580|   3.0| 2.8749175|
|   355|   1580|   4.0| 3.8901284|
|   603|   1580|   4.0| 2.9344745|
|     1|   1580|   3.0|  4.332275|
|   474|   1580|   4.5|  2.945593|
|   283|   1580|   4.0| 3.2414956|
|   579|   1580|   4.0|  4.003756|
|   263|   1580|   3.0| 3.5929573|
|   586|   1580|   5

In [53]:
print(test_predictions.count())
test_predictions = test_predictions.na.drop()
print(test_predictions.count())

20093
19293


In [54]:
evaluator.evaluate(test_predictions)

0.8834857303247121

### Important Question

Will Billy like movie m?

In [55]:
billy_row = user_factors[user_factors['id'] == 10].first()
billy_factors = np.array(billy_row['features'])

In [56]:
m_row = item_factors[item_factors['id'] == 296].first()
m_factors = np.array(m_row['features'])

In [57]:
billy_factors

array([-0.24594471,  0.663706  , -1.06943846,  1.06504226, -1.19796598,
        0.44404691, -0.17034313,  0.62352639,  0.09987149, -0.50399488])

In [58]:
m_factors

array([ 0.16406441, -0.42935836,  0.01214545,  0.83501232, -0.93340421,
        0.93720305, -1.30829847, -1.0222249 , -0.5035938 , -0.92789817])

In [59]:
billy_factors @ m_factors

2.08820142094766

In [70]:
billy_preds = test_predictions[test_predictions['userId'] == 10]

In [71]:
billy_preds.sort('movieId').show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    10|    356|   3.5| 3.7165873|
|    10|    597|   3.5|   3.51923|
|    10|   1307|   3.0| 3.0251868|
|    10|   2959|   0.5| 2.5241995|
|    10|   4306|   4.5| 3.6072264|
|    10|   5952|   4.0| 2.5943336|
|    10|   6155|   3.0| 3.8278906|
|    10|   6535|   4.0| 3.0703773|
|    10|   7151|   3.0| 2.3716252|
|    10|   7293|   3.5| 4.0405393|
|    10|   8961|   2.5| 2.8267362|
|    10|  30749|   3.5| 2.7336738|
|    10|  51662|   3.0| 2.4111786|
|    10|  51705|   4.5| 2.9769802|
|    10|  58559|   4.5| 2.6603174|
|    10|  59421|   2.0| 3.3664527|
|    10|  60950|   2.0| 1.8235226|
|    10|  61250|   2.5| 2.6568308|
|    10|  63113|   3.5| 2.8833141|
|    10|  69844|   3.0| 2.7508037|
+------+-------+------+----------+
only showing top 20 rows



In [72]:
!grep 356 < data/movies.csv

356,Forrest Gump (1994),Comedy|Drama|Romance|War
1356,Star Trek: First Contact (1996),Action|Adventure|Sci-Fi|Thriller
2356,Celebrity (1998),Comedy
3563,"Crow: Salvation, The (2000)",Action|Horror
3564,"Flintstones in Viva Rock Vegas, The (2000)",Children|Comedy
3565,Where the Heart Is (2000),Comedy|Drama
3566,"Big Kahuna, The (2000)",Comedy|Drama
3567,Bossa Nova (2000),Comedy|Drama|Romance
3568,Smiling Fish and Goat on Fire (1999),Comedy|Romance
3569,"Idiots, The (Idioterne) (1998)",Comedy|Drama
4356,Gentlemen Prefer Blondes (1953),Comedy|Musical|Romance
5356,"Giant Spider Invasion, The (1975)",Horror|Sci-Fi
33564,Divorce - Italian Style (Divorzio all'italiana) (1961),Comedy
43560,Nanny McPhee (2005),Children|Comedy|Fantasy
50356,The Little World of Don Camillo (1952),Comedy
72356,Partly Cloudy (2009),Animation|Children|Comedy|Fantasy
73569,Project A 2 ('A' gai wak juk jap) (1987),Action|Comedy|Crime
88356,"Smurfs, The (2011)",Animation|Children|Comed

## Okay, what *will* Billy like?

In [73]:
recs = als_model.recommendForAllUsers(numItems=10)

In [74]:
recs[recs['userId']==10].first()['recommendations']

[Row(movieId=7121, rating=4.9583210945129395),
 Row(movieId=5034, rating=4.869685649871826),
 Row(movieId=8235, rating=4.82766056060791),
 Row(movieId=515, rating=4.744690418243408),
 Row(movieId=3548, rating=4.693119049072266),
 Row(movieId=72641, rating=4.676361560821533),
 Row(movieId=6201, rating=4.6520795822143555),
 Row(movieId=4495, rating=4.6520795822143555),
 Row(movieId=2295, rating=4.6520795822143555),
 Row(movieId=8869, rating=4.610371112823486)]

In [84]:
!grep 8869 < data/movies.csv

8869,First Daughter (2004),Comedy|Romance
88697,SUBWAYStories: Tales from the Underground (1997),Drama
88699,Death Race 2 (2010),Action|Sci-Fi|Thriller


## Objective Review

* Identify the key components of the ALS 
* Demonstrate an understanding on how recommendation systems are being used for personalization of online services/products
* Parse and filter datasets into Spark DataFrame, performing basic feature selection
* Run a brief hyper-parameter selection activity through a scalable grid search
* Train and evaluate the predictive performance of recommendation system
* Generate predictions from the trained model

## Some great technical resources:

- [Stanford ALS Theory and Implementation](http://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf)
- [The Netflix Recommendation Project](https://www.netflixprize.com/assets/GrandPrize2009_BPC_BellKor.pdf)