In [1]:
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.9.15 | packaged by conda-forge | (main, Nov 22 2022, 08:48:25) 
[Clang 14.0.6 ]
Spark version: 3.3.1


In [2]:
# top k items to recommend
TOP_K = 100

<h1>Set up spark context</h1>

In [3]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="10g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/18 22:24:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.sparkContext.setCheckpointDir('./checkpoints')

<h1>Load the MovieLens dataset</h1>

In [5]:
# Column names for the dataset
COL_USER = "UserId"
COL_ITEM = "ItemId"
COL_RATING = "Rating"
COL_TIMESTAMP = "Timestamp"

In [6]:
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)

train = spark.read.load('../data/sas/train.csv', format="csv", header="true", sep=',', schema=schema)
test = spark.read.load('../data/sas/test.csv', format="csv", header="true", sep=',', schema=schema)

In [7]:
print ("N train", train.cache().count())
print ("N test", test.cache().count())

                                                                                

N train 67791
N test 22487


<h1>Train ALS</h1>

In [10]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}

als = ALS(
    rank=10,
    maxIter=100,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [11]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 8.455119125000003 seconds for training.


- In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. (because they have already seen them)

- Therefore, the rated movies are removed from the recommended items.

- In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset.

In [12]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    # train.{col_rating} will be null on the previous step, if this pair was not seen before. These are the entries of interest to us.
    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

23/03/18 22:25:06 WARN Column: Constructing trivially true equals predicate, 'UserId#0 = UserId#0'. Perhaps you need to use aliases.




Took 16.840148750000004 seconds for prediction.


                                                                                

In [16]:
top_all.show()

+------+------+----------+
|UserId|ItemId|prediction|
+------+------+----------+
|     0|   190|0.91622233|
|     0|   632|0.90225005|
|     0|   716| 1.0188911|
|     0|   915| 1.1059035|
|     0|  1218| 1.0256288|
|     0|  1237| 1.2798171|
|     0|  1265|0.96745914|
|     0|  1327| 1.6871678|
|     0|  1478| 0.9677968|
|     0|  1578|0.99901414|
|     0|  1761|0.98501194|
|     0|  1790| 1.2123132|
|     0|  1866|0.93400866|
|     0|  2248| 1.0757623|
|     1|   587| 3.0049047|
|     1|   869| 1.0433747|
|     1|  1208| 1.8330684|
|     1|  1348|  4.342155|
|     1|  1357| 1.3046756|
|     1|  1677| 1.5294855|
+------+------+----------+
only showing top 20 rows



<h1>Evaluate how well ALS performs</h1>

In [17]:
# Needs all the predictions, to calculate Top-K, NDCG, etc.
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

                                                                                

In [19]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')



Model:	ALS
Top K:	100
MAP:	0.003648
NDCG:	0.018429
Precision@K:	0.002615
Recall@K:	0.049072


                                                                                

<h1>Evaluate rating prediction</h1>

In [20]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()



+------+------+------+----------+----------+
|UserId|ItemId|Rating| Timestamp|prediction|
+------+------+------+----------+----------+
|  2580|   148|   1.0|1497895920|  1.206405|
|  2572|   148|   1.0|1497838440| 1.2108516|
|  5490|   148|   1.0|1494433680| 1.0447582|
|  3287|   148|   1.0|1495538100| 0.7834258|
|  3587|   148|   3.0|1498161420|  3.643561|
|  4526|   148|   1.0|1496456760|0.99113685|
|   122|   148|   1.0|1497617580| 0.9657242|
|  4280|   148|   1.0|1496850360| 1.3617122|
|  4839|   148|   2.0|1498768860| 2.1397343|
|  2824|   148|   1.0|1495063440|  1.037982|
|  2150|   148|   2.0|1497262380| 2.3734658|
|  1901|   148|   6.0|1496875560|  8.262979|
|  2019|   148|   1.0|1497797160|0.92047983|
|  5470|   148|   4.0|1498070340| 1.9968414|
|  1720|   148|   2.0|1496330760|0.95702773|
|  2598|   148|   1.0|1496922600| 1.3155339|
|   900|   148|   6.0|1497556140|  5.480857|
|   794|   148|   1.0|1498646520| 4.5575533|
|  3202|   148|   1.0|1498859820| 1.0086267|
|  4156|  

                                                                                

In [21]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

                                                                                

Model:	ALS rating prediction
RMSE:	1.765943
MAE:	0.728060
Explained variance:	0.516858
R squared:	0.507510


In [15]:
if not is_jupyter():
    # Record results with papermill for tests
    import scrapbook as sb
    sb.glue("map", rank_eval.map_at_k())
    sb.glue("ndcg", rank_eval.ndcg_at_k())
    sb.glue("precision", rank_eval.precision_at_k())
    sb.glue("recall", rank_eval.recall_at_k())
    sb.glue("rmse", rating_eval.rmse())
    sb.glue("mae", rating_eval.mae())
    sb.glue("exp_var", rating_eval.exp_var())
    sb.glue("rsquared", rating_eval.rsquared())
    sb.glue("train_time", train_time.interval)
    sb.glue("test_time", test_time.interval)

In [22]:
# cleanup spark instance
spark.stop()