In [1]:
import findspark
findspark.init()

from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext, SQLContext
sc = SparkContext("local", "test")
sqlContext = SQLContext(sc)

In [2]:
TRAIN_FILE = "./debug_data/ratings-train-debug.dat/"
VALIDATION_FILE = "./data/ratings-validation.dat/"
TEST_FILE = "./data/ratings-test.dat/"

In [3]:
def prepare_data(data):
    return (
        data
        .map(lambda l: l.split(','))
        .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
    )  

In [4]:
def prepare_data2(data):
    return (
        data
        .map(lambda l: l.split('::'))
        .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
    )  

In [7]:
# Load and parse the data
ratings_train_text = sc.textFile(TRAIN_FILE)
ratings_train = prepare_data2(ratings_train_text)
ratings_train.take(10)

[Rating(user=45211, product=2126, rating=4.0),
 Rating(user=45186, product=8, rating=3.0),
 Rating(user=45186, product=805, rating=5.0),
 Rating(user=45186, product=1003, rating=4.0),
 Rating(user=43733, product=3994, rating=2.0),
 Rating(user=45186, product=230, rating=4.0),
 Rating(user=45211, product=2253, rating=5.0),
 Rating(user=45186, product=367, rating=5.0),
 Rating(user=45211, product=3439, rating=5.0),
 Rating(user=45186, product=1918, rating=5.0)]

In [8]:
ratings_validation_text = sc.textFile(VALIDATION_FILE)
ratings_validation = prepare_data2(ratings_validation_text)
ratings_validation.take(10)

[Rating(user=64661, product=1100, rating=3.0),
 Rating(user=64661, product=1147, rating=4.0),
 Rating(user=64661, product=1005, rating=1.0),
 Rating(user=64661, product=999, rating=3.5),
 Rating(user=64661, product=886, rating=2.0),
 Rating(user=50311, product=1889, rating=5.0),
 Rating(user=64661, product=818, rating=3.0),
 Rating(user=64661, product=694, rating=3.0),
 Rating(user=64661, product=612, rating=1.5),
 Rating(user=64661, product=688, rating=1.0)]

In [9]:
ratings_test_text = sc.textFile(TEST_FILE)
ratings_test = prepare_data2(ratings_validation_text)
ratings_test.take(10)

[Rating(user=64661, product=1100, rating=3.0),
 Rating(user=64661, product=1147, rating=4.0),
 Rating(user=64661, product=1005, rating=1.0),
 Rating(user=64661, product=999, rating=3.5),
 Rating(user=64661, product=886, rating=2.0),
 Rating(user=50311, product=1889, rating=5.0),
 Rating(user=64661, product=818, rating=3.0),
 Rating(user=64661, product=694, rating=3.0),
 Rating(user=64661, product=612, rating=1.5),
 Rating(user=64661, product=688, rating=1.0)]

#### Calculate the general mean u for all ratings

In [10]:
global_mean = ratings_train.map(lambda r: (r[2])).mean()

In [11]:
global_mean

4.090000000000001

##### calculate item-specific bias, according to the paper we referenced, for each item i, its bias is equal to the summation of difference between all ratings of to the same item and global mean and then the result is divided by the sum of a regulation parameter and the quantity of the ratings.


In [12]:
#convert training data to dataframe with attribute
df = sqlContext.createDataFrame(ratings_train, ['userId', 'movieId', 'ratings'])

In [13]:
#sort the data by movie
df_orderByMovie = df.orderBy(df.movieId)

In [14]:
#group the movie and count each movie
movie_count = df_orderByMovie.groupBy(df_orderByMovie.movieId).count()

In [15]:
#calculate the sum of the ratings of each movie
sum_byMovie = df_orderByMovie.groupBy(['movieId']).sum()

In [16]:
#drop some unrelated column
drop_column1 = sum_byMovie.drop(sum_byMovie[1])
final_drop = drop_column1.drop(drop_column1[1])

In [17]:
#join the sum of count and sum of rating for each movie
movie_sorted = movie_count.join(final_drop, "movieId")

In [18]:
#sorted the dataset by each movie
new_movie_sorted = movie_sorted.orderBy(movie_sorted.movieId)

In [19]:
#calculate item specific bias
item_bias = new_movie_sorted.map(lambda r: [r[0], (r[2] - r[1]*global_mean)/(25+r[1])])

In [20]:
new_item_bias = sqlContext.createDataFrame(item_bias, ['movieId', 'item_bias'])

#### Caculate the user-specific bias

In [21]:
#order the training set by user
df_orderByUser = df.orderBy(df.userId)

In [22]:
#join the item bias dataset to with the same movieId
contain_itemBias = df_orderByUser.join(new_item_bias, "movieId")

In [23]:
#sorted the dataset by user
sorted_byUser = contain_itemBias.orderBy(['userId'])

In [24]:
#calculate the numerical part of item specific bais
subtraction = sorted_byUser.map(lambda r: [r[1], r[2] - global_mean - r[3]])

In [25]:
user_bias_part1 = sqlContext.createDataFrame(subtraction, ['userId', 'subtraction'])

In [26]:
sum_byUser = user_bias_part1.groupBy(['userId']).sum()

In [27]:
#count the user 
sum_UserCollect = user_bias_part1.groupBy(['userId']).count()

