<a href="https://colab.research.google.com/github/patricksabry/UTS_ML2019_ID98106545/blob/master/A2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Scalable Recommendation System using ALS in Spark (PySpark)

# Introduction

In the current age of information recommendation systems are a cornerstone of majority of our online interactions with products and social networks. Hyper consumption of media and consumer products online has seen machine learning powered recommendation systems rise to prominence on platforms such as Amazon, Facebook and Netflix. By leveraging behavioral data and supervised machine learning, internet users are being served tailored, personalised experiences when shopping and consuming online media. The widespread acknowledgment of the utilitarian value-add that recommendation systems provide has led to mercurial advancements in scalable compute frameworks and parallelized machine learning techniques. One of the most notable of these is Spark and Spark MLlib, a fast and scalable unified analytics engine for big data processing and machine learning.

The following notebook outlines the implementation of a content based recommendation system using Apache Spark. Spark will be used for scalable parallel memory-based data processing. This will ensure that this algorithm implementation is highly scalable based on how large of a compute cluster it is run on. Spark's ML library will be accessed via PySpark, Spark's high level Python API, and the dataset used for the recommendation system will be the open sourced MovieLens dataset. Movies will be recommended based on collaborative filtering using an Alternate Least Squares algorithm to find similar user ratings for certain movies.


# Social and Ethical Implications of recommendation systems

Utilitarian ethics suggests that the actions of an agent or system should be taken in the best interest of happiness and wellbeing. An ethical framework predicated on virtuous value maximization is befittingly adopted when developing this recommendation system. Recommendation systems serve to create value for users of a system by intelligently suggesting items such as products or movies to a user based on the historical interactions of similar users with the system. It is plausible to suggest that the utility afforded by the recommendation system to the users outweighs the privacy concerns of collection and manipulation of behavioral user data.

In the context of this particular system, the dataset used for training the model is open sourced for academic use, abolishing concerns of privacy exploitation. When collecting and cleaning behavioral user data I believe it is imperative to clearly communicate the intention of data use with the users and model the data in a meaningful way to mutually benefit the user and the platform in question. This can be communicated through end user license agreements and terms of use, which is evident on platforms such as Amazon and Netflix.

Misplaced trust of users in a given platform's privacy policies can have detrimental social and ethical implications. This was highlighted in the Cambridge Analytica Facebook scandal, whereby personal user data was wrongfully collected and used to engineer a politically biased advertisement recommendation campaign which was used to push a political agenda. This misuse of data ultimately resulted in Facebook incurring a $5 billion fine in July 2019.

Evidently, intentionally malicious misuse of user data in building a recommendation system such as in this project should be avoided at all costs.

# Data Exploration and Pre-processing

This ALS algorithm implementation will leverage the distributed processing power of spark through the use of Resilient Distributed Datasets (RDD). The Apache foundation defines an RDD as an immutable distributed collection of objects. Each dataset is divided into logical partitions, which may be computed on different nodes of the cluster. This implementation is highly scalable and boasts data read speeds of up to 100x faster than Hadoop MapReduce.

In order to pre-process the MovieLens data it must first be imported and transformed into a spark dataframe. The dataset is comprised of two independent files 'movies.csv' and 'ratings.csv' which are hosted on this project's GitHub repository.



In [0]:
import pandas as pd
pd.set_option('expand_frame_repr', False) 

movies_url = 'https://raw.githubusercontent.com/patricksabry/UTS_ML2019_ID98106545/master/movies.csv'
ratings_url = 'https://raw.githubusercontent.com/patricksabry/UTS_ML2019_ID98106545/master/ratings.csv'
df_movies = pd.read_csv(movies_url)
df_ratings = pd.read_csv(ratings_url)

The `movies` and `ratings` dataframes must be represented as RDDs of triplets in the following format:

$R$ $\rightarrow$ RDD(($u$, $i$, $r_{ui}$), . . .)

With each row consisting of a rating $r_{ui}$ given to item $i$ by user $u$. This matrix multiplication results in a highly sparse consolidated matrix with users and items as column and row indexes respectively, and explicit item ratings as tuples. This matrix will contain null ratings due to the high sparsity of the dot product, and thus the ultimate goal of the ALS algorithm is to fill in these null values with predicted ratings.


