<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>

<i>Licensed under the MIT License.</i>

# Running ALS on MovieLens (PySpark)

Matrix factorization by [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (Alternating Least Squares) is a well known collaborative filtering algorithm.

This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets. We use a smaller dataset in this example to run ALS efficiently on multiple cores of a [Data Science Virtual Machine](https://azure.microsoft.com/en-gb/services/virtual-machines/data-science-virtual-machines/).

**Note**: This notebook requires a PySpark environment to run properly. Please follow the steps in [SETUP.md](https://github.com/Microsoft/Recommenders/blob/master/SETUP.md#dependencies-setup) to install the PySpark environment.

In [14]:
# set the environment path to find Recommenders
import sys
sys.path.append("../../")
import time
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType, ArrayType

from reco_utils.dataset import movielens
from reco_utils.common.notebook_utils import is_jupyter
from reco_utils.dataset.spark_splitters import spark_random_split
from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from reco_utils.common.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))


System version: 3.7.4 (default, Aug 13 2019, 15:17:50) 
[Clang 4.0.1 (tags/RELEASE_401/final)]
Spark version: 2.4.5


Set the default parameters.

In [15]:
# top k items to recommend
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '10k'

### 0. Set up Spark context

The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap. 

In [16]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap

spark = start_or_get_spark(memory="16g")

### 1. Download the Cell Phone dataset

In [17]:
#Define custom schema
schema = StructType(
    (
        StructField("asin", StringType()),
        StructField("overall", FloatType()),
        StructField("reviewerID", StringType()),
        StructField("reviewText", StringType()),
        StructField("summary", StringType())
    )
)

# data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data = spark.read.schema(schema).json("data/Cell_Phones_and_Accessories.json").limit(20000)

In [18]:
data.show()

+----------+-------+--------------+--------------------+--------------------+
|      asin|overall|    reviewerID|          reviewText|             summary|
+----------+-------+--------------+--------------------+--------------------+
|098949232X|    5.0|A1GG51FWU0XQYH|If your into spac...|          Five Stars|
|098949232X|    5.0| AVFIDS9RK38E0|   Awesome pictures!|          Five Stars|
|098949232X|    5.0|A2S4AVR5SJ7KMI|Great wall art an...|          Five Stars|
|098949232X|    5.0| AEMMMVOR9BFLI|As always, it is ...|I love it. I buy ...|
|098949232X|    5.0|A2DZXMBTY7KLYP|This is a fantast...|     Great Calendar.|
|098949232X|    5.0| AUD367H6I25FX|It's great, I get...|    Awesome Calendar|
|098949232X|    5.0|A3K6KUWAZ6SWHE|2015 will be my 3...|     Great calendar!|
|098949232X|    5.0|A1FPEO0ME9G4VY|My son loves this...|          Five Stars|
|098949232X|    5.0|A20AOY7UXJA710|A great calendar ...|Great Calendar fo...|
|098949232X|    5.0|A222LHL23AH0GK|Lots and lots of ...|        

In [19]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
#https://stackoverflow.com/questions/48279056/how-to-create-row-index-for-a-spark-dataframe-using-window-partionby
w = Window().partitionBy().orderBy("asin")
data = data.withColumn('productIndex',F.rank().over(w))

w = Window().partitionBy().orderBy("reviewerID")
data = data.withColumn('reviewerIndex',F.rank().over(w))
data.show()

+----------+-------+--------------------+--------------------+--------------------+------------+-------------+
|      asin|overall|          reviewerID|          reviewText|             summary|productIndex|reviewerIndex|
+----------+-------+--------------------+--------------------+--------------------+------------+-------------+
|B00009WCAP|    1.0|A0265436JMR91F9LHBXT|This did not help...|This product is h...|        5748|            1|
|B0002SYC5O|    1.0|A0564474RQMYYH3H95UC|they will not sta...|                Crap|        9151|            2|
|8199900164|    5.0|A0617213KGAVUMXH6NK4|Not only does thi...|Efficient and fai...|         966|            3|
|B0006TIA8Y|    5.0|A0651739GZEV56UR6T54|great product wit...|Best  USB extende...|       13097|            4|
|7508492919|    5.0|A0755549VZ3OU6OE9EHO|perfect exactly w...|             SO cute|         301|            5|
|B00006JPBY|    4.0|A09781781CO6UDP1IQGW|Should note the s...|          Convenient|        5010|            6|
|

## NLTK

In [1]:
import nltk
import random
from nltk.classify.scikitlearn import SklearnClassifier
import pickle
from sklearn.naive_bayes import MultinomialNB, BernoulliNB
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import SVC, LinearSVC, NuSVC
from nltk.classify import ClassifierI
from statistics import mode
from nltk.tokenize import word_tokenize
import re

### 2. Split the data using the Spark random splitter provided in utilities

In [13]:
# Dropping unnecessary columns
data = data.select([c for c in data.columns if c in ['productIndex','reviewerIndex','overall', 'reviewText', 'summary']])
data.show()

+-------+------------+-------------+
|overall|productIndex|reviewerIndex|
+-------+------------+-------------+
|    1.0|        5748|            1|
|    1.0|        9151|            2|
|    5.0|         966|            3|
|    5.0|       13097|            4|
|    5.0|         301|            5|
|    4.0|        5010|            6|
|    1.0|       11437|            7|
|    1.0|        2172|            8|
|    5.0|       16822|            9|
|    5.0|       14702|           10|
|    5.0|        4814|           11|
|    2.0|       15869|           12|
|    1.0|        5748|           13|
|    1.0|       14702|           14|
|    5.0|        3756|           15|
|    5.0|       13097|           16|
|    4.0|        1083|           17|
|    2.0|        2246|           18|
|    5.0|        2321|           19|
|    5.0|        5309|           20|
+-------+------------+-------------+
only showing top 20 rows



In [8]:
train, test = spark_random_split(data, ratio=0.75, seed=420) #;)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 15029
N test 4971


### 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data

To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from [here](http://mymedialite.net/examples/datasets.html). We do not constrain the latent factors (`nonnegative = False`) in order to allow for both positive and negative preferences towards movies.
Timing will vary depending on the machine being used to train.

In [9]:
header = {
    "itemCol": "productIndex",
    "userCol": "reviewerIndex",
    "ratingCol": "overall",
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
#     nonnegative=TRUE,
    seed=42,
    #TODO figure out what this does
    **header
)

In [10]:
start_time = time.time()
model = als.fit(train)
train_time = time.time() - start_time
print("Took {} seconds for training.".format(train_time))

Took 7.106967210769653 seconds for training.


In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.

In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset.

In [11]:
start_time = time.time()

# Get the cross join of all user-item pairs and score them.
users = train.select('productIndex').distinct()
items = train.select('reviewerIndex').distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)

# Remove seen items.
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
    train.alias("train"),
    (dfs_pred['productIndex'] == train['productIndex']) & (dfs_pred['reviewerIndex'] == train['reviewerIndex']),
    how='outer'
)

