## CUNY MSDS DATA643 - Recommender System

---

### Rose Koh
### 06/22/2018
---

## Matrix factorization methods

* Implement a matrix factorization method—such as singular value decomposition (SVD) or,
* Alternating Least Squares (ALS)—in the context of a recommender system.

You may approach this assignment in a number of ways. You are welcome to start with an existing recommender system written by yourself or someone else.  

SVD can be thought of as a pre-processing step for feature engineering. You might easily start with thousands or millions of items, and use SVD to create a much smaller set of “k” items (e.g. 20 or 70).

---

#### Project 3
* SVD from project 2
* ALS

---

## Data description

This dataset describes 5-star rating and free-text tagging activity from [MovieLens](http://movielens.org), a movie recommendation service. It contains 100004 ratings and 1296 tag applications across 9125 movies. These data were created by 671 users between January 09, 1995 and October 16, 2016. This dataset was generated on October 17, 2016.

Users were selected at random for inclusion. All selected users had rated at least 20 movies. No demographic information is included. Each user is represented by an id, and no other information is provided.

For this project, we are going to use the subset of the data.
The data contained in the files as follows: 

* u.data
* u.item
* Movie_Id_Titles

This and other GroupLens data sets are publicly available for download at <http://grouplens.org/datasets/>.


---

## Load Data

In [1]:
import numpy as np
import pandas as pd

column_names = ['user_id', 'item_id', 'rating', 'timestamp']
# user data
df = pd.read_csv('data/u.data', sep='\t', names=column_names)
# movie title data
movie_titles = pd.read_csv("data/Movie_Id_Titles")

# merge
df = pd.merge(df,movie_titles,on='item_id')
df.head()

Unnamed: 0,user_id,item_id,rating,timestamp,title
0,0,50,5,881250949,Star Wars (1977)
1,290,50,5,880473582,Star Wars (1977)
2,79,50,4,891271545,Star Wars (1977)
3,2,50,5,888552084,Star Wars (1977)
4,8,50,5,879362124,Star Wars (1977)


In [2]:
idx_to_movie = {}
# item(movie) data
with open('data/u.item', encoding = "ISO-8859-1") as f:
    for line in f.readlines():
        info = line.split('|')
        idx_to_movie[int(info[0])-1] = info[1]

In [3]:
n_users = df.user_id.unique().shape[0]
n_items = df.item_id.unique().shape[0]
print('Number of users = ' + str(n_users) + ' | Number of movies = ' + str(n_items))

Number of users = 944 | Number of movies = 1682


## Split

In [61]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
from sklearn import cross_validation as cv

# Split Data
train_data, test_data = cv.train_test_split(df, test_size=0.25)

In [5]:
# Create two user-item matrices, one for training and another for testing
train_data_matrix = np.zeros((n_users, n_items))
for line in train_data.itertuples():
    train_data_matrix[line[1] - 1, line[2] - 1] = line[3]

test_data_matrix = np.zeros((n_users, n_items))
for line in test_data.itertuples():
    test_data_matrix[line[1] - 1, line[2] - 1] = line[3]

## Function set

* rmse computation
* predict: user-user and item-item similarity
* top k movie names by user mapping function
* predict ratings using dot product of the latent features for users and items

In [6]:
from sklearn.metrics import mean_squared_error
from math import sqrt
import numpy as np

def rmse(pred, actual):
    # Root Mean Squared Error. We will conside only non-zero ratings
    pred = pred[actual.nonzero()].flatten()
    actual = actual[actual.nonzero()].flatten()
    return sqrt(mean_squared_error(pred, actual))

def predict(ratings, similarity, type='user', epsilon=1e20):
    # Predict function to find user-user similarity and item-item similarity.
    # User bias is removed by adjusting the mean user bias before predicting ratings
    if type == 'user':
        mean_user_rating = ratings.mean(axis=1)
    # You use np.newaxis so that mean_user_rating has same format as ratings
        ratings_diff = (ratings - mean_user_rating[:, np.newaxis])
        pred = mean_user_rating[:, np.newaxis] + similarity.dot(ratings_diff) / np.array([np.abs(similarity).sum(axis=1)]).T
    elif type == 'item':
        pred = ratings.dot(similarity) / np.array([np.abs(similarity).sum(axis=1)])
    return pred

def top_k_movies_by_user(pred, mapper, user_idx, k=6):
    # Find the top-k movie names based on the ordered ratings
    return [mapper[x] for x in
            np.argsort(pred[user_idx, np.where(train_data_matrix[user_idx, :] == 0)[0]])[:-k - 1:-1]]

def predictionSGD(P, Q):
    # Predict the ratings using dot product of the latent features for users and items
    return np.dot(P.T, Q)

## Memory based Collaborative Filtering
* user-item filtering: Users who are similar to you also liked ...
* item-item filtering: Users who liked this item also liked...

In [7]:
# Compute the cosine distance and 1-cosine distance yields cosine similarity.
# epsilon is used to handle the divide by zero scenarios
from sklearn.metrics.pairwise import pairwise_distances
epsilon = 1e-9

user_similarity = (1 - pairwise_distances(train_data_matrix, metric='cosine')) + epsilon
item_similarity = (1 - pairwise_distances(train_data_matrix.T, metric='cosine')) + epsilon

user_idx = 23

pred = predict(train_data_matrix, user_similarity, type='user')

user_base_train_rmse = rmse(pred, train_data_matrix)
user_base_test_rmse = rmse(pred, test_data_matrix)

trainRMSE = []
testRMSE = []

trainRMSE.append(user_base_train_rmse)
testRMSE.append(user_base_test_rmse)

In [8]:
print('User-based CF RMSE on train data: ' + str(user_base_train_rmse))
print('User-based CF RMSE on test data: ' + str(user_base_test_rmse) + "\n")
print("Movies recommended to user " + str(user_idx) + " are:")
nearestMovies = top_k_movies_by_user(pred, idx_to_movie, user_idx)
print(str(nearestMovies) + "\n")

pred = predict(train_data_matrix, item_similarity, type='item')

item_base_train_rmse = rmse(pred, train_data_matrix)
item_base_test_rmse = rmse(pred, test_data_matrix)
trainRMSE.append(item_base_train_rmse)
testRMSE.append(item_base_test_rmse)

print('Item-based CF RMSE on train data: ' + str(item_base_train_rmse))
print('Item-based CF RMSE on test data: ' + str(item_base_test_rmse) + "\n")
print("Movies recommended to user " + str(user_idx) + " are:")
nearestMovies = top_k_movies_by_user(pred, idx_to_movie, user_idx)
print(str(nearestMovies) + "\n")

User-based CF RMSE on train data: 2.8973011016563786
User-based CF RMSE on test data: 2.9567967443898864

Movies recommended to user 23 are:
['Disclosure (1994)', "Weekend at Bernie's (1989)", "Monty Python's Life of Brian (1979)", 'Truth About Cats & Dogs, The (1996)', 'Toy Story (1995)', 'I.Q. (1994)']

Item-based CF RMSE on train data: 3.0969544116017405
Item-based CF RMSE on test data: 3.1634832502337686

Movies recommended to user 23 are:
['Man from Down Under, The (1943)', 'Butterfly Kiss (1995)', 'Delta of Venus (1994)', 'Paris, Texas (1984)', 'Twelfth Night (1996)', "C'est arrivé près de chez vous (1992)"]



---

## Singular Value Decomposition

A well-known matrix factorization method is **Singular value decomposition (SVD)**. Collaborative Filtering can be formulated by approximating a matrix `X` by using singular value decomposition. The winning team at the Netflix Prize competition used SVD matrix factorization models to produce product recommendations, for more information I recommend to read articles: [Netflix Recommendations: Beyond the 5 stars](http://techblog.netflix.com/2012/04/netflix-recommendations-beyond-5-stars.html) and [Netflix Prize and SVD](http://buzzard.ups.edu/courses/2014spring/420projects/math420-UPS-spring-2014-gower-netflix-SVD.pdf).
The general equation can be expressed as follows:
<img src="https://latex.codecogs.com/gif.latex?X=USV^T" title="X=USV^T" />


Given `m x n` matrix `X`:
* *`U`* is an *`(m x r)`* orthogonal matrix
* *`S`* is an *`(r x r)`* diagonal matrix with non-negative real numbers on the diagonal
* *V^T* is an *`(r x n)`* orthogonal matrix

Elements on the diagnoal in `S` are known as *singular values of `X`*. 


Matrix *`X`* can be factorized to *`U`*, *`S`* and *`V`*. The *`U`* matrix represents the feature vectors corresponding to the users in the hidden feature space and the *`V`* matrix represents the feature vectors corresponding to the items in the hidden feature space.

In [9]:
import scipy.sparse as sp
from scipy.sparse.linalg import svds
 
#get SVD components from train matrix. Choose k.
u, s, vt = svds(train_data_matrix, k = 30)
s_diag_matrix=np.diag(s)
X_pred = np.dot(np.dot(u, s_diag_matrix), vt)
 
model_base_train_rmse = rmse(X_pred, train_data_matrix)
model_base_test_rmse = rmse(X_pred, test_data_matrix)
trainRMSE.append(model_base_train_rmse)
testRMSE.append(model_base_test_rmse)
 
print('Model-based CF MSE on train data: ' + str(model_base_train_rmse))
print('Model-based CF MSE on test data: ' + str(model_base_test_rmse) + "\n")
 
print("Movies recommended to user " + str(user_idx) + " are:")
nearestMovies = top_k_movies_by_user(X_pred, idx_to_movie, user_idx)
print(str(nearestMovies) + "\n")
 
# Compare true ratings of user with predictions
ratings = pd.DataFrame(columns = ("Actual Rating","Predicted Rating"))
ratings["Actual Rating"] = train_data_matrix[user_idx,np.where(train_data_matrix[user_idx, :] > 0)[0]][0:5]
ratings["Predicted Rating"] = X_pred[user_idx,np.where(train_data_matrix[user_idx, :] > 0)[0]][0:5]
print(ratings)

Model-based CF MSE on train data: 2.2711166477386304
Model-based CF MSE on test data: 2.797821218930048

Movies recommended to user 23 are:
['I.Q. (1994)', 'From Dusk Till Dawn (1996)', 'Braveheart (1995)', 'Gigi (1958)', 'Toy Story (1995)', 'Fish Called Wanda, A (1988)']

   Actual Rating  Predicted Rating
0            4.0          3.412068
1            5.0          1.744381
2            5.0          4.360307
3            5.0          2.174179
4            5.0          3.334611


---

 * SVD does not seem to perform any better than memory based model.
 * SVD approach isn't optimal for dataset with missing data.

---

In [10]:
sparsity=round(1.0-len(df)/float(n_users*n_items),3)
print(('The sparsity level of MovieLens100K is ') +  str(sparsity*100) + '%')

The sparsity level of MovieLens100K is 93.7%


---

## ALS

### Notations
* Let $n_u$ and $n_p$ be the number of users and products.
* Let $R \in \mathbb{R}^{n_u \times n_p}$ be the matrix of ratings, where entry $r_{ij}$, $1 \le i \le n_u$ and $1 \le j \le n_p$, is the rating of user $i$ for product $j$. Entries $r_{ij}$ contain many missing values.
* Let $w_{i,j}$ be an indicator of the existence of a rating for product $j$ by user $i$, i.e., $w_{i,j}=1$ if the rating $(i,j)$ exists, and $w_{i,j}=0$ otherwise.
* Let $k$ be the rank of the matrix factorisation.
* Let $U \in \mathbb{R}^{n_u \times n_k}$ be the user matrix, and $P \in \mathbb{R}^{n_p \times n_k}$ be the products (item) matrix 
* Let $U_i$ be the $i-$th row of $U$, and $P_j$ be the $j-$th row of $P$.

![alt text](matprod.jpg "Ratings matrix factorisation")

* Let $\hat{R}$ be the predicted rating matrix, where all missing values are predicted on the basis of known user ratings. 
* The optimisation function is expressed as 

$$
J(U,P)=||(R-UP^T)||_2
$$

* and predictions $\hat{R}$ given by 

$$
\hat{R}=UP^T
$$

### Alternating Least Squares

* Note that since both $U$ and $P$ are unknown, the optimisation function $J(U,P)$ is non convex.
* However, if we fix $P$ and optimise for $U$ alone, the problem is simply reduced to the problem of linear regression, and can be solved using Ordinary Least Square (OLS):

$$
U^T=(P^TP)^{-1}P^TR^T
$$

Then, fixing $U$ and optimising for P gives

$$
P^T=(U^TU)^{-1}U^TR
$$


* ALS does just that, iteratively optimising $U$ by fixing $P$, and optimising $P$ by fixing $U$. It is guaranteed to converge only to a local minima, which ultimately depends on initial values for $U$ or $P$. 

* **Missing values**: Since R contains missing values, regression must be computed per user (or product), using only those entries for which the ratings are known ($w_{i,j}=1$). This is done by going though all users (or products):
    * For each i, compute $$U^T_i=(\sum_{j, w_{i,j}=1} P_j^TP_j)^{-1} \sum_{j, w_{i,j}=1} P_j^Tr_{ij}$$
    * For each j, compute $$P^T_j=(\sum_{i, w_{i,j}=1} U_i^TU_i)^{-1} \sum_{i, w_{i,j}=1} U_i^Tr_{ij}$$


###  ALS algorithm

1. Initialize the matrix P by assigning to the first column the average rating for each product, and using small random numbers for the remaining columns.
2. Fix P and solve for U_i that minimizes the objective function (RMSE).
3. Fix U and solve for P_j that minimizes the objective function similarly.
4. Repeat steps 2 and 3 until convergence


---

## Data description

For ALS, We are going to try implement pyspark.

This dataset (ml-latest-small) describes 5-star rating and free-text tagging activity from [MovieLens](http://movielens.org), a movie recommendation service. It contains 100004 ratings and 1296 tag applications across 9125 movies. These data were created by 671 users between January 09, 1995 and October 16, 2016. This dataset was generated on October 17, 2016.

Users were selected at random for inclusion. All selected users had rated at least 20 movies. No demographic information is included. Each user is represented by an id, and no other information is provided.

The data are contained in the files as follows: 

* `links.csv` 
* `movies.csv` 
* `ratings.csv`
* `tags.csv`

More details about the contents and use of all these files follows.

This is a *development* dataset. As such, it may change over time and is not an appropriate dataset for shared research results. See available *benchmark* datasets if that is your intent.

This and other GroupLens data sets are publicly available for download at <http://grouplens.org/datasets/>.

In [11]:
import time
import os 

import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import style

style.use('fivethirtyeight')
%matplotlib inline

## Load Data with pyspark

In [12]:
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [13]:
# Define download location
import os

datasets_path = os.path.join(os.getcwd(), 'data')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

In [14]:
# Proceed download
import urllib

small_f = urllib.request.urlretrieve(small_dataset_url, small_dataset_path)

In [15]:
# Extract the zip file
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

In [16]:
rating_path = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

#### Data

* ratings.csv: userId, movioeId, rating, timestamp
* movies.csv: movieId, title, genres
    * genres: Genre1 | Genre2 | Genre3...
* tags.csv: userId, movieId, tag, timestamp
* links.csv: UserId, movieId, imdbId,tmdbId

In [17]:
import pandas as pd
rating = pd.read_csv(rating_path)

In [18]:
rating[0:5]

Unnamed: 0,userId,movieId,rating,timestamp
0,1,31,2.5,1260759144
1,1,1029,3.0,1260759179
2,1,1061,3.0,1260759182
3,1,1129,2.0,1260759185
4,1,1172,4.0,1260759205


In [19]:
print(rating.shape)

(100004, 4)


In [20]:
# Split dataset
np.random.seed(23)
i_training = np.random.rand(len(rating)) < 0.8
train= rating[i_training].reset_index(drop=True)
test= rating[~i_training].reset_index(drop=True)

In [21]:
# Drop timestamp
rating = rating.drop("timestamp", axis = 1)

# Split data
train_data, test_data = cv.train_test_split(rating, test_size=0.2)
train = train_data.reset_index(drop = True)
test = test_data.reset_index(drop = True)

In [22]:
# reindex userId, movieId so the rainge is in the interval [0, n_u-1] and [0, n_p-1]
# Keep only those movies and user IDs which are in the training data (no predictions for 'new' users and movies)
test = test[test.movieId.isin(train.movieId)]
test = test[test.userId.isin(train.userId)].reset_index(drop=True)

# remove test dataset user Id and movie Id that is not included in the training set 
# as we will not approach rating new users/movies
# Reindex users and movies so IDs are between 0 and numbers of users/movies
userId = train.userId.unique()
user_mapping = dict(zip(userId,range(len(userId))))
train.userId = train.userId.map(user_mapping)
test.userId = test.userId.map(user_mapping)

movieId = train.movieId.unique()
movie_mapping = dict(zip(movieId,range(len(movieId))))
train.movieId = train.movieId.map(movie_mapping)
test.movieId = test.movieId.map(movie_mapping)

In [23]:
#ratings in train
train.shape

(80003, 3)

In [24]:
#ratings in test
test.shape

(19261, 3)

In [25]:
#Number of unique users
n_u=len(train.userId.unique())
n_u

671

In [26]:
#Number of unique movies
n_p=len(train.movieId.unique())
n_p

8391

In [27]:
train[0:5]

Unnamed: 0,userId,movieId,rating
0,0,0,3.0
1,1,1,4.5
2,2,2,5.0
3,3,3,4.0
4,4,4,3.0


In [28]:
test[0:5]

Unnamed: 0,userId,movieId,rating
0,28,619,3.5
1,132,1567,0.5
2,125,5197,5.0
3,296,986,3.0
4,51,267,4.0


---

## * ALS - Centralised approach

In [29]:
# Initialise k (the ALS rank), and lambda_reg (regularization params)
# Initialize the matrix P by assigning to the first column the average rating for each movie, 
# and using small random numbers for the remaining columns.

k = 1

np.random.seed(23)
P = np.random.rand(n_p,k)

average_rating_movie = np.array(train.groupby('movieId').rating.mean())
P[:,0] = average_rating_movie

### OLS function

The function aims at computing the OLS for a subset of entries of the rating matrix. 

* Inputs
    * Array of 2 columns: set of indices to keep from rating matrix and corresponding ratings.
    * Array: Matrix U or P depending on which matrix is updated. (X)

In [30]:
def OLS(list_id_rating, X, lambda_reg=0.1):
    list_id_rating = np.reshape(np.array(list_id_rating),(-1,2))
    X = np.array(X)
    
    #Get the subset of rows from X for which to compute OLS
    #Convert indices to int
    list_id=[int(elt) for elt in list_id_rating[:,0]]
    X_j = X[list_id,:] 
    
    #Compute OLS
    k = X_j.shape[1]
    XtX = np.dot(np.transpose(X_j),X_j) + lambda_reg * np.identity(k)
    XtY = np.dot(np.transpose(X_j),list_id_rating[:,1])
    OLS_coef = np.transpose(np.dot(np.linalg.inv(XtX),XtY))
    
    return OLS_coef

### Predict function

In [31]:
#This takes U and P matrices, and compute the predictions for pairs userId/movieId in rating
def get_pred(U,P,rating):
    N=rating.shape[0]
    predictions=np.zeros(N)
    
    for i in range(N):
        predictions[i]=np.sum(np.dot(U[rating.userId[i],:],P[rating.movieId[i],:]))
        
    return predictions

### ALS Iterations

In [32]:
#Number of iterations
T = 1

for t in range(T):
    
    print('Iteration: ',t)
    
    U = np.vstack(train.groupby('userId').
                            apply(lambda list_id_rating: OLS(list_id_rating[['movieId','rating']],P)))
    
    P = np.vstack(train.groupby('movieId').
                            apply(lambda list_id_rating: OLS(list_id_rating[['userId','rating']],U)))
    

Iteration:  0


In [33]:
train_pred = get_pred(U,P,train)
rmse(np.array(train.rating),np.array(train_pred))

0.8090376648900013

In [34]:
test_pred=get_pred(U,P,test)
rmse(np.array(test.rating),np.array(test_pred))

0.9074686816484212

In [35]:
train_pred[:5]

array([ 2.983248  ,  4.04853833,  4.00810724,  3.70573411,  2.7067777 ])

In [36]:
train.rating[0:5]

0    3.0
1    4.5
2    5.0
3    4.0
4    3.0
Name: rating, dtype: float64

## * ALS - Map/Reduce approach

### Start Spark context

In [37]:
os.environ['PYSPARK_SUBMIT_ARGS'] = "--conf spark.driver.memory=2g  pyspark-shell"

from pyspark.sql import SparkSession

#Start Spark session with local master and 2 cores
spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("ALS") \
    .getOrCreate()

sc=spark.sparkContext

### Transform training and validation in RDD

In [38]:
#Number of partitions
p = 4

train_rdd = sc.parallelize(np.array(train),p)
test_rdd = sc.parallelize(np.array(test),p)

In [39]:
#Same initialisation as centralised ALS
k = 1

np.random.seed(23)
P = np.random.rand(n_p,k)

average_rating_movie = np.array(train.groupby('movieId').rating.mean())
P[:,0] = average_rating_movie

In [40]:
# Adapt the centralized ALS in a Spark Map/Reduce way.

T=1

for t in range(T):
    
    print('Iteration: ',t)
    
    #Grouped by user ID (first column), value is (movieId,rating) (second and third columns)
    R1 = train_rdd.map(lambda x: (x[0],[x[1],x[2]])).groupByKey()
    
    U = np.array(R1.mapValues(lambda list_id_rating:OLS(list(list_id_rating),P)).
                    sortByKey().values().collect())
    
    #Grouped by movie ID (second column), value is (userId,rating) (first and third columns)
    R2 = train_rdd.map(lambda x: (x[1],[x[0],x[2]])).groupByKey()
    P = np.array(R2.mapValues(lambda list_id_rating:OLS(list(list_id_rating),U)).
                    sortByKey().values().collect())
    
    trian_pred = get_pred(U,P,train)
    train_error = rmse(np.array(train.rating),np.array(train_pred))
    
    print("Error training set: "+str(train_error))
    
    test_pred = get_pred(U,P,test)
    test_error = rmse(np.array(test.rating),np.array(test_pred))
    
    print("Error validation set: "+str(test_error))
    

Iteration:  0
Error training set: 0.8090376648900013
Error validation set: 0.9074686816484212


In [41]:
pred_train=get_pred(U,P,train)
rmse(np.array(train.rating),np.array(pred_train))

0.8090376648900013

In [42]:
pred_test=get_pred(U,P,test)
rmse(np.array(test.rating),np.array(pred_test))

0.9074686816484212

In [43]:
pred_train[0:10]

array([ 2.983248  ,  4.04853833,  4.00810724,  3.70573411,  2.7067777 ,
        3.31425152,  4.01361493,  3.18340813,  4.75048029,  4.04618534])

In [44]:
train.rating[0:10]

0    3.0
1    4.5
2    5.0
3    4.0
4    3.0
5    4.0
6    5.0
7    1.5
8    4.0
9    4.0
Name: rating, dtype: float64

## * ALS with Spark ML library 

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has these parameters:

* numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
* rank is the number of latent factors in the model.
* iterations is the number of iterations to run.
* lambda specifies the regularization parameter in ALS.
* implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
* alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

See documentation at https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html

In [45]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommend').getOrCreate()

In [46]:
data = spark.read.csv(rating_path, inferSchema=True, header=True)
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [47]:
# count, mean, std, min & max
data.describe().show()

+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|            100004|            100004|            100004|              100004|
|   mean| 347.0113095476181|12548.664363425463| 3.543608255669773|1.1296390869392424E9|
| stddev|195.16383797819535|26369.198968815268|1.0580641091070326|1.9168582602710962E8|
|    min|                 1|                 1|               0.5|           789652009|
|    max|               671|            163949|               5.0|          1476640644|
+-------+------------------+------------------+------------------+--------------------+



In [48]:
#Split dataset to train and test
train, test = data.randomSplit([0.8, 0.2])

In [49]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [50]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=10, regParam=0.1, nonnegative=True, coldStartStrategy="drop",\
          userCol='userId', itemCol='movieId', ratingCol='rating')

als.setSeed(23)
model = als.fit(train)

In [51]:
print('Factorized user matrix with rank = %d' % model.rank)
model.userFactors.show(5)

print('-'*40)

print('Factorized item matrix with rank = %d' % model.rank)
model.itemFactors.show(5)

Factorized user matrix with rank = 10
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.5513963, 0.143...|
| 20|[0.0, 1.1222926, ...|
| 30|[0.38053095, 0.59...|
| 40|[0.73925835, 0.71...|
| 50|[0.26130113, 0.55...|
+---+--------------------+
only showing top 5 rows

----------------------------------------
Factorized item matrix with rank = 10
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.35288718, 0.44...|
| 20|[0.288, 0.9096849...|
| 30|[0.92679757, 0.73...|
| 40|[0.0, 0.5933677, ...|
| 50|[0.77746767, 0.39...|
+---+--------------------+
only showing top 5 rows



In [52]:
print('Recommended top users (e.g. 1 top user) for all items with the corresponding predicted ratings:')
model.recommendForAllItems(1).show(5)

print('-'*90)

print('Recommended top items (e.g. 1 top item) for all users with the corresponding predicted ratings:')
model.recommendForAllUsers(1).show(5)

Recommended top users (e.g. 1 top user) for all items with the corresponding predicted ratings:
+-------+------------------+
|movieId|   recommendations|
+-------+------------------+
|   1580|  [[113, 5.10162]]|
|   5300| [[296, 5.328037]]|
|   6620|[[156, 4.9422774]]|
|   7340|[[123, 4.4566317]]|
|  32460|  [[46, 5.062686]]|
+-------+------------------+
only showing top 5 rows

------------------------------------------------------------------------------------------
Recommended top items (e.g. 1 top item) for all users with the corresponding predicted ratings:
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471| [[59684, 5.028317]]|
|   463|[[67504, 5.3299212]]|
|   496| [[59684, 5.268713]]|
|   148|[[67504, 5.5336957]]|
|   540| [[59684, 5.393605]]|
+------+--------------------+
only showing top 5 rows



### Make predictions on test_data

In [53]:
pred = model.transform(test)
pred.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   588|    471|   3.0| 842298526| 3.6431274|
|    86|    471|   4.0| 848161161| 3.9828506|
|    19|    471|   3.0| 855192558| 3.9977357|
|    92|    471|   4.0| 848526594|  3.742887|
|   607|    471|   4.0|1118247731| 3.8046055|
|   358|    471|   5.0| 957479605|  3.457286|
|   659|    471|   4.0| 853412972| 3.6239324|
|    73|    471|   4.0|1296460183| 3.8612459|
|   508|    471|   4.0| 844377075|  4.222822|
|   399|    471|   5.0| 841562601| 3.0694127|
|   296|    833|   4.5|1298158960| 1.9851413|
|   516|   1088|   3.0| 844688144|  3.689806|
|   372|   1088|   4.0| 958004568| 4.1652355|
|    52|   1088|   4.0|1231766626|  4.020903|
|   363|   1088|   2.0| 942345287| 3.5498774|
|   582|   1088|   3.5|1122169870| 3.5549753|
|   306|   1088|   4.0| 939760516| 3.7026877|
|    54|   1088|   5.0|1352836913| 3.1626437|
|   262|   1088|   2.0|1433938031|

### Evaluate predictions using RMSE

In [54]:
# Create an RMSE evaluator using the label and predicted columns
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluator.evaluate(pred)
print('Root mean squared error of the test_data: %.4f' % rmse)

Root mean squared error of the test_data: 0.9109


In [58]:
# see historical rating of the user
user_history = train.filter(train['userId']==23)
user_history.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|    23|      1|   3.0|1148729853|
|    23|     11|   3.5|1166728170|
|    23|     16|   4.0|1148672550|
|    23|     19|   2.0|1148669114|
|    23|     20|   1.5|1148720884|
|    23|     24|   3.5|1148673467|
|    23|     32|   4.0|1148730400|
|    23|     47|   4.5|1148669590|
|    23|     58|   3.5|1148672099|
|    23|     89|   4.0|1148669357|
|    23|    104|   3.5|1148730116|
|    23|    110|   3.5|1148669588|
|    23|    150|   3.5|1148672933|
|    23|    153|   3.0|1148720873|
|    23|    154|   4.0|1148672842|
|    23|    172|   2.5|1148386188|
|    23|    185|   3.5|1148778581|
|    23|    224|   4.0|1148729797|
|    23|    235|   4.0|1148729698|
|    23|    236|   4.5|1148386155|
+------+-------+------+----------+
only showing top 20 rows



In [59]:
# a list of movies we are thinking to offer
user_suggest = test.filter(train['userId']==23).select(['movieId', 'userId'])
user_suggest.show()

+-------+------+
|movieId|userId|
+-------+------+
|      6|    23|
|     34|    23|
|     50|    23|
|     62|    23|
|    111|    23|
|    307|    23|
|    337|    23|
|    480|    23|
|    527|    23|
|    541|    23|
|    590|    23|
|    668|    23|
|    841|    23|
|    900|    23|
|    911|    23|
|    926|    23|
|    933|    23|
|    969|    23|
|   1035|    23|
|   1077|    23|
+-------+------+
only showing top 20 rows



In [60]:
# offer movies with a high predicted rating
user_offer = model.transform(user_suggest)
user_offer.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    527|    23|  4.400507|
|    969|    23|  4.394819|
|  42418|    23| 4.3877387|
|     50|    23|  4.310225|
|   1254|    23|  4.305702|
|    111|    23| 4.2834864|
|   6787|    23|  4.281431|
|   3871|    23| 4.2510843|
|    926|    23|  4.239252|
|   1263|    23|  4.233564|
|   1931|    23| 4.2314863|
|   2858|    23|  4.192382|
|   1208|    23| 4.1598825|
|   8239|    23|  4.158046|
|   2068|    23| 4.1487956|
|   4878|    23| 4.1448298|
|   1288|    23| 4.1291714|
|    541|    23|  4.121001|
|   4993|    23|  4.120278|
|   1077|    23|  4.088746|
+-------+------+----------+
only showing top 20 rows



### References

* https://datajobs.com/data-science-repo/Recommender-Systems-[Netflix].pdf
* https://www.coursera.org/lecture/matrix-factorization/
* http://www.awesomestats.in/spark-movie-recommendations/
* https://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf
* https://datasciencemadesimpler.wordpress.com/tag/alternating-least-squares/
* https://stanford.edu/~rezab/classes/cme323/S16/projects_reports/parthasarathy_tea.pdf
* https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
* http://yifanhu.net/PUB/cf.pdf
* https://github.com/bdanalytics/Berkeley-Spark/blob/master/CS110x/cs110_lab2_als_prediction.ipynb
