# Running ALS on MovieLens (PySpark)
**ALS**(Alternating Least Squares)에 의한 Matrix factorization은 잘 알려진 collaborative filtering 알고리즘이다.   
이 노트에선 광범위하게 분산된 데이터셋에 대해 ALS PySpark ML을 어떻게 활용하고 평가하는지에 대한 예제를 제시한다. 이 예제에선 *Data Science Virtual Machine*의 멀티코어에서 ALS를 효과적으로 실행하기 위해 작은 데이터셋을 사용한다.   
*Note : This notebook requires a PySpark environment to run properly.*

### Set the environment path to find Recommenders

In [3]:
import sys, pyspark, warnings
warnings.simplefilter('ignore', category=FutureWarning)
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
from recommenders.utils.timer import Timer
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.datasets import movielens
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation

print('System version : {}'.format(sys.version))
print('Spark version : {}'.format(pyspark.__version__))

System version : 3.7.13 (default, Mar 29 2022, 02:18:16) 
[GCC 7.5.0]
Spark version : 3.2.1


### Set the default parameters

In [4]:
# Top_k = 추천할 상위 아이템 수
Top_k = 10

# Select MovieLens data size : 100k, 1m, 10m or 20m
Movielens_Data_Size = '100k'

# Column names for the dataset
Col_User = 'UserId'
Col_Item = 'MovieId'
Col_Rating = 'Rating'
Col_Timestamp = 'Timestamp'


### 0. Set up Spark context
The following settings work well for debugging locally on VM - change when running on a cluster. This notebook sets up a giant single executor with many threads and specify memory cap.

In [5]:
spark = start_or_get_spark('ALS PySpark', memory='16g')
spark.conf.set('spark.sql.analyzer.failAmbiguousSelfJoin', 'false')

22/06/08 14:03:18 WARN Utils: Your hostname, qwer-ND resolves to a loopback address: 127.0.1.1; using 10.140.50.75 instead (on interface enp3s0)
22/06/08 14:03:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/08 14:03:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 1. Download the MovieLens dataset

In [6]:
# The DataFrame-based API for ALS only supports integers for user and item ids.
schema = StructType((StructField(Col_User, IntegerType()),
                    StructField(Col_Item, IntegerType()),
                    StructField(Col_Rating, FloatType()),
                    StructField(Col_Timestamp, LongType())
                    ))
data = movielens.load_spark_df(spark, size=Movielens_Data_Size, schema=schema)
data.show()


100%|█████████████████████████████████████| 4.81k/4.81k [00:02<00:00, 2.04kKB/s]


+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|   196|    242|   3.0|881250949|
|   186|    302|   3.0|891717742|
|    22|    377|   1.0|878887116|
|   244|     51|   2.0|880606923|
|   166|    346|   1.0|886397596|
|   298|    474|   4.0|884182806|
|   115|    265|   2.0|881171488|
|   253|    465|   5.0|891628467|
|   305|    451|   3.0|886324817|
|     6|     86|   3.0|883603013|
|    62|    257|   2.0|879372434|
|   286|   1014|   5.0|879781125|
|   200|    222|   5.0|876042340|
|   210|     40|   3.0|891035994|
|   224|     29|   3.0|888104457|
|   303|    785|   3.0|879485318|
|   122|    387|   5.0|879270459|
|   194|    274|   2.0|879539794|
|   291|   1042|   4.0|874834944|
|   234|   1184|   2.0|892079237|
+------+-------+------+---------+
only showing top 20 rows



### 2. Split the data using the Spark random splitter provided in utilities

In [7]:
train, test = spark_random_split(data, ratio=0.75, seed=27)
print('N train', train.cache().count())
print('N test', test.cache().count())

N train 74920
N test 25080


`spark_random_split` ~ `data.randomSplit` & `process_split_ratio`
https://microsoft-recommenders.readthedocs.io/en/latest/_modules/recommenders/datasets/spark_splitters.html?highlight=spark_random_split

###  3. Train The ALS model on the training data and get the top-k recommendations for our testing data
영화 평점을 예측하기 위해 training set의 평점 데이터를 유저들의 외적 피드백으로 사용했다. We do not constrain the latent factors (nonnegative = False) in order to allow for both positive and negative preferences towards movies.    
영화 추천의 경우, 이미 평가된 영화들을 추천하는 것은 말이 안되므로 평가된 영화들은 제거해야 한다. 이를 위해, 모든 영화들을 모든 유저에게 추천한 다음, training dataset에 존재하는 유저-영화 짝들을 제거한다.

In [8]:
header = {'userCol':Col_User, 'itemCol':Col_Item, 'ratingCol':Col_Rating}

als = ALS(rank=10, maxIter=15, implicitPrefs=False,
         regParam=0.05, coldStartStrategy='drop', nonnegative=False,
         seed=27, **header)

