In [1]:
import os, sys, numpy as np
os.environ['SPARK_HOME']="/Users/paulthompson/spark-1.6.1-bin-hadoop2.4"
sys.path.append("/Users/paulthompson/spark-1.6.1-bin-hadoop2.4/python/")
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("local").setAppName("My app").set("spark.executor.memory", "1g"))
from pyspark.sql import SQLContext
from pyspark.sql.dataframe import StructType, StructField, IntegerType, FloatType
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

In [2]:
mv_ratings = sc.textFile('/Users/paulthompson/Documents/MSAN_Files/Spr2_Distributed/HW1/movies/ratings.txt')

### Matrix Factorization Background
Start with A matrix with rows = # of users and columns = # items and values = user|item rating. We want to break A into a lower dimension 2 matrix representation: 

    UserMatrix (dim: nUsers x nfactors) * ItemMatrix (dim: nfactors x nItems)  = A
    
where nfactors is a chosen integer parameter. Nfactors could be anywhere from 2 to a much larger integer. Larger nfactors leads to larger run times). The idea is that user preferences and item characteristics can be broken down into some number of common factors.

### Alternating Least Squares (ALS)
The ALS method calculates the entries of the user matrix and item matrix by using the matrix version of the least squares regression (with a regularization term) formula to solve for each row in each matrix alternatingly. For more background go to:
http://bugra.github.io/work/notes/2014-04-19/alternating-least-squares-method-for-collaborative-filtering/

In [3]:
# Creating sparse representation of A matrix with users as rows and items as columns
user_item_ratings = mv_ratings.map(lambda line: (int(line.split(':')[0]), (int(line.split(':')[1]), line.split(':')[2])))
user_item_ratings = user_item_ratings.groupByKey()
print user_item_ratings.take(1)

[(65541, <pyspark.resultiterable.ResultIterable object at 0x106ef4d90>)]


In [4]:
# Creating sparse representation of transposed A matrix with items as rows and rows as columns
item_user_ratings = mv_ratings.map(lambda line: (int(line.split(':')[1]), (int(line.split(':')[0]), line.split(':')[2])))
item_user_ratings = item_user_ratings.groupByKey()
print item_user_ratings.take(1)

[(40964, <pyspark.resultiterable.ResultIterable object at 0x106f0ee90>)]


In [5]:
print "Number of Items (columns of A matrix):", item_user_ratings.count()
print "Number of Users (rows of A matrix):", user_item_ratings.count()

Number of Items (columns of A matrix): 10677
Number of Users (rows of A matrix): 69878


In [6]:
# User Defined Parameters
lambda_ = sc.broadcast(0.1) # Regularization parameter
n_factors = sc.broadcast(3) # nfactors of User matrix and Item matrix
n_iterations = 20 # How many times to iterate over the user and item matrix calculations.

In [7]:
# Initizializing Items Matrix (User matrix doesn't need to be initialized since it is solved for first):
Items = item_user_ratings.map(lambda line: (line[0], 5 * np.random.rand(1, n_factors.value)))

In [8]:
print Items.take(10)

[(40964, array([[ 2.10237545,  2.92632156,  4.53418413]])), (8197, array([[ 2.06417651,  2.77285996,  0.99374318]])), (7, array([[ 2.77878313,  2.60483127,  4.1951319 ]])), (32781, array([[ 1.27154652,  1.57228388,  2.67777938]])), (14, array([[ 2.62251806,  2.08303676,  1.74662785]])), (21, array([[ 2.77898971,  3.59659248,  1.77365023]])), (28, array([[ 2.37920355,  3.96306217,  2.98380573]])), (8225, array([[ 1.27728752,  1.67007072,  2.31082087]])), (35, array([[ 1.79566971,  4.9938116 ,  3.69281481]])), (8232, array([[ 4.02286348,  1.02475063,  2.45012427]]))]


In [9]:
# The item matrix is needed in all partitions when solving for rows of User matrix individually
Items_broadcast = sc.broadcast({
  k: v for (k, v) in Items.collect()
})

In [10]:
j = 0
for k, v in {k: v for (k, v) in Items.collect()}.iteritems():
    print k, v
    j+=1
    if j > 10:
        break

1 [[ 0.62579674  2.58610988  4.70051704]]
2 [[ 3.5604129   1.83881324  3.89177802]]
3 [[ 4.22688547  4.72155475  3.53695258]]
4 [[ 3.7802607   0.75787422  3.62126939]]
5 [[ 0.0843198   1.07852023  2.37622072]]
6 [[ 4.12389119  4.05307618  4.54544272]]
7 [[ 1.62389601  3.14315122  2.51636905]]
8 [[ 4.74227848  4.2887011   3.03817062]]
9 [[ 3.13062674  3.20355597  4.2367267 ]]
10 [[ 0.99020206  3.35375568  1.06835584]]
11 [[ 4.3365692   2.64394156  0.66348743]]