In [21]:
df = pd.merge(df_ratings, df_movies, on='movieId')
print(df.head())

num_ratings = df.rating.count()
num_movies = df['movieId'].nunique()
num_users = df['userId'].nunique()

print("\n")
print("Observed " + str(num_ratings) + " ratings from " + str(num_users) + " users on " + str(num_movies) + " movies")

   userId  movieId  rating   timestamp             title                                       genres
0       1        1     4.0   964982703  Toy Story (1995)  Adventure|Animation|Children|Comedy|Fantasy
1       5        1     4.0   847434962  Toy Story (1995)  Adventure|Animation|Children|Comedy|Fantasy
2       7        1     4.5  1106635946  Toy Story (1995)  Adventure|Animation|Children|Comedy|Fantasy
3      15        1     2.5  1510577970  Toy Story (1995)  Adventure|Animation|Children|Comedy|Fantasy
4      17        1     4.5  1305696483  Toy Story (1995)  Adventure|Animation|Children|Comedy|Fantasy


Observed 100836 ratings from 610 users on 9724 movies


# Implementation Methodology

The system will strive to learn latent factors by minimising an error measure in respect to a source truth ratings matrix using the Alternate Least Squares (ALS) algorithm which will be fitted on the pre-processed data.

The ALS algorithm can be defined as

$\hat{r}$ = $\sum_{f=0}^{n factors}$ $H_{u,f}$$W_{f,i}$

Whereby for any item $i$ given by user $u$ the rating of the item can be expressed as a matrix dot ptoduct of the user latent vector $H$ and the item latent vector $W$.

The ALS algorithm achieves this by factoring the matrix $R$ into two constituent matrices $U$ and $P$. These constituent matrices are initialised with random values, and when multiplied together they should form an approximation of the source truth ratings matrix $R$. Subsequently, the algorithm proceeds to calculate an error term on the product of the latent factor matrices $U$ and $P$ in respect to the original ratings matrix $R$. This error adjustment is achieved by alternating between the factor matrices and adjusting their values until the error measure is minimized. Once this training process is completed, the product of the factor matrices could be used to infer item recommendations to users, and vice versa users to particular items.   


## Downloading Spark dependencies & initializing enviornment


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# Set java sdk and Spark enviornment paths
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

# Use PySpark to intialise spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
sc = SparkSession.builder.master("local[*]").getOrCreate()

## Building and optimizing the ALS model using Cross-fold Validation

In [10]:
import time
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col
from pyspark.sql import SQLContext

# Convert Pandas dataframe to Spark RDD based dataframe
spark_df = sc.createDataFrame(df)
spark_df_movies = sc.createDataFrame(df_movies)

# Split the input data into train, validation and test datasets 
X_train, X_val = spark_df.randomSplit([0.7,0.3])

# Instantiate baseline ALS, configured to drop NaN values in factors and refute negative predictions
# to avoid evaluation metric polution
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, coldStartStrategy='drop')

# Define gridsearch parameters to apply to the ALS model
# Note: As this application of ALS is explicit, the alpha parameter is ignored
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [6,8,10]) \
            .addGrid(als.maxIter, [5,8,10]) \
            .addGrid(als.regParam, [0.01,0.1,0.14,0.18]) \
            .build()

# Define evaluator to measure RMSE loss in cv training
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

# Instantiate cross validation using 3-fold cv
# If time permits, 5-fold validation should be used instead for optimal results
start = time.time()
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# Fit ALS model to train data
model = cv.fit(X_train)

end = time.time()
gridsearch_time_elapsed = (end - start)

# Extract best model from gridsearch 
best_model = model.bestModel

# Generate predictions and evaluate them against test data subset
predictions = best_model.transform(X_val)
rmse = evaluator.evaluate(predictions)

# Expose evaluation metrics and best model parameters
print("Best model achieved an RMSE of " + str(round(rmse,4)) + " with parameters [" 
      + "maxIter: " + str(best_model._java_obj.parent().getMaxIter()) + " ranks: " 
      + str(best_model.rank) + " regParam: " + str(str(best_model._java_obj.parent().getRegParam())) + "]")

