# RECOMMENDER SYSTEM WITH PYSPARK

## 0. INTRODUCTION

The aim of this project is to use pyspark for the implementation of a movies recommender (or recommendation) system.<br>

For this puspose, we will use the famous MoviLens dataset.<br>
We will use the small version to avoid memory limitations on CPU implementations.

Our recommender system will be based on the alternating least squares (ALS) algorithm.

We will train our model and test it using evaluation metrics such as: RMSE, Recall and MAP.

Finally, we will use our model to recommend movies to a new user.

The structure of this notebook is as follows:

1. FIRST STEPS:<br>
    1.1) Create Spark Session<br>
    1.2) Import data_sets
2. DATA PREPROCESSING
3. RECOMMENDER SYSTEM<br>
    3.1) Data_sets: training, validation and test<br>
    3.2) Quick look at the model (ALS)<br>
    3.3) Hyperparameters tuning
4. TEST<br>
    4.1) RMSE from the test data_set<br>
    4.2) Additional metrics: Recall and MAP<br>
5. OUR RECOMMENDER SYSTEM IN ACTION
6. CONCLUSIONS
7. ACKNOWLEDGMENTS

Let's get started!

## 1. FIRST STEPS:

### 1.1) Create Spark Session

First, we will create our Spark session

In [1]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
ss

### 1.2) Import data_sets

