# Movie Recommender Project Report 
<br>
<font size=4> Team members: </font> <br><br>
<font size=3> James Jungsuk Lee [UNI] </font><br>
<font size=3> Ujjwal Peshin [UNI]</font><br>
<font size=3> Bowen Zhou [UNI]</font><br>
<font size=3> Zhongling Jiang [UNI]</font>

In [3]:
import numpy as np
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import random
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

'''
from surprise import AlgoBase
from surprise import Dataset
from surprise import accuracy
from surprise import Reader
from surprise import KNNBasic
from surprise.model_selection import KFold
from surprise.model_selection import train_test_split
from surprise.model_selection import cross_validate
import pandas as pd
import random
'''

import itertools
RANDOM_SPLIT_SEED = 24
TRAIN_PROPORTION = 0.9
TEST_PROPORTION = 1 - TRAIN_PROPORTION

### Answer to 
As data scientists of a digital media company, state your objectives in building a
recommendation system. For example, what metrics do you care about, who is this system
built to serve (users or your boss?), and what business rules may you care to introduce?

As a data scientist, you want to create a solution that serves the users but as well as can satisify your coworkers such as your boss. Having key stakeholders bought into your idea is integral in adding value to the company. So here we list out a few metrics and ideas that we can focus on and also point out who they serve.

Users: <br/>
We want our users to feel engaged with our content. That means we want to push out recommendations that they can relate to and enjoy. Within the context of our project, we can measure accuracy of our model such as RMSE. However, there can be some other things we can measure as well that's a bit out of scope of this project such as serendipity of our recommendations and implicit feedbacks such as how long they listen or watch our recommendations even if they don't explicitly rate them.
    
Stakeholders: <br/>
Stakeholders want to make sure that the solutions that we recommend are indeed better than what they can do or have done in the past. They also want to understand how this system is affecting the product that they own. This means that measurements such as accuracy is also useful for stakeholders, but we can additionally focus on model methods that help them interpret the model's output. This means using simpler methods such as KNN or exploring Matrix Factorization methods to see if there can be any patterns in a reduced dimension space that can make sense to human rationality.

## Part I: Data Preparation

**Description: ** <br><br>

### Create a smaller development set

In [4]:
spark = SparkSession.builder.appName('proj_1').getOrCreate()
ratings = spark.read.csv('ml-20m/ratings.csv', header = True, 
                         inferSchema=True).cache()
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [5]:
pd.DataFrame(ratings.take(5), columns=ratings.columns)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,2,3.5,1112486027
1,1,29,3.5,1112484676
2,1,32,3.5,1112484819
3,1,47,3.5,1112484727
4,1,50,3.5,1112484580


In [6]:
random.seed(100)

In [7]:
def subsample(movies, n, p):
    """
    The function subsample the list of movies that each user rates, based on following rule:
    (i) If user has rated fewer than n movies, we keep all ratings
    (ii) If user has rated a lot of movies, we keep only p percent of them via random selection
    (iii) If number of movies * p percent is less than n, we randomly sample n movies and 
         keep these ratings
    """
    if len(movies) <= n:
        return movies
    elif int(p * len(movies)) <= n:
        return random.sample(movies, n)
    else:
        return random.sample(movies, int(p * len(movies)))
N = 5
P = 0.2    
# collect all movies each user has rated
ratings_rdd = ratings.select(['userId', 'movieId']).rdd.map(list)
users_rated_movies = ratings_rdd.groupByKey().mapValues(list)

# call subsample on rated movies
subsampled_users_rated_movies= users_rated_movies.mapValues(lambda m: subsample(m, N, P))

# Now convert these ratings back to dataframe
subsampled_ratings = subsampled_users_rated_movies.flatMapValues(lambda x: x)
subsampled_ratings = spark.createDataFrame(subsampled_ratings, ['userId', 'movieId'])

# check size of the subsampled dataset
subsample_rowcounts = subsampled_ratings.count()
subsample_percentage = subsample_rowcounts * 1.0 / ratings.count() * 100
print ('The subsampled dataset is {0}% size of the original dataset, has {1} rows'.\
       format(subsample_percentage, subsample_rowcounts))

The subsampled dataset is 19.81355445175896% size of the original dataset, has 3962763 rows


