# Hotel Recomendation System

In [189]:
hotels_file_name = 'hdfs:/user/hdfs/Hotels.csv'
reviews_file_name = 'hdfs:/user/hdfs/Reviews.csv'

hotels_raw_data = sc.textFile(hotels_file_name)
hotels_raw_data_header = hotels_raw_data.take(1)[0]
reviews_raw_data = sc.textFile(reviews_file_name)
reviews_raw_data_header = reviews_raw_data.take(1)[0]

In [190]:
hotels_raw_data.take(2)

[u'Hotel_ID,Hotel_Name,Average_Score,lat,lng',
 u'0,Hotel Arena,7.7,52.36057589999999,4.915968299999999']

In [191]:
reviews_raw_data.take(4)

[u'Reviewer_ID,Hotel_ID,Reviewer_Score',
 u'16708,0,2.9',
 u'9808,0,7.5',
 u'1407,0,7.1']

In [192]:
reviews_data = reviews_raw_data.filter(lambda line: line!=reviews_raw_data_header)\
            .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [193]:
hotels_data = hotels_raw_data.filter(lambda line: line!=hotels_raw_data_header)\
            .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [194]:
hotels_data.take(3)

[(u'0', u'Hotel Arena', u'7.7'),
 (u'1', u'K K Hotel George', u'8.5'),
 (u'2', u'Apex Temple Court Hotel', u'9.2')]

In [195]:
reviews_data.take(3)

[(u'16708', u'0', u'2.9'), (u'9808', u'0', u'7.5'), (u'1407', u'0', u'7.1')]

In [196]:
print("Number of different reviwers: " + str(reviews.distinct().count()))
print("Number of different hotels: " + str(hotels.distinct().count()))
print("Number of hotels with at least one rating strictly higher than 4: " + str(reviews_data.filter(lambda x: x[2] > '4').count()))

Number of different reviwers: 328599
Number of different hotels: 1494
Number of hotels with at least one rating strictly higher than 4: 389157


In [197]:
training_RDD, validation_RDD, test_RDD = reviews_data.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [198]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank


For rank 4 the RMSE is 1.54232092523
For rank 8 the RMSE is 1.54266701814
For rank 12 the RMSE is 1.54269061797
The best model was trained with rank 4


In [199]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 1.54369699574


In [200]:
complete_hotels = hotels_data.map(lambda x: (int(x[0]),x[1]))
    
print "There are %s hotels in the complete dataset" % (complete_hotels.count())

There are 1493 hotels in the complete dataset


In [203]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

hotel_ID_with_ratings_RDD = (hotels_data.map(lambda x: (x[1], x[2])).groupByKey())
hotel_ID_with_avg_ratings_RDD = hotel_ID_with_ratings_RDD.map(get_counts_and_averages)
hotel_rating_counts_RDD = hotel_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [204]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,0,7), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,2,6), # Casino (1995)
     (0,3,4), # Leaving Las Vegas (1995)
     (0,4,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,5,5), # Flintstones, The (1994)
     (0,6,6), # Timecop (1994)
     (0,7,9), # Pulp Fiction (1994)
     (0,8,9) , # Godfather, The (1972)
     (0,9,7) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 0, 7), (0, 1, 8), (0, 2, 6), (0, 3, 4), (0, 4, 4), (0, 5, 5), (0, 6, 6), (0, 7, 9), (0, 8, 9), (0, 9, 7)]


In [205]:
complete_data_with_new_ratings_RDD = reviews_data.union(new_user_ratings_RDD)

In [206]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

New model trained in 3.482 seconds


In [207]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (hotels_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)


In [208]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_hotels).join(hotel_rating_counts_RDD)
new_user_recommendations_rating_RDD.take(3)

[(1084, 5.302013021563428),
 (1410, 6.774312647106895),
 (667, 6.963404579670925)]

In [219]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0], r[1][0]))

In [220]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

Py4JJavaError: An error occurred while calling o3320.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4132.0 failed 1 times, most recent failure: Lost task 2.0 in stage 4132.0 (TID 860, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-203-b15ac9c52042>", line 3, in get_counts_and_averages
TypeError: unsupported operand type(s) for +: 'int' and 'unicode'

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
	at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
