# Part II: 

# 1. 

In [4]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkContext
sc = SparkContext()

ranks = [5,7,10,20]


In [5]:
data = sc.textFile("re_u.data")

In [6]:
data.take(5)

['196,242,3', '186,302,3', '22,377,1', '244,51,2', '166,346,1']

In [7]:
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

In [8]:
data.count()

100000

In [9]:
rank = ranks[0]
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

In [10]:
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = "+ str(MSE))

Mean Squared Error = 0.6212677632591163


Now, let us try implementing this for the different values of rank:

In [11]:
for i in range(len(ranks)):
    rank = ranks[i]
    model = ALS.train(ratings, rank, numIterations)
    
    testdata = ratings.map(lambda p: (p[0], p[1]))
    predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print("Mean Squared Error for rank "+str(rank)+" = "+str(MSE))

Mean Squared Error for rank 5 = 0.6209253729167984
Mean Squared Error for rank 7 = 0.5587300511686638
Mean Squared Error for rank 10 = 0.48536346826745874
Mean Squared Error for rank 20 = 0.30847382671479534


# 2. 

In [12]:
numIters = [2,5,10,20]
rank = 20
for i in range(len(numIters)):
    numIterations = numIters[i]
    model = ALS.train(ratings, rank, numIterations)
    
    testdata = ratings.map(lambda p: (p[0], p[1]))
    predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print("Mean Squared Error for number of iterations "+str(numIterations)+" = "+str(MSE))

Mean Squared Error for number of iterations 2 = 0.49406140047992964
Mean Squared Error for number of iterations 5 = 0.34511130849850313
Mean Squared Error for number of iterations 10 = 0.3050194481833983
Mean Squared Error for number of iterations 20 = 0.290924490131114


# 3.

In [23]:
sizes = [2000, 5000, 10000, 20000, 50000, 100000]
numIterations = 20

for i in range(len(sizes)):
    pData = data.take(sizes[i])# this returns a list
    #convert list to RDD
    pData = sc.parallelize(pData)
    ratings = pData.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
    model = ALS.train(ratings, rank, numIterations)
    
    testdata = ratings.map(lambda p: (p[0], p[1]))
    predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print("Mean Squared Error for data size of "+str(sizes[i])+" = "+str(MSE))
    

Mean Squared Error for data size of 2000 = 9.422348627972515e-05
Mean Squared Error for data size of 5000 = 0.0006496501264328184
Mean Squared Error for data size of 10000 = 0.0030010767452570077
Mean Squared Error for data size of 20000 = 0.025351257839588852
Mean Squared Error for data size of 50000 = 0.15151901742331103
Mean Squared Error for data size of 100000 = 0.2901319113611081
