-----------------------------------------------------------------
# INM432 Big Data - Coursework 2
#### Implementing a Recommender System on the MovieLens 20M dataset
#### Adrian Ellis and Toby Staines
-----------------------------------------------------------------

**1) Load the MovieLens dataset**

In [113]:
import pixiedust
import os
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import avg, sqrt
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.getOrCreate()

# load movie ratings data
ratings_data = 'hdfs://saltdean/data/movielens/ml-20m/ratings.csv'

# define schema (note that inferSchema="true" requires spark to read entire dataset)
ratings_fields = [('userID', IntegerType()), ('movieID', IntegerType()), 
                  ('rating', FloatType()), ('timestamp', IntegerType())]
# note nullable set to False. Leave timestamp as integer type as we are not using it
fields = [StructField(field_name, field_type, False) for
          field_name, field_type in ratings_fields]
schema = StructType(fields)

ratings = spark.read.load(ratings_data, format="csv", sep=",", schema=schema, header="true")
#note above function is documented under pyspark.sql.DataFrameReader()
ratings.drop('timestamp') # drop column as not required

# load movie titles data
movies_data = 'hdfs://saltdean/data/movielens/ml-20m/movies.csv'

# define schema
movies_fields = [('movieID', IntegerType()), ('title', StringType()), ('genres', StringType())]
fields = [StructField(field_name, field_type, False) for
          field_name, field_type in movies_fields]
schema = StructType(fields)

movies = spark.read.load(movies_data, format="csv", sep=",", schema=schema, header="true")
# note that delimiters inside the default ["] quote character are ignored which is what we want
movies.drop('timestamp')

DataFrame[movieID: int, title: string, genres: string]

In [114]:
%%time
# check dataframes loaded properly
dfs = [ratings, movies]
for df in dfs:
    print(df.count())
    print(df.dtypes)
    print(df.head(5))

20000263
[('userID', 'int'), ('movieID', 'int'), ('rating', 'float'), ('timestamp', 'int')]
[Row(userID=1, movieID=2, rating=3.5, timestamp=1112486027), Row(userID=1, movieID=29, rating=3.5, timestamp=1112484676), Row(userID=1, movieID=32, rating=3.5, timestamp=1112484819), Row(userID=1, movieID=47, rating=3.5, timestamp=1112484727), Row(userID=1, movieID=50, rating=3.5, timestamp=1112484580)]
27278
[('movieID', 'int'), ('title', 'string'), ('genres', 'string')]
[Row(movieID=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'), Row(movieID=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'), Row(movieID=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'), Row(movieID=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'), Row(movieID=5, title='Father of the Bride Part II (1995)', genres='Comedy')]
CPU times: user 24 ms, sys: 36 ms, total: 60 ms
Wall time: 27.7 s


Other things we could do:
- Use Dataframe SQL statements to print dimensions of data
- Use pixiedust to show histogram of number of ratings by user
- Calculate dataset sparsity

In [115]:
%%time
DEBUGGING = True
# Downsample ratings dataframe for development
if DEBUGGING:
    ratings = ratings.sample(withReplacement=False, fraction=0.005, seed=0) #Set seed for reproducibiity
    print("Ratings downsampled to {} for testing purposes".format(ratings.count()))

# split ratings dataframe into training and test sets
# Note - ideally we would split the dataset according to a proportion of the userIDs
ratings_train, ratings_test = ratings.randomSplit([0.9, 0.1], seed=0)
# cache and save dataframes to reduce execution time
ratings_train.cache()
ratings_test.cache()

Ratings downsampled to 100187 for testing purposes
CPU times: user 24 ms, sys: 8 ms, total: 32 ms
Wall time: 15 s


In [None]:
# DELETE CELL IF NOT REQUIRED / CAN'T GET TO WORK
# Can't get saving to parquet to work atm - try os(path.join()?)
# ! rmdir 'ratings_train.pqt' # delete if they already exist to avoid an error below
# ! rmdir 'ratings_test.pqt'
# ratings_train.write.parquet('ratings_train.pqt')
# ratings_test.write.parquet('ratings_test.pqt')
# # note mode='overwrite' raises error as trying to overwrite same directory as deleting - needs a temporary directory

#### 2) Test ALS model (without pipeline)
We will use Alternative Least Squares (ALS) to create a recommendation model using latent factors

