# ALS Bagging Ensemble for Movie Recommendation
<br>  
This notebook shows an example of using bagging ensemble w/ pySpark ASL on [MovieLens](https://grouplens.org/datasets/movielens/) data

In [1]:
import pyspark
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    FloatType,
    IntegerType,
    LongType
)

import pandas as pd

from bagging import RecoBagging
from ranking_evaluator import RankingEvaluator

spark = SparkSession.builder.appName("SAR pySpark").getOrCreate()

In [2]:
# Number of models to combine
NUM_MODELS = 10

# Number of items to recommend for each user
TOP_K = 10

### Data loading
Load 100k MovieLens data and randomly split into training and test sets

In [3]:
url = "http://files.grouplens.org/datasets/movielens/ml-100k/u.data"
data_pd = pd.read_csv(url, sep='\t', names=['userId', 'movieId', 'rating', 'timestamp'])
assert len(data_pd) == 100000

data_pd['rating'] = data_pd['rating'].astype(float)
data_pd.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,196,242,3.0,881250949
1,186,302,3.0,891717742
2,22,377,1.0,878887116
3,244,51,2.0,880606923
4,166,346,1.0,886397596


In [4]:
schema = StructType([
    StructField('userId', IntegerType()),
    StructField('movieId', IntegerType()),
    StructField('rating', FloatType()),
    StructField('timestamp', LongType()),
])
data_df = spark.createDataFrame(data_pd, schema=schema)

train, test = data_df.randomSplit([0.7, 0.3], 123)
print("Train vs test: {} vs {}".format(
    train.cache().count(),
    test.cache().count()))

Train vs test: 69854 vs 30146


### Training
Train multiple ALS with bootstraping sampling. To add more diversity in the ensemble, randomize some of the ALS hyper params too.


In [5]:
params = {
    'userCol': 'userId',
    'itemCol': 'movieId',
    'ratingCol': 'rating',
    'rank': (20, 50),
    'maxIter': 15,
    'implicitPrefs': True,
    'alpha': (0.1, 40.0),
    'regParam': (0.01, 0.2),
    'coldStartStrategy': 'drop',
    'nonnegative': True
}

bagging = RecoBagging(
    ALS,
    num_models=NUM_MODELS,
    user_col='userId', item_col='movieId', rating_col='rating',
    **params
)

bagging.fit(train)