In [11]:
j = 0
for i in user_item_ratings.take(1)[0][1]:
    print i
    j+=1
    if j > 10:
        break

(176, u'5')
(260, u'5')
(541, u'5')
(1060, u'5')
(1073, u'3')
(1077, u'4')
(1093, u'5')
(1136, u'5')
(1196, u'5')
(1197, u'4')
(1220, u'5')


#### ALS User matrix row calculation
    Users[i] = inverse(Items*Items^T + I*lambda) * Items * A[i]^T
Where only rows of item matrix for which user i has ratings are operated upon.

In [12]:
def Update_User(userTuple):
    '''
    This function calculates (userID, Users[i]) using:
        'Users[i] = inverse(Items*Items^T + I*lambda) * Items * A[i]^T'
    Dot product calculations are done differently than normal to allow for sparsity. Rather 
    than row of left matrix times column of right matrix, sum result of column of left matrix  
    * rows of right matrix (skipping items for which user doesn't have a rating).
    '''
    Itemssquare = np.zeros([n_factors.value,n_factors.value])
    for matrixA_item_Tuple in userTuple[1]:
        itemRow = Items_broadcast.value[matrixA_item_Tuple[0]][0]
        for i in range(n_factors.value):
            for j in range(n_factors.value):
                Itemssquare[i,j] += float(itemRow[i]) * float(itemRow[j])
    leftMatrix = np.linalg.inv(Itemssquare + lambda_.value * np.eye(n_factors.value))
    rightMatrix = np.zeros([1,n_factors.value])
    for matrixA_item_Tuple in userTuple[1]:
        for i in range(n_factors.value):
            rightMatrix[0][i] += Items_broadcast.value[matrixA_item_Tuple[0]][0][i] * float(matrixA_item_Tuple[1])
    newUserRow = np.dot(leftMatrix, rightMatrix.T).T
    return (userTuple[0], newUserRow)


In [13]:
Users = user_item_ratings.map(Update_User)

In [14]:
print Users.take(1)

[(65541, array([[ 0.00258184,  0.00211084,  0.00264371]]))]


In [None]:
# The item matrix is needed in all partitions when solving for rows of User matrix individually
Users_broadcast = sc.broadcast({
  k: v for (k, v) in Users.collect()
})

In [None]:
def Update_Item(itemTuple):
    '''
    This function calculates (userID, Users[i]) using:
        'Users[i] = inverse(Items*Items^T + I*lambda) * Items * A[i]^T'
    Dot product calculations are done differently than normal to allow for sparsity. Rather 
    than row of left matrix times column of right matrix, sum result of column of left matrix  
    * rows of right matrix (skipping items for which user doesn't have a rating).
    '''
    Userssquare = np.zeros([n_factors.value,n_factors.value])
    for matrixA_user_Tuple in itemTuple[1]:
        userRow = Users_broadcast.value[matrixA_user_Tuple[0]][0]
        for i in range(n_factors.value):
            for j in range(n_factors.value):
                Userssquare[i,j] += float(userRow[i]) * float(userRow[j])
    leftMatrix = np.linalg.inv(Userssquare + lambda_.value * np.eye(n_factors.value))
    rightMatrix = np.zeros([1,n_factors.value])
    for matrixA_user_Tuple in itemTuple[1]:
        for i in range(n_factors.value):
            rightMatrix[0][i] += Users_broadcast.value[matrixA_user_Tuple[0]][0][i] * float(matrixA_user_Tuple[1])
    newItemRow = np.dot(leftMatrix, rightMatrix.T).T
    return (itemTuple[0], newItemRow)

In [None]:
Items = item_user_ratings.map(Update_Item)

In [None]:
print Items.take(1)

In [None]:
Items_broadcast = sc.broadcast({
  k: v for (k, v) in Items.collect()
})

In [None]:
def getRowSumSquares(userTuple):
    userRow = Users_broadcast.value[userTuple[0]]
    rowSSE = 0.0
    for matrixA_item_Tuple in userTuple[1]:
        predictedRating = 0.0
        for i in range(n_factors.value):
            predictedRating += userRow[0][i] * Items_broadcast.value[matrixA_item_Tuple[0]][0][i]
        SE = (float(matrixA_item_Tuple[1]) - predictedRating) ** 2
        rowSSE += SE
    return rowSSE

        

In [None]:
SSE = user_item_ratings.map(getRowSumSquares).reduce(lambda a, b: a + b)
Count = mv_ratings.count()
MSE = SSE / Count
print "MSE:", MSE


In [None]:
for iter in range(n_iterations):
    Users = user_item_ratings.map(Update_User)
    Users_broadcast = sc.broadcast({k: v for (k, v) in Users.collect()})
    Items = item_user_ratings.map(Update_Item)
    Items_broadcast = sc.broadcast({k: v for (k, v) in Items.collect()})
    SSE = user_item_ratings.map(getRowSumSquares).reduce(lambda a, b: a + b)
    MSE = SSE / Count
    print "MSE:", MSE