Skip to content
Permalink
Browse files

provide ratio for topN product validation; generate MAP and prec@k me…

…tric for movielens dataset
  • Loading branch information...
Debasish Das
Debasish Das committed Nov 8, 2014
1 parent 9fa063e commit 10cbb37a7881867d801ae6630ffc0d09b3feebf9
Showing with 83 additions and 56 deletions.
  1. +83 −56 examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -18,16 +18,14 @@
package org.apache.spark.examples.mllib

import scala.collection.mutable

import org.apache.log4j.{ Level, Logger }
import scopt.OptionParser

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.{ ALS, MatrixFactorizationModel, Rating }
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.evaluation.RankingMetrics
import scala.math.round
import org.jblas.DoubleMatrix

/**
* An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/).
@@ -48,7 +46,8 @@ object MovieLensALS {
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false) extends AbstractParams[Params]
implicitPrefs: Boolean = false,
validateRecommendation: Double = 0.0) extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()
@@ -76,6 +75,9 @@ object MovieLensALS {
opt[Unit]("implicitPrefs")
.text("use implicit preference")
.action((_, c) => c.copy(implicitPrefs = true))
opt[Double]("validateRecommendation")
.text("ratio for topN product recommendation validation, default : 0.0*numProducts")
.action((x, c) => c.copy(validateRecommendation = x))
arg[String]("<input>")
.required()
.text("input paths to a MovieLens dataset of ratings")
@@ -107,9 +109,9 @@ object MovieLensALS {
val sc = new SparkContext(conf)

Logger.getRootLogger.setLevel(Level.WARN)

val implicitPrefs = params.implicitPrefs

val ratings = sc.textFile(params.input).map { line =>
val fields = line.split("::")
if (implicitPrefs) {
@@ -139,8 +141,12 @@ object MovieLensALS {

println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")

val splits = ratings.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
//val splits = ratings.randomSplit(Array(0.8, 0.2))
val fractions = (0 until numUsers.toInt).map(x => (x + 1, 0.8)).toMap

val training = ratings.map { x => (x.user, x) }.sampleByKey(false, fractions).map { x => x._2 }
val testSplit = ratings.subtract(training)

val test = if (params.implicitPrefs) {
/*
* 0 means "don't know" and positive values mean "confident that the prediction should be 1".
@@ -149,11 +155,14 @@ object MovieLensALS {
* the confidence. The error is the difference between prediction and either 1 or 0,
* depending on whether r is positive or negative.
*/
splits(1).map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0))
testSplit.map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0))
} else {
splits(1)
testSplit
}.cache()

training.cache
test.cache

val numTraining = training.count()
val numTest = test.count()
println(s"Training: $numTraining, test: $numTest.")
@@ -169,60 +178,78 @@ object MovieLensALS {
.setProductBlocks(params.numProductBlocks)
.run(training)

val (rmse, userMap, productMap) =
computeRecommendationMetrics(model, test, params.implicitPrefs)

println(s"Test RMSE = $rmse user MAP = $userMap product MAP = $productMap.")

val rmse = computeRmse(model, test, params.implicitPrefs)

println(s"Test RMSE = $rmse.")

val n = (numMovies * params.validateRecommendation).toInt

if (n > 0) {
val userMap = computeRecommendationMetrics(model,
training, test,
params.implicitPrefs, n)
println(s"Test user MAP = $userMap.")
}

sc.stop()
}

/**
* Threshold for predictions are at 0.5
*/

/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean) = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map { x =>
((x.user, x.product), mapPredictedRating(x.rating, implicitPrefs))
}.join(data.map(x => ((x.user, x.product), x.rating))).values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
}

def mapPredictedRating(r: Double, implicitPrefs: Boolean) = {
if (implicitPrefs) math.max(math.min(r, 1.0), 0.0)
else math.max(round(r), 0.0)
else r
}
/**
* Compute MAP (Mean Average Precision) statistics

/**
* Compute MAP (Mean Average Precision) statistics for top N product Recommendation
*/
def computeMap(predictedAndLabels: RDD[(Int, (Double, Double))]) = {
val ranking = predictedAndLabels.groupByKey.map {
case (user, entries) => {
val predictionValues = entries.toArray
val predicted = predictionValues.map { _._1 }
val labels = predictionValues.map { _._2 }
(predicted, labels)
}
def computeRecommendationMetrics(model: MatrixFactorizationModel,
train: RDD[Rating], test: RDD[Rating],
implicitPrefs: Boolean, n: Int) = {

val testProductLabels = test.map {
x => (x.user, x.product)
}.groupByKey.map {
case (userId, products) => (userId, products.toArray)
}
val metrics = new RankingMetrics[Double](ranking)
metrics.meanAveragePrecision
}

/**
* Compute recommendation metrics (RMSE, MAP)
*/
def computeRecommendationMetrics(model: MatrixFactorizationModel, data: RDD[Rating],
implicitPrefs: Boolean) = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionAndRatings = predictions.map { x =>
((x.user, x.product), mapPredictedRating(x.rating, implicitPrefs))
}.join(data.map(x => ((x.user, x.product), x.rating)))

val userPredictedAndLabels = predictionAndRatings.map {
case((user, product), predictionValues) => (user, predictionValues)

val trainProducts = train.map { x => ((x.user, x.product), x.rating) }

val rankings = model.userFeatures.cartesian(model.productFeatures).map {
case ((userId, userFeature), (productId, productFeature)) => {
val userVector = new DoubleMatrix(userFeature)
val productVector = new DoubleMatrix(productFeature)
((userId, productId), userVector.dot(productVector))
}
}.leftOuterJoin(trainProducts).filter {
case ((userId, productId), (ratingAll, ratingTrain)) =>
ratingTrain == None
}.map {
case ((userId, productId), (ratingAll, ratingTrain)) =>
(userId, (productId, ratingAll))
}.groupByKey.map {
case (user, predictedProducts) =>
val sortedProducts = predictedProducts.toArray.sortWith(
(predicted1: (Int, Double), predicted2: (Int, Double)) =>
predicted1._2 > predicted2._2).take(n)
(user, sortedProducts.map { _._1 })
}.join(testProductLabels).map {
case (user, (pred, lab)) => (pred, lab)
}

val productPredictedAndLabels = predictionAndRatings.map {
case((user, product), predictionValues) => (product, predictionValues)

val metrics = new RankingMetrics(rankings)
for (i <- 0 until 10) {
val k = (i + 1) * 20
println(s"k $k prec@k ${metrics.precisionAt(k)}")
}
val predictionValues = predictionAndRatings.values

val rmse = math.sqrt(predictionValues.map(x => (x._1 - x._2)*(x._1 - x._2)).mean())
val userMap = computeMap(userPredictedAndLabels)
val productMap = computeMap(productPredictedAndLabels)
(rmse, userMap, productMap)
metrics.meanAveragePrecision
}
}

0 comments on commit 10cbb37

Please sign in to comment.
You can’t perform that action at this time.