In [1]:
import os

In [2]:
os.environ["JAVA_HOME"]

'/usr/lib/jvm/java-8-openjdk-amd64'

In [3]:
os.environ["SPARK_HOME"]

'/home/vishnu/hadoop/spark'

In [4]:
#Now launching a spark session on colab
import findspark
findspark.init()

#Start Apache Spark session and context
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext(appName='als_recommender')
sqlContext = SQLContext(sc)

print('Master : ', sc.master)
print('Cores  : ', sc.defaultParallelism)

Master :  local[*]
Cores  :  12


In [5]:
# Required Later
sqlContext.sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [6]:
# Default Packages (available by Default in Google Colab)
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns
import random
from pprint import pprint
from matplotlib.lines import Line2D

# PySpark Utilities
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics

# Random Seed
SEED = 1492



In [7]:
#Path to Data Folder
DATA_PATH = "ml-25m/"

#Path to Results Folder
RESULTS_PATH = "results/"

# Preprocessing

In [8]:
class MovieLensDatasets(object):
    """
    Class for loading and preprocesing MovieLens Dataset
    """
    def __init__(self, ratings, movies, links, personalRatings, debug=True, debugLimit=10000):
        # Load Existing Data
        if debug:
            debugLimit = debugLimit
            ratings = ratings.limit(debugLimit)
        else:
            ratings = ratings      

        self.ratings = ratings
        self.movies = movies
        self.links = links
        self.personalRatings = personalRatings

        # Create New DataFrame
        users = ratings.select('userId').distinct()
        self.users = users

    def preprocessing(self):
        # Preprocess MovieLens Ratings
        self.ratings = self.ratings.withColumn('rating',
        F.col('rating').cast('double')).drop('timestamp') \
        .withColumn('userId', F.col('userId').cast('int')) \
        .withColumn('movieId', F.col('movieId').cast('int'))

        # Preprocess Personal IMDb Ratings To MovieLens Ratings
        self.personalRatings = self.personalRatings.select(['Const',
                                                'Your Rating']) \
        .withColumnRenamed('Const', 'imdbId') \
        .withColumnRenamed('Your Rating', 'personalRating')

        self.personalRatings = self.personalRatings \
        .withColumn('personalRating', F.col('personalRating').cast('double')*0.5) \
        .withColumn('imdbId', F.expr("substr(imdbId, 3)"))

        self.personalRatings = self.personalRatings.join(
        self.links.select('movieId', 'imdbId'), ['imdbId'], how='inner')

        # Insert IMDb Ratings into MovieLens Ratings Dataset
        self.personalRatings = self.personalRatings \
                    .withColumn('userId', F.lit('0'))
        self.personalRatings = self.personalRatings \
                    .select(['userId', 'movieId', 'personalRating']) \
                    .toDF('userId', 'movieId', 'rating')
        self.ratings = self.ratings.union(self.personalRatings)

        # Binarize MovieLens Ratings (if rating >= 3.0, then 1.0, else 0.0)
        udf_scale_ratings = F.udf(lambda x: x - 2.5, DoubleType())
        udf_binary_ratings = F.udf(lambda x: 1.0 if x > 0.0 else 0.0, DoubleType())

        self.ratings = self.ratings \
        .withColumn('ratingsScaled', udf_scale_ratings(F.col('rating'))) \
        .withColumn('ratingsBinary', udf_binary_ratings(F.col('ratingsScaled')))

    def get_ratings(self):
        return self.ratings

    def get_movies(self):
        return self.movies

    # Displaying Null Values
    def spark_df_display_null_values(sparkDf):
        print('NaN values ?')
        sparkDf.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in sparkDf.columns]).show()

        print('Null values ?')
        sparkDf.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in sparkDf.columns]).show()

In [11]:
%%time
# Set to True if only testing
# For testing: use debug=True option (subset of all ratings)
debug = False

# Load Datasets
movies = sqlContext \
            .read.format('csv').option("header", "true") \
            .load(DATA_PATH + "/movies.csv")

links = sqlContext \
            .read.format('csv').option("header", "true") \
            .load(DATA_PATH + "/links.csv")

ratings = sqlContext.read \
            .format('csv').option("header", "true") \
            .load(DATA_PATH + "/ratings.csv")

personalRatings = sqlContext \
            .read.format('csv').option("header", "true") \
            .load(DATA_PATH + "/personal_IMDB_ratings.csv")


# Data Preprocessing
movieLensDatasets = MovieLensDatasets(ratings=ratings, movies=movies,
                                      links=links,
                                      personalRatings=personalRatings,
                                      debug=debug)

