In [1]:
import os
import math
import time
!pip install pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.mllib.recommendation import ALS



In [2]:
# Getting the SparkContext
sc = SparkContext()
# Initializing the SQLContext
sqlContext = SQLContext(sc)
# Initializing Spark Session
spark = SparkSession \
    .builder \
    .appName("netflix-recommendation-system") \
    .getOrCreate()

In [11]:
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

user_ratings = [
    (0, 225, 4),
    (0, 322, 3),
    (0, 492, 4),
    (0, 104, 5),
    (0, 335, 3),
    (0, 640, 2),
    (0, 773, 3),
    (0, 348, 3),
    (0, 723, 2),
    (0, 354, 4),
    (0, 346, 4),
    (0, 923, 4),
    (0, 425, 3),
]

In [3]:
# Creating the Dataframe for the small dataset using SQLContext
small_file = os.path.join('ratings.csv')
small_raw_data = sc.textFile(small_file)
small_raw_data_header = small_raw_data.take(1)[0]
small_raw_data_DF = sqlContext.read.csv(small_file, header=True, inferSchema=True)
small_raw_data_DF.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
+------+-------+------+----------+
only showing top 10 rows



In [4]:
# Creating dataframe for visualization in temp table 'D'
data = sc.textFile(small_file)
data = data.filter(lambda line: line != small_raw_data_header).map(lambda line: line.split(',')). \
    map(lambda x: Row(userId=int(x[0]), movieId=int(x[1]), rating=float(x[2]), timestamp=str(x[3])))
dataDF = sqlContext.createDataFrame(data)
dataDF.registerTempTable("D")

In [5]:
# Displaying the temp table "D"
# print(spark.sql("Select * from D").show())

# Creating RDD using only userID, movieID, rating since we don't need timestamp
small_data = small_raw_data \
    .filter(lambda line: line != small_raw_data_header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (tokens[0], tokens[1], tokens[2])).cache()


In [6]:
# Creating the small dataset Dataframe
small_movies_file = os.path.join('movies.csv')
small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
small_movies_raw_data = sc.textFile(small_movies_file)
data = small_movies_raw_data.filter(lambda line: line != small_movies_raw_data_header).map(
    lambda line: line.split(',')). \
    map(lambda x: Row(movieId=int(x[0]), title=(x[1]).encode('utf-8')))
dataDF = sqlContext.createDataFrame(data)

In [7]:
# Displaying the dataframe schema
print(dataDF.select("movieId", "title").show())


