Skip to content

Commit

Permalink
Moved prediction methods into model class.
Browse files Browse the repository at this point in the history
  • Loading branch information
tgaloppo committed Dec 18, 2014
1 parent 308c8ad commit 227ad66
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseVector => BreezeVector}

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.impl.MultivariateGaussian

/**
* Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points
Expand All @@ -42,9 +45,50 @@ class GaussianMixtureModel(

/** Maps given points to their cluster indices. */
def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = {
val responsibilityMatrix = new GaussianMixtureModelEM()
.predictClusters(points,mu,sigma,weight,k)
val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k)
val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max))
(responsibilityMatrix, clusterLabels)
}

/**
* Given the input vectors, return the membership value of each vector
* to all mixture components.
*/
def predictMembership(
points: RDD[Vector],
mu: Array[Vector],
sigma: Array[Matrix],
weight: Array[Double], k: Int): RDD[Array[Double]] = {
val sc = points.sparkContext
val dists = sc.broadcast{
(0 until k).map{ i =>
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
}.toArray
}
val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray)
points.map{ x =>
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
}
}

// We use "eps" as the minimum likelihood density for any given point
// in every cluster; this prevents any divide by zero conditions for
// outlier points.
private val eps = math.pow(2.0, -52)

/**
* Compute the partial assignments for each vector
*/
private def computeSoftAssignments(
pt: BreezeVector[Double],
dists: Array[MultivariateGaussian],
weights: Array[Double],
k: Int): Array[Double] = {
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) }
val pSum = p.sum
for (i <- 0 until k){
p(i) /= pSum
}
p
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import breeze.linalg.Transpose
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.stat.impl.MultivariateGaussian
import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext}
import org.apache.spark.SparkContext.DoubleAccumulatorParam

import scala.collection.mutable.IndexedSeqView

Expand Down Expand Up @@ -86,19 +84,19 @@ class GaussianMixtureModelEM private (
private def computeExpectation(
weights: Array[Double],
dists: Array[MultivariateGaussian])
(model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
val k = model._2.length
(sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
val k = sums._2.length
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) }
val pSum = p.sum
model._1(0) += math.log(pSum)
sums._1(0) += math.log(pSum)
val xxt = x * new Transpose(x)
for (i <- 0 until k) {
p(i) /= pSum
model._2(i) += p(i)
model._3(i) += x * p(i)
model._4(i) += xxt * p(i)
sums._2(i) += p(i)
sums._3(i) += x * p(i)
sums._4(i) += xxt * p(i)
}
model
sums
}

// number of samples per cluster to use when initializing Gaussians
Expand Down Expand Up @@ -243,41 +241,4 @@ class GaussianMixtureModelEM private (
(0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length)
cov
}

/**
* Given the input vectors, return the membership value of each vector
* to all mixture components.
*/
def predictClusters(
points: RDD[Vector],
mu: Array[Vector],
sigma: Array[Matrix],
weight: Array[Double], k: Int): RDD[Array[Double]] = {
val sc = points.sparkContext
val dists = sc.broadcast{
(0 until k).map{ i =>
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
}.toArray
}
val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray)
points.map{ x =>
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
}
}

/**
* Compute the partial assignments for each vector
*/
private def computeSoftAssignments(
pt: DenseDoubleVector,
dists: Array[MultivariateGaussian],
weights: Array[Double],
k: Int): Array[Double] = {
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) }
val pSum = p.sum
for (i <- 0 until k){
p(i) /= pSum
}
p
}
}

0 comments on commit 227ad66

Please sign in to comment.