Skip to content

Commit

Permalink
[SPARK-3066] [MLLIB] Support recommendAll in matrix factorization model
Browse files Browse the repository at this point in the history
This is based on apache#3098 from debasish83.

1. BLAS' GEMM is used to compute inner products.
2. Reverted changes to MovieLensALS. SPARK-4231 should be addressed in a separate PR.
3. ~~Fixed a bug in topByKey~~

Closes apache#3098

debasish83 coderxiang

Author: Debasish Das <debasish.das@one.verizon.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes apache#5829 from mengxr/SPARK-3066 and squashes the following commits:

22e6a87 [Xiangrui Meng] topByKey was correct. update its usage
389b381 [Xiangrui Meng] fix indentation
49953de [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-3066
cb9799a [Xiangrui Meng] revert MovieLensALS
f864f5e [Xiangrui Meng] update test and fix a bug in topByKey
c5e0181 [Xiangrui Meng] use GEMM and topByKey
3a0c4eb [Debasish Das] updated with spark master
98fa424 [Debasish Das] updated with master
ee99571 [Debasish Das] addressed initial review comments;merged with master;added tests for batch predict APIs in matrix factorization
3f97c49 [Debasish Das] fixed spark coding style for imports
7163a5c [Debasish Das] Added API for batch user and product recommendation; MAP calculation for product recommendation per user using randomized split
d144f57 [Debasish Das] recommendAll API to MatrixFactorizationModel, uses topK finding using BoundedPriorityQueue similar to RDD.top
f38a1b5 [Debasish Das] use sampleByKey for per user sampling
10cbb37 [Debasish Das] provide ratio for topN product validation; generate MAP and prec@k metric for movielens dataset
9fa063e [Debasish Das] import scala.math.round
4bbae0f [Debasish Das] comments fixed as per scalastyle
cd3ab31 [Debasish Das] merged with AbstractParams serialization bug
9b3951f [Debasish Das] validate user/product on MovieLens dataset through user input and compute map measure along with rmse
  • Loading branch information
Debasish Das authored and jeanlyn committed May 28, 2015
1 parent b5ea7d5 commit f33aa90
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 22 deletions.
Expand Up @@ -42,13 +42,11 @@ class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Se
self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
seqOp = (queue, item) => {
queue += item
queue
},
combOp = (queue1, queue2) => {
queue1 ++= queue2
queue1
}
).mapValues(_.toArray.sorted(ord.reverse))
).mapValues(_.toArray.reverse) // This is an min-heap, so we reverse the order.
}
}

Expand Down
Expand Up @@ -20,14 +20,18 @@ package org.apache.spark.mllib.recommendation
import java.io.IOException
import java.lang.{Integer => JavaInteger}

import scala.collection.mutable

