In [None]:
from pyspark.mllib.recommendation import ALS
import math
from pyspark import SparkContext
import os

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
datasets_path = "/content/drive/MyDrive/CineBuddy-master/datasets"

In [None]:
sc =SparkContext()

In [None]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

rating = sc.textFile(small_ratings_file)
rating_header = rating.take(1)[0]

In [None]:
rating = rating.filter(lambda line: line!=rating_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()


In [None]:
rating.take(3)


[('1', '31', '2.5'), ('1', '1029', '3.0'), ('1', '1061', '3.0')]

In [None]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

movie = sc.textFile(small_movies_file)
movie_header = movie.take(1)[0]
movie_header
movie = movie.filter(lambda line: line!=movie_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
movie.take(3)

[('1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 ('2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 ('3', 'Grumpier Old Men (1995)', 'Comedy|Romance')]

In [None]:
training_RDD, validation_RDD, test_RDD = rating.randomSplit([6, 2, 2], seed=42)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0],x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0],x[1]))

In [None]:
validation_for_predict_RDD.take(10)


[('1', '1263'),
 ('1', '1953'),
 ('2', '10'),
 ('2', '39'),
 ('2', '144'),
 ('2', '161'),
 ('2', '225'),
 ('2', '266'),
 ('2', '272'),
 ('2', '292')]

In [None]:
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

For rank 4 the RMSE is 0.940561804825594
For rank 8 the RMSE is 0.9509674656922942
For rank 12 the RMSE is 0.9464591395118106


In [None]:
rates_and_preds.take(10)

[((1, 1953), (4.0, 3.002893915790655)),
 ((2, 144), (3.0, 3.0556840050756042)),
 ((2, 272), (3.0, 3.577331225126056)),
 ((2, 292), (3.0, 3.6124014714233956)),
 ((2, 616), (3.0, 3.484478481389996)),
 ((3, 595), (2.0, 3.565013580065013)),
 ((3, 44191), (3.5, 3.721427993966543)),
 ((4, 410), (3.0, 3.5038524111025913)),
 ((4, 1210), (5.0, 4.544828637149842)),
 ((4, 1282), (5.0, 4.759239586294552))]

In [None]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9515417584748112


In [None]:
print("There are %s recommendations in the complete dataset" % (rating.count()))


There are 100004 recommendations in the complete dataset


In [None]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    sumTotal=0
    for i in range(0,nratings):
      sumTotal+=float(str(ID_and_ratings_tuple[1][i]))
    return ID_and_ratings_tuple[0], (nratings,sumTotal/nratings)

#rating ada 3 kolom, diambil movie_id dan ratingnya... untuk movie yang sama di group dari movie_id... 
#jadinya movie_id,(rating1,rating2,...)

movie_ID_with_ratings_RDD = (rating.map(lambda x: (x[1], x[2])).groupByKey()).map(lambda x: (x[0],list(x[1])))

#dicari average dari rating untuk film itu
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (int(x[0]), x[1]))


In [None]:
movie_rating_counts_RDD.take(1)


[(1129, (48, 3.3125))]

In [None]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [None]:
complete_data_with_new_ratings_RDD = rating.union(new_user_ratings_RDD)


In [None]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in %s seconds" % round(tt,3))


New model trained in 4.064 seconds


In [None]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (movie.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [None]:
complete_movies_titles =  movie.map(lambda x: (int(x[0]),x[1]))
complete_movies_titles.take(1)

[(1, 'Toy Story (1995)')]

In [None]:
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
  
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(81132, ((3.207710379005258, 'Rubber (2010)'), (1, 4.0))),
 (7020, ((3.3292780165416422, 'Proof (1991)'), (1, 4.0))),
 (204,
  ((1.6501908759434194, 'Under Siege 2: Dark Territory (1995)'), (31, 3.0)))]

In [None]:
new_user_recommendations_rating_RDD.join(complete_movies_titles).take(1)


[(1084, (3.6464361253112294, 'Bonnie and Clyde (1967)'))]

In [None]:
movie_rating_counts_RDD.take(1)


[(1129, (48, 3.3125))]

In [None]:
new_user_recommendations_rating_title_and_count_RDD.take(1)


[(81132, ((3.207710379005258, 'Rubber (2010)'), (1, 4.0)))]

In [None]:
rating_title_and_count = new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
rating_title_and_count.take(10)

[('Rubber (2010)', 3.207710379005258, (1, 4.0)),
 ('Proof (1991)', 3.3292780165416422, (1, 4.0)),
 ('Under Siege 2: Dark Territory (1995)', 1.6501908759434194, (31, 3.0)),
 ('Kate & Leopold (2001)', 1.02981923825444, (10, 2.8)),
 ('Exit Wounds (2001)', 1.081200602619809, (2, 2.25)),
 ('Career Girls (1997)', 2.0104387343658807, (3, 2.3333333333333335)),
 ('Rain (1932)', 1.5503804852018828, (1, 3.0)),
 ('"Hospital', 3.338271622711792, (1, 3.5)),
 ('Grosse Pointe Blank (1997)', 3.3707815804713697, (64, 3.96875)),
 ('"Scout', 1.227754032044226, (2, 2.5))]

In [None]:
top_movies = rating_title_and_count.filter(lambda r: r[2][0]>=25).takeOrdered(10, key=lambda x: -x[1])
print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Cinema Paradiso (Nuovo cinema Paradiso) (1989)', 4.382523039538473, (46, 4.260869565217392))
('Brokeback Mountain (2005)', 4.2157450585284835, (29, 3.6206896551724137))
('"Remains of the Day', 4.182096515359948, (46, 4.043478260869565))
('Modern Times (1936)', 4.167324500109649, (32, 4.359375))
('All About Eve (1950)', 4.1567886519244475, (38, 4.434210526315789))
('Bringing Up Baby (1938)', 4.131534398577507, (30, 4.066666666666666))
('"Room with a View', 4.104672436174003, (31, 3.8225806451612905))
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 4.059253469484851, (105, 4.20952380952381))
('"Third Man', 4.05854109321448, (38, 4.25))
('"Amelie (Fabuleux destin d\'Amélie Poulain', 4.053343805797935, (125, 4.096))


In [None]:
top_movies

[('Cinema Paradiso (Nuovo cinema Paradiso) (1989)',
  4.382523039538473,
  (46, 4.260869565217392)),
 ('Brokeback Mountain (2005)', 4.2157450585284835, (29, 3.6206896551724137)),
 ('"Remains of the Day', 4.182096515359948, (46, 4.043478260869565)),
 ('Modern Times (1936)', 4.167324500109649, (32, 4.359375)),
 ('All About Eve (1950)', 4.1567886519244475, (38, 4.434210526315789)),
 ('Bringing Up Baby (1938)', 4.131534398577507, (30, 4.066666666666666)),
 ('"Room with a View', 4.104672436174003, (31, 3.8225806451612905)),
 ('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)',
  4.059253469484851,
  (105, 4.20952380952381)),
 ('"Third Man', 4.05854109321448, (38, 4.25)),
 ('"Amelie (Fabuleux destin d\'Amélie Poulain',
  4.053343805797935,
  (125, 4.096))]

In [None]:
import pandas as pd
data=pd.DataFrame({
    'Info':top_movies
})

In [None]:
data.to_csv('Top_10_movies.csv',index=False)