We will import the MoviLens dataset from their web page (https://movielens.org/).

In [2]:
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [3]:
import os

datasets_path = os.path.join('.','datasets')

small_dataset_path = os.path.join(datasets_path,'ml-latest-small.zip')


In [None]:
import urllib

small_f = urllib.request.urlretrieve(small_dataset_url, small_dataset_path)

#WARNING: To be run just for the first time. Then, comment it.

In [None]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

#WARNING: To be run just for the first time. Then, comment it.

In [4]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small','ratings.csv')
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

Then, we will create an RDD for each data_set:

In [5]:
small_ratings_raw_data = ss.sparkContext.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [6]:
#Let's see the name of the columns:
small_ratings_raw_data_header

'userId,movieId,rating,timestamp'

In [7]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [8]:
small_movies_raw_data = ss.sparkContext.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

In [9]:
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

In [10]:
#Let's see the name of the columns:
small_movies_raw_data_header

'movieId,title,genres'

## 2. DATA PREPROCESSING

In [11]:
# Check whether titles are unique or not. 

In [12]:
# We will transform RDDs into DataFrames to work with them more efficiently:
#(We will take the columns we need only)

In [13]:
movies_df =  ss.createDataFrame(small_movies_data,["item", "title"])
movies_df.show(3)

+----+--------------------+
|item|               title|
+----+--------------------+
|   1|    Toy Story (1995)|
|   2|      Jumanji (1995)|
|   3|Grumpier Old Men ...|
+----+--------------------+
only showing top 3 rows



In [14]:
ratings_df =  ss.createDataFrame(small_ratings_data,["user", "item", "rating"])

In [15]:
movies_df.count()

9742

In [16]:
movies_df.select(['item']).distinct().count()

9742

In [17]:
movies_df.select(['title']).distinct().count()

9624

There are more number of different movies than corresponding titles' .... what means that some movies have more than one key.<br>
Keys must be unique, so let's solve this problem:

In [18]:
#First, we will create a column with unique records of each movie:

movies_unique_titles = movies_df.select(['title']).distinct()
movies_unique_titles.count()

9624

In [19]:
#Then, we will index the movie titles with a consecutive serie of numbers (using SQL):

In [20]:
movies_unique_titles.registerTempTable('movies_unique_titles_table')

In [21]:
movies_index = ss.sql('select row_number() over (order by "title") as itemId, * from movies_unique_titles_table')

In [22]:
movies_index.show(5)

+------+--------------------+
|itemId|               title|
+------+--------------------+
|     1|    Fair Game (1995)|
|     2| If Lucy Fell (1996)|
|     3|           "Birdcage|
|     4| Three Wishes (1995)|
|     5|Heavenly Creature...|
+------+--------------------+
only showing top 5 rows



In [23]:
#Check control: All the movies are included
movies_index.count()

9624

In [24]:
#Check control: There is one and only one key for each movie
movies_index.select('itemId').distinct().count()

9624

In [25]:
# We will create a dataframe with the correspondence between the original indexes 
#(which include some different indexes matching the same movies) and the new unique index for each movie:

movies_corr = movies_df.join(movies_index,on=['title'],how='left')

In [26]:
movies_corr.show(5)

+--------------------+-----+------+
|               title| item|itemId|
+--------------------+-----+------+
|"Abominable Dr. P...| 4195|    24|
|           "Birdcage|  141|     3|
|      "International|66198|    43|
|              "Jetée| 8477|    28|
| "Life Less Ordinary| 1658|    13|
+--------------------+-----+------+
only showing top 5 rows



In [27]:
#Check control: All the original indexes are included
movies_corr.count()

9742

In [28]:
#Check control: All the unique new keys are included
movies_corr.select(['itemId']).distinct().count()

9624

In [29]:
#We will use this correspondence dataframe to double check whether there are more that one original index
#for some movies.
#For this purpose, we create a RDD with the new indexes and their corresponding list of repeated original
#indexes

In [30]:
movies_duplicates = movies_corr.rdd.map(lambda x: (x[2],x[0])).groupByKey()

In [31]:
#Check Control: the number of rows of this RDD must be the same as the number of new indexes
movies_duplicates.count()

9624

In [32]:
#Then, we will define a function to count the number of original indexes per movie (and new index)

In [33]:
def count_duplicates(movies_list):
    return int(len(movies_list[1])), int(movies_list[0]), movies_list[1]

movie_duplicates_count = movies_duplicates.map(count_duplicates)


In [34]:
#And we will use these information to build a dataframe with:
#the new index, their corresponding number of repeated original indexes and the title of their 
#related movies.
movie_duplicates_count_df = movie_duplicates_count.toDF(['duplic','id','movies'])

#We will filter those movies with more that one original index:
movie_duplicates_count_df.filter(movie_duplicates_count_df['duplic']>1).take(6) #We show a sample of 6 movies

[Row(duplic=2, id=5200, movies=Row(data=['"Signal', '"Signal'], index=0, maxindex=2)),
 Row(duplic=2, id=1803, movies=Row(data=['"Bourne Identity', '"Bourne Identity'], index=0, maxindex=2)),
 Row(duplic=2, id=7804, movies=Row(data=['"Ladykillers', '"Ladykillers'], index=0, maxindex=2)),
 Row(duplic=2, id=205, movies=Row(data=['"Apartment', '"Apartment'], index=0, maxindex=2)),
 Row(duplic=2, id=7610, movies=Row(data=['"Sex', '"Sex'], index=0, maxindex=2)),
 Row(duplic=2, id=212, movies=Row(data=['"Thing', '"Thing'], index=0, maxindex=2))]

In [35]:
#We can see that we were right!

In [36]:
# Finally, we will include a column with the new keys on the 'ratings' data_frame:

In [37]:
#Let's have a look at the number of rows of the 'ratings' data_frame:
ratings_df.count()

100836

In [38]:
#We add the new_key column ('itemId'):
ratings_corrected_df = ratings_df.join(movies_corr, on=['item'], how='left')

#And we check whether every original row is still there
ratings_corrected_df.count()

100836

In [39]:
#Let's check how many new keys are included on the new column
ratings_corrected_df.select(['itemId']).distinct().count()

9607

In [40]:
#The number is lower than the total number of movies because there are movies that have never
#been rated by any user 

In [41]:
#Taking a look at the new data_frame for ratings:
ratings_corrected_df.show(5)

+------+----+------+--------------------+------+
|  item|user|rating|               title|itemId|
+------+----+------+--------------------+------+
|100553| 105|   4.5|Frozen Planet (2011)|  5659|
|100553| 318|   4.5|Frozen Planet (2011)|  5659|
|102684| 249|   3.5|Only God Forgives...|  9613|
|102684| 380|   4.0|Only God Forgives...|  9613|
|  1090|   1|   4.0|      Platoon (1986)|  9413|
+------+----+------+--------------------+------+
only showing top 5 rows



### Final data_sets:

In [43]:
# We cache the corrected data_sets as they are the data_sets we are going to use in this project:

In [44]:
# Our list of movies:
movies_index.cache()

DataFrame[itemId: int, title: string]

In [45]:
# Our list of ratings: 
#We keep the columns we really need only (user, itemId and rating)

ratings_corrected_final_rdd = ratings_corrected_df.rdd.map(lambda x: (int(x[1]),int(x[4]),float(x[2]))).cache()

In [46]:
# Unpersist cached RDDs that are not be used in this project anymore:

In [47]:
small_movies_data.unpersist()

PythonRDD[7] at RDD at PythonRDD.scala:53

In [48]:
small_ratings_data.unpersist()

PythonRDD[3] at RDD at PythonRDD.scala:53

## 3. RECOMMENDER SYSTEM

We will base our recommendations on collabotative filtering  and matrix factorization.

Spark.ml supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. <br>
Spark.ml uses the alternating least squares (ALS) algorithm to learn these latent factors.



In [49]:
# ALS implementation:

In [50]:
#Let's have a look at our data_set again:
ratings_corrected_final_rdd.take(3)

[(105, 5659, 4.5), (318, 5659, 4.5), (249, 9613, 3.5)]

### 3.1) Data_sets: training, validation and test

In [51]:
#Create train and test data_sets:

training_RDD, test_RDD = ratings_corrected_final_rdd.randomSplit([8, 2], seed=42)
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

#Need to cache the data to speed up training
training_RDD.cache()
test_RDD.cache()

## NOTE: We will carry out the validation process using cross_validation technique on the training data_set.

PythonRDD[324] at RDD at PythonRDD.scala:53

In [52]:
test_RDD.take(3)

[(59, 9413, 4.0), (64, 9413, 5.0), (83, 9413, 1.5)]

In [53]:
test_for_predict_RDD.take(3)

[(59, 9413), (64, 9413), (83, 9413)]

In [54]:
#Check control: Distribution of the original data_set:
print(training_RDD.count(),test_RDD.count())


80671 20165


### 3.2) Quick look at the model (ALS)

In [55]:
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating

seed = 42
iterations = 10
regularization_parameter = 0.1
rank = 5 

model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)



