In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, Rating

In [None]:
'''
u.data       :       user id | item id | rating | timestamp

u.item       :       movie id | movie title | release date | video release date |
                     IMDb URL | unknown | Action | Adventure | Animation |
                     Children's | Comedy | Crime | Documentary | Drama | Fantasy |
                     Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
                     Thriller | War | Western |
'''

In [3]:
def loadMovieNames():
    movieNames = {}
    with open("/home/jovyan/work/MoviesDataFrame/ml-100k/u.ITEM",encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [4]:
conf = SparkConf().setMaster("local[*]").setAppName("MovieRecommendationsALS")
sc = SparkContext(conf = conf)
sc.setCheckpointDir('checkpoint')

In [5]:
print("\nLoading movie names...")
nameDict = loadMovieNames()


Loading movie names...


In [6]:
data = sc.textFile("/home/jovyan/work/MoviesDataFrame/ml-100k/u.data")
data.take(10)

['0\t50\t5\t881250949',
 '0\t172\t5\t881250949',
 '0\t133\t1\t881250949',
 '196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488']

In [7]:
# ratings = idUser | IdFlim | Ratings
ratings = data.map(lambda x : x.split()).map(lambda x : Rating(int(x[0]), int(x[1]), float(x[2])))

In [8]:
# Build the recommendation model using Alternating Least Squares
print("\nTraining recommendation model...")
rank = 10
# Lowered numIterations to ensure it works on lower-end systems
numIterations = 6
model = ALS.train(ratings, rank, numIterations)


Training recommendation model...


In [9]:
userID = 0

In [10]:
print("\nRatings for user ID " + str(userID) + ":")
userRatings = ratings.filter(lambda x : x[0] == userID)
for rating in userRatings.collect():
    print(nameDict[rating[1]] + ":" + str(rating[2]))


Ratings for user ID 0:
Star Wars (1977):5.0
Empire Strikes Back, The (1980):5.0
Gone with the Wind (1939):1.0


In [11]:
print("\nTop 10 recommendations:")
recommendations = model.recommendProducts(userID, 10)
for recommendation in recommendations:
    print (nameDict[int(recommendation[1])] + \
        " score " + str(recommendation[2]))


Top 10 recommendations:
Perfect Candidate, A (1996) score 9.24707640483211
Inspector General, The (1949) score 8.288007711390177
Unzipped (1995) score 8.046732159893878
Pillow Book, The (1995) score 7.2988743510097
unknown score 7.285977638290147
National Lampoon's Senior Trip (1995) score 7.170937580010669
American Buffalo (1996) score 6.724625637161964
Gay Divorcee, The (1934) score 6.676343010872559
Great Race, The (1965) score 6.67000402892201
Army of Darkness (1993) score 6.620410418036052