In [None]:
# Join them back to original dataset to get ratings and time stamps, using userId and movieId as key
subsampled_ratings = subsampled_ratings.join(ratings, ['userId','movieid'], 'inner')
subsampled_ratings.persist()
pd.DataFrame(subsampled_ratings.take(5), columns=subsampled_ratings.columns)

Unnamed: 0,userId,movieId,rating,timestamp
0,11,64034,4.5,1251144218
1,14,1028,3.0,1225308749
2,17,4639,4.0,998691698
3,22,3100,2.0,994638190
4,24,2278,2.0,994232903


### Divide into training, validation and test set <br>

**Description**: <br><br>


**Output**: 'train' - training set,  'val' - validation set, 'test' - test set

In [None]:
subsampled_ratings = subsampled_ratings.withColumn("timestamp", ratings["timestamp"].cast(T.TimestampType()))
subsampled_ratings.printSchema()

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [None]:
limit = 10
# test size
test_size = 0.05
def udf_user_limit(user_counts):
    if user_counts < limit:
        return -1
    else:
        if (user_counts * (1-test_size)) <= limit:
            return limit
        else:
            return int(np.around(user_counts * (1-test_size)))
user_limit = F.udf(udf_user_limit, T.IntegerType())

In [None]:
# train-test function
def train_test_split(data, col_to_split_on= 'userId', timestamp_col = 'timestamp'):
    # original columns
    orig_cols = data.columns
    # define count col
    count_col = 'count(' + col_to_split_on + ')'
    # do user ratings count
    counts_for_col = data.groupby(col_to_split_on).agg(F.count(col_to_split_on))
    # generate limits on each user based on rules
    limits = counts_for_col.withColumn('train_limit', user_limit(F.col(count_col)))
    # remove users having less than limit no of ratings
    limits_filtered = limits.filter(limits.train_limit > 0)
    # generate row numbers based on temporality
    data_row_num = data.withColumn("row_num", F.row_number().over(Window.partitionBy(col_to_split_on).orderBy(timestamp_col)))
    # join ratings and user counts dfs together
    data_row_num = data_row_num.alias('a')
    limits_filtered = limits_filtered.alias('b')
    merged_data = data_row_num.join(limits_filtered,F.col('b.' + col_to_split_on) == F.col('a.' + col_to_split_on)).select([F.col('a.'+xx) for xx in data_row_num.columns] + [F.col('b.' + count_col),F.col('b.train_limit')])
    # generate selection column based on number limit
    final_train_test = merged_data.withColumn('selection', F.col('row_num') <= F.col('train_limit'))
    # find train and test 
    train = final_train_test.filter(final_train_test.selection == True).select(orig_cols)
    test = final_train_test.filter(final_train_test.selection == False).select(orig_cols)
    return train, test

In [None]:
# to check if test has more entities in any column
def compatibility_test(train, test, cols_to_test = ['userId', 'movieId']):
    cols_greater = []
    for i in cols_to_test:
        train_unique = train.select([i]).distinct().rdd.map(lambda x: x[0]).collect()
        test_unique = test.select([i]).distinct().rdd.map(lambda x: x[0]).collect()
        size = len(list(set(test_unique) - set(train_unique)))
        print("Test has %d more %s" %(size, i))
        cols_greater.append(i)
    return cols_greater

In [None]:
traincv, test = train_test_split(ratings, col_to_split_on= 'movieId', timestamp_col='timestamp')
print((traincv.count(), len(traincv.columns)))
print((test.count(), len(test.columns)))
cols_greater = compatibility_test(traincv, test, cols_to_test=['userId', 'movieId'])

(18966365, 4)
(998468, 4)
Test has 391 more userId
Test has 0 more movieId


In [None]:
for i in cols_greater:
    traincv_unique = traincv.select([i]).distinct().rdd.map(lambda x: x[0]).collect()
    test_unique = test.select([i]).distinct().rdd.map(lambda x: x[0]).collect()
    unique_to_test = list(set(test_unique) - set(traincv_unique))
    test = test[~test[i].isin(unique_to_test)]
# After removing users/ movies not included in training set
print((traincv.count(), len(traincv.columns)))
print((test.count(), len(test.columns)))
cols_greater = compatibility_test(traincv, test, cols_to_test=['userId', 'movieId'])
test.persist()

(18966365, 4)
(980868, 4)
Test has 0 more userId
Test has 0 more movieId


DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