In [56]:
#Examine the latent features for one product
model.productFeatures().first()

(100,
 array('d', [-0.0350075326859951, -0.18089444935321808, 0.9677019715309143, -0.12788857519626617, -1.0125657320022583]))

In [57]:
#Examine the latent features for one user
model.userFeatures().take(1)

[(100,
  array('d', [0.41765549778938293, 0.7986670136451721, 1.2712528705596924, -0.9706224799156189, -0.8453475832939148]))]

In [58]:
# For Product X, Find N Users to Sell To
model.recommendUsers(242,10)

[Rating(user=53, product=242, rating=3.7267663426784754),
 Rating(user=360, product=242, rating=3.5275400909342216),
 Rating(user=243, product=242, rating=3.51589959464369),
 Rating(user=393, product=242, rating=3.5013989423263823),
 Rating(user=452, product=242, rating=3.423343666810384),
 Rating(user=12, product=242, rating=3.418763211697275),
 Rating(user=154, product=242, rating=3.4111653761396212),
 Rating(user=327, product=242, rating=3.371480869339484),
 Rating(user=548, product=242, rating=3.368568316929615),
 Rating(user=441, product=242, rating=3.3492286076305833)]

In [59]:
# For User Y Find N Products to Promote
model.recommendProducts(196,10)

[Rating(user=196, product=4415, rating=4.858206214589),
 Rating(user=196, product=6026, rating=4.85741371607064),
 Rating(user=196, product=3510, rating=4.852153724357697),
 Rating(user=196, product=4701, rating=4.8187556069240465),
 Rating(user=196, product=6406, rating=4.79168014246466),
 Rating(user=196, product=158, rating=4.757341308809968),
 Rating(user=196, product=2340, rating=4.747375284990344),
 Rating(user=196, product=2921, rating=4.745887407209832),
 Rating(user=196, product=4776, rating=4.712530087031055),
 Rating(user=196, product=1954, rating=4.709794682479856)]

In [60]:
#Predict Single Product for Single User
model.predict(12, 242)

3.418763211697275

In [61]:
#Predict Single Product for Single User
model.predict(196, 2340)

4.747375284990344

### 3.3) Hyperparameters tuning:

In this section, we will try to optimize the hyperparamenters of the model.

Most important hyper-params in Alternating Least Square (ALS):

- maxIter: the maximum number of iterations to run
- rank: the number of latent factors in the model
- regParam: the regularization parameter in ALS



