In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 22 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=69a67a7f1adadae1a1e2095a5204ea179b35a9772936de63fa98aa077a92f0cf
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
!pip install recommenders

Collecting recommenders
  Downloading recommenders-1.1.0-py3-none-manylinux1_x86_64.whl (335 kB)
[?25l[K     |█                               | 10 kB 24.2 MB/s eta 0:00:01[K     |██                              | 20 kB 27.3 MB/s eta 0:00:01[K     |███                             | 30 kB 19.6 MB/s eta 0:00:01[K     |████                            | 40 kB 12.4 MB/s eta 0:00:01[K     |████▉                           | 51 kB 5.5 MB/s eta 0:00:01[K     |█████▉                          | 61 kB 6.4 MB/s eta 0:00:01[K     |██████▉                         | 71 kB 7.3 MB/s eta 0:00:01[K     |███████▉                        | 81 kB 5.8 MB/s eta 0:00:01[K     |████████▉                       | 92 kB 6.5 MB/s eta 0:00:01[K     |█████████▊                      | 102 kB 7.1 MB/s eta 0:00:01[K     |██████████▊                     | 112 kB 7.1 MB/s eta 0:00:01[K     |███████████▊                    | 122 kB 7.1 MB/s eta 0:00:01[K     |████████████▊                   | 133 kB

In [3]:
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.7.13 (default, Mar 16 2022, 17:37:17) 
[GCC 7.5.0]
Spark version: 3.2.1


In [4]:
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
#MOVIELENS_DATA_SIZE = '100k'

# Column names for the dataset
COL_USER = "UserId"
COL_ITEM = "MovieId"
COL_RATING = "Rating"
COL_TIMESTAMP = "Timestamp"

In [5]:
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

In [20]:
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)

data = spark.read.csv('/content/ratings.csv',schema=schema, header=True)
data.show()

+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [21]:
train, test = spark_random_split(data, ratio=0.80, seed=123)

In [22]:
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 80712
N test 20124


In [23]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [24]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 16.71842151299984 seconds for training.


In [25]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

Took 68.681948401 seconds for prediction.


In [26]:
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587| 4.2639647|
|     1|    869| 3.2915387|
|     1|   1357|  4.521576|
|     1|   1702| 3.9532385|
|     1|   1892|  4.268973|
|     1|   2202|  4.780009|
|     1|   2324| 5.2737136|
|     1|   2870| 3.9986348|
|     1|   3452| 3.6135578|
|     1|   3468| 4.9207306|
|     1|   3477| 3.7892077|
|     1|   3668| 4.6151853|
|     1|   4367|  3.266737|
|     1|   4979| 3.8057027|
|     1|   5501|  2.024569|
|     1|   5562|  2.481118|
|     1|   5668| 3.4021716|
|     1|   5957| 3.3796196|
|     1|   6031| 3.7804754|
|     1|   6271| 1.4388893|
+------+-------+----------+
only showing top 20 rows



In [27]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

In [28]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.000670
NDCG:	0.009532
Precision@K:	0.009836
Recall@K:	0.002543


In [29]:
prediction = model.transform(test)
prediction.cache().show()

+------+-------+------+----------+----------+
|UserId|MovieId|Rating| Timestamp|prediction|
+------+-------+------+----------+----------+
|   602|    471|   4.0| 840876085| 4.3295484|
|   409|    471|   3.0| 967912821| 5.2120667|
|   541|    471|   3.0| 835643551| 3.7046933|
|   609|    833|   3.0| 847221080| 1.6043167|
|   307|    833|   1.0|1186172725|  0.704576|
|   111|   1088|   3.0|1516153967| 3.5111418|
|    47|   1088|   4.0|1496205519| 2.9350314|
|   169|   1088|   4.5|1059427717| 4.2832947|
|   381|   1088|   3.5|1168664508|  3.934904|
|   594|   1088|   4.5|1109035643|  4.815674|
|   307|   1088|   3.0|1186162146| 2.8536115|
|    84|   1088|   3.0| 860398568| 3.1737502|
|   509|   1088|   3.0|1435992808|  3.313357|
|   221|   1088|   3.0|1111178147| 3.2720041|
|    68|   1088|   3.5|1158534614| 3.4710298|
|   116|   1088|   4.5|1337195649| 3.8503232|
|   517|   1088|   1.0|1487958398|  2.167795|
|   503|   1342|   1.0|1335214611|  3.269649|
|   600|   1342|   2.5|1237851755|

In [30]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	0.941770
MAE:	0.716980
Explained variance:	0.192507
R squared:	0.171728