import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.github.fommil.netlib.BLAS.{getInstance => blas}

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
Expand Down Expand Up @@ -57,7 +61,7 @@ class MatrixFactorizationModel(

/** Validates factors and warns users if there are performance concerns. */
private def validateFeatures(name: String, features: RDD[(Int, Array[Double])]): Unit = {
require(features.first()._2.size == rank,
require(features.first()._2.length == rank,
s"$name feature dimension does not match the rank $rank.")
if (features.partitioner.isEmpty) {
logWarning(s"$name factor does not have a partitioner. "
Expand All @@ -72,19 +76,19 @@ class MatrixFactorizationModel(
def predict(user: Int, product: Int): Double = {
val userVector = userFeatures.lookup(user).head
val productVector = productFeatures.lookup(product).head
blas.ddot(userVector.length, userVector, 1, productVector, 1)
blas.ddot(rank, userVector, 1, productVector, 1)
}

/**
* Predict the rating of many users for many products.
* The output RDD has an element per each element in the input RDD (including all duplicates)
* unless a user or product is missing in the training set.
*
* @param usersProducts RDD of (user, product) pairs.
* @return RDD of Ratings.
*/
* Predict the rating of many users for many products.
* The output RDD has an element per each element in the input RDD (including all duplicates)
* unless a user or product is missing in the training set.
*
* @param usersProducts RDD of (user, product) pairs.
* @return RDD of Ratings.
*/
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
val users = userFeatures.join(usersProducts).map{
val users = userFeatures.join(usersProducts).map {
case (user, (uFeatures, product)) => (product, (user, uFeatures))
}
users.join(productFeatures).map {
Expand Down Expand Up @@ -112,7 +116,7 @@ class MatrixFactorizationModel(
* recommended the product is.
*/
def recommendProducts(user: Int, num: Int): Array[Rating] =
recommend(userFeatures.lookup(user).head, productFeatures, num)
MatrixFactorizationModel.recommend(userFeatures.lookup(user).head, productFeatures, num)
.map(t => Rating(user, t._1, t._2))

/**
Expand All @@ -128,7 +132,7 @@ class MatrixFactorizationModel(
* recommended the user is.
*/
def recommendUsers(product: Int, num: Int): Array[Rating] =
recommend(productFeatures.lookup(product).head, userFeatures, num)
MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num)
.map(t => Rating(t._1, product, t._2))

protected override val formatVersion: String = "1.0"
Expand All @@ -137,20 +141,113 @@ class MatrixFactorizationModel(
MatrixFactorizationModel.SaveLoadV1_0.save(this, path)
}

/**
* Recommends topK products for all users.
*
* @param num how many products to return for every user.
* @return [(Int, Array[Rating])] objects, where every tuple contains a userID and an array of
* rating objects which contains the same userId, recommended productID and a "score" in the
* rating field. Semantics of score is same as recommendProducts API
*/
def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = {
MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map {
case (user, top) =>
val ratings = top.map { case (product, rating) => Rating(user, product, rating) }
(user, ratings)
}
}


/**
* Recommends topK users for all products.
*
* @param num how many users to return for every product.
* @return [(Int, Array[Rating])] objects, where every tuple contains a productID and an array
* of rating objects which contains the recommended userId, same productID and a "score" in the
* rating field. Semantics of score is same as recommendUsers API
*/
def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = {
MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map {
case (product, top) =>
val ratings = top.map { case (user, rating) => Rating(user, product, rating) }
(product, ratings)
}
}
}

object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {

import org.apache.spark.mllib.util.Loader._

/**
* Makes recommendations for a single user (or product).
*/
private def recommend(
recommendToFeatures: Array[Double],
recommendableFeatures: RDD[(Int, Array[Double])],
num: Int): Array[(Int, Double)] = {
val scored = recommendableFeatures.map { case (id,features) =>
val scored = recommendableFeatures.map { case (id, features) =>
(id, blas.ddot(features.length, recommendToFeatures, 1, features, 1))
}
scored.top(num)(Ordering.by(_._2))
}
}

object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
/**
* Makes recommendations for all users (or products).
* @param rank rank
* @param srcFeatures src features to receive recommendations
* @param dstFeatures dst features used to make recommendations
* @param num number of recommendations for each record
* @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array
* of (dstId, rating) pairs.
*/
private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}

import org.apache.spark.mllib.util.Loader._
/**
* Blockifies features to use Level-3 BLAS.
*/
private def blockify(
rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
val blockSize = 4096 // TODO: tune the block size
val blockStorage = rank * blockSize
features.mapPartitions { iter =>
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
}
}

override def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
val (loadedClassName, formatVersion, _) = loadMetadata(sc, path)
Expand Down Expand Up @@ -214,4 +311,5 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
new Path(dataPath(path), "product").toUri.toString
}
}

}
Expand Up @@ -24,13 +24,13 @@ import org.apache.spark.mllib.rdd.MLPairRDDFunctions._

class MLPairRDDFunctionsSuite extends FunSuite with MLlibTestSparkContext {
test("topByKey") {
val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (3, 5), (5, 1), (5, 3)), 2)
val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (5, 1), (3, 5)), 2)
.topByKey(2)
.collectAsMap()

assert(topMap.size === 3)
assert(topMap(1) === Array(2, 1))
assert(topMap(3) === Array(7, 5))
assert(topMap(5) === Array(3, 1))
assert(topMap(5) === Array(1))
}
}
Expand Up @@ -72,4 +72,24 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext
Utils.deleteRecursively(tempDir)
}
}

test("batch predict API recommendProductsForUsers") {
val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
val topK = 10
val recommendations = model.recommendProductsForUsers(topK).collectAsMap()

assert(recommendations(0)(0).rating ~== 17.0 relTol 1e-14)
assert(recommendations(1)(0).rating ~== 39.0 relTol 1e-14)
}

test("batch predict API recommendUsersForProducts") {
val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
val topK = 10
val recommendations = model.recommendUsersForProducts(topK).collectAsMap()

assert(recommendations(2)(0).user == 1)
assert(recommendations(2)(0).rating ~== 39.0 relTol 1e-14)
assert(recommendations(2)(1).user == 0)
assert(recommendations(2)(1).rating ~== 17.0 relTol 1e-14)
}
}

0 comments on commit f33aa90

Please sign in to comment.