# ALS
https://github.com/microsoft/recommenders/blob/main/examples/03_evaluate/als_movielens_diversity_metrics.ipynb

In [1]:
%load_ext autoreload
%autoreload 2

import sys

import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType, StructType, StructField
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import HashingTF, CountVectorizer, VectorAssembler

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation, SparkDiversityEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

from pyspark.sql.window import Window
import pyspark.sql.functions as F

import numpy as np
import pandas as pd

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.7.0 (default, Oct  9 2018, 10:31:47) 
[GCC 7.3.0]
Spark version: 2.4.8


## Set Paramters for Dataset

In [39]:
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '1m'

# user, item column names
COL_USER="UserId"
COL_ITEM="MovieId"
COL_RATING="Rating"

## Spark environment and Dataset setup

In [40]:
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark

Setting cross-join to enabled, because pyspark has ways to disable it otherwise

In [41]:
spark.conf.set("spark.sql.crossJoin.enabled", "true")

Creating column types/names with struct and then loading the built in movielens

In [42]:
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField("Timestamp", LongType()),
    )
)

In [43]:
data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, title_col="title", genres_col="genres")
print(data.schema)
data.show()

100%|██████████| 5.78k/5.78k [00:03<00:00, 1.91kKB/s]


StructType(List(StructField(MovieId,IntegerType,true),StructField(UserId,IntegerType,true),StructField(Rating,FloatType,true),StructField(Timestamp,LongType,true),StructField(title,StringType,true),StructField(genres,StringType,true)))
+-------+------+------+----------+--------------+------+
|MovieId|UserId|Rating| Timestamp|         title|genres|
+-------+------+------+----------+--------------+------+
|     26|    18|   4.0| 978157335|Othello (1995)| Drama|
|     26|    69|   4.0| 977882050|Othello (1995)| Drama|
|     26|   229|   4.0|1039503573|Othello (1995)| Drama|
|     26|   342|   4.0| 976338677|Othello (1995)| Drama|
|     26|   524|   3.0| 976169012|Othello (1995)| Drama|
|     26|   655|   3.0| 975699243|Othello (1995)| Drama|
|     26|   748|   5.0| 975463153|Othello (1995)| Drama|
|     26|   881|   3.0|1013536125|Othello (1995)| Drama|
|     26|   890|   3.0| 976504835|Othello (1995)| Drama|
|     26|   918|   4.0| 978241611|Othello (1995)| Drama|
|     26|   963|   4.0|

split data, ratio argument indicates the ratio of the training data.

In [44]:
train_df, test_df = spark_random_split(data.select(COL_USER, COL_ITEM, COL_RATING), ratio=0.75, seed=123)
print ("N train_df", train_df.cache().count())
train_df.show()
print ("N test_df", test_df.cache().count())

N train_df 750336
+------+-------+------+
|UserId|MovieId|Rating|
+------+-------+------+
|     5|     29|   5.0|
|    10|   2453|   4.0|
|    10|   2529|   4.0|
|    17|   2529|   4.0|
|    18|     26|   4.0|
|    19|   2529|   3.0|
|    23|     29|   3.0|
|    23|   2529|   3.0|
|    26|   3506|   4.0|
|    29|    474|   2.0|
|    29|   2529|   5.0|
|    33|    474|   5.0|
|    33|   2529|   5.0|
|    35|   3091|   5.0|
|    36|    474|   5.0|
|    36|   2250|   4.0|
|    36|   2529|   5.0|
|    42|   2529|   5.0|
|    45|   1677|   2.0|
|    48|    474|   3.0|
+------+-------+------+
only showing top 20 rows

N test_df 249873


In [45]:
print(data.cache().count())

1000209


Check average number of ratings per user.

In [46]:
rowsPerUser = data.groupBy("UserId").count().sort('count', ascending = False)
rowsPerUser.show()
print(rowsPerUser.cache().count())
rowsPerUser.agg({'count': 'mean'}).show()
rowsPerUser.agg({'count': 'min'}).show()

+------+-----+
|UserId|count|
+------+-----+
|  4169| 2314|
|  1680| 1850|
|  4277| 1743|
|  1941| 1595|
|  1181| 1521|
|   889| 1518|
|  3618| 1344|
|  2063| 1323|
|  1150| 1302|
|  1015| 1286|
|  5795| 1277|
|  4344| 1271|
|  1980| 1260|
|  2909| 1258|
|  1449| 1243|
|  4510| 1240|
|   424| 1226|
|  4227| 1222|
|  5831| 1220|
|  3841| 1216|
+------+-----+
only showing top 20 rows