Repeat the same logic on train / validation set split

In [None]:
train, val = train_test_split(traincv, col_to_split_on= 'movieId', timestamp_col='timestamp')
print((train.count(), len(train.columns)))
print((val.count(), len(val.columns)))
cols_greater = compatibility_test(train, val, cols_to_test=['userId', 'movieId'])

(18018048, 4)


In [None]:
for i in cols_greater:
    train_unique = train.select([i]).distinct().rdd.map(lambda x: x[0]).collect()
    val_unique = val.select([i]).distinct().rdd.map(lambda x: x[0]).collect()
    unique_to_val = list(set(val_unique) - set(train_unique))
    val = val[~val[i].isin(unique_to_val)]
print((train.count(), len(train.columns)))
print((val.count(), len(val.columns)))
cols_greater = compatibility_test(train, val, cols_to_test=['userId', 'movieId'])
train.persist()
val.persist()

### Evaluation Metric

#### I. Regression Metric

In [None]:
def regression_metric(ratings, predicted_ratings):
    """
        Calculate regression metrics
        - root mean square error
        - mean absoluate error
        - r squared
        - explained variance
        by joining original and predicted rating dataset into 
        ((userId, movieId), (original_rating, predicted_rating)).
        :ratings: original rating dataset. Format:   userId | movieId | rating
        :predicted_ratings: predicted rating dataset
    """
    ratings_tuple = ratings.rdd.map(lambda r: ((r.userId, r.movieId), r.rating))
    predicted_ratings_tuple = predicted_ratings.rdd.map(lambda r: ((r.userId, r.movieId), r.rating))
    score_and_labels = predicted_ratings_tuple.join(ratings_tuple).map(lambda tup: tup[1])
    metrics = RegressionMetrics(score_and_labels)
    print("RMSE = %s" % metrics.rootMeanSquaredError)
    print("MAE = %s" % metrics.meanAbsoluteError)
    print("R-squared = %s" % metrics.r2)
    print("Explained Variance = %s" % metrics.explainedVariance)
    return metrics.rootMeanSquaredError, metrics.meanAbsoluteError, metrics.r2, metrics.explainedVariance

#### II. Ranking Metric

In [None]:
# Ranking metrics
# https://vinta.ws/code/spark-ml-cookbook-pyspark.html
def ranking_metric(ratings, predicted_ratings, k=5):
    """
        Calculate ranking metrics
        - mean average precision
        - precision at k
        - normalized discounted continuous gain at k
        by collecting recommended items for both datasets, which takes form of
        ((userId, movieId), ([[predicted_item1, predicted_item2, ...], [item1, item2, ...]))
        :ratings: original rating dataset. Format:   userId | movieId | rating
        :predicted ratings: predicted rating dataset, same format
        :k: top k positions used for comparison
    """
    windowSpec = Window.partitionBy('userId').orderBy(col('rating').desc())
    perUserPredictedItemsDF = predicted_ratings \
        .select('userId', 'movieId', 'rating', F.rank().over(windowSpec).alias('rank')) \
        .where('rank <= {0}'.format(k)) \
        .groupBy('userId') \
        .agg(expr('collect_list(movieId) as itemsId'))
    
    windowSpec = Window.partitionBy('userId').orderBy(col('rating').desc())
    perUserActualItemsDF = ratings \
        .select('userId', 'movieId', 'rating', F.rank().over(windowSpec).alias('rank')) \
        .where('rank <= {0}'.format(k)) \
        .groupBy('userId') \
        .agg(expr('collect_list(movieId) as itemsId')) 
    
    perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'userId') \
        .rdd \
        .map(lambda row: (row[1], row[2]))
    rankingMetrics = RankingMetrics(perUserItemsRDD)

    print("mean Average Precision = %s" %rankingMetrics.meanAveragePrecision)
    print("Precision at k = %s" %rankingMetrics.precisionAt(k))
    print("NDCG at k = %s" %rankingMetrics.ndcgAt(k))  
    
    return rankingMetrics.meanAveragePrecision, rankingMetrics.precisionAt(k), rankingMetrics.ndcgAt(k)

#### III. Area Under Curve

### Baseline Model <br>

**Description:**

