Style improvements
Changed ExpectationSum to a private class
tgaloppo committed Dec 20, 2014
1 parent b97fe00 commit 9b2fc2a
Expand Up @@ -38,10 +38,10 @@ object DenseGmmEM {

private def run(inputFile: String, k: Int, convergenceTol: Double) {
val conf = new SparkConf().setAppName("Spark EM Sample")
val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example")
val ctx = new SparkContext(conf)

val data = ctx.textFile(inputFile).map{ line =>
val data = ctx.textFile(inputFile).map { line =>
Vectors.dense(line.trim.split(' ').map(_.toDouble))

Expand All @@ -56,8 +56,8 @@ object DenseGmmEM {

println("Cluster labels (first <= 100):")
val (responsibilityMatrix, clusterLabels) = clusters.predict(data)
clusterLabels.take(100).foreach{ x =>
val clusterLabels = clusters.predictLabels(data)
clusterLabels.take(100).foreach { x =>
print(" " + x)
Expand Up @@ -20,8 +20,7 @@ package org.apache.spark.mllib.clustering
import breeze.linalg.{DenseVector => BreezeVector}

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Matrix, Vector}
import org.apache.spark.mllib.stat.impl.MultivariateGaussian

Expand All @@ -44,10 +43,9 @@ class GaussianMixtureModel(
def k: Int = weight.length

/** Maps given points to their cluster indices. */
def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = {
val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k)
val clusterLabels = => r.indexOf(r.max))
(responsibilityMatrix, clusterLabels)
def predictLabels(points: RDD[Vector]): RDD[Int] = {
val responsibilityMatrix = predictMembership(points, mu, sigma, weight, k) => r.indexOf(r.max))

Expand All @@ -58,15 +56,16 @@ class GaussianMixtureModel(
points: RDD[Vector],
mu: Array[Vector],
sigma: Array[Matrix],
weight: Array[Double], k: Int): RDD[Array[Double]] = {
weight: Array[Double],
k: Int): RDD[Array[Double]] = {
val sc = points.sparkContext
val dists = sc.broadcast{
(0 until k).map{ i =>
val dists = sc.broadcast {
(0 until k).map { i =>
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
val weights = sc.broadcast(weight){ x => { x =>
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
Expand All @@ -86,7 +85,7 @@ class GaussianMixtureModel(
k: Int): Array[Double] = {
val p = { case (weight, dist) => eps + weight * dist.pdf(pt) }
val pSum = p.sum
for (i <- 0 until k){
for (i <- 0 until k) {
p(i) /= pSum
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix}
import breeze.linalg.Transpose
import scala.collection.mutable.IndexedSeq

import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix, diag, Transpose}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
import org.apache.spark.mllib.stat.impl.MultivariateGaussian

import scala.collection.mutable.IndexedSeqView

* This class performs expectation maximization for multivariate Gaussian
* Mixture Models (GMMs). A GMM represents a composite distribution of
Expand All @@ -47,87 +45,34 @@ class GaussianMixtureModelEM private (
private var k: Int,
private var convergenceTol: Double,
private var maxIterations: Int) extends Serializable {

// Type aliases for convenience
private type DenseDoubleVector = BreezeVector[Double]
private type DenseDoubleMatrix = BreezeMatrix[Double]
private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]]

private type ExpectationSum = (
Array[Double], // log-likelihood in index 0
Array[Double], // array of weights
Array[DenseDoubleVector], // array of means
Array[DenseDoubleMatrix]) // array of cov matrices

// create a zero'd ExpectationSum instance
private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = {
new Array[Double](k),
(0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray,
(0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray)

// add two ExpectationSum objects (allowed to use modify m1)
// (U, U) => U for aggregation
private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = {
m1._1(0) += m2._1(0)
var i = 0
while (i < m1._2.length) {
m1._2(i) += m2._2(i)
m1._3(i) += m2._3(i)
m1._4(i) += m2._4(i)
i = i + 1
/** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */
def this() = this(2, 0.01, 100)

// compute cluster contributions for each input point
// (U, T) => U for aggregation
private def computeExpectation(
weights: Array[Double],
dists: Array[MultivariateGaussian])
(sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
val k = sums._2.length
val p = { case (weight, dist) => eps + weight * dist.pdf(x) }
val pSum = p.sum
sums._1(0) += math.log(pSum)
val xxt = x * new Transpose(x)
var i = 0
while (i < k) {
p(i) /= pSum
sums._2(i) += p(i)
sums._3(i) += x * p(i)
sums._4(i) += xxt * p(i)
i = i + 1

// number of samples per cluster to use when initializing Gaussians
private val nSamples = 5

// an initializing GMM can be provided rather than using the
// default random starting point
private var initialGmm: Option[GaussianMixtureModel] = None

/** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */
def this() = this(2, 0.01, 100)
private var initialModel: Option[GaussianMixtureModel] = None

/** Set the initial GMM starting point, bypassing the random initialization.
* You must call setK() prior to calling this method, and the condition
* (gmm.k == this.k) must be met; failure will result in an IllegalArgumentException
* (model.k == this.k) must be met; failure will result in an IllegalArgumentException
def setInitialGmm(gmm: GaussianMixtureModel): this.type = {
if (gmm.k == k) {
initialGmm = Some(gmm)
def setInitialModel(model: GaussianMixtureModel): this.type = {
if (model.k == k) {
initialModel = Some(model)
} else {
throw new IllegalArgumentException("initialing GMM has mismatched cluster count (gmm.k != k)")
throw new IllegalArgumentException("mismatched cluster count (model.k != k)")

/** Return the user supplied initial GMM, if supplied */
def getInitialGmm: Option[GaussianMixtureModel] = initialGmm
def getInitialModel: Option[GaussianMixtureModel] = initialModel

/** Set the number of Gaussians in the mixture model. Default: 2 */
def setK(k: Int): this.type = {
Expand Down Expand Up @@ -161,9 +106,6 @@ class GaussianMixtureModelEM private (
def getConvergenceTol: Double = convergenceTol

/** Machine precision value used to ensure matrix conditioning */
private val eps = math.pow(2.0, -52)

/** Perform expectation maximization */
def run(data: RDD[Vector]): GaussianMixtureModel = {
val sc = data.sparkContext
Expand All @@ -179,70 +121,122 @@ class GaussianMixtureModelEM private (
// we start with uniform weights, a random mean from the data, and
// diagonal covariance matrices using component variances
// derived from the samples
val (weights, gaussians) = initialGmm match {
case Some(gmm) => (gmm.weight,{ case(mu, sigma) =>
val (weights, gaussians) = initialModel match {
case Some(gmm) => (gmm.weight, { case(mu, sigma) =>
new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix)

case None => {
val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt)
(Array.fill[Double](k)(1.0 / k), (0 until k).map{ i =>
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))

var llh = Double.MinValue // current log-likelihood
var llhp = 0.0 // previous log-likelihood

var iter = 0
do {
while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) {
// create and broadcast curried cluster contribution function
val compute = sc.broadcast(computeExpectation(weights, gaussians)_)
val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_)

// aggregate the cluster contribution for all sample points
val (logLikelihood, wSums, muSums, sigmaSums) =
breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums)
val sums = breezeData.aggregate(, d))(compute.value, _ += _)

// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
val sumWeights = wSums.sum
for (i <- 0 until k) {
val mu = muSums(i) / wSums(i)
val sigma = sigmaSums(i) / wSums(i) - mu * new Transpose(mu)
weights(i) = wSums(i) / sumWeights
val sumWeights = sums.weights.sum
var i = 0
while (i < k) {
val mu = sums.means(i) / sums.weights(i)
val sigma = sums.sigmas(i) / sums.weights(i) - mu * new Transpose(mu) // TODO: Use BLAS.dsyr
weights(i) = sums.weights(i) / sumWeights
gaussians(i) = new MultivariateGaussian(mu, sigma)
i = i + 1

llhp = llh // current becomes previous
llh = logLikelihood(0) // this is the freshly computed log-likelihood
llh = sums.logLikelihood // this is the freshly computed log-likelihood
iter += 1
} while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol)

// Need to convert the breeze matrices to MLlib matrices
val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i).mu)).toArray
val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i).sigma)).toArray
val means = Array.tabulate(k) { i => Vectors.fromBreeze(gaussians(i).mu) }
val sigmas = Array.tabulate(k) { i => Matrices.fromBreeze(gaussians(i).sigma) }
new GaussianMixtureModel(weights, means, sigmas)

/** Average of dense breeze vectors */
private def vectorMean(x: VectorArrayView): DenseDoubleVector = {
private def vectorMean(x: IndexedSeq[BreezeVector[Double]]): BreezeVector[Double] = {
val v = BreezeVector.zeros[Double](x(0).length)
x.foreach(xi => v += xi)
v / x.length.asInstanceOf[Double]
v / x.length.toDouble

* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
private def initCovariance(x: VectorArrayView): DenseDoubleMatrix = {
private def initCovariance(x: IndexedSeq[BreezeVector[Double]]): BreezeMatrix[Double] = {
val mu = vectorMean(x)
val ss = BreezeVector.zeros[Double](x(0).length)
val cov = BreezeMatrix.eye[Double](ss.length) => (xi - mu) :^ 2.0).foreach(u => ss += u)
(0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length)
diag(ss / x.length.toDouble)

// companion class to provide zero constructor for ExpectationSum
private object ExpectationSum {
private val eps = math.pow(2.0, -52)

def zero(k: Int, d: Int): ExpectationSum = {
new ExpectationSum(0.0, Array.fill(k)(0.0),
Array.fill(k)(BreezeVector.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d,d)))

// compute cluster contributions for each input point
// (U, T) => U for aggregation
def add(
weights: Array[Double],
dists: Array[MultivariateGaussian])
(sums: ExpectationSum, x: BreezeVector[Double]): ExpectationSum = {
val p = { case (weight, dist) => eps + weight * dist.pdf(x) }
val pSum = p.sum
sums.logLikelihood += math.log(pSum)
val xxt = x * new Transpose(x)
var i = 0
while (i < sums.k) {
p(i) /= pSum
sums.weights(i) += p(i)
sums.means(i) += x * p(i)
sums.sigmas(i) += xxt * p(i) // TODO: use BLAS.dsyr
i = i + 1

// Aggregation class for partial expectation results
private class ExpectationSum(
var logLikelihood: Double,
val weights: Array[Double],
val means: Array[BreezeVector[Double]],
val sigmas: Array[BreezeMatrix[Double]]) extends Serializable {

val k = weights.length

def +=(x: ExpectationSum): ExpectationSum = {
var i = 0
while (i < k) {
weights(i) += x.weights(i)
means(i) += x.means(i)
sigmas(i) += x.sigmas(i)
i = i + 1
logLikelihood += x.logLikelihood
Expand Up @@ -17,21 +17,21 @@

package org.apache.spark.mllib.stat.impl

import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix}
import breeze.linalg.{Transpose, det, pinv}
import breeze.linalg.{DenseVector => DBV, DenseMatrix => DBM, Transpose, det, pinv}

* Utility class to implement the density function for multivariate Gaussian distribution.
* Breeze provides this functionality, but it requires the Apache Commons Math library,
* so this class is here so-as to not introduce a new dependency in Spark.
private[mllib] class MultivariateGaussian(
val mu: BreezeVector[Double],
val sigma: BreezeMatrix[Double]) extends Serializable {
val mu: DBV[Double],
val sigma: DBM[Double]) extends Serializable {
private val sigmaInv2 = pinv(sigma) * -0.5
private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5)

def pdf(x: BreezeVector[Double]): Double = {
/** Returns density of this multivariate Gaussian at given point, x */
def pdf(x: DBV[Double]): Double = {
val delta = x - mu
val deltaTranspose = new Transpose(delta)
U * math.exp(deltaTranspose * sigmaInv2 * delta)
Expand Up @@ -65,7 +65,7 @@ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContex

val gmm = new GaussianMixtureModelEM()

assert(gmm.weight(0) ~== Ew(0) absTol 1E-3)
Expand Down