top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.overall"].isNull()) \
    .select('pred.' + 'productIndex', 'pred.' + 'reviewerIndex', 'pred.' + "prediction")

# In Spark, transformations are lazy evaluation
# Use an action to force execute and measure the test time 
top_all.cache().count()

test_time = time.time() - start_time
print("Took {} seconds for prediction.".format(test_time))

Took 61.163707971572876 seconds for prediction.


In [12]:
top_all.show()

+------------+-------------+-----------+
|productIndex|reviewerIndex| prediction|
+------------+-------------+-----------+
|           1|          587|  -3.286294|
|           1|         1208| 0.36795348|
|           1|         1348| -3.2554672|
|           1|         1677|-0.38888377|
|           1|         1702| 0.78687894|
|           1|         1720|  0.7981349|
|           1|         2086| -1.1226474|
|           1|         2324|  -3.286294|
|           1|         2483|  -3.905129|
|           1|         2667| 0.38557625|
|           1|         3452|  -3.286294|
|           1|         3468| -1.4968638|
|           1|         3668|  -3.286294|
|           1|         4136|-0.38994172|
|           1|         4949|-0.26167196|
|           1|         5501| -1.1960809|
|           1|         5562| 0.23899707|
|           1|         5668| -0.7484319|
|           1|         5957|  -3.905129|
|           1|         6031| -0.4864055|
+------------+-------------+-----------+
only showing top

### 4. Evaluate how well ALS performs

In [13]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user="productIndex", col_item="reviewerIndex", 
                                    col_rating="overall", col_prediction="prediction", 
                                    relevancy_method="top_k")

In [14]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.000375
NDCG:	0.000979
Precision@K:	0.000901
Recall@K:	0.000901


### 5. Evaluate rating prediction

In [15]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()

+-------+------------+-------------+-----------+
|overall|productIndex|reviewerIndex| prediction|
+-------+------------+-------------+-----------+
|    5.0|       16224|        10377|  4.9629183|
|    3.0|       16224|         8061| -2.7615619|
|    4.0|        5529|          631|-0.20651442|
|    4.0|        5529|        17054| 0.08290452|
|    4.0|        5529|         2132|  -1.548662|
|    4.0|       14410|        14817| 0.50653875|
|    2.0|       14410|         4883| 0.20261544|
|    4.0|        8053|         1876|   2.069514|
|    3.0|        8053|        10628|-0.63113046|
|    1.0|        8053|         5745|  1.9729208|
|    3.0|       14370|         7448|  4.1374593|
|    4.0|        4227|        11382| -2.1725307|
|    4.0|       11437|         6912|  2.0627642|
|    2.0|       11437|        10047|  1.3751763|
|    4.0|       13988|         2927|   3.784785|
|    5.0|        8131|         9362|  1.2370882|
|    3.0|       16543|         2142|  3.9550729|
|    1.0|       1654

In [23]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user="productIndex", col_item="reviewerIndex", 
                                    col_rating="overall", col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	3.371438
MAE:	2.827619
Explained variance:	-1.371053
R squared:	-5.381247


In [25]:
if is_jupyter():
    # Record results with papermill for tests
    import papermill as pm
    pm.record("map", rank_eval.map_at_k())
    pm.record("ndcg", rank_eval.ndcg_at_k())
    pm.record("precision", rank_eval.precision_at_k())
    pm.record("recall", rank_eval.recall_at_k())
    pm.record("rmse", rating_eval.rmse())
    pm.record("mae", rating_eval.mae())
    pm.record("exp_var", rating_eval.exp_var())
    pm.record("rsquared", rating_eval.rsquared())
    pm.record("train_time", train_time)
    pm.record("test_time", test_time)

AttributeError: module 'papermill' has no attribute 'record'

In [26]:
# cleanup spark instance
spark.stop()