In [24]:
class BaselineModel:

    def __init__(self, user_column, item_column,ratings_column):
        self.user_col = user_column
        self.item_col = item_column
        self.ratings_col = ratings_column

    def __find_avg_of_col(self, data, column):
        return data.select(F.mean(F.col(column))).collect()[0][0]

    def __subtract_from_col(self, data, column, value):
        return data.withColumn('normalized_' + column, F.col(column)-value)

    def train(self, training_data):
        user_col = self.user_col
        item_col = self.item_col
        ratings_col = self.ratings_col
        user_bias = {}
        item_bias = {}

        # find average and calculate bias for user and item
        avg_rating = self.__find_avg_of_col(training_data, ratings_col)
        norm_training_data = self.__subtract_from_col(training_data, ratings_col, avg_rating)
        user_bias = norm_training_data.groupby(user_col).agg(F.avg('normalized_' + ratings_col)).\
                                                    rdd.map(lambda x : (x[0],x[1])).collectAsMap()
        item_bias = norm_training_data.groupby(item_col).agg(F.avg('normalized_' + ratings_col)).\
                                                    rdd.map(lambda x : (x[0],x[1])).collectAsMap()

        self.training_data = norm_training_data
        self.avg_rating = avg_rating
        self.user_bias = user_bias
        self.item_bias = item_bias
    
        return avg_rating, user_bias, item_bias

In [25]:
baselineModel = BaselineModel(user_column='userId', item_column='movieId', ratings_column='rating')
avg_rating, user_bias, item_bias = baselineModel.train(train)

In [27]:
def udf_predict(user_id, item_id):
    """
        Predict function: average rating + user bias + item bias. Handle
        corner cases where there is missing user id and item id. 
    """
    if (user_id in user_bias) and (item_id in item_bias) :
        return avg_rating + user_bias[user_id] + item_bias[item_id]
    elif user_id in user_bias:
        return avg_rating + user_bias[user_id]
    elif item_id in item_bias:
        return avg_rating + item_bias[item_id]
    else: 
        return avg_rating
def predict(test_data, user_column, item_column, ratings_column):
    predict_udf = F.udf(udf_predict, T.FloatType())
    return test_data.withColumn(ratings_column, predict_udf(F.col(user_column), F.col(item_column)))

In [36]:
###Do cross join so we can predict on everything, not just the data that is available in the test set
# user_vals = test.select('userId')
# rating_vals = test.select('movieId')
# predict_df = user_vals.crossJoin(rating_vals)
predicted_ratings_baseline = predict(val, user_column='userId', item_column='movieId', ratings_column='rating')

In [37]:
rmse_baseline, mae_baseline, r2_baseline, explainedvar_baseline = \
regression_metric(val, predicted_ratings_baseline)

RMSE = 0.913494648428
MAE = 0.684286236834
R-squared = 0.291004251236
Explained Variance = 0.627566356625


In [38]:
averageprecision_baseline, precision_baseline, ndcg_baseline = \
ranking_metric(val, predicted_ratings_baseline)

mean Average Precision = 0.747433432528
Precision at k = 0.503518355896
NDCG at k = 0.846501517443


### Item Based Model <br> <br>

In [None]:
train_df = train.toPandas()
val_df = test.toPandas()
reader = Reader(rating_scale=(0.0,5.0))
train_data = Dataset.load_from_df(train_df[['userId','movieId','rating']],reader)
val_data = Dataset.load_from_df(val_df[['userId','movieId','rating']], reader)

In [None]:
sim_options = {'name':'cosine', 'user_based':False}
algo = KNNBasic(sim_options = sim_options)
### Error 
algo.fit(train_data)
predictions = algo.test(val_data)
accuracy.rmse(predictions)

### Matrix Factorization Model

In [39]:
# Example Run: ALS with random chosen parameters
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
als_model = als.fit(train)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
predicted_ratings_als = als_model.transform(val)
predicted_ratings_als = predicted_ratings_als.withColumn('rating', predicted_ratings_als.prediction).drop('prediction')

In [40]:
rmse_als, mae_als, r2_als, explainedvar_als= \
regression_metric(val, predicted_ratings_als)

RMSE = 0.949870120165
MAE = 0.717391207724
R-squared = 0.233415437818
Explained Variance = 0.785107962996


In [41]:
averageprecision_als, precision_als, ndcg_als = \
ranking_metric(val, predicted_ratings_als)

mean Average Precision = 0.75692714216
Precision at k = 0.515680348528
NDCG at k = 0.859847192553