In [62]:
#First, we need to translate the RDD for training into a data_frame: 
training_ratings_df = ss.createDataFrame(training_RDD,["user", "item", "rating"])


In [63]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

#We set a seed just in case we need to replicate results.
seed = 42

#Building the model:
alsExplicit = ALS(userCol="user", itemCol="item", ratingCol="rating",coldStartStrategy="drop")
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

#Feeding the model with the data_frame:
defaultModel = alsExplicit.fit(training_ratings_df)

#Setting the hyperparamenters grid:
paramMapExplicit = ParamGridBuilder() \
                    .addGrid(alsExplicit.rank, [2,8, 12]) \
                    .addGrid(alsExplicit.maxIter, [5,10,20,30,50]) \
                    .addGrid(alsExplicit.regParam, [0.01,0.05,0.1,0.5,1.0,2]) \
                    .build()

#Setting metric for evaluation purposes:
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

#Setting up the cross_validated grid search of the best hyperparameter over the hyperparameters grid. 
cvExplicit = CrossValidator(seed=seed, estimator=alsExplicit, estimatorParamMaps=paramMapExplicit, evaluator=evaluatorR, numFolds=3)

#Feeding the cross_validated grid search with the data_frame:
cvModelExplicit = cvExplicit.fit(training_ratings_df)

In [64]:
### Now we are going to collect the best hyperparameters from the cross_validated grid search:

In [65]:
best = cvModelExplicit.bestModel
best.rank

2

In [66]:
metrics_cv = cvModelExplicit.avgMetrics

for i,n in enumerate(metrics_cv):
    if n == min(metrics_cv):
        best_position = i
best_param_cv = list(zip(cvModelExplicit.avgMetrics, paramMapExplicit))[best_position]

In [67]:
best_param_cv

