# Running ALS on MovieLens (PySpark)

Matrix factorization by ALS (Alternating Least Squares) is a well known collaborative filtering algorithm.

This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets. We use a smaller dataset in this example to run ALS efficiently on multiple cores of a Data Science Virtual Machine.

Note: This notebook requires a PySpark environment to run properly. Please follow the steps in SETUP.md to install the PySpark environment.

In [None]:
!git push origin main

Username for 'https://github.com/kalyani-subbiah/amazon-recommender': 

In [1]:
!pip3 install recommenders

Collecting recommenders
  Downloading recommenders-1.1.0-py3-none-manylinux1_x86_64.whl (335 kB)
     |████████████████████████████████| 335 kB 11.0 MB/s            
[?25hCollecting memory-profiler<1,>=0.54.0
  Downloading memory_profiler-0.60.0.tar.gz (38 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting category-encoders<2,>=1.3.0
  Downloading category_encoders-1.3.0-py2.py3-none-any.whl (61 kB)
     |████████████████████████████████| 61 kB 1.7 MB/s             
Collecting pandera[strategies]>=0.6.5
  Downloading pandera-0.6.5-py3-none-any.whl (79 kB)
     |████████████████████████████████| 79 kB 12.0 MB/s            
[?25hCollecting cornac<2,>=1.1.2
  Downloading cornac-1.14.2-cp36-cp36m-manylinux1_x86_64.whl (12.5 MB)
     |████████████████████████████████| 12.5 MB 62.2 MB/s            
[?25hCollecting transformers<5,>=2.5.0
  Downloading transformers-4.18.0-py3-none-any.whl (4.0 MB)
     |████████████████████████████████| 4.0 MB 53.9 MB/s            
Collecti

     |████                            | 839 kB 22.6 MB/s eta 0:00:01     |████                            | 849 kB 22.6 MB/s eta 0:00:01     |████▏                           | 860 kB 22.6 MB/s eta 0:00:01     |████▏                           | 870 kB 22.6 MB/s eta 0:00:01     |████▎                           | 880 kB 22.6 MB/s eta 0:00:01     |████▎                           | 890 kB 22.6 MB/s eta 0:00:01     |████▍                           | 901 kB 22.6 MB/s eta 0:00:01     |████▍                           | 911 kB 22.6 MB/s eta 0:00:01     |████▍                           | 921 kB 22.6 MB/s eta 0:00:01     |████▌                           | 931 kB 22.6 MB/s eta 0:00:01     |████▌                           | 942 kB 22.6 MB/s eta 0:00:01     |████▋                           | 952 kB 22.6 MB/s eta 0:00:01     |████▋                           | 962 kB 22.6 MB/s eta 0:00:01     |████▊                           | 972 kB 22.6 MB/s eta 0:00:01     |████▊                       

     |████████████████████████▏       | 5.0 MB 22.6 MB/s eta 0:00:01     |████████████████████████▏       | 5.0 MB 22.6 MB/s eta 0:00:01     |████████████████████████▎       | 5.0 MB 22.6 MB/s eta 0:00:01     |████████████████████████▎       | 5.0 MB 22.6 MB/s eta 0:00:01     |████████████████████████▍       | 5.0 MB 22.6 MB/s eta 0:00:01     |████████████████████████▍       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▍       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▌       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▌       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▋       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▋       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▊       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▊       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▉       | 5.1 MB 22.6 MB/s eta 0:00:01     |████████████████████████▉  

     |████████████████████████████████| 6.6 MB 22.6 MB/s            
Collecting sacremoses
  Downloading sacremoses-0.0.53.tar.gz (880 kB)
     |████████████████████████████████| 880 kB 55.0 MB/s            
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: lightfm, memory-profiler, retrying, scikit-surprise, sacremoses
  Building wheel for lightfm (setup.py) ... [?25ldone
[?25h  Created wheel for lightfm: filename=lightfm-1.16-cp36-cp36m-linux_x86_64.whl size=762032 sha256=5ad8f4cf68c3f3b47d7eb657b3bfde8c54886d8e886da0ef5b9693e5f99ed6ab
  Stored in directory: /home/ec2-user/.cache/pip/wheels/6c/f0/48/ffe9095b572a6b4adde8fda07a20ea92b68ce33577d0e22adb
  Building wheel for memory-profiler (setup.py) ... [?25ldone
[?25h  Created wheel for memory-profiler: filename=memory_profiler-0.60.0-py3-none-any.whl size=31276 sha256=a5e1ac3c8426f201c6a1cd9a63c36dcd52de8e0f69369e1dc71210d7c1232773
  Stored in directory: /home/ec2-user/.cache/pip/wheels/f

In [2]:
# set the environment path to find Recommenders
import sys
import os

import boto3

import pyspark
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.6.13 | packaged by conda-forge | (default, Feb 19 2021, 05:36:01) 
[GCC 9.3.0]
Spark version: 2.4.0


In [3]:
# top k
TOP_K=10

DATA_PATH='../data/amazon_reviews_us_Electronics_v1_00.tsv'

COL_USER = "customer_id"
COL_ITEM = "product_parent"
COL_RATING = "star_rating"
COL_PREDICTION = "star_rating"
COL_TIMESTAMP = "review_date"

## 0. Set up Spark Context

The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap.

In [4]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = pyspark.sql.SparkSession \
        .builder \
        .config("spark.executor.instances", 10) \
        .config("spark.driver.memory", "16g") \
        .getOrCreate()
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

In [5]:
BUCKET_NAME = 'amazon-reviews-pds'
LOCAL_DIR = 'data/'
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(BUCKET_NAME)

first = True

for object_summary in my_bucket.objects.filter(Prefix="parquet/product_category=Electronics/"):
    OBJECT_NAME = object_summary.key
    LOCAL_FILE = LOCAL_DIR + OBJECT_NAME.split("/")[-1]
    s3_client.download_file(BUCKET_NAME, OBJECT_NAME, LOCAL_FILE)
    data = spark.read.parquet(LOCAL_FILE)
    print(data.count())
    if first:
        first = False
    else:
        data = prev_data.union(data)
    prev_data = data

print(data.count())



312489
312064
311812
311393
312698
311962
312445
312888
311731
311456
3120938


In [6]:
data = data.select(COL_USER, COL_ITEM, COL_RATING)

In [7]:
data.show(2)

+-----------+--------------+-----------+
|customer_id|product_parent|star_rating|
+-----------+--------------+-----------+
|   52826068|     822091995|          4|
|   13676500|     662432872|          5|
+-----------+--------------+-----------+
only showing top 2 rows



In [28]:
data = data.withColumn(COL_USER,col(COL_USER).cast("integer"))
data = data.withColumn(COL_ITEM,col(COL_ITEM).cast("integer"))
data = data.withColumn(COL_RATING,col(COL_RATING).cast("integer"))

In [9]:
data.count()

3120938

In [10]:
train, test = spark_random_split(data, ratio=0.7, seed=56)
print("N train", train.cache().count())
print("N test", test.cache().count())

N train 2184092
N test 936846


In [11]:
data.count()

3120938

## 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data

To prodict ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from: http://mymedialite.net/examples/datasets.html

We do not constrain the latent factors (nonnegative=False) in order to allow for both positive and negative preferences towards movies. Timing will vary depending on the machine being used to train. 

In [32]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}

als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=True,
    seed=42,
    **header
)

In [33]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 62.95778563800013 seconds for training.


In [35]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator=RegressionEvaluator(metricName="rmse",labelCol=COL_RATING,predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
r2= evaluator.evaluate(predictions, {evaluator.metricName: "r2"}) 
print("RMSE="+str(rmse))
print("R2="+str(r2))

predictions.show()

RMSE=2.091033207999272
R2=-1.8089138303701344
+-----------+--------------+-----------+-----------+
|customer_id|product_parent|star_rating| prediction|
+-----------+--------------+-----------+-----------+
|   21056281|       3077953|          5|  3.3142796|
|   44800986|       3077953|          5|   4.784516|
|   52233347|       3077953|          5|   6.179378|
|   12966018|       3077953|          5|  2.9326391|
|   45525742|      12263628|          1|  7.1528616|
|   17097300|      12341146|          5|  4.0950546|
|   15485358|      12341146|          5|   3.834104|
|   52742638|      12341146|          5|  1.5699214|
|   20522375|      12341146|          5|  3.5095272|
|   12863291|      12341146|          4|  4.3305893|
|   38928074|      12341146|          1|   4.121529|
|   31867850|      12341146|          5|  1.0948269|
|   22647559|      12341146|          3|  2.0678208|
|   18108139|      12341146|          5|0.064083464|
|   35234567|      12341146|          4|  2.6849165|