#### Hyperparameter Tuning

In [151]:
"""
CAUTION: THIS CELL TAKES LONG TO RUN
Hyperparameter tuning using one validation set. 
"""

grid = {'maxIter':[5], 'regParam': [1.0,2.0]}
param_vals = []
for key,val in grid.items():
    param_vals.append(val)
    
final_results = dict()
for i in itertools.product(*param_vals):
    evaluation = [] 
    inputs = dict()
    for j,(key,val) in enumerate(grid.items()):
        inputs[key] = i[j]
    inputs['userCol'] ='userId'
    inputs['itemCol'] = 'movieId'
    inputs['ratingCol'] = 'rating'
    als = ALS(**inputs)
    model = als.fit(train)
    predictions = model.transform(val)
    cv_rmse, cv_mae, cv_rw, cv_exp_var = \
    regression_metric(val, predictions)
    evaluation.append({'rmse': cv_rmse})
    final_results[i] = evaluation


# If you want to run hyperparameter tuning using cross validation. This takes even longer time.
# paramMap = ParamGridBuilder() \
#                     .addGrid(als.rank, [10, 50, 100]) \
#                     .addGrid(als.maxIter, [10]) \
#                     .addGrid(als.regParam, [0.01,0.001]) \
#                     .build()

# evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

# # Run cross-validation, and choose the best set of parameters.
# cv = CrossValidator(estimator=als,
#                             estimatorParamMaps=paramMap,
#                             evaluator=evaluatorR,
#                            numFolds=5)

# cv_res = cv.fit(subsampled_train)
# final_results
# predicted_ratings_als = cv_res.bestModel.transform(subsampled_test)
# predicted_ratings_als = predicted_ratings_als.withColumn('rating', predicted_ratings_als.prediction).drop('prediction')

In [None]:
rmse_als, mae_als, r2_als, explainedvar_als= \
regression_metric(val, predicted_ratings_als)

In [None]:
averageprecision_als, precision_als, ndcg_als = \
ranking_metric(val, predicted_ratings_als)

### Evaluation Metric

#### I. Ranking Metric

**(i) AUC **

In [None]:
def areaUnderCurve(positiveData, bAllArtistIDs, predictFunction): 
#     // What this actually computes is AUC, per user. The result is actually something
#     // that might be called "mean AUC".

#     // Take held-out data as the "positive".
#     // Make predictions for each of them, including a numeric score
#     val positivePredictions = predictFunction(positiveData.select("user", "artist")).
#       withColumnRenamed("prediction", "positivePrediction")

#     // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
#     // small AUC problems, and it would be inefficient, when a direct computation is available.

#     // Create a set of "negative" products for each user. These are randomly chosen
#     // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int,Int)].j
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
#         // Make at most one pass over all artists to avoid an infinite loop.
#         // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
#           // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) {
            negative += artistID
          }
          i += 1
        }
#         // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

#     // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

#     // Join positive predictions to negative predictions by user, only.
#     // This will result in a row for every possible pairing of positive and negative
#     // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

#     // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
#     // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")

#     // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    return meanAUC


In [None]:
def areaUnderCurve(
      positiveData: DataFrame,
      bAllArtistIDs: Broadcast[Array[Int]],
      predictFunction: (DataFrame => DataFrame)): Double = {

#     // What this actually computes is AUC, per user. The result is actually something
#     // that might be called "mean AUC".

#     // Take held-out data as the "positive".
#     // Make predictions for each of them, including a numeric score
#     val positivePredictions = predictFunction(positiveData.select("user", "artist")).
#       withColumnRenamed("prediction", "positivePrediction")

#     // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
#     // small AUC problems, and it would be inefficient, when a direct computation is available.

#     // Create a set of "negative" products for each user. These are randomly chosen
#     // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
#         // Make at most one pass over all artists to avoid an infinite loop.
#         // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
#           // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) {
            negative += artistID
          }
          i += 1
        }
#         // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

#     // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

#     // Join positive predictions to negative predictions by user, only.
#     // This will result in a row for every possible pairing of positive and negative
#     // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

#     // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
#     // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")
    
#     // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

In [46]:
allUserId = subsampled_traincv.select('userId').distinct().rdd.map(lambda x: x[0]).collect()
ballUserId = spark.sparkContext.broadcast(allUserId)