### Movie Recommendation System using MovieLens Dataset
In this notebook, we will explore two approaches to build a recommendation system using collaborative filtering algorithms: memory-based and model-based. Our analysis is based on a sampled MovieLens dataset with model training and inference implemented on Spark platform.

#### Table of Contents
1. [Data Import](#import)
2. [Sampling Ratings Dataset](#sampling)
3. [ALS Model-Based Collaborative Filtering](#als) <br>
    3.1. [Training ALS Model](#alstrain) <br>
    3.2. [Predicting on Test Dataset](#alspredict) <br>
    3.3. [Top-k Recommendations](#alsrecommend) <br>
    3.4. [Hyperparameter Tuning](#alshyperparameter)

In [7]:
import time
import pandas as pd

from pyspark import SparkContext, SQLContext

from pyspark.sql.functions import *
from pyspark.sql import functions as F

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#### 1. Data Import  <a id = import></a>
Something Something

In [30]:
sc = SparkContext()
sqlContext = SQLContext(sc)

csvf = 'com.databricks.spark.csv'
ratings = sqlContext.read.format(csvf).options(header='true', inferschema='true').load('data/raw/ratings.csv')

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-8-18027a7511b3>:1 

In [9]:
df=pd.read_csv('/Users/mohit/Documents/Columbia Data Science/Personalization Theory/MovieLens/MovieLens/data/raw/ratings.csv')

#### 2. Sampling Ratings Dataset <a id = sampling></a>
Something Something

In [3]:
ratings_count = ratings.groupby(['userId']).count()
quantile = ratings_count.approxQuantile('count', [0.25, 0.75], 0)

print("Ratings Count by User: 25th Percentile = "+str(quantile[0]))
print("Ratings Count by User: 75th Percentile = "+str(quantile[1]))

Ratings Count by User: 25th Percentile = 35.0
Ratings Count by User: 75th Percentile = 155.0


In [4]:
ratings_count = ratings_count.withColumn(
    'user_class', when(col('count') < quantile[0], 1).when(col('count') < quantile[1], 2).otherwise(3))
ratings_count = ratings_count.withColumnRenamed('userId', 'userId2')
ratings = ratings.join(ratings_count, ratings['userId'] == ratings_count['userId2'])
ratings = ratings.select(['userId', 'movieId', 'rating', 'timestamp', 'user_class'])

In [5]:
ratings_sampled = ratings.sampleBy('user_class', fractions = {1: 0, 2: 0.0001, 3: 0.005}, seed = 10)
print("Total Ratings in Sample = "+str(ratings_sampled.count()))
print("Distinct Users = "+str(ratings_sampled.select('userId').distinct().count())+
      " & Distinct Movies = "+str(ratings_sampled.select('movieId').distinct().count()))

Total Ratings in Sample = 69169
Distinct Users = 27074 & Distinct Movies = 8111


In [12]:
def data_sampling (df,item_nos=600,item_split=[0.90,0.10]):
    
    ##############################    Data Preprocessing from User Perspective   #########################
    
    #Frequency of movie rating by each user
    user_rtgs_cnt = (df.groupby(['userId']).count()).iloc[:,0:1].reset_index().rename(columns={"movieId":"rating_cnt"})
    print ("Original number of users in dataset : ",len(user_rtgs_cnt))
    
    quantile_user=user_rtgs_cnt.quantile([0.1,.25,.75,0.9], axis = 0).drop(["userId"],axis=1)
    print("Data distribution of frequency of movies rated by users : \n ", quantile_user)
    
    #Removing the lower 10% of the outliers.
    user_rtgs_cnt=user_rtgs_cnt[user_rtgs_cnt.rating_cnt>=quantile_user.iloc[0,0]]
    print ("Number of users in dataset post removal of bias based on user activity: ",len(user_rtgs_cnt))
    
    #These users are then removed from the dataset
    df=df.merge(user_rtgs_cnt[['userId']],on="userId", how="inner")   
    
    ##############################  Data Preprocessing from Item Perspective   #########################
    
    #Count of Ratings per movie
    item_count = (df[["movieId","rating"]].groupby(['movieId']).count()).reset_index().rename(columns={"rating":"rating_per_item"})
    print("Original number of movies in dataset :\n ",len(item_count))
    
    quantile_item=item_count.quantile([0.1,.25,.75,1], axis = 0).drop(["movieId"],axis=1)
    print("Data distribution of frequency of ratings per movie : \n ", quantile_item)
    
    #Removing all items which have less than 3 user counts i.e Q1 or based on a fixed number 
    #item_count=item_count[item_count.rating_per_item>=quantile_item.iloc[1,0]].reset_index(drop=True)
    item_count=item_count[item_count.rating_per_item>=5].reset_index(drop=True)
    item_count["item_subset"]=np.where(item_count.rating_per_item < quantile_item.iloc[2,0],1,2)
    print("Total number of movies in dataset post removal of low rated movies: ",len(item_count))
    
    
    ######################################################  Data Sampling   #########################
    
    sampled_ratings=pd.DataFrame()
    j=len(item_split)-1
    
    for i in item_count.item_subset.unique():
        sampled_ratings=sampled_ratings.append(item_count[item_count.item_subset==i].sample(n=int(item_split[j]*item_nos), random_state=10))
        j=j-1
        
    sampled_ratings.reset_index(drop=True, inplace=True)   
    print ("Sum of all the ratings for the selected movies : ",sampled_ratings['rating_per_item'].sum())
    
    
    #Select user rows for only those movies which have been sampled
    df=df.merge(sampled_ratings[['movieId']],on="movieId", how="inner")
    
    #Since not all items are selected it may happen that we again get items with only user frequency.
    #Removing single frequency users so as to reduce sparsity and enable item-item comparison between pairs
    
    user_rtgs_cnt_2=(df.groupby(['userId']).count()).iloc[:,0:1].reset_index().rename(columns={"movieId":"user_freq"})
    df=df.merge(user_rtgs_cnt_2,on="userId", how="inner")
    
    #For any personalized recommendation to a user, we are setting a rule that user should have watched 5 movies before. 
    #Before that only popular recommendations to him
    df=df[df.user_freq>7] 
    df.drop(['user_freq'],axis=1, inplace=True)
    df=df.reset_index(drop=True)
    print("Number of rows in total sampled dataset : ", len(df))
    
    #############################################   Train-Test Split   ###################################
    
    df_train=df.groupby(['userId']).apply(lambda x : x.sample(frac=0.8,random_state=10)).reset_index(drop=True)
    z=df.merge(df_train,how='outer',on=['userId','movieId','rating','timestamp'],indicator=True)
    df_test=z.query('_merge != "both"')
    df_test=df_test.drop(['_merge'],axis=1)
    df_test.reset_index(drop=True,inplace=True)
        
    return [df, df_train, df_test]

In [18]:
ratings_sampled, train, test=data_sampling(df)

Original number of users in dataset :  138493
Data distribution of frequency of movies rated by users : 
        rating_cnt
0.10        24.0
0.25        35.0
0.75       155.0
0.90       334.0
Number of users in dataset post removal of bias based on user activity:  125431
Original number of movies in dataset :
  26737
Data distribution of frequency of ratings per movie : 
        rating_per_item
0.10              1.0
0.25              3.0
0.75            204.0
1.00          65080.0
Total number of movies in dataset post removal of low rated movies:  18328
Sum of all the ratings for the selected movies :  250958
Number of rows in total sampled dataset :  90256

Time Elapsed = 8.707414865493774 secs


#### 1. Objective
This proof of concept explores two approaches of building a recommendation system. In this notebook, we implement two collaborative filtering algorithms - memory based and model based - and compare the performance of each in terms of accuracy, coverage, and scalability. However, the objective is not just to evaluate the two CF algorithms, but to show with conclusive evidence the value of having a recommendation system on our platform. 

In contrast with existing (baseline) logic of recommending movies, which is not curated for each user, we strive to learn user preferences from the ratings given to movies and provide recommendation based on that. This would improve the overall user experience by providing more targeted recommendations. And for stakeholders invested in this project, a good recommendation engine translates into customer retention and increase in revenue.

However, at this stage our focus is to build a system that would provide recommendations for users that have rated at least 5 movies. This is required in order for us to understand something about the user's taste and provide relevant recommendations. Though our system does not provide any recommendations for new users, we can include this in our project pipeline and make use of implicit user behavior to provide recommendations.

In [29]:
# Do not delete this code, need to integrate this logic with other sections, functions
(training, test) = ratings_sampled.randomSplit([0.8, 0.2])
ratings_sampled = sqlContext.createDataFrame(ratings_sampled)
# type(ratings_sampled)

AttributeError: 'DataFrame' object has no attribute 'randomSplit'

#### 3. ALS Model-Based Collaborative Filtering <a id = als></a>
Alternating Least Squares (ALS) is a matrix factorization algorithm that works well for large scale collaborative filtering problems. It performs well with sparse ratings dataset, and scales well to very large datasets - two features that make it a good choice for real world recommendation systems.

In the background, ALS is an optimization problem that is trying to minimize the objective function: 

$$Minimize: J = \frac{1}{2}\left \| R - UV^{T} \right \|; Constraints: U\geq 0, V\geq 0$$

where U and V represent the latent factors of our user-movie matrix, R corresponds to the actual rating and hence J is the loss that we are trying to minimize. 

ALS is a two-step iterative optimization process where it first holds the user matrix fixed and runs gradient descent with the movie matrix, and then holds the movie matrix fixed and runs gradient descent with the item matrix. We have implemented this below using the Spark ML library in Python.

##### 3.1 Training ALS Model  <a id = alstrain></a>
In the function defined below, we have fit the ALS model on our training dataset. The various design choices made in training the model are detailed below:
1. The nonnegative hyperparameter to the ALS model is set to *True* which puts the constraints on U, V to be greater than 0 as rating values are non-negative
2. We have chosen to tune two of the most important hyperparameters of ALS: **rank** - the number of latent factors and **regParam** - the regularization factor over a set of values that are built into a parameter grid
3. We are learning the optimal values of hyperparameters using cross validation with 4 folds and our evaluation metric is root-mean squared error (RMSE)

In [28]:
def als_model_train(train):
    # Initializing implicit ALS with user, movie and ratings column
    als = ALS(userCol="userId", 
              itemCol="movieId", 
              ratingCol="rating",
              nonnegative=True,
              coldStartStrategy="drop")
    
    # We use a ParamGridBuilder to construct a grid of parameters to search over
    param_grid = ParamGridBuilder() \
        .addGrid(als.rank, [50, 75, 100]) \
        .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
        .build()
    
    # Defining the evaluation criteria for choosing best set of hyperparameters
    evaluator = RegressionEvaluator(metricName="rmse", 
                                    labelCol="rating", 
                                    predictionCol="prediction")
    
    # To try all combinations of hyperparameters and determine best model using evaluator
    hypertuned = CrossValidator(estimator=als, 
                                estimatorParamMaps=param_grid, 
                                evaluator=evaluator,
                                numFolds=4)
    
    # Choosing the best set of hyperparameters from cross validation
    cvModel = hypertuned.fit(train)
    
    return cvModel

In [27]:
start = time.time()
print('Initiating ALS Model Training...')
model = als_model_train(train)
print('ALS Model Training Complete')
end = time.time()
print('\nTime Elapsed = '+str(end - start)+' secs')

Initiating ALS Model Training...


TypeError: unhashable type: 'Column'

##### 3.2 Prediction on Test Dataset <a id = alspredict></a>
In the function defined below, we use the best trained model from our hyperparameter tuning to predict ratings for user-movie pairs present in the test dataset. This allows us to gauge the performance of our model on unseen data. We are evaluating the performance of our ALS model using RMSE. Prior to calculating RMSE, we are capping all predicted ratings of more than 5 to 5 as the actual ratings data is in the range 0-5.

In [1]:
def als_model_predict(model, test):
    predictions = model.bestModel.transform(test)
    # Capping all predictions that exceed 5 to the max rating 5
    predictions = predictions.withColumn('prediction', 
                                         when(col('prediction') > 5, 5).otherwise(col('prediction')))
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    return rmse

##### 3.3 Top-k Recommendations <a id = alsrecommend></a>
In the function defined below, we are using the inbuilt Spark ALS method *recommendForAllUsers( )* method to generate recommendations for all users. It will only return recommendations for users for whom we have data when training our ALS model. Essentially our recommendation system built on ALS would not be able to provide recommendations for new users, i.e. users who have not provided any ratings.

Before providing the final set of recommendations for each user, we filter out the movies the user has already rated from the set of top-k recommendations we provide for the user. We are performing this filteration as our end objective is to recommend to users, the top-k movies that they have not rated/consumed. 

In [339]:
def als_model_recommend(model, k = 10):
    user_recs = model.bestModel.recommendForAllUsers(ratings_sampled.select('movieId').distinct().count())
    user_recs_pd = userRecs.toPandas()
    user_rated = ratings_sampled.toPandas()
    
    # Populating a dictionary for each user with the list of movies that they have rated
    user_rated_movies = user_rated.groupby('userId')['movieId'].apply(lambda x: x.values.tolist()).to_dict()
    user_movie_recs = pd.DataFrame(columns = ['userId', 'recommendations'])
    
    for i in range(len(user_recs_pd)):
        userID = user_recs_pd['userId'][i]
        user_movie_recs.loc[i, 'userId'] = userID
        rated_movies = user_rated_movies.get(userID)
        
        count = 0
        recommendations = []
        for j in range(len(user_recs_pd.loc[i, 'recommendations'])):
            
            # Only movies not rated by a user makes it into their recommendation 
            if(user_recs_pd.loc[i, 'recommendations'][j][0] not in rated_movies):
                recommendations.append((user_recs_pd.loc[i, 'recommendations'][j][0], 
                                        user_recs_pd.loc[i, 'recommendations'][j][1]))
                count = count + 1

            # Stopping as soon as we have our top-k recommendations ready
            if(count == k):
                user_movie_recs.loc[i, 'recommendations'] = recommendations
                break
    
    return user_movie_recs

##### 3.4 Hyperparameter Tuning  <a id = alsrecommend></a>
In order to learn the hyperparameters of our ALS model, that give us the best results, we essentially trained 9 models with different combinations of *rank* and *regParam* values as specified. Observing RMSE values from the models trained with different hyperparameters can give us an indication if we need to further tune our parameters as there might be scope to extract more performance. Finally, we would use the best fit model for predictions and recommendations.

In [None]:
params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]
tuning_results = pd.DataFrame.from_dict([
    {model.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, model.avgMetrics)
])

tuning_results.head(9)

In [349]:
start = timeit.timeit()
model = als_model_train(training)
# y = als_model_predict(x, test)
end = timeit.timeit()
print(end - start)

Py4JJavaError: An error occurred while calling o11126.fit.
: org.apache.spark.SparkException: Job 379 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:932)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:930)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:930)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2128)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2041)
	at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
	at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1031)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:676)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:658)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:658)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:569)
	at sun.reflect.GeneratedMethodAccessor323.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