Training model 0 {'userCol': 'userId', 'itemCol': 'movieId', 'ratingCol': 'rating', 'rank': 31, 'maxIter': 15, 'implicitPrefs': True, 'alpha': 15.023938133735445, 'regParam': 0.012762680426829758, 'coldStartStrategy': 'drop', 'nonnegative': True}
Training model 1 {'userCol': 'userId', 'itemCol': 'movieId', 'ratingCol': 'rating', 'rank': 22, 'maxIter': 15, 'implicitPrefs': True, 'alpha': 33.476069414287075, 'regParam': 0.1558492757743099, 'coldStartStrategy': 'drop', 'nonnegative': True}
Training model 2 {'userCol': 'userId', 'itemCol': 'movieId', 'ratingCol': 'rating', 'rank': 48, 'maxIter': 15, 'implicitPrefs': True, 'alpha': 35.92503483617226, 'regParam': 0.1519447821933119, 'coldStartStrategy': 'drop', 'nonnegative': True}
Training model 3 {'userCol': 'userId', 'itemCol': 'movieId', 'ratingCol': 'rating', 'rank': 40, 'maxIter': 15, 'implicitPrefs': True, 'alpha': 25.814073409363534, 'regParam': 0.172423724155834, 'coldStartStrategy': 'drop', 'nonnegative': True}
Training model 4 {'u

### Testing
Recommend top-k movies for each user

In [6]:
recommendations = bagging.recommend_k_items(test, top_k=TOP_K, merge_by='sum', scale=True)
recommendations.cache().show()

Recommending by 0
Recommending by 1
Recommending by 2
Recommending by 3
Recommending by 4
Recommending by 5
Recommending by 6
Recommending by 7
Recommending by 8
Recommending by 9
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[[432, 1.26218629...|
|   463|[[277, 0.98066174...|
|   471|[[220, 0.78397213...|
|   496|[[1084, 0.8240563...|
|   833|[[39, 1.758559763...|
|   243|[[283, 1.38636025...|
|   392|[[242, 0.95355752...|
|   540|[[288, 1.18128086...|
|   623|[[210, 0.89439146...|
|   737|[[191, 1.15225412...|
|   858|[[286, 2.58612694...|
|   897|[[148, 0.87238724...|
|    31|[[513, 1.44179504...|
|   516|[[169, 0.76956390...|
|    85|[[1020, 1.3456176...|
|   137|[[50, 2.460963522...|
|   251|[[181, 1.76799780...|
|   451|[[881, 2.24169102...|
|   580|[[300, 1.04780128...|
|   808|[[327, 1.28745436...|
+------+--------------------+
only showing top 20 rows



### Evaluation
Evaluate the recommendation results by using [Spark's ranking metrics](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html#ranking-systems). Since the Spark mllib's is based on RDD, a DataFrame wrapper class `RankingEvaluator` is implemented and used here. 
For more information about the evaluation metrics, see [link](https://en.wikipedia.org/wiki/Evaluation_measures_(information_retrieval)).

In [7]:
"""
Result of bagging ensemble of ALS
"""
ranking_evaluator = RankingEvaluator(test, reco_df=recommendations,
    user_col='userId', item_col='movieId', rating_col='rating', reco_col='recommendations')

result_bagging = {}
result_bagging['model'] = "Bagging(ALS)"
result_bagging['ndcg@k'] = ranking_evaluator.ndcgAt(TOP_K) 
result_bagging['precision@k'] = ranking_evaluator.precisionAt(TOP_K)
result_bagging['recall@k'] = ranking_evaluator.recallAt(TOP_K)
result_bagging['map'] = ranking_evaluator.meanAveragePrecision()

In [10]:
"""
Results of the individual ALS in the ensemble and their max and mean
"""
results = {
    'model': [],
    'ndcg@k': [],
    'precision@k': [],
    'recall@k': [],
    'map': []
}

for i in range(bagging.num_models):
    reco = bagging.reco_lists_df.filter(F.col('model') == i)
        
    rank_eval = RankingEvaluator(test, reco_df=reco,
        user_col='userId', item_col='movieId', rating_col='rating', reco_col='recommendations')
    results['model'].append("ALS " + str(i+1))
    results['ndcg@k'].append(rank_eval.ndcgAt(TOP_K))
    results['precision@k'].append(rank_eval.precisionAt(TOP_K))
    results['recall@k'].append(rank_eval.recallAt(TOP_K))
    results['map'].append(rank_eval.meanAveragePrecision())

result_max = {}
result_max['model'] = "Max(ALS)"
result_max['ndcg@k'] = max(results['ndcg@k'])
result_max['precision@k'] = max(results['precision@k'])
result_max['recall@k'] = max(results['recall@k'])
result_max['map'] = max(results['map'])

result_avg = {}
result_avg['model'] = "Avg(ALS)"
result_avg['ndcg@k'] = sum(results['ndcg@k']) /  len(results['ndcg@k']) 
result_avg['precision@k'] = sum(results['precision@k']) / len(results['precision@k'])
result_avg['recall@k'] = sum(results['recall@k']) / len(results['recall@k'])
result_avg['map'] = sum(results['map']) / len(results['map'])


In [16]:
results['model'].append(result_max['model'])
results['ndcg@k'].append(result_max['ndcg@k'])
results['precision@k'].append(result_max['precision@k'])
results['recall@k'].append(result_max['recall@k'])
results['map'].append(result_max['map'])

results['model'].append(result_avg['model'])
results['ndcg@k'].append(result_avg['ndcg@k'])
results['precision@k'].append(result_avg['precision@k'])
results['recall@k'].append(result_avg['recall@k'])
results['map'].append(result_avg['map'])

results['model'].append(result_bagging['model'])
results['ndcg@k'].append(result_bagging['ndcg@k'])
results['precision@k'].append(result_bagging['precision@k'])
results['recall@k'].append(result_bagging['recall@k'])
results['map'].append(result_bagging['map'])


result_table = pd.DataFrame.from_dict(results)
result_table

Unnamed: 0,model,ndcg@k,precision@k,recall@k,map
0,ALS 1,0.147004,0.140297,0.075325,0.0298
1,ALS 2,0.116004,0.112513,0.061447,0.022492
2,ALS 3,0.122784,0.121103,0.065097,0.024557
3,ALS 4,0.130224,0.129692,0.067485,0.025508
4,ALS 5,0.144424,0.141145,0.074903,0.028455
5,ALS 6,0.119779,0.117709,0.064514,0.023684
6,ALS 7,0.140104,0.134464,0.074453,0.029743
7,ALS 8,0.133569,0.130753,0.067657,0.02614
8,ALS 9,0.129028,0.125557,0.069247,0.02685
9,ALS 10,0.119595,0.118982,0.06719,0.024682
