In [None]:
# The notebook when opened gave error "Notebook does not appear to be JSON"
# So i'll just follow along in my own notebook


# Running ALS on MovieLens (PySpark)
Matrix factorization by ALS (Alternating Least Squares) is a well known collaborative filtering algorithm.

This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets. We use a smaller dataset in this example to run ALS efficiently on multiple cores of a Data Science Virtual Machine.

Note: This notebook requires a PySpark environment to run properly. Please follow the steps in SETUP.md to install the PySpark environment.

In [1]:
# set the environment path to find Recommenders
import sys
sys.path.append("/Users/gilmer/Recommenders")
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from reco_utils.common.timer import Timer
from reco_utils.dataset import movielens
from reco_utils.common.notebook_utils import is_jupyter
from reco_utils.dataset.spark_splitters import spark_random_split
from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from reco_utils.common.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.6.11 | packaged by conda-forge | (default, Nov 27 2020, 18:51:43) 
[GCC Clang 11.0.0]
Spark version: 2.4.5


In [2]:
# top k items to recommend
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'

# 0. Set up Spark Context
The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap.

In [13]:
!pip install findspark

Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [3]:
!echo $SPARK_HOME

/opt/anaconda3/lib/python3.7/site-packages/pyspark


In [3]:
!echo $PYTHONPATH

/opt/anaconda3/lib/python3.7/site-packages/pyspark/python:/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip


In [4]:
!echo $PATH

/opt/anaconda3/lib/python3.7/site-packages/pyspark/bin:/opt/anaconda3/lib/python3.7/site-packages/pyspark/python:/opt/anaconda3/lib/python3.7/site-packages/pyspark/bin:/opt/anaconda3/lib/python3.7/site-packages/pyspark/python:/Users/gilmer/.cargo/bin:/opt/anaconda3/envs/reco_pyspark/bin:/opt/anaconda3/condabin:/usr/local/opt/llvm/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/Applications/Postgres.app/Contents/Versions/latest/bin


In [8]:
import findspark
findspark.init('/opt/anaconda3/lib/python3.7/site-packages/pyspark')

In [11]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap

spark = start_or_get_spark("ALS PySpark", memory="16g")

Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM

# 1. Download the Movie Lens dataset

In [7]:
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
    (
        StructField("UserId", IntegerType()),
        StructField("MovieId", IntegerType()),
        StructField("Rating", FloatType()),
        StructField("Timestamp", LongType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data.show()

100%|██████████| 4.81K/4.81K [00:00<00:00, 6.29KKB/s]


+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|   196|    242|   3.0|881250949|
|   186|    302|   3.0|891717742|
|    22|    377|   1.0|878887116|
|   244|     51|   2.0|880606923|
|   166|    346|   1.0|886397596|
|   298|    474|   4.0|884182806|
|   115|    265|   2.0|881171488|
|   253|    465|   5.0|891628467|
|   305|    451|   3.0|886324817|
|     6|     86|   3.0|883603013|
|    62|    257|   2.0|879372434|
|   286|   1014|   5.0|879781125|
|   200|    222|   5.0|876042340|
|   210|     40|   3.0|891035994|
|   224|     29|   3.0|888104457|
|   303|    785|   3.0|879485318|
|   122|    387|   5.0|879270459|
|   194|    274|   2.0|879539794|
|   291|   1042|   4.0|874834944|
|   234|   1184|   2.0|892079237|
+------+-------+------+---------+
only showing top 20 rows



# 2. Split the data using the Spark random splitter provided in utilities

In [8]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 75018
N test 24982


# 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data
To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from [here](http://mymedialite.net/examples/datasets.html). We do not constrain the latent factors (nonnegative = False) in order to allow for both positive and negative preferences towards movies. Timing will vary depending on the machine being used to train.

In [9]:
header = {
    "userCol": "UserId",
    "itemCol": "MovieId",
    "ratingCol": "Rating",
}

als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [None]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))