(0.9236690660432525,
 {Param(parent='ALS_462e7cb3aa5a', name='rank', doc='rank of the factorization'): 2,
  Param(parent='ALS_462e7cb3aa5a', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_462e7cb3aa5a', name='regParam', doc='regularization parameter (>= 0).'): 0.1})

Above, we can find the best RMSE obtained in the hyperparameters optimization process and the corresponding hyperparamenters

## 4. TEST:

### 4.1) RMSE from the test data_set

In [68]:
#Now, we are going to train our ALS model with the best parameters we have gotten.
#Then, we will use this model to make predictions for the test data_set and,
#we will use the RMSE metric to evalute our model

In [69]:
from pyspark.mllib.recommendation import ALS
import math

seed = 42
best_rank = list(best_param_cv[1].values())[0] #2
best_iterations = list(best_param_cv[1].values())[1] #20
best_reg_parameter=list(best_param_cv[1].values())[2] #0.1


model = ALS.train(training_RDD, best_rank, seed=seed, iterations=best_iterations,
                      lambda_=best_reg_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8787836373243412


In [70]:
# Since the test RMSE is very similar to training's (or even better),
# It seems that the model can generalize properly.

### 4.2) Additional metrics:

We are going to use two additional metrics to check our model performance and reliability:

- **4.2.a) Recall:** normalized recall to measure the relevance of the recommendations without taking into account their order.

### $$\mathrm{recall}@N = \frac{\sum_{i=1}^N rel_i}{\mathrm{min}(N, \sum_{i\in \mathcal{I}_u} 1})$$

    This way, results are normalized to 1 always.

In [71]:
def recall_at_n(N, test, recommended, train=None):
    """
    :param N: number of recommendations
    :param test: list of movies seen by user in test
    :param train: list of movies seen by user in train. This has to be removed from the recommended list 
    :param recommended: list of movies recommended
    
    :return the recall
    """
    if train is not None: # Remove items already in train
        rec_true = [r for r in recommended if r not in train]        
    else:
        rec_true = recommended 
        
    intersection = len(set(test) & set(rec_true[:N]))
    
    return intersection / float(min(N, len(test)))

- **4.2.b) Mean Averaged Precision (MAP)**:<br>
Previous metrics did not account for the ranking of the recommendation, i.e. the relative position of a movie within the sorted list of recommendations.
MAP does.

    The Average Precision is definied as:

### $$\mathrm{AP}@N = \frac{\sum_{k=1}^N P(k) \times rel(k)}{\mathrm{min}(N, \sum_{i\in \mathcal{I}_u} 1)}$$

where $P(k)$ is the precision at cut-off in the item list, i.e. the ratio of the number of recommended items adopted, up to the position k, over the number k. Thus:

### $$\mathrm{AP}@N = \frac{\sum_{k=1}^N \left(\sum_{i=1}^k rel(i)\right)/k \times rel(k)}{\mathrm{min}(N, \sum_{i\in \mathcal{I}_u} 1)}$$

    (See http://fastml.com/what-you-wanted-to-know-about-mean-average-precision/)


Function taken from:
    https://github.com/benhamner/Metrics/blob/master/Python/ml_metrics/average_precision.py

In [72]:
def apk(N, test, recommended, train=None):
    """
    Computes the average precision at N given recommendations.
    
    :param N: number of recommendations
    :param test: list of movies seen by user in test
    :param train: list of movies seen by user in train. This has to be removed from the recommended list 
    :param recommended: list of movies recommended
    
    :return The average precision at N over the test set
    """
    if train is not None: 
        rec_true = []
        for r in recommended:
            if r not in train:
                rec_true.append(r)
    else:
        rec_true = recommended    
    predicted = rec_true[:N] # top-k predictions
    
    score = 0.0 # This will store the numerator
    num_hits = 0.0 # This will store the sum of rel(i)

    for i,p in enumerate(predicted):
        if p in test and p not in predicted[:i]: #MIO: the purpose of "not in predicted[:i]" is to ensure there isn´t any duplicate
            num_hits += 1.0
            score += num_hits/(i+1.0)


    return score / min(len(test), N)

For the implementation of both additional metrics, we need to build a data_frame that includes the following information for each user:
    - The movies with high rates (>=4) in training and test data_sets.
    - Recommended movies sorted by rating. 

In [73]:
from pyspark.sql import functions
trainUsersGrouped = training_ratings_df.filter(training_ratings_df['rating']>=4).select('user','item').groupby('user').agg(functions.collect_set('item').alias('item_list_train'))
trainUsersGrouped.show(3)

+----+--------------------+
|user|     item_list_train|
+----+--------------------+
|  26|[1827, 881, 7745,...|
| 474|[3304, 356, 9032,...|
|  29|[844, 6491, 510, ...|
+----+--------------------+
only showing top 3 rows



In [74]:
test_ratings_df = ss.createDataFrame( test_RDD, ["user", "item", "rating"])
testUsersGrouped = test_ratings_df.filter(test_ratings_df['rating']>=4).select('user','item').groupby('user').agg(functions.collect_set('item').alias('item_list_test'))
testUsersGrouped.show(3)

+----+--------------------+
|user|      item_list_test|
+----+--------------------+
| 474|[4495, 1068, 6927...|
|  29|[9358, 4938, 82, ...|
| 418|[3427, 5895, 8102...|
+----+--------------------+
only showing top 3 rows



In [75]:
def zip_col(x):
    return x[0],(x[2],x[1])

In [76]:
def item_only(x):
    return x[0], [i[1] for i in x[1]] 

In [77]:
test_preds = model.predictAll(test_for_predict_RDD)
test_predict_df = ss.createDataFrame( test_preds, ["user", "item", "rating"])

PredUsersGrouped = test_predict_df.rdd.map(zip_col).toDF(["user","item_rate"]).groupby('user').agg(functions.sort_array(functions.collect_set('item_rate'),asc=False).alias('item_list'))

PredUsersRank = PredUsersGrouped.rdd.map(item_only).toDF(["user","item_list_pred"])

PredUsersRank.show(6)

+----+--------------------+
|user|      item_list_pred|
+----+--------------------+
| 474|[6041, 4040, 9301...|
|  29|[4938, 796, 5673,...|
|  26|[7832, 2162, 1870...|
|  65|[3869, 1300, 5236...|
| 418|[7745, 400, 8102,...|
| 558|[1300, 5222, 6393...|
+----+--------------------+
only showing top 6 rows



In [None]:
# Let's check whether we have gotten what we are looking for:

In [79]:
test_predict_df.filter(test_predict_df['user'] == '65').agg({"rating":"max"}).collect()

[Row(max(rating)=4.230414561253497)]

In [80]:
PredUsersGrouped.filter(PredUsersGrouped['user']=='65').collect()

[Row(user=65, item_list=[Row(_1=4.230414561253497, _2=3869), Row(_1=4.032184338366392, _2=1300), Row(_1=3.927514728445658, _2=5236), Row(_1=3.8808185829599324, _2=6570), Row(_1=3.865653497051426, _2=3368), Row(_1=3.820906888708919, _2=4266), Row(_1=3.7658954913448497, _2=2222), Row(_1=3.6476676693615673, _2=6690)])]

In [82]:
dataForMetrics = trainUsersGrouped.join(testUsersGrouped, on=['user']).join(PredUsersRank, on=['user'])

dataForMetrics.show(5)

+----+--------------------+--------------------+--------------------+
|user|     item_list_train|      item_list_test|      item_list_pred|
+----+--------------------+--------------------+--------------------+
|  29|[844, 6491, 510, ...|[9358, 4938, 82, ...|[4938, 796, 5673,...|
| 474|[3304, 356, 9032,...|[4495, 1068, 6927...|[6041, 4040, 9301...|
|  65|[1591, 3616, 4938...|[6690, 2222, 4266...|[3869, 1300, 5236...|
| 191|[7282, 6346, 7049...|[4226, 4938, 3446...|[5301, 5425, 5571...|
| 418|[9510, 5105, 1425...|[3427, 5895, 8102...|[7745, 400, 8102,...|
+----+--------------------+--------------------+--------------------+
only showing top 5 rows



This is the data_frame that includes all the data we need. <br>

Now, we will calculate metrics for the first (5,10,30, ...) recommendations provided by our model:

In [83]:
for k in [5, 10, 30, 50, 100]:
    recalls_sum, recalls_count = \
    dataForMetrics.rdd.map(lambda x: recall_at_n(k,x[2],x[3],x[1])).map(lambda x: (x,1)).reduce(lambda x,y: (x[0]+y[0],x[1]+y[1]))
    recall = recalls_sum/recalls_count
    print("recall@%s=%.3f" %(k, recall))
    
    apks_sum, apks_count = \
    dataForMetrics.rdd.map(lambda x: apk(k,x[2],x[3],x[1])).map(lambda x: (x,1)).reduce(lambda x,y: (x[0]+y[0],x[1]+y[1]))
    map_ = apks_sum/apks_count
    print("map@%s=%.3f" %(k, map_))
    

recall@5=0.801
map@5=0.689
recall@10=0.857
map@10=0.701
recall@30=0.921
map@30=0.719
recall@50=0.948
map@50=0.731
recall@100=0.973
map@100=0.743


In [84]:
# Good enought for the purpose of this notebook!

Since we have already finished the stage for model definition, we proceed to unpersist the data_frames that we are not going to use any more. 

In [85]:
training_RDD.unpersist()
test_RDD.unpersist()

PythonRDD[324] at RDD at PythonRDD.scala:53

### 5. OUR RECOMMENDER SYSTEM IN ACTION:

We are going to use our model to give recommendations to a new user:

We can give recommendations of movies with a certain minimum number of ratings. <br>
For that, we need to count the number of ratings per movie

In [86]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return int(ID_and_ratings_tuple[0]), (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (ratings_corrected_final_rdd.map(lambda x: (x[1], float(x[2]))).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))


In [87]:
movie_rating_counts_RDD.take(3)

[(9200, 1), (7600, 1), (8800, 1)]

In [88]:
# Adding new user rating:

In [89]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4),
     (0,1,3), 
     (0,16,3), 
     (0,25,4.5), 
     (0,32,4), 
     (0,335,1), 
     (0,379,1), 
     (0,296,3), 
     (0,858,5) , 
     (0,50,4) 
    ]
new_user_ratings_RDD = ss.sparkContext.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))


New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4.5), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [90]:
# Now, we add the new user to our training dataset

In [95]:
data_with_new_ratings_RDD = ratings_corrected_final_rdd.union(new_user_ratings_RDD)


In [96]:
# Then, we train our model with the slightly extended dataset

In [97]:
from pyspark.mllib.recommendation import ALS

new_ratings_model = ALS.train(data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=best_iterations, lambda_=best_reg_parameter)

In [94]:
#Before predicting rates, we are going to remove the movies whose rating was provided

In [98]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)

In [99]:
data_with_new_ratings_RDD.take(5)

[(105, 5659, 4.5),
 (318, 5659, 4.5),
 (249, 9613, 3.5),
 (380, 9613, 4.0),
 (1, 9413, 4.0)]

In [100]:
#We will make predicitons over movies included in our data_set of ratings 
#(its number is lower than the movies dataset's)

movies_training_df = data_with_new_ratings_RDD.toDF(['user','item','rating']).select('item').distinct()
movies_training_df.count()

9607

In [101]:
# We will make rating predicitions for the new user for every movie within the training data_set 
#but not rated yet.

new_user_unrated_movies_RDD = (movies_training_df.rdd.filter(lambda x: x not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

In [102]:
#Check control:
new_ratings_model.predict(0,8440)

3.1705717180966815

In [103]:
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)


In [104]:
new_user_recommendations_RDD.take(3)

[Rating(user=0, product=4056, rating=5.07765097910405),
 Rating(user=0, product=4680, rating=4.823041746720165),
 Rating(user=0, product=3016, rating=2.241652931754288)]

We have our recommendations ready!!!<br>
Now we can print out the 25 movies with the highest predicted ratings. And join them with the movies RDD to get the titles, and ratings count in order to get movies with a minimum number of counts.<br>

First we will do the join and see what the result looks like.

In [105]:
#Let's create an RDD with a list of movie's titles and their corresponding keys:
movies_title = movies_index.rdd

In [106]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_RDD.take(3)

[(4056, 5.07765097910405),
 (4680, 4.823041746720165),
 (3016, 2.241652931754288)]

In [108]:
#Then, add their corresponding title and number of times each movie's been rated:
new_user_recommendations_rating_title_and_count_RDD = \
new_user_recommendations_rating_RDD.join(movies_title).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(3050, ((3.175350789017699, 'Purgatory (1999)'), 1)),
 (2745, ((1.8920015902846288, '"Abandoned'), 1)),
 (2440, ((3.9418588837340423, 'Love Crazy (1941)'), 1))]

So we need to flat this down a bit in order to have (Title, Rating, Ratings Count).

In [109]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

new_user_recommendations_rating_title_and_count_RDD.take(3)

[('Purgatory (1999)', 3.175350789017699, 1),
 ('"Abandoned', 1.8920015902846288, 1),
 ('Love Crazy (1941)', 3.9418588837340423, 1)]

Finally, we get the highest rated recommendations for the new user, after removing movies with less than 25 ratings.

In [110]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(10, key=lambda x: -x[1])

print('TOP-10 recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))


TOP-10 recommended movies (with more than 25 reviews):
('Strangers on a Train (1951)', 5.641325674048375, 25)
('Citizen Kane (1941)', 5.435732335278956, 69)
('Old Boy (2003)', 5.15401514380396, 39)
('Easy Rider (1969)', 5.149853396471215, 29)
('Moonrise Kingdom (2012)', 5.143888156000969, 29)
('"African Queen', 5.106370572496246, 34)
('"Maltese Falcon', 5.104120647988694, 61)
('Annie Hall (1977)', 5.086362612228669, 58)
('There Will Be Blood (2007)', 5.069352695230904, 28)
('Seven Samurai (Shichinin no samurai) (1954)', 5.048519820262911, 48)


## 6. CONCLUSIONS

Our ALS model seems to provide decent recommendations to new users, given the size of the training data_set.

The performance of our recommender system could be improved by:<br>
- A larger data_set (i.e.: MoviLens provides a larger data_set along with the small one we have used in this notebook).
- More optimization of the hyperparameters of our model.
- Using a more sophisticated algorithm.

But our main target, which is its implementation with PySpark, has been met.

I hope this notebook helps to make PySpark more understandable.<br>
Thank you very much for your time!!!

## 7. ACKNOWLEDGMENTS

This notebook could be considered as a personal extension of the post by Jose A. Dianes that you can find on the following link:<br>
https://www.codementor.io/jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw

Some of my contributions consist of:
- Data Prepocessing (see '3.Data Prepocessing' section)
- Showing some functionalities of ALS from Spark.ml (see '3.2) Quick look at the model (ALS)' section).
- Hyperparameters tuning using a cross_validated grip seach strategy (see '3.3) Hyperparameters tuning' section).
- Additional evaluation metrics (Recall and MAP) for model testing purposes (see '4.2) Additional metrics: Recall and MAP' section).
- Using data_frames (in addition to RDDs) and SQL code.

