From f33aa905dd5c3e7f8f0d2f7817be22565457c8e4 Mon Sep 17 00:00:00 2001 From: Debasish Das Date: Fri, 1 May 2015 08:27:46 -0700 Subject: [PATCH] [SPARK-3066] [MLLIB] Support recommendAll in matrix factorization model This is based on #3098 from debasish83. 1. BLAS' GEMM is used to compute inner products. 2. Reverted changes to MovieLensALS. SPARK-4231 should be addressed in a separate PR. 3. ~~Fixed a bug in topByKey~~ Closes #3098 debasish83 coderxiang Author: Debasish Das Author: Xiangrui Meng Closes #5829 from mengxr/SPARK-3066 and squashes the following commits: 22e6a87 [Xiangrui Meng] topByKey was correct. update its usage 389b381 [Xiangrui Meng] fix indentation 49953de [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-3066 cb9799a [Xiangrui Meng] revert MovieLensALS f864f5e [Xiangrui Meng] update test and fix a bug in topByKey c5e0181 [Xiangrui Meng] use GEMM and topByKey 3a0c4eb [Debasish Das] updated with spark master 98fa424 [Debasish Das] updated with master ee99571 [Debasish Das] addressed initial review comments;merged with master;added tests for batch predict APIs in matrix factorization 3f97c49 [Debasish Das] fixed spark coding style for imports 7163a5c [Debasish Das] Added API for batch user and product recommendation; MAP calculation for product recommendation per user using randomized split d144f57 [Debasish Das] recommendAll API to MatrixFactorizationModel, uses topK finding using BoundedPriorityQueue similar to RDD.top f38a1b5 [Debasish Das] use sampleByKey for per user sampling 10cbb37 [Debasish Das] provide ratio for topN product validation; generate MAP and prec@k metric for movielens dataset 9fa063e [Debasish Das] import scala.math.round 4bbae0f [Debasish Das] comments fixed as per scalastyle cd3ab31 [Debasish Das] merged with AbstractParams serialization bug 9b3951f [Debasish Das] validate user/product on MovieLens dataset through user input and compute map measure along with rmse --- .../spark/mllib/rdd/MLPairRDDFunctions.scala | 4 +- .../MatrixFactorizationModel.scala | 132 +++++++++++++++--- .../mllib/rdd/MLPairRDDFunctionsSuite.scala | 4 +- .../MatrixFactorizationModelSuite.scala | 20 +++ 4 files changed, 138 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala index 9213fd3f595c3..5af55aaf84802 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala @@ -42,13 +42,11 @@ class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Se self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))( seqOp = (queue, item) => { queue += item - queue }, combOp = (queue1, queue2) => { queue1 ++= queue2 - queue1 } - ).mapValues(_.toArray.sorted(ord.reverse)) + ).mapValues(_.toArray.reverse) // This is an min-heap, so we reverse the order. } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 36cbf060d9998..88c2148403313 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -20,14 +20,18 @@ package org.apache.spark.mllib.recommendation import java.io.IOException import java.lang.{Integer => JavaInteger} +import scala.collection.mutable + +import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.spark.{Logging, SparkContext} import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} @@ -57,7 +61,7 @@ class MatrixFactorizationModel( /** Validates factors and warns users if there are performance concerns. */ private def validateFeatures(name: String, features: RDD[(Int, Array[Double])]): Unit = { - require(features.first()._2.size == rank, + require(features.first()._2.length == rank, s"$name feature dimension does not match the rank $rank.") if (features.partitioner.isEmpty) { logWarning(s"$name factor does not have a partitioner. " @@ -72,19 +76,19 @@ class MatrixFactorizationModel( def predict(user: Int, product: Int): Double = { val userVector = userFeatures.lookup(user).head val productVector = productFeatures.lookup(product).head - blas.ddot(userVector.length, userVector, 1, productVector, 1) + blas.ddot(rank, userVector, 1, productVector, 1) } /** - * Predict the rating of many users for many products. - * The output RDD has an element per each element in the input RDD (including all duplicates) - * unless a user or product is missing in the training set. - * - * @param usersProducts RDD of (user, product) pairs. - * @return RDD of Ratings. - */ + * Predict the rating of many users for many products. + * The output RDD has an element per each element in the input RDD (including all duplicates) + * unless a user or product is missing in the training set. + * + * @param usersProducts RDD of (user, product) pairs. + * @return RDD of Ratings. + */ def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { - val users = userFeatures.join(usersProducts).map{ + val users = userFeatures.join(usersProducts).map { case (user, (uFeatures, product)) => (product, (user, uFeatures)) } users.join(productFeatures).map { @@ -112,7 +116,7 @@ class MatrixFactorizationModel( * recommended the product is. */ def recommendProducts(user: Int, num: Int): Array[Rating] = - recommend(userFeatures.lookup(user).head, productFeatures, num) + MatrixFactorizationModel.recommend(userFeatures.lookup(user).head, productFeatures, num) .map(t => Rating(user, t._1, t._2)) /** @@ -128,7 +132,7 @@ class MatrixFactorizationModel( * recommended the user is. */ def recommendUsers(product: Int, num: Int): Array[Rating] = - recommend(productFeatures.lookup(product).head, userFeatures, num) + MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num) .map(t => Rating(t._1, product, t._2)) protected override val formatVersion: String = "1.0" @@ -137,20 +141,113 @@ class MatrixFactorizationModel( MatrixFactorizationModel.SaveLoadV1_0.save(this, path) } + /** + * Recommends topK products for all users. + * + * @param num how many products to return for every user. + * @return [(Int, Array[Rating])] objects, where every tuple contains a userID and an array of + * rating objects which contains the same userId, recommended productID and a "score" in the + * rating field. Semantics of score is same as recommendProducts API + */ + def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = { + MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map { + case (user, top) => + val ratings = top.map { case (product, rating) => Rating(user, product, rating) } + (user, ratings) + } + } + + + /** + * Recommends topK users for all products. + * + * @param num how many users to return for every product. + * @return [(Int, Array[Rating])] objects, where every tuple contains a productID and an array + * of rating objects which contains the recommended userId, same productID and a "score" in the + * rating field. Semantics of score is same as recommendUsers API + */ + def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = { + MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map { + case (product, top) => + val ratings = top.map { case (user, rating) => Rating(user, product, rating) } + (product, ratings) + } + } +} + +object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { + + import org.apache.spark.mllib.util.Loader._ + + /** + * Makes recommendations for a single user (or product). + */ private def recommend( recommendToFeatures: Array[Double], recommendableFeatures: RDD[(Int, Array[Double])], num: Int): Array[(Int, Double)] = { - val scored = recommendableFeatures.map { case (id,features) => + val scored = recommendableFeatures.map { case (id, features) => (id, blas.ddot(features.length, recommendToFeatures, 1, features, 1)) } scored.top(num)(Ordering.by(_._2)) } -} -object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { + /** + * Makes recommendations for all users (or products). + * @param rank rank + * @param srcFeatures src features to receive recommendations + * @param dstFeatures dst features used to make recommendations + * @param num number of recommendations for each record + * @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array + * of (dstId, rating) pairs. + */ + private def recommendForAll( + rank: Int, + srcFeatures: RDD[(Int, Array[Double])], + dstFeatures: RDD[(Int, Array[Double])], + num: Int): RDD[(Int, Array[(Int, Double)])] = { + val srcBlocks = blockify(rank, srcFeatures) + val dstBlocks = blockify(rank, dstFeatures) + val ratings = srcBlocks.cartesian(dstBlocks).flatMap { + case ((srcIds, srcFactors), (dstIds, dstFactors)) => + val m = srcIds.length + val n = dstIds.length + val ratings = srcFactors.transpose.multiply(dstFactors) + val output = new Array[(Int, (Int, Double))](m * n) + var k = 0 + ratings.foreachActive { (i, j, r) => + output(k) = (srcIds(i), (dstIds(j), r)) + k += 1 + } + output.toSeq + } + ratings.topByKey(num)(Ordering.by(_._2)) + } - import org.apache.spark.mllib.util.Loader._ + /** + * Blockifies features to use Level-3 BLAS. + */ + private def blockify( + rank: Int, + features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = { + val blockSize = 4096 // TODO: tune the block size + val blockStorage = rank * blockSize + features.mapPartitions { iter => + iter.grouped(blockSize).map { grouped => + val ids = mutable.ArrayBuilder.make[Int] + ids.sizeHint(blockSize) + val factors = mutable.ArrayBuilder.make[Double] + factors.sizeHint(blockStorage) + var i = 0 + grouped.foreach { case (id, factor) => + ids += id + factors ++= factor + i += 1 + } + (ids.result(), new DenseMatrix(rank, i, factors.result())) + } + } + } override def load(sc: SparkContext, path: String): MatrixFactorizationModel = { val (loadedClassName, formatVersion, _) = loadMetadata(sc, path) @@ -214,4 +311,5 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { new Path(dataPath(path), "product").toUri.toString } } + } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala index 1ac7c12c4e8e6..cb8fe4dba96f5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala @@ -24,13 +24,13 @@ import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ class MLPairRDDFunctionsSuite extends FunSuite with MLlibTestSparkContext { test("topByKey") { - val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (3, 5), (5, 1), (5, 3)), 2) + val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (5, 1), (3, 5)), 2) .topByKey(2) .collectAsMap() assert(topMap.size === 3) assert(topMap(1) === Array(2, 1)) assert(topMap(3) === Array(7, 5)) - assert(topMap(5) === Array(3, 1)) + assert(topMap(5) === Array(1)) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala index 9801e87576744..2c92866f3893d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala @@ -72,4 +72,24 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext Utils.deleteRecursively(tempDir) } } + + test("batch predict API recommendProductsForUsers") { + val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures) + val topK = 10 + val recommendations = model.recommendProductsForUsers(topK).collectAsMap() + + assert(recommendations(0)(0).rating ~== 17.0 relTol 1e-14) + assert(recommendations(1)(0).rating ~== 39.0 relTol 1e-14) + } + + test("batch predict API recommendUsersForProducts") { + val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures) + val topK = 10 + val recommendations = model.recommendUsersForProducts(topK).collectAsMap() + + assert(recommendations(2)(0).user == 1) + assert(recommendations(2)(0).rating ~== 39.0 relTol 1e-14) + assert(recommendations(2)(1).user == 0) + assert(recommendations(2)(1).rating ~== 17.0 relTol 1e-14) + } }