diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index 41adcbb0a3115..b56c4b3bd7789 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -38,10 +38,10 @@ object DenseGmmEM { } private def run(inputFile: String, k: Int, convergenceTol: Double) { - val conf = new SparkConf().setAppName("Spark EM Sample") + val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example") val ctx = new SparkContext(conf) - val data = ctx.textFile(inputFile).map{ line => + val data = ctx.textFile(inputFile).map { line => Vectors.dense(line.trim.split(' ').map(_.toDouble)) }.cache() @@ -56,8 +56,8 @@ object DenseGmmEM { } println("Cluster labels (first <= 100):") - val (responsibilityMatrix, clusterLabels) = clusters.predict(data) - clusterLabels.take(100).foreach{ x => + val clusterLabels = clusters.predictLabels(data) + clusterLabels.take(100).foreach { x => print(" " + x) } println() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index c02aa60dd86ca..734d67ea72a26 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -20,8 +20,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseVector => BreezeVector} import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.Matrix -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.impl.MultivariateGaussian /** @@ -44,10 +43,9 @@ class GaussianMixtureModel( def k: Int = weight.length /** Maps given points to their cluster indices. */ - def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { - val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) - val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) - (responsibilityMatrix, clusterLabels) + def predictLabels(points: RDD[Vector]): RDD[Int] = { + val responsibilityMatrix = predictMembership(points, mu, sigma, weight, k) + responsibilityMatrix.map(r => r.indexOf(r.max)) } /** @@ -58,15 +56,16 @@ class GaussianMixtureModel( points: RDD[Vector], mu: Array[Vector], sigma: Array[Matrix], - weight: Array[Double], k: Int): RDD[Array[Double]] = { + weight: Array[Double], + k: Int): RDD[Array[Double]] = { val sc = points.sparkContext - val dists = sc.broadcast{ - (0 until k).map{ i => + val dists = sc.broadcast { + (0 until k).map { i => new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) }.toArray } val weights = sc.broadcast(weight) - points.map{ x => + points.map { x => computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) } } @@ -86,7 +85,7 @@ class GaussianMixtureModel( k: Int): Array[Double] = { val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } val pSum = p.sum - for (i <- 0 until k){ + for (i <- 0 until k) { p(i) /= pSum } p diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index b0d8c7a0aafbd..f985f3828952b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -17,15 +17,13 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.Transpose +import scala.collection.mutable.IndexedSeq +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix, diag, Transpose} import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} import org.apache.spark.mllib.stat.impl.MultivariateGaussian -import scala.collection.mutable.IndexedSeqView - /** * This class performs expectation maximization for multivariate Gaussian * Mixture Models (GMMs). A GMM represents a composite distribution of @@ -47,87 +45,34 @@ class GaussianMixtureModelEM private ( private var k: Int, private var convergenceTol: Double, private var maxIterations: Int) extends Serializable { - - // Type aliases for convenience - private type DenseDoubleVector = BreezeVector[Double] - private type DenseDoubleMatrix = BreezeMatrix[Double] - private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] - - private type ExpectationSum = ( - Array[Double], // log-likelihood in index 0 - Array[Double], // array of weights - Array[DenseDoubleVector], // array of means - Array[DenseDoubleMatrix]) // array of cov matrices - - // create a zero'd ExpectationSum instance - private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { - (Array(0.0), - new Array[Double](k), - (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, - (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) - } - // add two ExpectationSum objects (allowed to use modify m1) - // (U, U) => U for aggregation - private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { - m1._1(0) += m2._1(0) - var i = 0 - while (i < m1._2.length) { - m1._2(i) += m2._2(i) - m1._3(i) += m2._3(i) - m1._4(i) += m2._4(i) - i = i + 1 - } - m1 - } + /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ + def this() = this(2, 0.01, 100) + - // compute cluster contributions for each input point - // (U, T) => U for aggregation - private def computeExpectation( - weights: Array[Double], - dists: Array[MultivariateGaussian]) - (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { - val k = sums._2.length - val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } - val pSum = p.sum - sums._1(0) += math.log(pSum) - val xxt = x * new Transpose(x) - var i = 0 - while (i < k) { - p(i) /= pSum - sums._2(i) += p(i) - sums._3(i) += x * p(i) - sums._4(i) += xxt * p(i) - i = i + 1 - } - sums - } // number of samples per cluster to use when initializing Gaussians private val nSamples = 5 // an initializing GMM can be provided rather than using the // default random starting point - private var initialGmm: Option[GaussianMixtureModel] = None - - /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ - def this() = this(2, 0.01, 100) + private var initialModel: Option[GaussianMixtureModel] = None /** Set the initial GMM starting point, bypassing the random initialization. * You must call setK() prior to calling this method, and the condition - * (gmm.k == this.k) must be met; failure will result in an IllegalArgumentException + * (model.k == this.k) must be met; failure will result in an IllegalArgumentException */ - def setInitialGmm(gmm: GaussianMixtureModel): this.type = { - if (gmm.k == k) { - initialGmm = Some(gmm) + def setInitialModel(model: GaussianMixtureModel): this.type = { + if (model.k == k) { + initialModel = Some(model) } else { - throw new IllegalArgumentException("initialing GMM has mismatched cluster count (gmm.k != k)") + throw new IllegalArgumentException("mismatched cluster count (model.k != k)") } this } /** Return the user supplied initial GMM, if supplied */ - def getInitialGmm: Option[GaussianMixtureModel] = initialGmm + def getInitialModel: Option[GaussianMixtureModel] = initialModel /** Set the number of Gaussians in the mixture model. Default: 2 */ def setK(k: Int): this.type = { @@ -161,9 +106,6 @@ class GaussianMixtureModelEM private ( */ def getConvergenceTol: Double = convergenceTol - /** Machine precision value used to ensure matrix conditioning */ - private val eps = math.pow(2.0, -52) - /** Perform expectation maximization */ def run(data: RDD[Vector]): GaussianMixtureModel = { val sc = data.sparkContext @@ -179,17 +121,17 @@ class GaussianMixtureModelEM private ( // we start with uniform weights, a random mean from the data, and // diagonal covariance matrices using component variances // derived from the samples - val (weights, gaussians) = initialGmm match { - case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map{ case(mu, sigma) => + val (weights, gaussians) = initialModel match { + case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map { case(mu, sigma) => new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix) - }.toArray) + }) case None => { val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) - (Array.fill[Double](k)(1.0 / k), (0 until k).map{ i => + (Array.fill(k)(1.0 / k), Array.tabulate(k) { i => val slice = samples.view(i * nSamples, (i + 1) * nSamples) new MultivariateGaussian(vectorMean(slice), initCovariance(slice)) - }.toArray) + }) } } @@ -197,52 +139,104 @@ class GaussianMixtureModelEM private ( var llhp = 0.0 // previous log-likelihood var iter = 0 - do { + while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) { // create and broadcast curried cluster contribution function - val compute = sc.broadcast(computeExpectation(weights, gaussians)_) + val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_) // aggregate the cluster contribution for all sample points - val (logLikelihood, wSums, muSums, sigmaSums) = - breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) + val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _) // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) - val sumWeights = wSums.sum - for (i <- 0 until k) { - val mu = muSums(i) / wSums(i) - val sigma = sigmaSums(i) / wSums(i) - mu * new Transpose(mu) - weights(i) = wSums(i) / sumWeights + val sumWeights = sums.weights.sum + var i = 0 + while (i < k) { + val mu = sums.means(i) / sums.weights(i) + val sigma = sums.sigmas(i) / sums.weights(i) - mu * new Transpose(mu) // TODO: Use BLAS.dsyr + weights(i) = sums.weights(i) / sumWeights gaussians(i) = new MultivariateGaussian(mu, sigma) + i = i + 1 } llhp = llh // current becomes previous - llh = logLikelihood(0) // this is the freshly computed log-likelihood + llh = sums.logLikelihood // this is the freshly computed log-likelihood iter += 1 - } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) + } // Need to convert the breeze matrices to MLlib matrices - val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i).mu)).toArray - val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i).sigma)).toArray + val means = Array.tabulate(k) { i => Vectors.fromBreeze(gaussians(i).mu) } + val sigmas = Array.tabulate(k) { i => Matrices.fromBreeze(gaussians(i).sigma) } new GaussianMixtureModel(weights, means, sigmas) } /** Average of dense breeze vectors */ - private def vectorMean(x: VectorArrayView): DenseDoubleVector = { + private def vectorMean(x: IndexedSeq[BreezeVector[Double]]): BreezeVector[Double] = { val v = BreezeVector.zeros[Double](x(0).length) x.foreach(xi => v += xi) - v / x.length.asInstanceOf[Double] + v / x.length.toDouble } /** * Construct matrix where diagonal entries are element-wise * variance of input vectors (computes biased variance) */ - private def initCovariance(x: VectorArrayView): DenseDoubleMatrix = { + private def initCovariance(x: IndexedSeq[BreezeVector[Double]]): BreezeMatrix[Double] = { val mu = vectorMean(x) val ss = BreezeVector.zeros[Double](x(0).length) - val cov = BreezeMatrix.eye[Double](ss.length) x.map(xi => (xi - mu) :^ 2.0).foreach(u => ss += u) - (0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length) - cov + diag(ss / x.length.toDouble) } } + +// companion class to provide zero constructor for ExpectationSum +private object ExpectationSum { + private val eps = math.pow(2.0, -52) + + def zero(k: Int, d: Int): ExpectationSum = { + new ExpectationSum(0.0, Array.fill(k)(0.0), + Array.fill(k)(BreezeVector.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d,d))) + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + def add( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: BreezeVector[Double]): ExpectationSum = { + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } + val pSum = p.sum + sums.logLikelihood += math.log(pSum) + val xxt = x * new Transpose(x) + var i = 0 + while (i < sums.k) { + p(i) /= pSum + sums.weights(i) += p(i) + sums.means(i) += x * p(i) + sums.sigmas(i) += xxt * p(i) // TODO: use BLAS.dsyr + i = i + 1 + } + sums + } +} + +// Aggregation class for partial expectation results +private class ExpectationSum( + var logLikelihood: Double, + val weights: Array[Double], + val means: Array[BreezeVector[Double]], + val sigmas: Array[BreezeMatrix[Double]]) extends Serializable { + + val k = weights.length + + def +=(x: ExpectationSum): ExpectationSum = { + var i = 0 + while (i < k) { + weights(i) += x.weights(i) + means(i) += x.means(i) + sigmas(i) += x.sigmas(i) + i = i + 1 + } + logLikelihood += x.logLikelihood + this + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala index d14adab09177e..2eab5d277827d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala @@ -17,8 +17,7 @@ package org.apache.spark.mllib.stat.impl -import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.{Transpose, det, pinv} +import breeze.linalg.{DenseVector => DBV, DenseMatrix => DBM, Transpose, det, pinv} /** * Utility class to implement the density function for multivariate Gaussian distribution. @@ -26,12 +25,13 @@ import breeze.linalg.{Transpose, det, pinv} * so this class is here so-as to not introduce a new dependency in Spark. */ private[mllib] class MultivariateGaussian( - val mu: BreezeVector[Double], - val sigma: BreezeMatrix[Double]) extends Serializable { + val mu: DBV[Double], + val sigma: DBM[Double]) extends Serializable { private val sigmaInv2 = pinv(sigma) * -0.5 private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) - def pdf(x: BreezeVector[Double]): Double = { + /** Returns density of this multivariate Gaussian at given point, x */ + def pdf(x: DBV[Double]): Double = { val delta = x - mu val deltaTranspose = new Transpose(delta) U * math.exp(deltaTranspose * sigmaInv2 * delta) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala index e44db28ceb614..d19b23c7b1600 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -65,7 +65,7 @@ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContex val gmm = new GaussianMixtureModelEM() .setK(2) - .setInitialGmm(initialGmm) + .setInitialModel(initialGmm) .run(data) assert(gmm.weight(0) ~== Ew(0) absTol 1E-3)