6040
+-----------------+
|       avg(count)|
+-----------------+
|165.5975165562914|
+-----------------+

+----------+
|min(count)|
+----------+
|        20|
+----------+



## Use cross join to create all possible user-item pairs
Use training data

In [47]:
users = train_df.select(COL_USER).distinct()
items = train_df.select(COL_ITEM).distinct()
user_item = users.crossJoin(items)

In [48]:
print(users.count())
print(items.count())
print(user_item.count())
user_item.show()

6040
3670
22166800
+------+-------+
|UserId|MovieId|
+------+-------+
|   148|    463|
|   148|   1591|
|   148|   3918|
|   148|   3175|
|   148|    496|
|   148|   1238|
|   148|   3794|
|   148|   1342|
|   148|    833|
|   148|   2366|
|   148|   1829|
|   148|    471|
|   148|   1959|
|   148|   1580|
|   148|   2142|
|   148|   2659|
|   148|   2866|
|   148|   1088|
|   148|   2122|
|   148|   1645|
+------+-------+
only showing top 20 rows



## Train Model and get top K recommendations

als model

In [49]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


##could modify and have another algo here
als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

training

In [50]:
with Timer() as train_time:
    model = als.fit(train_df)

print("Took {} seconds for training.".format(train_time.interval))

Took 2.611721148714423 seconds for training.


recommend all movies to all users and then takeout the ones already in training

In [51]:
# Score all user-item pairs
dfs_pred = model.transform(user_item)
dfs_pred.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|   148|    148| 1.9932647|
|   463|    148| 2.1618307|
|   471|    148| 3.3621922|
|   496|    148| 2.3596911|
|   833|    148| 3.3021572|
|  1088|    148| 2.5344753|
|  1238|    148|  2.794884|
|  1342|    148| 1.7258394|
|  1580|    148| 2.9216523|
|  1591|    148|  3.609573|
|  1645|    148| 2.4713356|
|  1829|    148| 1.9832127|
|  1959|    148| 3.5405562|
|  2122|    148| 2.5934963|
|  2142|    148| 1.4509761|
|  2366|    148| 1.3390584|
|  2659|    148| 3.3351188|
|  2866|    148| 2.2846713|
|  3175|    148|  3.468598|
|  3749|    148| 2.9621146|
+------+-------+----------+
only showing top 20 rows



In [52]:
# Remove seen items - Remember we only used training data to create user_item
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
    train_df.alias("train"),
    (dfs_pred[COL_USER] == train_df[COL_USER]) & (dfs_pred[COL_ITEM] == train_df[COL_ITEM]),
    how='outer'
)

top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
    .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

print(top_all.count())

top_all.show()

21416464


Py4JJavaError: An error occurred while calling o1375.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 69 in stage 2354.0 failed 1 times, most recent failure: Lost task 69.0 in stage 2354.0 (TID 77041, localhost, executor driver): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:562)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:370)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3388)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor353.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:562)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [16]:
window = Window.partitionBy(COL_USER).orderBy(F.col("prediction").desc())
top_k_reco = top_all.select("*", F.row_number().over(window).alias("rank")).filter(F.col("rank") <= TOP_K).drop("rank")
 
print(top_k_reco.count())
top_k_reco.show()

18860
+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|   148|    919| 6.3261137|
|   148|    353| 6.2676616|
|   148|    968| 6.1783676|
|   148|    253| 6.1438556|
|   148|   1512| 5.9449964|
|   148|   1227| 5.8750854|
|   148|   1251| 5.6491814|
|   148|    115|   5.64693|
|   148|    244|  5.636389|
|   148|     57| 5.4875755|
|   148|     89|  5.465199|
|   148|   1085| 5.4451227|
|   148|   1001|  5.392377|
|   148|    703| 5.3899245|
|   148|    805| 5.3898864|
|   148|   1449|  5.308111|
|   148|   1104| 5.3005443|
|   148|    626|  5.295541|
|   148|   1240|  5.290914|
|   148|   1478| 5.2802405|
+------+-------+----------+
only showing top 20 rows



## Metric functions

In [17]:
##different diversity metrics parsed form recall eval....object?
def get_ranking_results(ranking_eval):
    metrics = {
        "Precision@k": ranking_eval.precision_at_k(),
        "Recall@k": ranking_eval.recall_at_k(),
        "NDCG@k": ranking_eval.ndcg_at_k(),
        "Mean average precision": ranking_eval.map_at_k()
      
    }
    return metrics  