with Timer() as train_time:
    model = als.fit(train)
print('Took {} seconds for training'.format(train_time.interval))

22/06/08 14:47:14 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/06/08 14:47:14 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/06/08 14:47:14 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Took 2.284715439000138 seconds for training


In [66]:
with Timer() as test_time:
    # 모든 user-item 짝을 얻고 점수를 매긴다.
    users = train.select(Col_User).distinct()
    items = train.select(Col_Item).distinct()    
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)
    
    # 평가된 아이템 제거
    dfs_pred_exclude_train = dfs_pred.alias('pred').join(
    train.alias('train'),
    (dfs_pred[Col_User]==train[Col_User])&(dfs_pred[Col_Item]==train[Col_Item]), how='outer')
    
    top_all = dfs_pred_exclude_train.filter(
    dfs_pred_exclude_train[f'train.{Col_Rating}'].isNull()).select(
    'pred.'+Col_User,'pred.'+Col_Item, 'pred.'+'prediction')
    
    # Use an action to force execute and measure the test time
    top_all.cache().count()
    
print('Took {} seconds for training'.format(test_time.interval))

22/06/08 15:21:37 WARN Column: Constructing trivially true equals predicate, 'UserId#0 = UserId#0'. Perhaps you need to use aliases.

Took 5.325648283996998 seconds for training




In [67]:
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587| 3.2906096|
|     1|    869|  2.681578|
|     1|   1208| 1.9780601|
|     1|   1348|  0.792479|
|     1|   1357| 0.7010497|
|     2|     80| 1.9774098|
|     2|    472| 2.9985087|
|     2|    582|  3.589076|
|     2|    838| 2.5580637|
|     2|    975|  3.080968|
|     2|   1260|  1.611711|
|     2|   1381| 3.6324484|
|     2|   1530| 1.8517529|
|     3|     22| 2.4254963|
|     3|     57| 2.9116383|
|     3|     89| 3.5991197|
|     3|    367| 2.3351898|
|     3|   1091| 0.7451291|
|     3|   1167| 0.7089374|
|     3|   1499| 2.7345362|
+------+-------+----------+
only showing top 20 rows



### 4. Evaluate how well ALS performs

In [68]:
rank_eval = SparkRankingEvaluation(test, top_all, k=Top_k,
                                  col_user=Col_User, col_item=Col_Item, col_rating=Col_Rating, col_prediction='prediction', relevancy_method='top_k')

                                                                                

In [69]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.004614
NDCG:	0.044075
Precision@K:	0.047402
Recall@K:	0.016022


### 5. Evaluate rating prediction

In [70]:
prediction = model.transform(test)
prediction.cache().show()

+------+-------+------+---------+----------+
|UserId|MovieId|Rating|Timestamp|prediction|
+------+-------+------+---------+----------+
|   251|    148|   2.0|886272547| 2.7677028|
|   642|    148|   5.0|885604163| 4.0739584|
|    26|    148|   3.0|891377540| 2.2995055|
|    44|    148|   4.0|878346946|  3.126897|
|   916|    148|   2.0|880843892| 2.1912308|
|   236|    148|   4.0|890117028|    2.2681|
|   372|    148|   5.0|876869915|  3.295108|
|   618|    148|   3.0|891309670| 2.5100422|
|   435|    148|   3.0|884133284| 3.0875416|
|     1|    148|   2.0|875240799|  2.741537|
|   178|    148|   4.0|882824325|   3.82051|
|   120|    148|   3.0|889490499| 3.9990516|
|   347|    148|   3.0|881652888|  2.958974|
|   717|    148|   3.0|884642958|   3.07442|
|   391|    148|   3.0|877400062|  2.133016|
|   234|    148|   3.0|891228196| 2.1961553|
|   938|    148|   3.0|891356500| 3.4448173|
|   181|    148|   2.0|878963204| 2.3005846|
|   893|    148|   3.0|874829287|  3.501895|
|   396|  

In [72]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=Col_User, col_item=Col_Item, col_rating=Col_Rating, col_prediction='prediction')

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')



Model:	ALS rating prediction
RMSE:	0.971747
MAE:	0.756063
Explained variance:	0.264629
R squared:	0.258666


In [73]:
if is_jupyter():
    import papermill as pm
    import scrapbook as sb
    sb.glue("map", rank_eval.map_at_k())
    sb.glue("ndcg", rank_eval.ndcg_at_k())
    sb.glue("precision", rank_eval.precision_at_k())
    sb.glue("recall", rank_eval.recall_at_k())
    sb.glue("rmse", rating_eval.rmse())
    sb.glue("mae", rating_eval.mae())
    sb.glue("exp_var", rating_eval.exp_var())
    sb.glue("rsquared", rating_eval.rsquared())
    sb.glue("train_time", train_time.interval)
    sb.glue("test_time", test_time.interval)

In [74]:
spark.stop()