In [28]:
#order the data set by user
ordered_sum_UserCollect = sum_UserCollect.orderBy(sum_UserCollect.userId)

In [29]:
drop_column2 = sum_byUser.drop(sum_byUser[1])

In [30]:
final_drop2 = drop_column2.orderBy(drop_column2.userId)

In [31]:
user_bias_table = final_drop2.join(ordered_sum_UserCollect, 'userId')

In [32]:
ordered_userBiaTable = user_bias_table.orderBy(user_bias_table.userId)

In [33]:
user_bias = ordered_userBiaTable.map(lambda r: [r[0], r[1]/(10+r[2])])

In [34]:
user_specific_bias = sqlContext.createDataFrame(user_bias, ['userId', 'user_bias'])

In [35]:
merge1 = df_orderByUser.join(user_specific_bias, 'userId')

In [36]:
merge2 = merge1.join(new_item_bias, 'movieId')

In [37]:
new_ratings_train = merge2.map(lambda r: [r[0], r[1], r[2] - r[3] - r[4]])

In [38]:
temp = sqlContext.createDataFrame(new_ratings_train, ['movieId', 'userId', 'new_ratings'])

In [39]:
final_new_ratings_train = temp.orderBy(temp.userId)

In [40]:
final_new_ratings_train.take(10)

[Row(movieId=479, userId=33700, new_ratings=4.017884615384615),
 Row(movieId=3409, userId=33700, new_ratings=4.017884615384615),
 Row(movieId=1032, userId=43733, new_ratings=4.128919413919414),
 Row(movieId=3638, userId=43733, new_ratings=5.090457875457876),
 Row(movieId=2078, userId=43733, new_ratings=3.167380952380953),
 Row(movieId=2081, userId=43733, new_ratings=2.2058424908424916),
 Row(movieId=2881, userId=43733, new_ratings=5.093663003663004),
 Row(movieId=2085, userId=43733, new_ratings=3.167380952380953),
 Row(movieId=2087, userId=43733, new_ratings=4.128919413919414),
 Row(movieId=2687, userId=43733, new_ratings=3.167380952380953)]

In [41]:
#now, we perform the same procedure as task1
#first, we sort the data by timestamp. 
new_ratings_byTime = final_new_ratings_train.join(df, ['userId', 'movieId'])

In [42]:
#example of dataset
new_ratings_byTime.take(20)

[Row(userId=45211, movieId=2542, new_ratings=4.393715574548908, ratings=5.0),
 Row(userId=43733, movieId=1032, new_ratings=4.128919413919414, ratings=3.0),
 Row(userId=45186, movieId=74, new_ratings=4.008408584169454, ratings=4.0),
 Row(userId=43733, movieId=3638, new_ratings=5.090457875457876, ratings=4.0),
 Row(userId=45211, movieId=153, new_ratings=4.393715574548908, ratings=5.0),
 Row(userId=45186, movieId=2881, new_ratings=4.011613712374583, ratings=4.0),
 Row(userId=45211, movieId=2959, new_ratings=4.393715574548908, ratings=5.0),
 Row(userId=45186, movieId=2485, new_ratings=4.008408584169454, ratings=4.0),
 Row(userId=45211, movieId=2762, new_ratings=4.393715574548908, ratings=5.0),
 Row(userId=45186, movieId=292, new_ratings=4.008408584169454, ratings=4.0),
 Row(userId=45186, movieId=2094, new_ratings=4.008408584169454, ratings=4.0),
 Row(userId=45186, movieId=2496, new_ratings=3.0468701226309927, ratings=3.0),
 Row(userId=33700, movieId=479, new_ratings=4.017884615384615, rati

In [43]:
new_ratings_byTime = new_ratings_byTime.drop(new_ratings_byTime[3])

In [44]:
def prepare_validation(validation):
    return validation.map(lambda p: (p[0], p[1]))

In [45]:
import math

In [57]:
new_ratings_byTime.cache()
ratings_validation.cache()

PythonRDD[666] at RDD at PythonRDD.scala:43

In [83]:
print type(new_ratings_byTime)
print type(ratings_validation)

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.rdd.PipelinedRDD'>


In [None]:
# Evaluate the model on training data

def train_evaluate_als(train, validation, rank, iterations_num, lambda_val):
    model = ALS.train(train, rank, iterations_num, lambda_val)
    predictions = model.predictAll(prepare_validation(validation)).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndPreds = validation.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    RMSE = math.sqrt(MSE)
    return MSE, RMSE


In [None]:
ranks = [10, 20, 30, 40, 50]
lambda_values = [0.01,0.1,1.0,10.0]
ITERATIONS = 10

In [None]:
def report_mse_results(rank, lambda_value, mse, rmse):
    print("Rank=%d, Lambda=%0.2f, MSE=%s, RMSE=%s" % (rank, lambda_value, mse, rmse))

In [None]:
def evaluate_parameters(train, validation, ranks, lambda_values):
    for r in ranks:
        for l in lambda_values:
            mse, rmse = train_evaluate_als(new_ratings_byTime.rdd, validation, r, ITERATIONS, l)
            report_mse_results(r, l, mse, rmse)

In [None]:
evaluate_parameters(new_ratings_byTime, ratings_validation, ranks, lambda_values)