##different diversity metrics parsed form diversity eval....object?
def get_diversity_results(diversity_eval):
    metrics = {
        "catalog_coverage":diversity_eval.catalog_coverage(),
        "distributional_coverage":diversity_eval.distributional_coverage(), 
        "novelty": diversity_eval.novelty(), 
        "diversity": diversity_eval.diversity(), 
        "serendipity": diversity_eval.serendipity()
    }
    return metrics


def get_rating_results(rating_eval):
    metrics = {
     'rmse': rating_eval.rmse(),
     'mean absolute error' : rating_eval.mae(),
     'R squared': rating_eval.rsquared(),
     'explained variance': rating_eval.exp_var()
    }
    return metrics

def generate_summary(data, algo, k, ranking_metrics, diversity_metrics):
    summary = {"Data": data, "Algo": algo, "K": k}

    if ranking_metrics is None:
        ranking_metrics = {           
            "Precision@k": np.nan,
            "Recall@k": np.nan,            
            "nDCG@k": np.nan,
            "MAP": np.nan,
        }
        #update just adds to the back of the dictionary.
    summary.update(ranking_metrics)
    summary.update(diversity_metrics)
    return summary

## ALS Ranking results

In [18]:
als_ranking_eval = SparkRankingEvaluation(
    test_df, 
    top_all, 
    k = TOP_K, 
    col_user="UserId", 
    col_item="MovieId",
    col_rating="Rating", 
    col_prediction="prediction",
    relevancy_method="top_k"
)

als_ranking_metrics = get_ranking_results(als_ranking_eval)

In [19]:
cols = [ "Precision@k", "Recall@k", "NDCG@k", "Mean average precision"]
ranking_results = pd.DataFrame(columns=cols)

ranking_results.loc[1] = als_ranking_metrics

ranking_results.head()

Unnamed: 0,Precision@k,Recall@k,NDCG@k,Mean average precision
1,0.053287,0.038843,0.052833,0.007391


## Diversity results

In [20]:
als_diversity_eval = SparkDiversityEvaluation(
    train_df = train_df, 
    reco_df = top_k_reco,
    col_user = COL_USER, 
    col_item = COL_ITEM
)

als_diversity_metrics = get_diversity_results(als_diversity_eval)

In [21]:
cols = ["catalog_coverage", "distributional_coverage","novelty", "diversity", "serendipity"]
diversity_results = pd.DataFrame(columns=cols)

diversity_results.loc[1] = als_diversity_metrics

diversity_results.head()

Unnamed: 0,catalog_coverage,distributional_coverage,novelty,diversity,serendipity
1,0.515003,8.497723,11.391092,0.880526,0.868944


## Combined results

In [22]:
als_results = generate_summary(MOVIELENS_DATA_SIZE, "als", TOP_K, als_ranking_metrics, als_diversity_metrics)
cols = ["Data", "Algo", "K", "Precision@k", "Recall@k", "NDCG@k", "Mean average precision","catalog_coverage", "distributional_coverage","novelty", "diversity", "serendipity" ]
summary_results = pd.DataFrame(columns = cols)
summary_results.loc[1] = als_results
summary_results.head()

Unnamed: 0,Data,Algo,K,Precision@k,Recall@k,NDCG@k,Mean average precision,catalog_coverage,distributional_coverage,novelty,diversity,serendipity
1,100k,als,20,0.053287,0.038843,0.052833,0.007391,0.515003,8.497723,11.391092,0.880526,0.868944


## Add rating evaluation

In [23]:
als_rating_eval = SparkRatingEvaluation(
    test_df, 
    top_all,  
    col_user="UserId", 
    col_item="MovieId",
    col_rating="Rating", 
    col_prediction="prediction")

rating_results = get_rating_results(als_rating_eval)

In [24]:
cols = ['rmse', 'mean absolute error', 'R squared','explained variance']

rating_res = pd.DataFrame(columns=cols)
rating_res.loc[1] = rating_results
rating_res.head()

Unnamed: 0,rmse,mean absolute error,R squared,explained variance
1,0.971761,0.757398,0.254696,0.259558


## Content based version

### convert text input into feature vectors