movieLensDatasets.preprocessing()
dfRatings = movieLensDatasets.get_ratings() 
dfMovies = movieLensDatasets.get_movies()

CPU times: user 17 ms, sys: 6.39 ms, total: 23.3 ms
Wall time: 3.54 s


In [12]:
ratingsPrepare = dfRatings.withColumn("userId", F.col("userId").cast('int')) \
                          .withColumn("movieId", F.col("movieId").cast('int'))

dfRatingsTrain, dfRatingsTest = ratingsPrepare.randomSplit([0.7, 0.3],
                                                           seed=SEED)

In [13]:
def catalog_coverage(predicted, catalog, k):
    """
    Computes the catalog coverage for k lists of recommendations
    Parameters
    ----------
    predicted : a list of lists
        Ordered predictions
        example: [['X', 'Y', 'Z'], ['X', 'Y', 'Z']]
    catalog: list
        A list of all unique items in the training data
        example: ['A', 'B', 'C', 'X', 'Y', Z]
    k: integer
        The number of observed recommendation lists
        which randomly choosed in our offline setup
    Returns
    ----------
    catalog_coverage:
        The catalog coverage of the recommendations as a percent
        rounded to 2 decimal places
    ----------    
    Metric Defintion:
    Ge, M., Delgado-Battenfeld, C., & Jannach, D. (2010, September).
    Beyond accuracy: evaluating recommender systems by coverage and
    serendipity.
    In Proceedings of the fourth ACM conference on Recommender systems
    (pp. 257-260). ACM.
    """
    sampling = random.choices(predicted, k=k)
    predicted_flattened = [p for sublist in sampling for p in sublist]
    L_predictions = len(set(predicted_flattened))
    catalog_coverage = L_predictions/(len(catalog)*1.0)
    return catalog_coverage


def get_rec_sys_results(relevantDocumentsDf, dfMovies):
    """
    Returns dictionnary of recommender system metrics
    Since Spark ML DataFrame-based doesn't have a
    specific function for evaluating the soundness
    of item recommendations, we will have to use
    Spark MLlib RDD-based API
    """
    # Transform Spark Dataframe into Spark RDD
    relevantDocuments = relevantDocumentsDf.rdd \
        .map(lambda row: (row.predictions, row.groundTruth))
    
    # Get Catalog Coverage
    moviesCatalog = dfMovies.select("movieId") \
                            .orderBy("movieId") \
                            .distinct() \
                            .toPandas()
    predictions = relevantDocumentsDf.select("predictions").toPandas()
    predictionsList = predictions.values.tolist()
    predictionsList = [sublist[0] for sublist in predictionsList]
    cc = catalog_coverage(predicted=predictionsList,
                          catalog=moviesCatalog, k=100)

    # Get Results
    metrics = RankingMetrics(relevantDocuments)
    pk20 = metrics.precisionAt(20)
    ndcg20 = metrics.ndcgAt(20)
    
    results = dict(Pk20=pk20, NDCGk20=ndcg20, CC=cc)
    
    return results


def format_recommendations(rowPreds):
    # From recommendations column extract only items
    # recommended and ignore ratings
    rowPredsList = [row.movieId for row in rowPreds]
    return rowPredsList

udf_format_recommendations = F.udf(lambda x: format_recommendations(x),
                                   ArrayType(IntegerType()))

In [14]:
class PopularityRecommenderModel(object):
    """
    Class for generating movie item recommendations based on movie popularity
    (from average ratings with logarithmic scaling factor that penalizes movies
    with few ratings). By default, only the top 10 rated movies are selected.

    Approach studied during MS Big Data's SD 701 Big Data Mining Course.
    """
    def __init__(self, ratingsTrain, ratingsTest):
        self.ratingsTrain = ratingsTrain
        self.ratingsTest = ratingsTest

    def generate_recommendations_for_all_users(self, topk=10):
        # Get list of top k rated movies
        topRated = self._get_top_k_best_movies(topk=topk)

        # Compare most popular movies with user's personal preferences
        # Important: only keep recommendations with rating over 3/5
        results = self.ratingsTest \
                 .filter(F.col("ratingsBinary")==1.0) \
                 .withColumn('movieId', F.col('movieId').cast('int')) \
                 .orderBy('rating', ascending=False) \
                 .groupby("userId") \
                 .agg(F.collect_list("movieId").alias('groundTruth')) \
                 .withColumn('predictions', F.array([F.lit(el)
                        for el in topRated])) \
                 .select(['userId', 'predictions', 'groundTruth'])

        return results

    def _get_top_k_best_movies(self, topk):

        # Function
        def _get_mean_rating_w_log_penalty(arr):
            sumRatings, nbRatings = arr[0], arr[1]
            grade = (sumRatings / nbRatings) * math.log(nbRatings)
            return grade

        # UDF
        udf_mean_rating_w_log_penalty = F.udf(
            lambda arr: _get_mean_rating_w_log_penalty(arr),
            DoubleType()
        )

        # Get top k Best Rated Movies
        ranking = self.ratingsTrain.groupBy('movieId') \
              .agg({'rating': 'sum', 'userId': 'count'}) \
              .toDF('movieId', 'sumRating', 'nbRatings')

        ranking = ranking \
        .withColumn("meanLogUserRating",
         udf_mean_rating_w_log_penalty(F.array("sumRating", "nbRatings"))
         ) \
        .sort(['meanLogUserRating', 'nbRatings'], ascending=[False, True]) \
        .limit(topk)

        self.topRatedMovies = ranking
        ratingsTop = [int(row.movieId) for row in ranking.collect()]

        return ratingsTop