First implement without a pipeline so we explore the results qualitatively by calling methods of the ALS Model (note when using a pipeline we don't seem to have access to these methods as a Pipeline Model object is returned)



In [116]:
als = ALS(userCol="userID", itemCol="movieID", ratingCol="rating",
          rank=10, maxIter=10, regParam=0.1, coldStartStrategy='drop', seed=0)
          # use coldStartStrategy='drop' above to avoid 'nan' values for IDs not seen in training data
    
# fit the model
%time model=als.fit(ratings_train)

CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 4.03 s


#### 2.1) Evaluate the prediction results

In [117]:
%time predictions = model.transform(ratings_test)
# Join ratings with movie title
predictions = predictions.join(movies, predictions.movieID == movies.movieID, 'inner') \
                         .select('userID', 'title', 'rating', 'prediction')
predictions.show(5, truncate = False)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 23.6 ms
+------+---------------------------+------+-----------+
|userID|title                      |rating|prediction |
+------+---------------------------+------+-----------+
|109469|Hudsucker Proxy, The (1994)|4.0   |2.974759   |
|67352 |Hudsucker Proxy, The (1994)|3.5   |2.2586203  |
|106543|High School High (1996)    |3.0   |-0.05509311|
|106572|Dirty Dancing (1987)       |3.0   |-0.9867803 |
|31550 |Dirty Dancing (1987)       |4.5   |-0.06630593|
+------+---------------------------+------+-----------+
only showing top 5 rows



In [118]:
## Qualitative results
# Show predictions vs. ratings for first userID
firstID = predictions.first().userID
firstDF = predictions.filter(predictions.userID == firstID)
firstDF.show(truncate=False)

# Show top 10 recommendations for first userID
firstRecs = model.recommendForUserSubset(firstDF, 10).select('recommendations')
firstRecs.collect()

+------+---------------------------+------+----------+
|userID|title                      |rating|prediction|
+------+---------------------------+------+----------+
|109469|Hudsucker Proxy, The (1994)|4.0   |2.974759  |
|109469|Gilda (1946)               |4.0   |2.6662097 |
+------+---------------------------+------+----------+



[Row(recommendations=[Row(movieID=8183, rating=4.394659996032715), Row(movieID=5135, rating=4.393545150756836), Row(movieID=56949, rating=4.320899963378906), Row(movieID=71108, rating=4.23151969909668), Row(movieID=73881, rating=4.0969929695129395), Row(movieID=5644, rating=4.080875396728516), Row(movieID=57183, rating=4.0242509841918945), Row(movieID=8957, rating=3.9853291511535645), Row(movieID=7215, rating=3.981336832046509), Row(movieID=2365, rating=3.950181722640991)])]

TODO:
- Fix above output so we can display a list of the movie titles and predicted rating

Other things we could do:
- Compare predictions vs. ratings for the userID with the highest number of ratings
- Calculate the MSRE by user and:
- Show predictions vs. ratings for the best / worst userID error
- Use pixiedust to plot a histogram of MSRE by userID

In [119]:
# Quantitative results
# Calculate model accuracy
predictions = predictions.withColumn('SquaredError', (predictions.rating - predictions.prediction)**2)
predictions.select(sqrt(avg(predictions["SquaredError"]))).show()


+-----------------------+
|SQRT(avg(SquaredError))|
+-----------------------+
|      2.565372059862994|
+-----------------------+



#### 3) Find best model using ALS and ML Pipeline

In [120]:
# Create pipeline
pipeline = Pipeline(stages=[als])
#print ("Pipeline:",pipeline.explainParams())
#print ("ALS:",als.explainParams())

TODO: test with different training set sizes either by:
- running code below multiple times for each training set
- if it works, adding a pipeline set for sampling the training set and addding this as a param to the paramgrid

In [121]:
# Set performance evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
#rmse = evaluator.evaluate(predictions)
#print("Root-mean-square error = " + str(rmse))

In [122]:
# Create parameter grid over ALS rank and regularisation parameter
# TODO: add sample size if we are doing it this way
# regParam must be >=0 (default=0.1)
paramGrid = ParamGridBuilder()\
    .addGrid(als.rank,[3, 5])\
    .addGrid(als.regParam,[0.2, 0.5])\
    .build()

TODO: Enter more values for param grid e.g.
als.rank,[1, 2, 3, 5, 10, 20]
als.regParam,[0, 0.1, 0.3, 0.5, 1, 5, 10]

In [123]:
# Set train/validation split
validator = TrainValidationSplit(trainRatio=0.9, seed=0)\
            .setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid)

In [124]:
# Execute pipeline
%time tunedModel = validator.fit(ratings_train)

CPU times: user 376 ms, sys: 124 ms, total: 500 ms
Wall time: 32.2 s


In [125]:
# function provided by Tillman
def bestValidationParamters(validatedModel,parameterGrid):
    """ Find the paramter map that produced the highest result in a validation (TrainValidationSplit or CrossValidation) 
        
        Positional arguments:
        validatedModel: the model returned by cv.fit() or tvs.fit()
        parameterGrid: the paramterGrid used in the fitting
    """
    # link the measured metric results to the paramter maps in the grid
    metricParamPairs = zip(validatedModel.validationMetrics,parameterGrid)
    # for our metrics, higher is better and 0 is the minimum
    bestMetric = 0 # initialize with the minimal value
    # now iterate through all tested parameter maps
    for metric,params in metricParamPairs:
        if metric > bestMetric: # if metric is better than current best
            bestParams = params # then keept the corresponding parameter map 
    return bestParams # and return the final best paramters

In [126]:
# Find parameters for best model
bestValidationParamters(tunedModel,paramGrid)

{Param(parent='ALS_4ce88fef3c32d548d763', name='rank', doc='rank of the factorization'): 5,
 Param(parent='ALS_4ce88fef3c32d548d763', name='regParam', doc='regularization parameter (>= 0).'): 0.5}

In [127]:
# Calculate peformance for best model
print("Training accuracy for tuned model =",evaluator.evaluate(tunedModel.transform(ratings_train)))
print("Test accuracy for tuned model =",evaluator.evaluate(tunedModel.transform(ratings_test)))

Training accuracy for tuned model = 0.6410849858403498
Test accuracy for tuned model = 1.6411222575909927


Other things we can do:
- Tabulate / plot performance by sample size and hyperparameter settings