In [28]:
#import Spark packages
from __future__ import print_function

from pyspark import SparkContext

from pyspark.mllib.recommendation import ALS, Rating

import itertools
from math import sqrt
from os.path import join

In [29]:
#create an instance of SparkContext
if __name__ == "__main__":
    sc = SparkContext.getOrCreate()

In [30]:
    #read input dataset into RDD
    data = sc.textFile("C:\Users\poonam\Downloads\RecommenderSysDataset\Book-Crossings\BX-CSV-Dump\BookCrossings.txt")

In [31]:
    #count number of entries in RDD
    data.count()

1048574

In [32]:
    #read first 5 elements from RDD
    data.take(5)

[u'276725\t1\t0',
 u'276726\t2\t5',
 u'276727\t3\t0',
 u'276729\t4\t3',
 u'276729\t5\t6']

In [33]:
    bookCrossings = data.map(lambda l: l.split('\t'))

In [34]:
    bookCrossings.take(5)

[[u'276725', u'1', u'0'],
 [u'276726', u'2', u'5'],
 [u'276727', u'3', u'0'],
 [u'276729', u'4', u'3'],
 [u'276729', u'5', u'6']]

In [35]:
    #create an object of class Rating(int user, int product, double rating) 
    ratings = bookCrossings.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))

In [36]:
    ratings.first()

Rating(user=276725, product=1, rating=0.0)

In [37]:
    ratings.take(5)

[Rating(user=276725, product=1, rating=0.0),
 Rating(user=276726, product=2, rating=5.0),
 Rating(user=276727, product=3, rating=0.0),
 Rating(user=276729, product=4, rating=3.0),
 Rating(user=276729, product=5, rating=6.0)]

In [40]:
    # Split dataset into train and test set
    training, test = ratings.randomSplit([0.7,0.3])

In [41]:
    training.cache()
    test.cache()

PythonRDD[3105] at RDD at PythonRDD.scala:48

In [42]:
    training.count()

734211

In [43]:
    test.count()

314363

In [45]:
    # define parameter range for ALS algorithm
    ranks       = [5, 10]
    lambdas     = [0.1, .01]
    numIters    = [5, 10, 15]
    bestModel   = None
    bestValidationRmse = float("inf")
    bestRank    = 0
    bestLambda  = -1.0
    bestNumIter = -1

In [46]:
    # implement collaberative filtering by calling ALS.train()
    for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
        model = ALS.train(training, rank, numIter, lmbda)
        testdata = training.map(lambda p: (p[0], p[1]))
        predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
          
        # calculate RMSE
        predictionsAndRatings = training.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
        MSE = predictionsAndRatings.map(lambda r: (r[1][0] - r[1][1])**2).mean()
        validationRmse = sqrt(MSE)

        print(rank, lmbda, numIter, validationRmse)
        
        if(validationRmse < bestValidationRmse):
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lmbda
            bestNumIter = numIter

5 0.1 5 1.81138008542
5 0.1 10 1.46911842136
5 0.1 15 1.41978917603
5 0.01 5 1.82814727599
5 0.01 10 1.53291271648
5 0.01 15 1.48682610749
10 0.1 5 1.18489134483
10 0.1 10 1.03095498997
10 0.1 15 0.977239865464
10 0.01 5 1.24245724176
10 0.01 10 0.995344064048
10 0.01 15 0.925562649813


In [47]:
    print("Rank = ",bestRank,", Lambda = ",bestLambda,", Iterations =  ",bestNumIter,", RMSE = ",bestValidationRmse)

    print("RMSE for train:\t\t%.3f" % bestValidationRmse)

Rank =  10 , Lambda =  0.01 , Iterations =   15 , RMSE =  0.925562649813
RMSE for train:		0.926


In [48]:
    # train the model using optimized parameters
    bestModel = ALS.train(training, bestRank, bestNumIter, bestLambda)

In [58]:
    bestModel.recommendProducts(276729, 2)

[Rating(user=276729, product=214408, rating=19.17244874129043),
 Rating(user=276729, product=419578, rating=19.17244874129043)]

In [59]:
    bestModel.recommendUsers(2, 5)

[Rating(user=90467, product=2, rating=25.575049362820664),
 Rating(user=174834, product=2, rating=21.36798048222395),
 Rating(user=111228, product=2, rating=20.50604528447721),
 Rating(user=221967, product=2, rating=19.875966697994627),
 Rating(user=85502, product=2, rating=19.591909139276513)]

In [62]:
    predictTestdata = test.map(lambda p: (p[0], p[1]))
    testPredictions = model.predictAll(predictTestdata).map(lambda r: ((r[0], r[1]), r[2]))
    predictionsAndRatingsOnTest   = test.map(lambda r: ((r[0], r[1]), r[2])).join(testPredictions)
    testMSE = predictionsAndRatingsOnTest.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    testValidationRmse = sqrt(MSE)

In [64]:
    print("RMSE for test:\t%.10f" % testValidationRmse)

RMSE for test:	0.9255626498


In [65]:
    sc.stop()