In [1]:
from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS, ALSModel

In [2]:
appName = "evluating"
master = "local"

In [3]:
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config("spark.sql.broadcastTimeout", "36000") \
    .getOrCreate()

## load model

In [4]:
TEMP_PATH = "models/ALS_{}_{}".format(10, 0.1)

In [5]:
ALS_PATH = TEMP_PATH + "/als"
MODEL_PATH = TEMP_PATH + "/als_model"

In [6]:
als = ALS.load(ALS_PATH)

In [7]:
model = ALSModel.load(MODEL_PATH)

## load train val test

In [8]:
train = spark.read.parquet("data/processed/train.parquet/")
validation = spark.read.parquet("data/processed/validation.parquet/")
testing = spark.read.parquet("data/processed/testing.parquet/")

In [9]:
train.createOrReplaceTempView("train")
validation.createOrReplaceTempView("validation")
testing.createOrReplaceTempView("testing")

## evaluation

### RMSE

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

In [11]:
predictions = model.transform(validation)

In [13]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 2.086216224915346


### other metrics

for each user:
- groundtruth : all of his interactions in the validation set
- predictions: predicted recommendations of that user

In [14]:
userRecs = model.recommendForAllUsers(500)

In [15]:
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.mllib.evaluation import RankingMetrics

In [16]:
predictions = model.recommendForAllUsers(500)
predictions.createOrReplaceTempView("predictions")

In [17]:
predictions

DataFrame[user_id: int, recommendations: array<struct<book_id:int,rating:float>>]

In [18]:
from pyspark.sql import functions as F

In [19]:
groundtruth = validation.groupby("user_id").agg(F.collect_set("book_id").alias('groundtruth'))
groundtruth.createOrReplaceTempView("groundtruth")

In [20]:
total = spark.sql("SELECT g.user_id, g.groundtruth AS groundtruth, p.recommendations AS predictions FROM groundtruth g JOIN predictions p ON g.user_id = p.user_id")
total.createOrReplaceTempView("total")

In [21]:
total

DataFrame[user_id: int, groundtruth: array<int>, predictions: array<struct<book_id:int,rating:float>>]

**need to get book_index from predictions, ignore ratings**

In [30]:
data = total.selectExpr("predictions.book_id", "groundtruth")
data

DataFrame[book_id: array<int>, groundtruth: array<int>]

In [31]:
rdd = data.rdd.map(tuple)

In [32]:
metrics = RankingMetrics(rdd)

In [33]:
metrics.precisionAt(3)

0.0028985507246376834

In [34]:
metrics.meanAveragePrecision

0.00029646742716241684

In [35]:
metrics.ndcgAt(3)

0.004080684574110924

In [36]:
metrics.ndcgAt(10)

0.0025517885947603475