+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|[54 6F 79 20 53 7...|
|      2|[4A 75 6D 61 6E 6...|
|      3|[47 72 75 6D 70 6...|
|      4|[57 61 69 74 69 6...|
|      5|[46 61 74 68 65 7...|
|      6|[48 65 61 74 20 2...|
|      7|[53 61 62 72 69 6...|
|      8|[54 6F 6D 20 61 6...|
|      9|[53 75 64 64 65 6...|
|     10|[47 6F 6C 64 65 6...|
|     11|[22 41 6D 65 72 6...|
|     12|[44 72 61 63 75 6...|
|     13|[42 61 6C 74 6F 2...|
|     14|[4E 69 78 6F 6E 2...|
|     15|[43 75 74 74 68 7...|
|     16|[43 61 73 69 6E 6...|
|     17|[53 65 6E 73 65 2...|
|     18|[46 6F 75 72 20 5...|
|     19|[41 63 65 20 56 6...|
|     20|[4D 6F 6E 65 79 2...|
+-------+--------------------+
only showing top 20 rows

None


In [8]:
# Validation datasets
training_RDD, validation_RDD, test_RDD = small_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))


In [12]:
# ALS algorithm training step
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_predictions = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_predictions.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
    errors[err] = error
    err += 1
    if error < min_error:
        min_error = error
        best_rank = rank
print('The best model was trained with rank %s' % best_rank)

The best model was trained with rank 4


In [13]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD) \
    .map(lambda r: ((r[0], r[1]), r[2]))
rates_and_predictions = test_RDD \
    .map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))) \
    .join(predictions)
error = math.sqrt(rates_and_predictions.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
print('For testing data the RMSE is %s' % error)

For testing data the RMSE is 0.9380956550634307


In [14]:
large_file = os.path.join('movies.csv')
large_raw_data = sc.textFile(large_file)
large_raw_data_header = large_raw_data.take(1)[0]

In [15]:
large_data = large_raw_data \
    .filter(lambda line: line != large_raw_data_header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), tokens[1], tokens[2])).cache()
large_titles = large_data.map(lambda x: (int(x[0]), x[1]))
print("There are %s movies in the large dataset" % (large_titles.count()))

There are 9125 movies in the large dataset


In [18]:
# Large dataset file parsing
complete_file = os.path.join('ratings.csv')
complete_raw_data = sc.textFile(complete_file)
complete_raw_data_header = complete_raw_data.take(1)[0]
complete_data = complete_raw_data \
    .filter(lambda line: line != complete_raw_data_header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))) \
    .cache()
print('There are %s recommendations in the large dataset' % (complete_data.count()))

There are 100004 recommendations in the large dataset


In [19]:
# Counts and averages of the ratings
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1])) / nratings)


In [20]:
# Mapping the ratings, avg_ratings and counts
movie_ID_with_ratings_RDD = (complete_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [21]:
# New user id
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = user_ratings

# parallelize the datasets
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 225, 4), (0, 322, 3), (0, 492, 4), (0, 104, 5), (0, 335, 3), (0, 640, 2), (0, 773, 3), (0, 348, 3), (0, 723, 2), (0, 354, 4)]


In [22]:
# Join the small data ratings and new user ratings
complete_data_with_new_ratings_RDD = small_data.union(new_user_ratings_RDD)

# Time taken to train new model
t0 = time.time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
tt = time.time() - t0
print("New model trained in %s seconds" % round(tt, 3))

New model trained in 4.891 seconds


In [23]:
# New user recommendation ratings
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)
new_user_unrated_movies_RDD = large_data \
    .filter(lambda x: x[0] not in new_user_ratings_ids) \
    .map(lambda x: (new_user_ID, x[0]))
print(new_user_unrated_movies_RDD.count())

9125


In [24]:
# Predicting the new ratings
recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
print(recommendations_RDD.take(5))

[Rating(user=0, product=1084, rating=3.1833908423799393), Rating(user=0, product=7942, rating=2.68375742232333), Rating(user=0, product=6400, rating=3.2677270046367823), Rating(user=0, product=3702, rating=3.4565404889363656), Rating(user=0, product=142192, rating=2.1915337432439177)]


In [25]:
# Transform recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
recommendations_rating_RDD = recommendations_RDD.map(lambda x: (x.product, x.rating))
recommendations_rating_title_and_count_RDD = \
    recommendations_rating_RDD.join(large_titles).join(movie_rating_counts_RDD)
print(recommendations_rating_title_and_count_RDD.take(6))


[(81132, ((3.5688057523365764, 'Rubber (2010)'), 1)), (7020, ((2.5046098641511243, 'Proof (1991)'), 1)), (204, ((3.502963439279013, 'Under Siege 2: Dark Territory (1995)'), 31)), (4992, ((3.7327699640560983, 'Kate & Leopold (2001)'), 10)), (4224, ((2.826918121274484, 'Exit Wounds (2001)'), 2)), (1596, ((0.9763275803039493, 'Career Girls (1997)'), 3))]


In [26]:
# Take and display the recommendations
recommendations_rating_title_and_count_RDD = \
    recommendations_rating_title_and_count_RDD \
        .map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
top_movies = recommendations_rating_title_and_count_RDD \
    .filter(lambda r: r[2] >= 15) \
    .takeOrdered(15, key=lambda x: -x[1])

print('Recommended movies for you:\n%s' %
      '\n'.join(map(str, top_movies)))

Recommended movies for you:
('Despicable Me (2010)', 4.887035224020957, 16)
('"Girl', 4.848748098906941, 20)
('Stardust (2007)', 4.700040856731036, 23)
('The Martian (2015)', 4.677594891349678, 25)
('Ant-Man (2015)', 4.623240453201322, 17)
('Red (2010)', 4.563298171853312, 18)
('"Amazing Spider-Man', 4.55524812413573, 20)
('How to Train Your Dragon (2010)', 4.487703458305262, 33)
('Pay It Forward (2000)', 4.457568450130303, 23)
('"Lord of the Rings: The Return of the King', 4.454542569830214, 176)
('"Shawshank Redemption', 4.4502876094974635, 311)
('"Avengers', 4.433616440885272, 46)
('Star Trek: First Contact (1996)', 4.4278923143818645, 82)
('"Lord of the Rings: The Two Towers', 4.409318166792802, 188)
('Excalibur (1981)', 4.396798019913279, 16)
