Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
revert MovieLensALS
  • Loading branch information
mengxr committed May 1, 2015
1 parent f864f5e commit cb9799a
Showing 1 changed file with 24 additions and 67 deletions.
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.examples.mllib

import scala.collection.mutable

import org.apache.log4j.{Level, Logger}
import scopt.OptionParser

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.evaluation.RankingMetrics

/**
* An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/).
Expand All @@ -37,15 +39,14 @@ import org.apache.spark.mllib.evaluation.RankingMetrics
object MovieLensALS {

case class Params(
input: String = null,
kryo: Boolean = false,
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false,
metrics: String = "rmse") extends AbstractParams[Params]
input: String = null,
kryo: Boolean = false,
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false) extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()
Expand Down Expand Up @@ -73,9 +74,6 @@ object MovieLensALS {
opt[Unit]("implicitPrefs")
.text("use implicit preference")
.action((_, c) => c.copy(implicitPrefs = true))
opt[String]("metrics")
.text("generate recommendation metrics using rmse/map measures, default: rmse")
.action((x, c) => c.copy(metrics = x))
arg[String]("<input>")
.required()
.text("input paths to a MovieLens dataset of ratings")
Expand All @@ -102,7 +100,7 @@ object MovieLensALS {
val conf = new SparkConf().setAppName(s"MovieLensALS with $params")
if (params.kryo) {
conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating]))
.set("spark.kryoserializer.buffer.mb", "8")
.set("spark.kryoserializer.buffer", "8m")
}
val sc = new SparkContext(conf)

Expand Down Expand Up @@ -169,66 +167,25 @@ object MovieLensALS {
.setProductBlocks(params.numProductBlocks)
.run(training)

params.metrics match {
case "rmse" =>
val rmse = computeRmse(model, test, params.implicitPrefs)
println(s"Test RMSE = $rmse")
case "map" =>
val (map, users) = computeRankingMetrics(model, training, test, numMovies.toInt)
println(s"Test users $users MAP $map")
case _ => println(s"Metrics not defined, options are rmse/map")
}
val rmse = computeRmse(model, test, params.implicitPrefs)

println(s"Test RMSE = $rmse.")

sc.stop()
}

/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(
model: MatrixFactorizationModel,
data: RDD[Rating],
implicitPrefs: Boolean) : Double = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map { x =>
((x.user, x.product), mapPredictedRating(x.rating, implicitPrefs))
}.join(data.map(x => ((x.user, x.product), x.rating))).values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
}
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
: Double = {

def mapPredictedRating(r: Double, implicitPrefs: Boolean) = {
if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
}

/** Compute MAP (Mean Average Precision) statistics for top N product Recommendation */
def computeRankingMetrics(
model: MatrixFactorizationModel,
train: RDD[Rating],
test: RDD[Rating],
n: Int) : (Double, Long) = {
val ord = Ordering.by[(Int, Double), Double](x => x._2)

val testUserLabels = test.map {
x => (x.user, (x.product, x.rating))
}.groupByKey().map {
case (userId, products) =>
val sortedProducts = products.toArray.sorted(ord.reverse)
(userId, sortedProducts.map(_._1))
}

val trainUserLabels = train.map {
x => (x.user, x.product)
}.groupByKey().map{case (userId, products) => (userId, products.toArray)}

val rankings = model.recommendProductsForUsers(n).join(trainUserLabels).map {
case (userId, (pred, train)) => {
val predictedProducts = pred.map(_.product)
val trainSet = train.toSet
(userId, predictedProducts.filterNot { x => trainSet.contains(x) })
}
}.join(testUserLabels).map {
case (user, (pred, lab)) => (pred, lab)
def mapPredictedRating(r: Double): Double = {
if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
}

val metrics = new RankingMetrics(rankings)
(metrics.meanAveragePrecision, testUserLabels.count)
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map{ x =>
((x.user, x.product), mapPredictedRating(x.rating))
}.join(data.map(x => ((x.user, x.product), x.rating))).values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
}
}

0 comments on commit cb9799a

Please sign in to comment.