In [25]:
# Get movie features "title" and "genres"
movies = (
    data.groupBy("MovieId", "title", "genres").count()
    .na.drop()  # remove rows with null values
    .withColumn("genres", F.split(F.col("genres"), "\|"))  # convert to array of genres
    .withColumn("title", F.regexp_replace(F.col("title"), "[\(),:^0-9]", ""))  # remove year from title
    .drop("count")  # remove unused columns
)
movies.show(5)

+-------+--------------------+---------+
|MovieId|               title|   genres|
+-------+--------------------+---------+
|    167|   Private Benjamin | [Comedy]|
|   1343|         Lotto Land |  [Drama]|
|   1607|  Hurricane Streets |  [Drama]|
|    966|Affair to Remembe...|[Romance]|
|      9|   Dead Man Walking |  [Drama]|
+-------+--------------------+---------+
only showing top 5 rows



## tokenize title and remove stop words

In [26]:
# tokenize "title" column
title_tokenizer = Tokenizer(inputCol="title", outputCol="title_words")
tokenized_data = title_tokenizer.transform(movies)
tokenized_data.show(10)

+-------+--------------------+--------------------+--------------------+
|MovieId|               title|              genres|         title_words|
+-------+--------------------+--------------------+--------------------+
|    167|   Private Benjamin |            [Comedy]| [private, benjamin]|
|   1343|         Lotto Land |             [Drama]|       [lotto, land]|
|   1607|  Hurricane Streets |             [Drama]|[hurricane, streets]|
|    966|Affair to Remembe...|           [Romance]|[affair, to, reme...|
|      9|   Dead Man Walking |             [Drama]|[dead, man, walking]|
|   1230|Ready to Wear Pre...|            [Comedy]|[ready, to, wear,...|
|   1118|        Up in Smoke |            [Comedy]|     [up, in, smoke]|
|    673|          Cape Fear |[Film-Noir, Thril...|        [cape, fear]|
|    879|     Peacemaker The |[Action, Thriller...|   [peacemaker, the]|
|     66|While You Were Sl...|   [Comedy, Romance]|[while, you, were...|
+-------+--------------------+--------------------+

In [27]:
# remove stop words
remover = StopWordsRemover(inputCol="title_words", outputCol="text")
clean_data = remover.transform(tokenized_data).drop("title", "title_words")
clean_data.show(10)

+-------+--------------------+--------------------+
|MovieId|              genres|                text|
+-------+--------------------+--------------------+
|    167|            [Comedy]| [private, benjamin]|
|   1343|             [Drama]|       [lotto, land]|
|   1607|             [Drama]|[hurricane, streets]|
|    966|           [Romance]|  [affair, remember]|
|      9|             [Drama]|[dead, man, walking]|
|   1230|            [Comedy]|[ready, wear, pre...|
|   1118|            [Comedy]|             [smoke]|
|    673|[Film-Noir, Thril...|        [cape, fear]|
|    879|[Action, Thriller...|        [peacemaker]|
|     66|   [Comedy, Romance]|          [sleeping]|
+-------+--------------------+--------------------+
only showing top 10 rows



### convert text input into feature vectors

In [28]:
# step 1: perform HashingTF on column "text"
text_hasher = HashingTF(inputCol="text", outputCol="text_features", numFeatures=1024)
hashed_data = text_hasher.transform(clean_data)
hashed_data.show(5)

+-------+---------+--------------------+--------------------+
|MovieId|   genres|                text|       text_features|
+-------+---------+--------------------+--------------------+
|    167| [Comedy]| [private, benjamin]|(1024,[128,544],[...|
|   1343|  [Drama]|       [lotto, land]|(1024,[38,300],[1...|
|   1607|  [Drama]|[hurricane, streets]|(1024,[592,821],[...|
|    966|[Romance]|  [affair, remember]|(1024,[389,502],[...|
|      9|  [Drama]|[dead, man, walking]|(1024,[11,342,101...|
+-------+---------+--------------------+--------------------+
only showing top 5 rows



In [29]:
# step 2: fit a CountVectorizerModel from column "genres".
count_vectorizer = CountVectorizer(inputCol="genres", outputCol="genres_features")
count_vectorizer_model = count_vectorizer.fit(hashed_data)
vectorized_data = count_vectorizer_model.transform(hashed_data)
vectorized_data.show(5)

+-------+---------+--------------------+--------------------+---------------+
|MovieId|   genres|                text|       text_features|genres_features|
+-------+---------+--------------------+--------------------+---------------+
|    167| [Comedy]| [private, benjamin]|(1024,[128,544],[...| (19,[1],[1.0])|
|   1343|  [Drama]|       [lotto, land]|(1024,[38,300],[1...| (19,[0],[1.0])|
|   1607|  [Drama]|[hurricane, streets]|(1024,[592,821],[...| (19,[0],[1.0])|
|    966|[Romance]|  [affair, remember]|(1024,[389,502],[...| (19,[4],[1.0])|
|      9|  [Drama]|[dead, man, walking]|(1024,[11,342,101...| (19,[0],[1.0])|
+-------+---------+--------------------+--------------------+---------------+
only showing top 5 rows



In [30]:
# step 3: assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["text_features", "genres_features"],
    outputCol="features",
)

feature_data = assembler.transform(vectorized_data).select("MovieId", "features")

feature_data.show(10, False)

+-------+---------------------------------------------+
|MovieId|features                                     |
+-------+---------------------------------------------+
|167    |(1043,[128,544,1025],[1.0,1.0,1.0])          |
|1343   |(1043,[38,300,1024],[1.0,1.0,1.0])           |
|1607   |(1043,[592,821,1024],[1.0,1.0,1.0])          |
|966    |(1043,[389,502,1028],[1.0,1.0,1.0])          |
|9      |(1043,[11,342,1014,1024],[1.0,1.0,1.0,1.0])  |
|1230   |(1043,[597,740,902,1025],[1.0,1.0,1.0,1.0])  |
|1118   |(1043,[702,1025],[1.0,1.0])                  |
|673    |(1043,[169,690,1027,1040],[1.0,1.0,1.0,1.0]) |
|879    |(1043,[909,1026,1027,1034],[1.0,1.0,1.0,1.0])|
|66     |(1043,[256,1025,1028],[1.0,1.0,1.0])         |
+-------+---------------------------------------------+
only showing top 10 rows



The features column is represented with a SparseVector object. For example, in the feature vector (1043,[128,544,1025],[1.0,1.0,1.0]), 1043 is the vector length, indicating the vector consisting of 1043 item features. The values at index positions 128,544,1025 are 1.0, and the values at other positions are all 0.

### diversity metrics applied

In [31]:
als_eval = SparkDiversityEvaluation(
    train_df = train_df, 
    reco_df = top_k_reco,
    item_feature_df = feature_data, 
    item_sim_measure="item_feature_vector",
    col_user = COL_USER, 
    col_item = COL_ITEM
)

als_metrics = get_diversity_results(als_eval)

In [32]:
cols = ["catalog_coverage", "distributional_coverage","novelty", "diversity", "serendipity"]
content_diversity_results = pd.DataFrame(columns=cols)
content_diversity_results.loc[1] = als_metrics
content_diversity_results.head()

Unnamed: 0,catalog_coverage,distributional_coverage,novelty,diversity,serendipity
1,0.515003,8.497723,11.391092,0.871292,0.886252


In [33]:
diversity_results.head()

Unnamed: 0,catalog_coverage,distributional_coverage,novelty,diversity,serendipity
1,0.515003,8.497723,11.391092,0.880526,0.868944


## DISPLAY ALL RESULTS

In [34]:
print("ALS results for : {} of the Movielens size: {} ".format(TOP_K,MOVIELENS_DATA_SIZE))
print()

ALS results for : 20 of the Movielens size: 100k 



In [35]:
print("DIVERSITY METRICS - COLLABORATIVE")
print()
diversity_results.head()

DIVERSITY METRICS - COLLABORATIVE



Unnamed: 0,catalog_coverage,distributional_coverage,novelty,diversity,serendipity
1,0.515003,8.497723,11.391092,0.880526,0.868944


In [36]:
print("DIVERSITY METRICS - CONTENT BASED")
print()
content_diversity_results.head()

DIVERSITY METRICS - CONTENT BASED



Unnamed: 0,catalog_coverage,distributional_coverage,novelty,diversity,serendipity
1,0.515003,8.497723,11.391092,0.871292,0.886252


In [37]:
print("RATING METRICS")
print()
rating_res.head()

RATING METRICS



Unnamed: 0,rmse,mean absolute error,R squared,explained variance
1,0.971761,0.757398,0.254696,0.259558


In [38]:
print("RANKING METRICS")
print()
ranking_results.head()

RANKING METRICS



Unnamed: 0,Precision@k,Recall@k,NDCG@k,Mean average precision
1,0.053287,0.038843,0.052833,0.007391
