# Building a Recommendation System in PySpark - Lab

## Introduction

In this last lab, we will implement a a movie recommendation system using ALS in Spark programming environment. Spark's machine learning libraray `ml` comes packaged with a very efficient imeplementation of ALS algorithm that we looked at in the previous lesson. The lab will require you to put into pratice your spark programming skills for creating and manipulating pyspark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and pyspark using the MovieLens Dataset that we used in a previous lab.

Note: You are advised to refer to [PySpark Documentation](http://spark.apache.org/docs/2.2.0/api/python/index.html) heavily for completing this lab as it will introduce a few new methods. 


## Objectives

You will be able to:

* Demonstrate an understanding on how recommendation systems are being used for personalization of online services/products
* Parse and filter datasets into Spark RDDs, performing basic feature selection
* Run a brief hyper-parameter selection activity through a scalable grid search
* Train and evaluate the predictive performance of recommendation system
* Generate predictions from the trained model

## Building a Recommendation System

We have seen how recommender/Recommendation Systems have played an  integral parts in the success of Amazon (Books, Items), Pandora/Spotify (Music), Google (News, Search), YouTube (Videos) etc.  For Amazon these systems bring more than 30% of their total revenues. For Netflix service, 75% of movies that people watch are based on some sort of recommendation.

> The goal of Recommendation Systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.


For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of recommendation system is such as this:

1.    User A watches Game of Thrones and Breaking Bad.
2.    User B performs a search query for Game of Thrones.
3.    The system suggests Breaking Bad to user B from data collected about user A.


This lab will guide you through a step-by-step process into developing such a movie recommendation system. We will use the MovieLens dataset to build a movie recommendation system using the collaborative filtering technique with Spark's Alternating Least Saqures implementation. After building that recommendation system, we will go through the process of adding a new user to the dataset with some new ratings and obtaining new recommendations for that user.

### Importing the Data
To begin with:
* initialize a SparkSession object
* import the dataset found at './data/ratings.csv' into a pyspark DataFrame

In [1]:
# import necessary libraries
!env | grep PYTHONPATH

PYTHONPATH=/usr/local/Cellar/apache-spark/2.4.3/libexec/python:


In [2]:
import pyspark

spark = (pyspark.sql.SparkSession.builder 
  .master("local[*]")
  .getOrCreate())

In [4]:
!ls data/

movies.csv  ratings.csv


In [6]:
!file data/ratings.csv

data/ratings.csv: ASCII text, with CRLF line terminators


In [7]:
!head data/ratings.csv

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041


In [13]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('data/ratings.csv',
                               inferSchema=True,
                               header=True)

Check the data types of each of the values to ensure that they are a type that makes sense given the column.

In [27]:
from pyspark.sql.types import (
    ArrayType,
    AtomicType,
    BinaryType,
    BooleanType,
    ByteType,
    CloudPickleSerializer,
    DataType,
    DataTypeSingleton,
    DateConverter,
    DateType,
    DatetimeConverter,
    DecimalType,
    DoubleType,
    FloatType,
    FractionalType,
    IntegerType,
    IntegralType,
    JavaClass,
    LongType,
    MapType,
    NullType,
    NumericType,
    Row,
    ShortType,
    SparkContext,
    StringType,
    StructField,
    StructType,
    TimestampType,
    UserDefinedType,
)

In [20]:
schema = StructType(
    [
        StructField('userId', IntegerType()),
        StructField('movieId', IntegerType()),
        StructField('rating', FloatType()),
        StructField('timestamp', LongType()),
    ]
)

In [21]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('data/ratings.csv',
                               inferSchema=False,
                               schema=schema,
                               header=True)

In [38]:
movie_ratings.persist()

DataFrame[userId: int, movieId: int, rating: float, timestamp: bigint]

In [22]:
movie_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: long (nullable = true)



In [28]:
movie_ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'double'),
 ('timestamp', 'int')]

We aren't going to need the time stamp, so we can go ahead and remove that column.

In [57]:
movie_ratings = None

### Fitting the Alternating Least Squares Model

Because this dataset is already preprocessed for us, we can go ahead and fit the Alternating Least Squares model.

* Import the ALS module from pyspark.ml.recommendation.
* Use the randomSplit method on the pyspark DataFrame to separate the dataset into a training and test set
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the userCol, itemCol, and ratingCol to the appropriate names given this dataset. Then fit the data to the training set and assign it to a variable model. 

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS, ALSModel
# split into training and testing sets

