In [1]:
from time import time

tstart = time()

from math import sqrt
import pandas as pd
import numpy as np
from scipy import spatial
import scipy 

import datetime
import os


# 1,000,000 Records
Ubuntu 16

In [2]:
#Some data exploration using SparkSQL
#Source adapted from http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlCtx = SQLContext(sc)

In [3]:
# loading 1,000, 000 Movie lens data directly from github
path = "https://raw.githubusercontent.com/ppadebettu/CUNY/Master/IS_643_Recommender_Systems/Final_Project/Data/"
movies_fname = 'movies.dat'
url = path + movies_fname
pd_movies = pd.read_csv(url, sep = ":" , header = None, na_values='NaN', usecols = [0,2], names = ['movieid', 'movietitle'])

users_fname = 'users.dat'
url = path + users_fname
pd_users = pd.read_csv(url, sep = ":" , header = None, na_values='NaN', usecols = [0,2,4,6,8], 
                       names = ['userid', 'gender', 'age', 'occupation', 'zipcode'])

ratings_fname = 'ratings.dat'
url = path + ratings_fname
pd_ratings = pd.read_csv(url, sep = ":" , header = None, na_values='NaN', usecols = [0,2,4,6], 
                       names = ['userid', 'movieid', 'rating', 'timestamp'])

result = pd.merge(pd_ratings,pd_movies, on = ['movieid'] )
movielens = pd.merge(result,pd_users, on = ['userid'] )


# Code to add the new columns that will contain the context data that is obtained from the timestamp

def fdate(x):   
    return datetime.datetime.fromtimestamp(
        int(str(x['timestamp']))).strftime('%Y-%m-%d') 

def ftime(x):   
    return datetime.datetime.fromtimestamp(
        int(str(x['timestamp']))).strftime('%H:%M:%S') 

def fweekday(x):   
    
    if (datetime.datetime.fromtimestamp(int(str(x['timestamp']))).weekday() >= 4):
        return 'Weekend'
    else:
        return 'Weekday'
    
def fagegroup(x):   
    
    if (x['age'] >= 45):
        return '45+'
    
    elif (x['age'] >= 30):
        return '30-44'
    
    elif (x['age'] >= 19):
        return '19-29'
    else:
        return 'below 18' 
    
   
def ftimeofday(x): 
    
    t = datetime.datetime.fromtimestamp(int(str(x['timestamp']))).strftime('%H:%M:%S')
    
    if (t >= '23:00:00'):
        return 'night'
    
    elif (t >= '18:00:00'):
        return 'evening'
    
    elif (t >= '12:00:00'):
        return 'afternoon'
    
    elif (t >= '08:00:00'):
        return 'morning'
    
    else:
        return 'night'
    
def flocation(x):   
    
    start = datetime.datetime.strptime(x['date'], '%Y-%m-%d')
    end = datetime.datetime.strptime(x['releasedate'], '%d-%b-%Y')
    
    if start - end >= datetime.timedelta(180):
        return 'home'
    else:
        return 'theater'       
    
    
movielens['date'] = movielens.apply(fdate, axis=1)
movielens['time'] = movielens.apply(ftime, axis=1)
movielens['weekday'] = movielens.apply(fweekday, axis=1)
#pandas_df['agegroup'] = pandas_df.apply(fagegroup, axis=1)  

movielens['timeofday'] = movielens.apply(ftimeofday, axis=1)
   


In [4]:
movielens.shape

(1000209, 13)

In [5]:
movielens.head()

Unnamed: 0,userid,movieid,rating,timestamp,movietitle,gender,age,occupation,zipcode,date,time,weekday,timeofday
0,1,1193,5,978300760,One Flew Over the Cuckoo's Nest (1975),F,1,10,48067,2000-12-31,17:12:40,Weekend,afternoon
1,1,661,3,978302109,James and the Giant Peach (1996),F,1,10,48067,2000-12-31,17:35:09,Weekend,afternoon
2,1,914,3,978301968,My Fair Lady (1964),F,1,10,48067,2000-12-31,17:32:48,Weekend,afternoon
3,1,3408,4,978300275,Erin Brockovich (2000),F,1,10,48067,2000-12-31,17:04:35,Weekend,afternoon
4,1,2355,5,978824291,"Bug's Life, A (1998)",F,1,10,48067,2001-01-06,18:38:11,Weekend,evening


In [6]:
#Get movie titles
movies_titles = movielens[['movieid', 'movietitle']]
movies_titles =movies_titles.drop_duplicates()
print movies_titles.shape
movies_titles.head()

(3706, 2)


Unnamed: 0,movieid,movietitle
0,1193,One Flew Over the Cuckoo's Nest (1975)
1,661,James and the Giant Peach (1996)
2,914,My Fair Lady (1964)
3,3408,Erin Brockovich (2000)
4,2355,"Bug's Life, A (1998)"


In [7]:
movielens = movielens[['userid','movieid','rating']]

In [8]:
sqlContext = pyspark.sql.SQLContext(sc)
movie_rdd = sqlContext.createDataFrame(movielens)

In [9]:
movie_rdd.show(5)

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|     1|   1193|     5|
|     1|    661|     3|
|     1|    914|     3|
|     1|   3408|     4|
|     1|   2355|     5|
+------+-------+------+
only showing top 5 rows



