From 227ad66302f0d1166b452dc70b7cd3784a6f81e3 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 18 Dec 2014 08:18:18 -0500 Subject: [PATCH] Moved prediction methods into model class. --- .../clustering/GaussianMixtureModel.scala | 48 ++++++++++++++++- .../clustering/GaussianMixtureModelEM.scala | 53 +++---------------- 2 files changed, 53 insertions(+), 48 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 35a9024165d19..a38381e505258 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -17,9 +17,12 @@ package org.apache.spark.mllib.clustering +import breeze.linalg.{DenseVector => BreezeVector} + import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian /** * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points @@ -42,9 +45,50 @@ class GaussianMixtureModel( /** Maps given points to their cluster indices. */ def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { - val responsibilityMatrix = new GaussianMixtureModelEM() - .predictClusters(points,mu,sigma,weight,k) + val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) (responsibilityMatrix, clusterLabels) } + + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictMembership( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], + weight: Array[Double], k: Int): RDD[Array[Double]] = { + val sc = points.sparkContext + val dists = sc.broadcast{ + (0 until k).map{ i => + new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) + }.toArray + } + val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray) + points.map{ x => + computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) + } + } + + // We use "eps" as the minimum likelihood density for any given point + // in every cluster; this prevents any divide by zero conditions for + // outlier points. + private val eps = math.pow(2.0, -52) + + /** + * Compute the partial assignments for each vector + */ + private def computeSoftAssignments( + pt: BreezeVector[Double], + dists: Array[MultivariateGaussian], + weights: Array[Double], + k: Int): Array[Double] = { + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } + val pSum = p.sum + for (i <- 0 until k){ + p(i) /= pSum + } + p + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 5c190120904b9..0c317c0a618b4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -23,8 +23,6 @@ import breeze.linalg.Transpose import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} import org.apache.spark.mllib.stat.impl.MultivariateGaussian -import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} -import org.apache.spark.SparkContext.DoubleAccumulatorParam import scala.collection.mutable.IndexedSeqView @@ -86,19 +84,19 @@ class GaussianMixtureModelEM private ( private def computeExpectation( weights: Array[Double], dists: Array[MultivariateGaussian]) - (model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { - val k = model._2.length + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { + val k = sums._2.length val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } val pSum = p.sum - model._1(0) += math.log(pSum) + sums._1(0) += math.log(pSum) val xxt = x * new Transpose(x) for (i <- 0 until k) { p(i) /= pSum - model._2(i) += p(i) - model._3(i) += x * p(i) - model._4(i) += xxt * p(i) + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) } - model + sums } // number of samples per cluster to use when initializing Gaussians @@ -243,41 +241,4 @@ class GaussianMixtureModelEM private ( (0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length) cov } - - /** - * Given the input vectors, return the membership value of each vector - * to all mixture components. - */ - def predictClusters( - points: RDD[Vector], - mu: Array[Vector], - sigma: Array[Matrix], - weight: Array[Double], k: Int): RDD[Array[Double]] = { - val sc = points.sparkContext - val dists = sc.broadcast{ - (0 until k).map{ i => - new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) - }.toArray - } - val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray) - points.map{ x => - computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) - } - } - - /** - * Compute the partial assignments for each vector - */ - private def computeSoftAssignments( - pt: DenseDoubleVector, - dists: Array[MultivariateGaussian], - weights: Array[Double], - k: Int): Array[Double] = { - val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } - val pSum = p.sum - for (i <- 0 until k){ - p(i) /= pSum - } - p - } }