<small><i>This notebook was create by Franck Iutzeler, Jerome Malick and Yann Vernaz (2016).</i></small>
<!-- Credit (images) Jeffrey Keating Thompson. -->

<center><img src="UGA.png" width="30%" height="30%"></center>
<center><h3>Master of Science in Industrial and Applied Mathematics (MSIAM)</h3></center>
<hr>
<center><h1>Convex and distributed optimization</h1></center>
<center><h2>Part III - Recommender Systems (3h + 3h home work)</h2></center>

# Outline

In this Lab, we will investigate some gradient-based algorithms on the very well known matrix factorization problem which is the most prominent approach for build a _Recommender Systems_.

Our goal is to implement Large-Scale Matrix Factorization with Distributed Stochastic Gradient Descent in Spark.

# Problem Formulation

The problem of matrix factorization for collaborative filtering captured much attention, especially after the [Netflix prize](https://datajobs.com/data-science-repo/Recommender-Systems-%5BNetflix%5D.pdf). The premise behind this approach is to approximate a large rating matrix $R$ with the multiplication of two low-dimensional factor matrices $P$ and $Q$, i.e. $R \approx \hat{R} = P^TQ$, that model respectively users and items in some latent space. For instance, matrix $R$ has dimension $m \times  n$ where $m$ and $n$ are restrictively the number of users and items, both large; while $P$ has size $m \times  k$ and contains user information in a latent space of size $k<<m,n$, $Q$ has size $n\times k$ and contains item information in the same latent space of size $k << m,n$. Typical values for $m, n$ are $10^6$ while $k$ is in the tens.

For a pair of user and item $(u_i,i_j)$ for which a rating $r_{ij}$ exists, a common approach approach is based on the minimization of the $\ell_2$-regularized quadratic error:
$$  \ell_{u_i,i_j}(P,Q)= \left(r_{ij} - p_{i}^{\top}q_{j}\right)^2 + \lambda(|| p_{i} ||^{2} + || q_{j} ||^2 )  $$
where $p_i$ is the column vector composed of the $i$-th line of $P$ and  $\lambda\geq 0$ is a regularization parameter. The whole matrix factorization problem thus writes
$$ \min_{P,Q} \sum_{i,j : r_{ij} \text{exists}}  \ell_{u_i,i_j}(P,Q). $$
Note that the error $ \ell_{u_i,i_j}(P,Q)$ depends only on $P$ and $Q$ through $p_{i}$ and $q_{j}$; however, item $i_j$ may also be rated by user $u_{i'}$ so that the optimal factor $q_{j}$ depends on both $p_{i}$ and $p_{i'}$.

In [1]:
# set up spark environment (Using Spark Local Mode)
from pyspark import SparkContext, SparkConf

conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("MSIAM part III - Matrix Factorization")

sc = SparkContext(conf = conf)

We remind you that you can access this interface by simply opening http://localhost:4040 in a web browser.

We will capitalize on the first lab and take the MovieLens dataset, and thus the RDD routines we already have.

In [2]:
def parseRating(line):
    fields = line.split('::')
    return int(fields[0]), int(fields[1]), float(fields[2])

def parseMovie(line):
    fields = line.split("::")
    return int(fields[0]), fields[1], fields[2]

# path to MovieLens dataset
movieLensHomeDir="data/movielens/medium/"


# movies is an RDD of (movieID, title, genres)
moviesRDD = sc.textFile(movieLensHomeDir + "movies.dat").map(parseMovie).setName("movies").cache()

# ratings is an RDD of (userID, movieID, rating)
ratingsRDD = sc.textFile(movieLensHomeDir + "ratings.dat").map(parseRating).setName("ratings").cache()

numRatings = ratingsRDD.count()
numUsers = ratingsRDD.map(lambda r: r[0]).distinct().count()
numMovies = ratingsRDD.map(lambda r: r[1]).distinct().count()
print("We have %d ratings from %d users on %d movies.\n" % (numRatings, numUsers, numMovies))

M = ratingsRDD.map(lambda r: r[0]).max()
N = ratingsRDD.map(lambda r: r[1]).max()
matrixSparsity = float(numRatings)/float(M*N)
print("We have %d users, %d movies and the rating matrix has %f percent of non-zero value.\n" % (M, N, 100*matrixSparsity))

We have 1000209 ratings from 6040 users on 3706 movies.

We have 6040 users, 3952 movies and the rating matrix has 4.190221 percent of non-zero value.



#  Gradient Descent Algorithms

The goal here is to 
1. Compute gradients of the loss functions.
2. Implement gradient algorithms.
3. Observe the prediction accuracy of the developed methods.

__Question 1__

> Split (ramdomly) the dataset into training versus testing sample. We learn over 70% (for example) of the users, we test over the rest.

> Define a routine that returns the predicted rating from factor matrices. Form a RDD with the following elements `(i,j,true rating,predicted rating)`. 

> Define a routine that returns the Mean Square Error (MSE).


In [3]:
import numpy as np

trainingSample, testingSample = ratingsRDD.randomSplit([70, 30])
print("The number of examples in the dataset : ",ratingsRDD.count())
print("The number of examples in the training dataset : ",trainingSample.count())
print("The number of examples in the testing dataset : ",testingSample.count())

The number of examples in the dataset :  1000209
The number of examples in the training dataset :  699742
The number of examples in the testing dataset :  300467


In [4]:
# Create moviesGenresRDD, a new RDD of (genre, list_of_movies)
genres = moviesRDD.map(lambda x : x[2]).flatMap(lambda x : x.split("|")).distinct()

def getGenresOfMovie(movie):
    return moviesRDD.filter(lambda x : x[1] == movie).map(lambda y : y[2]).flatMap(lambda x : x.split("|")).collect()
    
def getMoviesFromGenre(genre):
    return moviesRDD.filter(lambda x : genre in x[2]).map(lambda y : y[1]).collect()

genres_dict = {}
for g in genres.collect() :
    genres_dict_temp = {g:getMoviesFromGenre(g)}
    genres_dict.update(genres_dict_temp)

moviesGenresRDD = genres.map(lambda genre : (genre,genres_dict[genre]))

##TEST
#g = getGenresOfMovie("Pocahontas (1995)")
#print("Genres of movie \"Pocahontas (1995)\":\n",g,"\n")
#m = getMoviesFromGenre("Comedy")
#print("Five movies from genre \"Comedy\":\n",np.asarray(m)[:5],"\n")
#print("There are",genres.count(),"different genres:\n",genres.collect(),"\n")
#print("Check from dictionnary:\n",len(genres_dict.keys()),"Genres:\n",genres_dict.keys(),"\n")
#print("Number of movies having \"Comedy\" in their genres :",moviesRDD.filter(lambda x : "Comedy" in x[2]).count())
#print("Check from dictionnary:",len(genres_dict["Comedy"]))
#print("Number of movies having",moviesGenresRDD.first()[0],"in their genres :",moviesRDD.filter(lambda x : "Musical" in x[2]).count())
#print("Check from RDD, the number of movies having",moviesGenresRDD.first()[0],"in their genres:",len(moviesGenresRDD.first()[1]))

#genres_dict.clear()

In [22]:
# Compute Pu

movies_dict = {}
for i,title in moviesRDD.map(lambda x : (x[0],x[1])).collect():
    movies_dict_temp = {i:title}
    movies_dict.update(movies_dict_temp)

def update_factors(movieIdListOfGenres,rating,p_u):
    for f in movieIdListOfGenres:
        p_u[f] = (p_u[f][0]+1, p_u[f][1] + rating)
    return p_u

def compute_final_p_u(p_u):
    for f in genres.collect():
        if p_u[f][0] != 0:
            p_u[f] = (p_u[f][0]+1, p_u[f][1] / p_u[f][0])
    return p_u

def get_preferences(user,p_u):
    pref = []
    for g in genres.collect():
        if p_u[g][0]>0:
            pref.append(g)
    return pref
            

def get_genres_list_of_movieId(movie):
    movieTitle = movies_dict[movie]
    #print("movie (id =",movie,"):",movieTitle)
    genreslist = moviesRDD.map(lambda x : x[2]).flatMap(lambda x : x.split("|")).distinct()
    return getGenresOfMovie(movieTitle)
    
def computePu(user,debug):
    p_u = {}
    for g in genres.collect() :
        p_u_temp = {g:(0,0)} # {genre, (number of movie with that genre rated; sum of eatings; average)}
        p_u.update(p_u_temp)
    userRatingsRDD = trainingSample.filter(lambda x : x[0] == user)
    userMoviesList = userRatingsRDD.map(lambda x : x[1]).collect()
    userRatingsList = userRatingsRDD.map(lambda x : x[2]).collect()
    if debug!=0:
        print("Number of movies rated:",len(userMoviesList))
        print("Movies rated:",userMoviesList)
    for l in range(0,len(userMoviesList)):
        movieIdListOfGenres = get_genres_list_of_movieId(userMoviesList[l])
        rating = userRatingsList[l]
        p_u = update_factors(movieIdListOfGenres,rating,p_u)
    p_u = compute_final_p_u(p_u)
    if debug!=0:
        print("User preferences :\n",get_preferences(user,p_u))
    return p_u
    
usersList = trainingSample.map(lambda r: r[0]).distinct().collect()

In [23]:
# Compute Qi

def occurance(_list,x):
    nb=0
    for i in _list:
        if i == x:
            nb = nb + 1
    return nb

def parseGenres(genres):
    fields = genres.split("|")
    return fields

a = moviesRDD.map(lambda x : x[2]).distinct().map(lambda x : parseGenres(x))

def brothers(genre):
    brothersList = []
    for genresList in a.collect():
        if genre in genresList:
            for newGenre in genresList:
                if newGenre not in brothersList:
                    brothersList.append(newGenre)
    return brothersList
    
def notBrothers(genre):
    notBrothersList = []
    for g in genres.collect():
        if genre not in brothers(g):
            notBrothersList.append(g)      
    return notBrothersList

def computeQi(item,debug):
    q_i = {}
    for g in genres.collect() :
        q_i_temp = {g:0} 
        q_i.update(q_i_temp)
        
    itemGenres = get_genres_list_of_movieId(item)
    _list = []
    final_list = []
    for genre in itemGenres:
        q_i[genre] = 1/len(itemGenres)
        for g in notBrothers(genre):
            _list.append(g)            
    for i in genres.collect():
        if (occurance(_list,i)==len(itemGenres)):
            final_list.append(i)
    
    for j in final_list:
        q_i[j] = -1/(len(final_list)+len(itemGenres))
    return q_i       
    
itemsList = moviesRDD.map(lambda r: r[0]).distinct().collect()

In [24]:
# Compute predictive rating
def dict2list(dic,col):

    index = 0
    _list = np.zeros(18)
    for f in genres.collect():
        if col != 0:
            _list[index] = dic[f][col]
        else:
            _list[index] = dic[f]
        index = index + 1
    return _list

def computePS_u_i(item,user,debug):
    if debug!=0:
        print("All genres:\n",genres.collect(),"\n")
        print("User =",user) 
        print("Movie =",item,":",movies_dict[item])

    Pu = dict2list(computePu(user,debug),1)
    Qi = dict2list(computeQi(item,debug),0)

    if debug!=0:
        print("\nPu = \n",Pu) 
        print("\nQi =\n",Qi)

    ps = np.vdot(Pu,Qi)
    if debug!=0:
        print("\nDot product =",ps)

    realRating = trainingSample.filter(lambda x : x[0]==user).filter(lambda x : x[1]==item).collect()
    if debug!=0:
        print("True rating =",realRating)
    return ps

#ps = computePS_u_i(1378,453,1)
#ps = computePS_u_i(2692,68,1)
#print(ps)

In [52]:
# Build factorization matrix

m = len(usersList)
n = len(itemsList)
m = 5
n = 5

def buildFactMatrix(n,m):
    print("m =",m,"n =",n)
    factMatrix = np.eye(m,n)
    for u in range(0,m):
        for i in range(0,n):
            item = itemsList[i]
            user = usersList[u]
            factMatrix[u][i] = computePS_u_i(item,user,0)
            print(round(factMatrix[u][i],1),end = ' ')
        print("\n",end = '')
    return factMatrix

def MSE(user,item,lamb):
    pi = []
    qj = []
    #pi = dict2list(computePu(usersList[i],0),1)
    pi = dict2list(computePu(user,0),1)
    print("pi =\n",pi,"\n")
    #qj = dict2list(computeQi(itemsList[j],0),0)
    qj = dict2list(computeQi(item,0),0)
    print("qj =\n",qj,"\n")
    #rij = trainingSample.filter(lambda x : x[0] == usersList[i]).filter(lambda y : y[1] == itemsList[j]).collect()
    rij = trainingSample.filter(lambda x : x[0] == user).filter(lambda y : y[1] == item).map(lambda z : z[2]).collect()[0]
    rijHat = np.vdot(pi,qj)
    print("Real rating :",rij)
    print("Predicted rating :",rijHat)
    e = pow(rij - rijHat,2) + lamb * (pow(np.linalg.norm(pi,2),2) + pow(np.linalg.norm(qj,2),2))
    return e

print("Mean Square Error =", MSE(1,1,0))

pi =
 [ 4.45454545  4.          0.          4.3125      0.          4.5
  3.66666667  4.          4.          0.          3.75        3.66666667
  0.          4.          5.          4.125       0.          4.09090909] 

qj =
 [ 0.          0.          0.          0.33333333  0.          0.          0.
  0.          0.          0.          0.          0.          0.          0.
  0.          0.33333333  0.          0.33333333] 

Real rating : 5.0
Predicted rating : 4.17613636364
Mean Square Error = 0.678751291322


$$  \ell_{u_i,i_j}(P,Q)= \left(r_{ij} - p_{i}^{\top}q_{j}\right)^2 + \lambda(|| p_{i} ||^{2} + || q_{j} ||^2 )  $$

__Question 2__

> Derive the update rules for gradient descent. 

> Implement a (full) gradient algorithm in `Python` on the training set.  Take a step size (learning rate) $\gamma=0.001$ and stop after a specified number of iterations. Investigate the latent space size (e.g. $K=2,5,10,50$).

> Provide plots and explanations for your experiments. 

> Try to parrallelize it so that the code can be run using `PySpark`. What do you conclude?

Stochastic Gradient Descent (SGD) simply does away with the expectation in the update and computes the gradient of the parameters using only a single or a few training examples. In SGD the learning rate $\gamma$ is typically much smaller than a corresponding learning rate in batch gradient descent because there is much more variance in the update.

__Question 3__
> Implement stochastic gradient descent algorithm for Matrix Factorization.

> Provide plots and explanations for your experiments.

> Compare and discuss the results with the (full) gradient algorithm in terms of MSE versus full data passes.

> Discuss the stepsize choice of SGD (e.g. constant v.s. 1/`nb_iter`).

Now we will implement Large-Scale Matrix Factorization with Distributed Stochastic Gradient Descent (DSGD) in Spark. 
The algorithm is described in the following article: <br \><br \>
_Gemulla, R., Nijkamp, E., Haas, P. J., & Sismanis, Y. (2011). Large-scale matrix factorization with distributed stochastic gradient descent. New York, USA._<br \><br \>
The paper sets forth a solution for matrix factorization using minimization of sum of local losses.  The solution involves dividing the matrix into strata for each iteration and performing sequential stochastic gradient descent within each stratum in parallel.  DSGD is a fully distributed algorithm, i.e. both the data matrix $R$ and factor matrices $P$ and $Q$ can be carefully split and distributed to multiple workers for parallel computation without communication costs between the workers. Hence, it is a good match for implementation in a distributed in-memory data processing system like Spark. 

__Question 4__

> Implement a `PySpark` version of DSGD.

> Test on different number of cores on a local machine (1 core, 2 cores, 4 cores). Ran the ALS method already implemented in MLlib as a reference for comparison.