In [1]:
%matplotlib inline

import sys, operator, findspark
findspark.init()

import numpy as np

import pyspark
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating


sc= pyspark.SparkContext()
sqlContext = SQLContext(sc)

In [2]:
rdd = sc.textFile('ml-10M100K/ratings.dat')
rdd = rdd.map(lambda l: l.split("::")).sample(False, 0.1).sortBy(lambda x: x[3])

In [3]:
#find timestamp at 60% and 80% to split rdd
size=rdd.count()
divider_60 = rdd.map(lambda x: x[3]).take(int(0.6*size))[-1]
divider_80 = rdd.map(lambda x: x[3]).take(int(0.8*size))[-1]
train_rdd=rdd.filter(lambda x: x[3]<divider_60)
validation_rdd=rdd.filter(lambda x: x[3]>=divider_60 and x[3]<divider_80)
test_rdd=rdd.filter(lambda x: x[3]>=divider_80)

In [4]:
#drop timestamp
ratings = train_rdd.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
validation_rdd=validation_rdd.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
test_rdd=test_rdd.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

def train_model(train_rdd):
    model = ALS.train(train_rdd,rank=8,seed= None, iterations=10,lambda_ = 0.1)
    return model

In [5]:
#train model
vanilla_model=train_model(ratings)

def find_predictions_and_MSE(rdd):
    #prepare data for predictions
    rdd_for_predict = rdd.map(lambda x: (x[0], x[1]))
    
    #find predictions
    predictions = vanilla_model.predictAll(rdd_for_predict).map(lambda r: ((r[0], r[1]), r[2]))

    #compute RMSE
    ratesAndPreds = rdd.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print("Mean Squared Error " + str(MSE))

print "Validation Data"
find_predictions_and_MSE(validation_rdd)

print "Test Data"
find_predictions_and_MSE(test_rdd)

Validation Data
Mean Squared Error 1.11875060059
Test Data
Mean Squared Error 1.1339822189


<h1>Recommendation System </h1>

In [6]:
#RECOMMEND MOVIES
movies_rdd= sc.textFile('ml-latest/movies.csv')

#parse data
header = movies_rdd.take(1)[0]

# Parse
movies_rdd = movies_rdd.filter(lambda line: line!=header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2]))

movie_titles = movies_rdd.map(lambda x: (int(x[0]),x[1]))

In [7]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movieID_ratings_RDD = (rdd.map(lambda x: (int(x[1]), float(x[2]))).groupByKey())
movieID_ratings_RDD = movieID_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movieID_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [8]:
#new user ratings

new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [9]:
#add new ratings to original ratings
ratings = rdd.union(new_user_ratings_RDD)

In [10]:
#train model again
new_ratings_model= train_model(ratings.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))))

In [11]:
#get movies rated by new user
new_user_ratings = map(lambda x: x[1], new_user_ratings)

#get list of unwatched movies
unrated_movies = (movies_rdd.filter(lambda x: x[0] not in new_user_ratings).map(lambda x: (0, x[0])))

#get recommendations
recommendations = new_ratings_model.predictAll(unrated_movies)
print recommendations.take(3)

[Rating(user=0, product=384, rating=2.3813291603584616), Rating(user=0, product=4926, rating=2.0686434218891296), Rating(user=0, product=5928, rating=2.7610496247491167)]


In [12]:
# Transform recommendations into(Movie ID, Predicted Rating)
recommendations = recommendations.map(lambda x: (x.product, x.rating))
recommendations = \
    recommendations.join(movie_titles).join(movie_rating_counts_RDD)
    
#flatten it
recommendations = recommendations.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

print movie_rating_counts_RDD.take(3)

[(2048, 79), (3072, 407), (5592, 1)]


In [14]:
#get top 25 ratings
top_movies = recommendations.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))


TOP recommended movies (with more than 25 reviews):
(u'Sunshine (1999)', 4.367679651888162, 35)
(u'"Diving Bell and the Butterfly', 4.365090943548678, 38)
(u'Withnail & I (1987)', 4.325709966404039, 68)
(u'"Endless Summer 2', 4.323567863620477, 34)
(u'American Beauty (1999)', 4.2902778766523895, 2208)
(u'Eternal Sunshine of the Spotless Mind (2004)', 4.289448373623751, 859)
(u'"Big Red One', 4.280803290197588, 32)
(u'Primer (2004)', 4.273502752545498, 74)
(u'City of God (Cidade de Deus) (2002)', 4.213031537475345, 414)
(u'Nadja (1994)', 4.195803692973873, 27)
(u'Koyaanisqatsi (a.k.a. Koyaanisqatsi: Life Out of Balance) (1983)', 4.178680423650899, 130)
(u'Amazon Women on the Moon (1987)', 4.1746981760829005, 37)
(u'"Royal Tenenbaums', 4.167678196877453, 586)
(u'"Samoura\xef', 4.148188858584449, 31)
(u'8 Women (2002)', 4.135325591343863, 42)
(u'Lupin III: The Castle Of Cagliostro (Rupan sansei: Kariosutoro no shiro) (1979)', 4.13521404381985, 38)
(u'Fight Club (1999)', 4.087300470541667,