In [1]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import itertools
from math import sqrt
import math
from operator import add
from pyspark.mllib.recommendation import ALS
from time import time

In [2]:
#read files
mg_review = spark.read.format("com.databricks.spark.avro")\
.load("hdfs://localhost:54310/user/hduser/mg_review/part-m-00000.avro")

mg_rating_option_vote = spark.read.format("com.databricks.spark.avro")\
.load("hdfs://localhost:54310/user/hduser/mg_rating_option_vote/part-m-00000.avro")

mg_catalog_category_product_index = spark.read.format("com.databricks.spark.avro")\
.load("hdfs://localhost:54310/user/hduser/mg_catalog_category_product_index/part-m-00000.avro")

mg_catalog_product_entity_text = spark.read.format("com.databricks.spark.avro")\
.load("hdfs://localhost:54310/user/hduser/mg_catalog_product_entity_text/part-m-00000.avro")


In [3]:
# |customer_id|entity_pk_value|value|

#join two tables
cond = [mg_review.review_id == mg_rating_option_vote.review_id, mg_rating_option_vote.customer_id > 0]
joinedOne = mg_review.join(mg_rating_option_vote, cond, 'inner').select(mg_rating_option_vote.customer_id, 
                                                                     mg_review.entity_pk_value,
                                                                     mg_rating_option_vote.value
                                                                    ).collect()
#make rdd from list
rddOne = sc.parallelize(joinedOne).cache()


In [4]:
#|product_id|value|category_id| 

#join two tables
cond = [mg_catalog_category_product_index.product_id == mg_catalog_product_entity_text.entity_id]
joinedTwo = mg_catalog_category_product_index.join(mg_catalog_product_entity_text, cond, 'inner').select(
                                                                     mg_catalog_category_product_index.product_id,
                                                                     mg_catalog_product_entity_text.value,
                                                                     mg_catalog_category_product_index.category_id 
                                                                     
                                                                     
                                                                     ).distinct().collect()

joinedTwo = filter(None, joinedTwo)
#make rdd from list
rddTwo = sc.parallelize(joinedTwo).cache()

In [5]:
#### split ratings into train (60%), validation (20%), and test (20%) 

# training, validation, test are all RDDs of (customer_id, entity_pk_value, value)


#numPartitions = 4

training_RDD, validation_RDD, test_RDD = rddOne.randomSplit([6, 2, 2], seed=0)   

rating_data = rddOne.map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))

test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))


numTraining = training_RDD.count()
numValidation = validation_RDD.count()
numTest = test_RDD.count()

print ("Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest))



Training: 60135, validation: 19907, test: 19958


In [6]:
#    sc.setCheckpointDir('checkpoint/')
#    ALS.checkpointInterval = 2
seed = 5
iterations = 20
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    sc.setCheckpointDir('checkpoint/')
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
    ALS.checkpointInterval = 2
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9636851542953062
For rank 8 the RMSE is 0.9827735251062966
For rank 12 the RMSE is 0.9839469044533
The best model was trained with rank 4


In [7]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))


For testing data the RMSE is 0.9773841605064202


In [10]:
product_data = rddTwo.map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

In [12]:
product_titles = product_data.map(lambda x: (int(x[0]),x[1]))
    
print (product_titles.take(1))

[(348, 'Leather. Inside zipper. 3-button outside detail. 4.5" heel, 1" platform, 3.5" equiv. Leather insole and lining. Red sole. Made in Italy.')]


In [14]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

# groupbykey When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
product_ID_with_ratings_RDD = (rating_data.map(lambda x: (x[1], x[2])).groupByKey())
product_ID_with_avg_ratings_RDD = product_ID_with_ratings_RDD.map(get_counts_and_averages)
product_rating_counts_RDD = product_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [15]:
new_user_ID = 0

# The format of each line is (customerID, productID, rating)
# example of new user rating
new_user_ratings = [
     (0,260,4), 
     (0,1,3), 
     (0,16,3), 
     (0,25,4), 
     (0,32,4), 
     (0,335,1), 
     (0,379,1), 
     (0,296,3),
     (0,858,5), 
     (0,50,4) 
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))


New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [16]:
complete_data_with_new_ratings_RDD = rating_data.union(new_user_ratings_RDD)

In [17]:
t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 2.881 seconds


In [26]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just product IDs
# keep just those not on the ID list 
new_user_unrated_product_RDD = (complete_data_with_new_ratings_RDD.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the products
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_product_RDD)

In [27]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(product_titles).join(product_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)


[(252, ((1.9737859159948208, None), 41)),
 (252,
  ((1.9737859159948208,
    'Ultrasoft, lightweight V-neck tee. 100% cotton. Machine wash.'),
   41)),
 (252, ((1.9737859159948208, None), 41))]

In [28]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [29]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(1, key=lambda x: -x[1])

print ('TOP recommended product (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended product (with more than 25 reviews):
('Sleek and modern, our form flattering blazer carries a slightly relaxed, yet structured shape. Timeless in any time zone.', 4.043999512271116, 61)