als = ALS(
    rank=10,
    maxIter=10,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
)

In [35]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

# fit the ALS model to the training set

als_model = als.fit(movie_ratings)

Now you've fit the model, and it's time to evaluate it to determine just how well it performed.

* import `RegressionEvalutor` from pyspark.ml.evaluation
* generate predictions with your model for the test set by using the `transform` method on your ALS model
* evaluate your model and print out the RMSE from your test set

In [45]:
ALSModel.transform?

In [36]:
predictions = als_model.transform(movie_ratings)

In [39]:
predictions.persist()

DataFrame[userId: int, movieId: int, rating: float, timestamp: bigint, prediction: float]

In [43]:
movie_ratings.show(1)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
+------+-------+------+---------+
only showing top 1 row



In [48]:
predictions.show(1)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|   191|    148|   5.0|829760897|  4.912873|
+------+-------+------+---------+----------+
only showing top 1 row



In [56]:
user_factors = als_model.userFactors

In [59]:
user_factors

DataFrame[id: int, features: array<float>]

In [57]:
item_factors = als_model.itemFactors

In [63]:
import numpy as np

In [64]:
billy_row = user_factors[user_factors['id'] == 10].first()
billy_factors = np.array(billy_row['features'])

In [86]:
m_row = item_factors[item_factors['id'] == 296].first()
m_factors = np.array(m_row['features'])

In [87]:
billy_factors

array([ 0.48113078,  0.10100795,  0.06847224, -0.07711513,  1.59972095,
        1.18931723,  0.74921227,  0.002795  , -0.31532204, -0.30787283])

In [88]:
m_factors

array([ 4.06861246e-01, -9.56375822e-02, -2.69766182e-01, -5.80208361e-01,
        1.02675958e-02, -2.74204591e-04,  1.92972398e+00, -2.94304371e-01,
       -3.90635788e-01, -1.19364870e+00])

In [89]:
billy_factors @ m_factors

2.1640822027223257

In [78]:
billy_preds = predictions[predictions['userId'] == 10]

In [84]:
billy_preds.sort('movieId').show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|    10|    296|   1.0|1455303387| 2.1640823|
|    10|    356|   3.5|1455301685| 3.5761378|
|    10|    588|   4.0|1455306173| 3.2734149|
|    10|    597|   3.5|1455357645| 3.2656565|
|    10|    912|   4.0|1455302254| 3.7281077|
|    10|   1028|   0.5|1455306152|  2.921604|
|    10|   1088|   3.0|1455619275|  3.244572|
|    10|   1247|   3.0|1455303518| 2.4162045|
|    10|   1307|   3.0|1455357613| 2.9066176|
|    10|   1784|   3.5|1455301699|  3.095241|
|    10|   1907|   4.0|1455306183| 3.6470838|
|    10|   2571|   0.5|1455356378|  3.017029|
|    10|   2671|   3.5|1455357517|  3.745073|
|    10|   2762|   0.5|1455356388| 2.7671921|
|    10|   2858|   1.0|1455356578| 2.6532393|
|    10|   2959|   0.5|1455356582|  2.424939|
|    10|   3578|   4.0|1455356591| 3.2716172|
|    10|   3882|   3.0|1455398344| 3.1183858|
|    10|   4246|   3.5|1455302676|

In [94]:
recs = als_model.recommendForAllUsers(numItems=10)

In [97]:
recs[recs['userId']==10].first()['recommendations']

[Row(movieId=3086, rating=4.889717102050781),
 Row(movieId=26614, rating=4.750113487243652),
 Row(movieId=68073, rating=4.680994033813477),
 Row(movieId=42730, rating=4.661563396453857),
 Row(movieId=71579, rating=4.630825519561768),
 Row(movieId=8869, rating=4.606689929962158),
 Row(movieId=103372, rating=4.472315311431885),
 Row(movieId=32892, rating=4.447782039642334),
 Row(movieId=113275, rating=4.427666664123535),
 Row(movieId=45503, rating=4.4271650314331055)]

In [1]:
!grep 3086 < data/movies.csv

3086,Babes in Toyland (1934),Children|Comedy|Fantasy|Musical
83086,Burlesque (2010),Drama|Musical|Romance
