# MovieLens Latest Datasets
<br>
These datasets will change over time, and are not appropriate for reporting research results. We will keep the download links stable for automated downloads. We will not archive or make available previously released versions.<br>
<br>
Small: 100,000 ratings and 1,300 tag applications applied to 9,000 movies by 700 users. Last updated 10/2016.<br>
<br>
https://grouplens.org/datasets/movielens/

In [1]:
import os
import math
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel

In [2]:
logFile = "Jupyter/Recommendation/UserCF"
sc = SparkContext("local", "Jupyter_Spark_UserCF")
logData = sc.textFile(logFile).cache()

In [3]:
ratings_path_train = os.path.join('Data', 'ml-latest-small', 'ratings_train.csv')

ratings_data_train = sc.textFile(ratings_path_train)
ratings_header_train = ratings_data_train.take(1)[0]

ratings_data_train = ratings_data_train.filter(lambda row: row!=ratings_header_train)\
                        .map(lambda row: row.split(",")).map(lambda x: (x[0], x[1], x[2])).cache()

ratings_data_train.take(3)

[('1', '31', '2.5'), ('1', '1061', '3.0'), ('1', '1172', '4.0')]

In [4]:
ratings_path_test = os.path.join('Data', 'ml-latest-small', 'ratings_test.csv')

ratings_data_test = sc.textFile(ratings_path_test)
ratings_header_test = ratings_data_test.take(1)[0]

ratings_data_test = ratings_data_test.filter(lambda row: row!=ratings_header_test)\
                        .map(lambda row: row.split(",")).map(lambda x: (x[0], x[1], x[2])).cache()

ratings_data_test.take(3)

[('1', '1129', '2.0'), ('1', '1263', '2.0'), ('1', '1339', '3.5')]

In [5]:
# For Prediction
ratings_predict_test = ratings_data_test.map(lambda x: (x[0], x[1]))
ratings_predict_test.take(5)

[('1', '1129'), ('1', '1263'), ('1', '1339'), ('1', '1343'), ('1', '1371')]

In [6]:
for i in [2, 3, 4, 5]:
    model = ALS.train(ratings_data_train, rank=i, seed=852, iterations=10, lambda_=0.1)

    predictions = model.predictAll(ratings_predict_test).map(lambda r: ((r[0], r[1]), r[2]))

    ratings_data_predict_test = ratings_data_test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    RMSE_error = math.sqrt(ratings_data_predict_test.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
    print('rank =', i, ', RMSE =', RMSE_error)

rank = 2 , RMSE = 0.9723081251727486
rank = 3 , RMSE = 0.9864234284863788
rank = 4 , RMSE = 0.9916567716929491
rank = 5 , RMSE = 0.9978878865573982


In [7]:
i = 2

model = ALS.train(ratings_data_train, rank=i, seed=853, iterations=10, lambda_=0.1)

predictions = model.predictAll(ratings_predict_test).map(lambda r: ((r[0], r[1]), r[2]))

ratings_data_predict_test = ratings_data_test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
RMSE_error = math.sqrt(ratings_data_predict_test.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
print('rank =', i, ', RMSE =', RMSE_error)

rank = 2 , RMSE = 0.9768104957264988


In [8]:
ratings_data_predict_test.count()

37402

In [9]:
userId = 1

model.recommendProducts(userId, 10)

[Rating(user=1, product=3216, rating=3.7845114112994906),
 Rating(user=1, product=40412, rating=3.7845114112994906),
 Rating(user=1, product=3973, rating=3.6423235664183977),
 Rating(user=1, product=83318, rating=3.578340338039787),
 Rating(user=1, product=67504, rating=3.578340338039787),
 Rating(user=1, product=7064, rating=3.5176169578346617),
 Rating(user=1, product=4086, rating=3.50986179093168),
 Rating(user=1, product=5765, rating=3.4947689870179772),
 Rating(user=1, product=33171, rating=3.392569685351816),
 Rating(user=1, product=6818, rating=3.3673228430238993)]

In [10]:
movieId = 1

model.recommendUsers(movieId, 10)

[Rating(user=543, product=1, rating=5.135956505732757),
 Rating(user=289, product=1, rating=4.880314944437661),
 Rating(user=46, product=1, rating=4.866589390827418),
 Rating(user=401, product=1, rating=4.825002788182802),
 Rating(user=517, product=1, rating=4.811991645636462),
 Rating(user=296, product=1, rating=4.805640736003248),
 Rating(user=656, product=1, rating=4.772424273542519),
 Rating(user=540, product=1, rating=4.761480394409725),
 Rating(user=298, product=1, rating=4.74829685027636),
 Rating(user=568, product=1, rating=4.735384825741399)]

In [11]:
csv_ratings_data_predict_test = ratings_data_predict_test.map(lambda x: ','.join(str(d) for d in [x[0][0], x[0][1], x[1][0], x[1][1]]))
csv_ratings_data_predict_test.saveAsTextFile("Data/temp/User_CF_Test_Result")

In [12]:
# Save Model
model_path = os.path.join('Data', 'temp', 'ALS_Model')
model.save(sc, model_path)

# Load Model
load_model = MatrixFactorizationModel.load(sc, model_path)