print("Gridsearching best model took " + str(round(gridsearch_time_elapsed,2) + "seconds")            

# Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)


Best model achieved an RMSE of 0.8841 with parameters [maxIter: 10 ranks: 10 regParam: 0.18]
Gridsearching best model took 883.6228220462799seconds


# Evaluation

The recommendation model's performance was evaluated by comparing baseline and gridsearch-tuned models across a number of different model configurations. This included testing different train/test splits and different error measures to determine the most optimal model to use.

Train | Test | Model | FeatureSet | Measure | Value | IsBaseline
--- | --- | --- | --- | --- | --- | ---
X_train 70% |X_val 30% |ALS | RDD(user, item, rating), default | RMSE | -- | 1
X_train 70% |X_val 30% |ALS | RDD(user, item, rating), default | MAE | -- | 1 
X_train 80% |X_val 20% |ALS | RDD(user, item, rating), default | RMSE | -- | 1
X_train 80% |X_val 20% |ALS | RDD(user, item, rating), default | MAE | -- | 1
X_train 70% |X_val 30% |ALS | RDD(user, item, rating), 3-fold CV gridsearch | RMSE | 0.8841 | 0
X_train 70% |X_val 30% |ALS | RDD(user, item, rating), 3-fold CV gridsearch | MAE | -- | 0
X_train 80% |X_val 20% |ALS | RDD(user, item, rating), 3-fold CV gridsearch | RMSE | 0.8841 | 0
X_train 80% |X_val 20% |ALS | RDD(user, item, rating), 3-fold CV gridsearch | MAE | -- | 0



## Time and space complexity
* discuss time() measurements of gridsearch and real-time prediction output.


In [0]:
# Function to fetch and display a given user's original movie rating behaviour (input data)
def get_original_user_ratings(user_id):
    ratings_df = spark_df.select("movieId", "rating", "title", "genres")
    ratings_df = spark_df.filter(spark_df.userId == user_id)
    
    df = ratings_df.toPandas()
    df = df.sort_values(['rating'], ascending=[False])
    df = df[0:10]
    print("Original Ratings provided by User " + str(user_id))
    print(df)
    
# Utility function for `format_user_recommendations` function which returns movie recommendations
# as well as movie attributes such as title and genres for eaier interpretation of data
def get_user_recommendations(recs):
    recs = recs.select("recommendations.movieId", "recommendations.rating")
    movies = recs.select("movieId").toPandas().iloc[0,0]
    ratings = recs.select("rating").toPandas().iloc[0,0]
    ratings_matrix = pd.DataFrame(movies, columns=["movieId"])
    ratings_matrix["ratings"] = ratings
    ratings_matrix_ps = sc.createDataFrame(ratings_matrix)
    
    return ratings_matrix_ps

# Function to display recommended movies for a given user, including the predicted rating scores
def format_user_recommendations(user_id):
    # Filter dataframe to only return current user's movie recommendations
    user_df = userRecs.filter(userRecs.userId == user_id)
    df = filteredDf.toPandas()
    final_recs = get_user_recommendations(user_df)
    ratings_df = final_recs.toPandas()
    movies_df = spark_df_movies.toPandas()
    formatted_matrix = pd.merge(ratings_df, movies_df, on='movieId')
    print("Movie recommendations for User " + str(user_id))
    print(recommendations)
    

get_original_user_ratings(14)
print("\n")
format_user_recommendations(14)

# Conclusion

* Discuss lack of computing power
* Discuss lack of data (using 100k dataset instead of 1M, 10M etc due to lack of resources)
* Discuss limitation of explicit colaborative filtering. The collection of data is contingent on user interaction

# References 

http://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf

https://en.wikipedia.org/wiki/Facebook%E2%80%93Cambridge_Analytica_data_scandal#Use_of_the_data

https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm


https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.recommendation

https://core.ac.uk/download/pdf/76649438.pdf

http://www.cs.carleton.edu/cs_comps/0607/recommend/recommender/collaborativefiltering.html