In [15]:
%%time
pbrModel = PopularityRecommenderModel(dfRatingsTrain, dfRatingsTest)
resultsPop = pbrModel.generate_recommendations_for_all_users(topk=20)

CPU times: user 69.4 ms, sys: 12.4 ms, total: 81.8 ms
Wall time: 32.1 s


In [16]:
%%time
resultsPop.limit(5).orderBy('userId').show(5)

+------+--------------------+--------------------+
|userId|         predictions|         groundTruth|
+------+--------------------+--------------------+
|   148|[318, 296, 858, 5...|[858, 1089, 1136,...|
|   463|[318, 296, 858, 5...|[32, 648, 780, 78...|
|   471|[318, 296, 858, 5...|[95167, 2571, 521...|
|   496|[318, 296, 858, 5...|[3910, 44555, 122...|
|   833|[318, 296, 858, 5...|[45431, 46578, 98...|
+------+--------------------+--------------------+

CPU times: user 55.8 ms, sys: 30 ms, total: 85.7 ms
Wall time: 1min 33s


In [17]:
%%time
results = get_rec_sys_results(resultsPop, dfMovies)

CPU times: user 562 ms, sys: 58.1 ms, total: 620 ms
Wall time: 1min 58s


In [18]:
pprint(results)

{'CC': 0.00032039472630280506,
 'NDCGk20': 0.12632832305435426,
 'Pk20': 0.10125806312494992}


In [19]:
%%time
_ = pbrModel.generate_recommendations_for_all_users(topk=20)
pbrModel.topRatedMovies \
    .join(dfMovies.select(['movieId', 'title']), 'movieId') \
    .orderBy('meanLogUserRating', ascending=False) \
    .show(20, truncate=False)

+-------+---------+---------+------------------+------------------------------------------------------------------------------+
|movieId|sumRating|nbRatings|meanLogUserRating |title                                                                         |
+-------+---------+---------+------------------+------------------------------------------------------------------------------+
|318    |251444.5 |56981    |48.32200642241223 |Shawshank Redemption, The (1994)                                              |
|296    |233476.0 |55743    |45.77335501425202 |Pulp Fiction (1994)                                                           |
|858    |159121.0 |36803    |45.45532488277934 |Godfather, The (1972)                                                         |
|50     |165948.5 |38720    |45.27630330905071 |Usual Suspects, The (1995)                                                    |
|527    |179594.0 |42294    |45.23353708263301 |Schindler's List (1993)                                 

# ALS

In [20]:
%%time
tempALS = ALS(maxIter=10, rank=10, regParam=0.1, nonnegative=True,
              userCol='userId', itemCol='movieId', ratingCol='rating',
              coldStartStrategy='drop', implicitPrefs=False, seed=SEED)

mlALSFitted = tempALS.fit(dfRatingsTrain)

CPU times: user 63.7 ms, sys: 44.8 ms, total: 108 ms
Wall time: 59.6 s


In [21]:
mlALSFitted.save(RESULTS_PATH+"/ALS_MovieLens_25M")

In [23]:
mlALSFitted = ALSModel.load(RESULTS_PATH+"/ALS_MovieLens_25M")

### RMSE

In [24]:
%%time
predictions = mlALSFitted.transform(dfRatingsTest)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('RMSE (Test Set):', rmse)

RMSE (Test Set): 0.8147259683304076
CPU times: user 66.4 ms, sys: 25.4 ms, total: 91.8 ms
Wall time: 36.1 s


In [25]:
resultsALS = mlALSFitted.recommendForAllUsers(20)

resultsALS = resultsALS.withColumn('recommendations',
              udf_format_recommendations(F.col("recommendations"))) \
              .toDF('userId', 'predictions')

In [27]:
# Downloaded Packages (not available by Default)
import databricks.koalas


In [28]:
resultsALSExpanded = resultsALS \
                        .withColumn("movieId", F.explode("predictions")) \
                        .drop('predictions') \
                        .join(dfMovies, "movieId")

resultsALSKdf = resultsALSExpanded.to_koalas()

MostRecommendedMoviesForAllUsers = resultsALSKdf.groupby(["movieId", "title"])['userId'].count()
MostRecommendedMoviesForAllUsers = MostRecommendedMoviesForAllUsers.sort_values(ascending=False)

In [29]:
%%time
MostRecommendedMoviesForAllUsers.head(20)

CPU times: user 5.85 ms, sys: 744 µs, total: 6.59 ms
Wall time: 20.6 ms


movieId  title                                             
203882   Dead in the Water (2006)                              144255
183947   NOFX Backstage Passport 2                             142233
194434   Adrenaline (1990)                                     136780
196787   The Law and the Fist (1964)                           119244
165689   Head Trauma (2006)                                     98235
192089   National Theatre Live: One Man, Two Guvnors (2011)     96258
143422   2 (2007)                                               90441
166812   Seeing Red: Stories of American Communists (1983)      86752
117352   A Kind of America 2 (2008)                             84742
194334   Les Luthiers: El Grosso Concerto (2001)                82590
121919   The Good Mother (2013)                                 74576
128667   Wiseguy (1996)                                         74542
197355   Once Upon a Ladder (2016)                              72643
165559   Ο Θανάσης στη χώρα τη

In [30]:
resultsALS = resultsALS \
                 .join(
                 dfRatingsTest \
                 .filter(F.col("ratingsBinary")==1.0) \
                 .withColumn('movieId', F.col('movieId').cast('int')) \
                 .groupby("userId") \
                 .agg(F.collect_list("movieId").alias("groundTruth")),
                     'userId'
                 )

# resultsALS.orderBy('userId').limit(10).show(10)

In [31]:
%%time
resultsALSMetrics = get_rec_sys_results(resultsALS, dfMovies)

CPU times: user 1.29 s, sys: 117 ms, total: 1.4 s
Wall time: 5min 23s


In [32]:
pprint(resultsALSMetrics)

{'CC': 0.004293289332457588,
 'NDCGk20': 4.063936848185454e-06,
 'Pk20': 3.080467984696237e-06}


In [33]:
results = {'CC': 0.00032039472630280506,
 'NDCGk20': 0.12632832305435426,
 'Pk20': 0.10125806312494992}

In [34]:
results['RMSE'] = 'N/A'
resultsALSMetrics['RMSE'] = rmse

In [35]:
modelResults = pd.DataFrame([results, resultsALSMetrics],
                             index=['PopRec', 'CF-ALS'])

modelResults.transpose()

Unnamed: 0,PopRec,CF-ALS
CC,0.000320395,0.00429329
NDCGk20,0.126328,4.06394e-06
Pk20,0.101258,3.08047e-06
RMSE,,0.814726


In [36]:
%%time
dfRatingsTrain.filter(F.col("userId")==0).count()

CPU times: user 17.5 ms, sys: 8.32 ms, total: 25.8 ms
Wall time: 27.2 s


48

In [37]:
%%time
predictionsPerso = resultsALS.filter(F.col("userId")==0) \
                             .select(F.explode("predictions") \
                             .alias("movieId")) \
                             .join(dfMovies.select(["movieId", "title"]),
                                   "movieId") \
                             .join(dfRatings.filter(F.col("userId")==0),
                                   ['movieId'], how='left')

predictionsPerso.select(["title", "rating"]).show(10, truncate=False)

+-------------------------------------------+------+
|title                                      |rating|
+-------------------------------------------+------+
|The Country Cousin (1936)                  |null  |
|Foster (2018)                              |null  |
|Cássia (2015)                              |null  |
|Insane (2016)                              |null  |
|Olga (2004)                                |null  |
|Argo (2004)                                |null  |
|.hack Liminality In the Case of Yuki Aihara|null  |
|NOFX Backstage Passport 2                  |null  |
|.hack Liminality In the Case of Kyoko Tohno|null  |
|Red, Honest, in Love (1984)                |null  |
+-------------------------------------------+------+
only showing top 10 rows

CPU times: user 75.4 ms, sys: 36.3 ms, total: 112 ms
Wall time: 2min 43s
