In [1]:
import os 
from pyspark import SparkContext

In [2]:
# Create sparkContext
sc = SparkContext.getOrCreate()

In [3]:
# Path data
rating_data_path = "./data/ml-latest-small/ratings.csv"
rating_data = sc.textFile(rating_data_path)
# Show name attribute 
header_rating_data = rating_data.take(1)[0]
print(header_rating_data)

userId,movieId,rating,timestamp


In [4]:
# Transfer rating data to RDD
# Remove "timestamp" attribute
rdd_rating_data = rating_data.filter(lambda line : line!=header_rating_data)\
    .map(lambda line: line.split(",")).map(lambda token: (token[0], token[1], token[2])).cache()
# Show result 
rdd_rating_data.take(5)

[('1', '1', '4.0'),
 ('1', '3', '4.0'),
 ('1', '6', '4.0'),
 ('1', '47', '5.0'),
 ('1', '50', '5.0')]

In [5]:
# Read movie data and transfer data to rdd 
movie_data_path = "./data/ml-latest-small/movies.csv"
movie_data = sc.textFile(movie_data_path)

header_movie_data = movie_data.take(1)[0]
print("Header movie data\n" + header_movie_data)

rdd_movie_data = movie_data.filter(lambda line : line!=header_movie_data)\
    .map(lambda line : line.split(',')).map(lambda token: (token[0],token[1])).cache()
print("Show rdd movie data :")
rdd_movie_data.take(2)

Header movie data
movieId,title,genres
Show rdd movie data :


[('1', 'Toy Story (1995)'), ('2', 'Jumanji (1995)')]

***Build model***

In [6]:
# Split data
training_RDD, validation_RDD, test_RDD = rdd_rating_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
training_RDD.take(5)

[('1', '1', '4.0'),
 ('1', '3', '4.0'),
 ('1', '50', '5.0'),
 ('1', '157', '5.0'),
 ('1', '231', '5.0')]

In [7]:
# Training 
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    #Train model
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    # Join predict data with validation data
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    # Caculate RMSE
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s'% (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s'%best_rank)

For rank 4 the RMSE is 0.9112493060035921
For rank 8 the RMSE is 0.921153648289659
For rank 12 the RMSE is 0.9171939783197479
The best model was trained with rank 4


Phần tử thứ 3 là dự đoán của mô hình

In [8]:
predictions.take(3)

[((599, 45208), 1.6128359058406168),
 ((368, 3272), 2.225890425222745),
 ((603, 3272), 3.7075110274932532)]

Dùng tập dữ liệu dự đoán join với tập validation

In [9]:
rates_and_preds.take(3)

[((1, 1089), (5.0, 5.253537400349771)),
 ((2, 80906), (5.0, 3.600419803730944)),
 ((2, 89774), (5.0, 3.3557152415520726))]

In [10]:
# Test on test dataset with best rank
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9130530382227152


***Make recommendations with small dataset***

In [11]:
complete_movies_file = './data/ml-latest-small/movies.csv'
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the small dataset" % (complete_movies_titles.count()))

There are 9742 movies in the small dataset


Đếm số lượng xếp hạng của mỗi bộ phim

In [12]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (rdd_rating_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

Tạo ra một người dùng mới với các đá giá ở một số bộ phim

In [13]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


Thêm người dùng mới vào tập dữ liệu train

In [14]:
small_data_with_new_ratings_RDD = rdd_rating_data.union(new_user_ratings_RDD)

Train lại với tập dữ liệu mới 

In [15]:
from time import time

t0 = time()
new_ratings_model = ALS.train(small_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 18.31 seconds


Tạo ra một RDD chứa ID của các bộ phim mà người dùng mới chưa rating 

In [16]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (rdd_rating_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

Join các RDD để tạo thành RDD chứa thông tin dựa đoán rating các bộ phim của người dùng mới

In [17]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)

In [18]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [None]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

In [20]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=464, rating=2.0852014439195177)]

In [None]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('E:/TaiLieuDaiHoc/NamTu/HKI/BigDataApplicationsMachinelearningatScale/DoAn/Part1')
print(model_path)
# Save and load model
model.save(sc, model_path)
# same_model = MatrixFactorizationModel.load(sc, model_path)