In [10]:
type(movie_rdd)

pyspark.sql.dataframe.DataFrame

In [11]:
movie_rdd.take(5)

[Row(userid=1, movieid=1193, rating=5),
 Row(userid=1, movieid=661, rating=3),
 Row(userid=1, movieid=914, rating=3),
 Row(userid=1, movieid=3408, rating=4),
 Row(userid=1, movieid=2355, rating=5)]

In [12]:
#Split data into training and test datasets
training_RDD, validation_RDD, test_RDD = movie_rdd.rdd.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [13]:
test_for_predict_RDD.take(5)

[(1, 1193), (1, 1287), (1, 2804), (1, 594), (1, 919)]

In [14]:
validation_for_predict_RDD.take(5)

[(1, 3408), (1, 2355), (1, 938), (1, 2918), (1, 2018)]

In [15]:
training_RDD.take(3)

[Row(userid=1, movieid=661, rating=3),
 Row(userid=1, movieid=914, rating=3),
 Row(userid=1, movieid=1197, rating=3)]

In [16]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndpreds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(ratesAndpreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.884217682053
For rank 8 the RMSE is 0.875185078879
For rank 12 the RMSE is 0.875303351425
The best model was trained with rank 8


In [17]:
#Let's check our predictions
predictions.take(3)

[((4551, 347), 2.9735000904684004),
 ((4551, 163), 2.6672770802723273),
 ((4551, 858), 4.384040013031293)]

In [18]:
#Let's compare predictions vs actuals (ratings)
ratesAndpreds.take(3)

[((5569, 597), (5.0, 3.972421976074028)),
 ((967, 597), (5.0, 3.859913585358328)),
 ((3290, 2688), (3.0, 2.363323866272773))]

In [19]:
#Let's test the selected model
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndpreds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(ratesAndpreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.875154490879


In [20]:
#Create a list of ratings for a new user (998)
new_user_ID_1 = 998

# The format of each line is (userID, movieID, rating)
new_user_ratings_1 = [
 (998, 242, 4), # Kolya (1996)
 (998, 51, 3),  # Legends of the Fall (1994)
 (998, 465, 1),  # Jungle Book, The (1994)
 (998 , 86, 2), # Remains of the Day, The (1993)
 (998, 222, 3), # Star Trek: First Contact (1996)
 (998, 274, 4), # Sabrina (1995)
 (998, 1042, 3),  # Just Cause (1995)
 (998, 1184, 3), # Endless Summer 2, The (1994)
 (998, 265, 2), # Hunt for Red October, The (1990)
 (998, 302, 3) # L.A. Confidential (1997)
]
new_user_ratings_RDD_1 = sc.parallelize(new_user_ratings_1)
print 'New user ratings: %s' % new_user_ratings_RDD_1.take(10)

New user ratings: [(998, 242, 4), (998, 51, 3), (998, 465, 1), (998, 86, 2), (998, 222, 3), (998, 274, 4), (998, 1042, 3), (998, 1184, 3), (998, 265, 2), (998, 302, 3)]


In [21]:
# Merge new user ratings to the existing RDD
data_with_new_ratings_RDD = movie_rdd.rdd.union(new_user_ratings_RDD_1)

In [22]:
#Train the ALS model using new dataset and all the parameters we selected before
from time import time

t0 = time()
new_ratings_model = ALS.train(data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

New model trained in 18.885 seconds


In [23]:
#Getting top recommendations
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings_1) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = movie_rdd.rdd.filter(lambda x: x[0] not in new_user_ratings_ids)\
                               .map(lambda x:(new_user_ID_1, x[0]))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [24]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
# Use distinct() here
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.distinct().map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_RDD.take(3)

[(1783, 3.060338282873792),
 (2575, 3.7763104345544116),
 (2253, 2.7780416703225352)]

In [25]:
#Get movie titles
movies = sqlContext.createDataFrame(movies_titles)
print movies.take(3)
m = movies.map(lambda x: (int(x[0]),x[1]))

[Row(movieid=1193, movietitle=u"One Flew Over the Cuckoo's Nest (1975)"), Row(movieid=661, movietitle=u'James and the Giant Peach (1996)'), Row(movieid=914, movietitle=u'My Fair Lady (1964)')]


In [26]:
m.take(2)

[(1193, u"One Flew Over the Cuckoo's Nest (1975)"),
 (661, u'James and the Giant Peach (1996)')]

In [27]:
#Merge movie titles and recommendations for the new user so that the results are meaningful
#movies_titles = ratings_data.map(lambda x: (int(x[0]),x[1]))
new_user_recommendations_rating_title_RDD = new_user_recommendations_rating_RDD.join(m)
new_user_recommendations_rating_title_RDD.distinct().sortBy(lambda x: x[1][0], ascending=False).take(3)

[(572, (5.105234326202673, u'Foreign Student (1994)')),
 (1851, (5.00454326917589, u'Leather Jacket Love Story (1997)')),
 (3233, (4.8539271025667805, u'Smashing Time (1967)'))]

In [28]:
print "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
tstop = time() - tstart

print "Total run time =  %s minutes" % round(tstop/60,3)

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
Total run time =  5.402 minutes
