In [130]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pandas as pd
sc = SparkContext()
sql_sc = SQLContext(sc)

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [6]:
pd_df_ratings = pd.read_csv('C://Users/user/Desktop/ml-latest-small/ratings.csv')

In [41]:
pd_df_movies = pd.read_csv('C://Users/user/Desktop/ml-latest-small/movies.csv')

In [9]:
pyspark_df_ratings = sql_sc.createDataFrame(pd_df_ratings)

In [45]:
pyspark_df_movies = sql_sc.createDataFrame(pd_df_movies)

In [133]:
RDD_df_ratings = sc.textFile("C://Users/user/Desktop/ml-latest-small/ratings.csv")

In [134]:
RDD_df_movies = sc.textFile("C://Users/user/Desktop/ml-latest-small/movies.csv")

In [28]:
d = pyspark_df_ratings.join(pyspark_df_movies, on = 'movieId', how =  'full')

In [37]:
pyspark_df_ratings = pyspark_df_ratings.drop('Timestamp')

In [42]:
pyspark_df_ratings.show(5, truncate=False)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |31     |2.5   |
|1     |1029   |3.0   |
|1     |1061   |3.0   |
|1     |1129   |2.0   |
|1     |1172   |4.0   |
+------+-------+------+
only showing top 5 rows



In [50]:
pyspark_df_ratings.cache()
pyspark_df_movies.cache()

DataFrame[movieId: bigint, title: string]

In [48]:
pyspark_df_movies = pyspark_df_movies.drop('genres')

In [49]:
pyspark_df_movies.show(5, truncate=False)

+-------+----------------------------------+
|movieId|title                             |
+-------+----------------------------------+
|1      |Toy Story (1995)                  |
|2      |Jumanji (1995)                    |
|3      |Grumpier Old Men (1995)           |
|4      |Waiting to Exhale (1995)          |
|5      |Father of the Bride Part II (1995)|
+-------+----------------------------------+
only showing top 5 rows



In [53]:
joint_ratings_movies = pyspark_df_ratings.join(pyspark_df_movies, on='movieId')

In [54]:
joint_ratings_movies.show(5)

+-------+------+------+--------------------+
|movieId|userId|rating|               title|
+-------+------+------+--------------------+
|     31|     1|   2.5|Dangerous Minds (...|
|   1029|     1|   3.0|        Dumbo (1941)|
|   1061|     1|   3.0|     Sleepers (1996)|
|   1129|     1|   2.0|Escape from New Y...|
|   1172|     1|   4.0|Cinema Paradiso (...|
+-------+------+------+--------------------+
only showing top 5 rows



In [55]:
training_df, validation_df, test_df = joint_ratings_movies.randomSplit([0.6, 0.2, 0.2], seed = 42)

In [56]:
training_df.cache()
validation_df.cache()
test_df.cache()

DataFrame[movieId: bigint, userId: bigint, rating: double, title: string]

In [57]:
MAX_ITERATIONS = 10
REG_PARAM = 0.1
SEED_VALUE = 42

In [114]:
als = ALS(maxIter = MAX_ITERATIONS, regParam=REG_PARAM, userCol= 'userId', itemCol='movieId',ratingCol='rating')

In [115]:
als

ALS_4a6e86fda199a64a02a7

In [116]:
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

In [117]:
reg_eval.read()

<pyspark.ml.util.JavaMLReader at 0x148cfac8>

In [118]:
ranks = [1, 2, 3, 4]
errors = [0, 0, 0, 0]
models = [0, 0, 0, 0]
count = 0
min_error = float('inf')
best_rank = -1

In [119]:
for rank in ranks:
    als.setRank(rank)
    model =als.fit(training_df)
    predict_df = model.transform(validation_df)
    predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))
    error = reg_eval.evaluate(predicted_ratings_df)
    errors[count] = error
    models[count]= model
    print('Rank: %s  RMSE: %s' % (rank,error))
    
    if error < min_error:
        min_error = error
        best_rank = count
    count += 1

als.setRank(ranks[best_rank])
print('Best model: %s' % ranks[best_rank])

Rank: 1  RMSE: 0.91536003322
Rank: 2  RMSE: 0.91947340264
Rank: 3  RMSE: 0.931341261591
Rank: 4  RMSE: 0.937812092655
Best model: 1


In [120]:
best_model = models[best_rank]

In [121]:
test_predict_df = best_model.transform(test_df)

In [122]:
test_predict_df.show(10)

+-------+------+------+--------------------+----------+
|movieId|userId|rating|               title|prediction|
+-------+------+------+--------------------+----------+
|    463|    30|   4.0|Guilty as Sin (1993)| 3.4663312|
|    463|   311|   3.0|Guilty as Sin (1993)| 2.7467191|
|    471|   588|   3.0|Hudsucker Proxy, ...|  4.026064|
|    471|   126|   5.0|Hudsucker Proxy, ...| 4.1335073|
|    471|    19|   3.0|Hudsucker Proxy, ...| 3.9036355|
|    471|   309|   4.0|Hudsucker Proxy, ...| 4.3257484|
|    471|    15|   3.0|Hudsucker Proxy, ...| 2.9689806|
|    471|    73|   4.0|Hudsucker Proxy, ...|  3.865151|
|    471|   487|   4.0|Hudsucker Proxy, ...|  4.436184|
|    471|   508|   4.0|Hudsucker Proxy, ...| 4.3334002|
+-------+------+------+--------------------+----------+
only showing top 10 rows



In [123]:
test_predict_df = test_predict_df.filter(test_predict_df.prediction != float('nan'))


In [124]:
test_RMSE = reg_eval.evaluate(test_predict_df)
print('RMSE on test set: {0}'.format(test_RMSE))

RMSE on test set: 0.918710445384
