diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala new file mode 100644 index 0000000000000..948c350953e27 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.clustering.GaussianMixtureEM +import org.apache.spark.mllib.linalg.Vectors + +/** + * An example Gaussian Mixture Model EM app. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.DenseGmmEM + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object DenseGmmEM { + def main(args: Array[String]): Unit = { + if (args.length < 3) { + println("usage: DenseGmmEM [maxIterations]") + } else { + val maxIterations = if (args.length > 3) args(3).toInt else 100 + run(args(0), args(1).toInt, args(2).toDouble, maxIterations) + } + } + + private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int) { + val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example") + val ctx = new SparkContext(conf) + + val data = ctx.textFile(inputFile).map { line => + Vectors.dense(line.trim.split(' ').map(_.toDouble)) + }.cache() + + val clusters = new GaussianMixtureEM() + .setK(k) + .setConvergenceTol(convergenceTol) + .setMaxIterations(maxIterations) + .run(data) + + for (i <- 0 until clusters.k) { + println("weight=%f\nmu=%s\nsigma=\n%s\n" format + (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) + } + + println("Cluster labels (first <= 100):") + val clusterLabels = clusters.predict(data) + clusterLabels.take(100).foreach { x => + print(" " + x) + } + println() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureEM.scala new file mode 100644 index 0000000000000..bdf984aee4dae --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureEM.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.IndexedSeq + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix, diag, Transpose} +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian +import org.apache.spark.mllib.util.MLUtils + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureEM private ( + private var k: Int, + private var convergenceTol: Double, + private var maxIterations: Int) extends Serializable { + + /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ + def this() = this(2, 0.01, 100) + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the + // default random starting point + private var initialModel: Option[GaussianMixtureModel] = None + + /** Set the initial GMM starting point, bypassing the random initialization. + * You must call setK() prior to calling this method, and the condition + * (model.k == this.k) must be met; failure will result in an IllegalArgumentException + */ + def setInitialModel(model: GaussianMixtureModel): this.type = { + if (model.k == k) { + initialModel = Some(model) + } else { + throw new IllegalArgumentException("mismatched cluster count (model.k != k)") + } + this + } + + /** Return the user supplied initial GMM, if supplied */ + def getInitialModel: Option[GaussianMixtureModel] = initialModel + + /** Set the number of Gaussians in the mixture model. Default: 2 */ + def setK(k: Int): this.type = { + this.k = k + this + } + + /** Return the number of Gaussians in the mixture model */ + def getK: Int = k + + /** Set the maximum number of iterations to run. Default: 100 */ + def setMaxIterations(maxIterations: Int): this.type = { + this.maxIterations = maxIterations + this + } + + /** Return the maximum number of iterations to run */ + def getMaxIterations: Int = maxIterations + + /** + * Set the largest change in log-likelihood at which convergence is + * considered to have occurred. + */ + def setConvergenceTol(convergenceTol: Double): this.type = { + this.convergenceTol = convergenceTol + this + } + + /** Return the largest change in log-likelihood at which convergence is + * considered to have occurred. + */ + def getConvergenceTol: Double = convergenceTol + + /** Perform expectation maximization */ + def run(data: RDD[Vector]): GaussianMixtureModel = { + val sc = data.sparkContext + + // we will operate on the data as breeze data + val breezeData = data.map(u => u.toBreeze.toDenseVector).cache() + + // Get length of the input vectors + val d = breezeData.first.length + + // Determine initial weights and corresponding Gaussians. + // If the user supplied an initial GMM, we use those values, otherwise + // we start with uniform weights, a random mean from the data, and + // diagonal covariance matrices using component variances + // derived from the samples + val (weights, gaussians) = initialModel match { + case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map { case(mu, sigma) => + new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix) + }) + + case None => { + val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) + (Array.fill(k)(1.0 / k), Array.tabulate(k) { i => + val slice = samples.view(i * nSamples, (i + 1) * nSamples) + new MultivariateGaussian(vectorMean(slice), initCovariance(slice)) + }) + } + } + + var llh = Double.MinValue // current log-likelihood + var llhp = 0.0 // previous log-likelihood + + var iter = 0 + while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) { + // create and broadcast curried cluster contribution function + val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_) + + // aggregate the cluster contribution for all sample points + val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _) + + // Create new distributions based on the partial assignments + // (often referred to as the "M" step in literature) + val sumWeights = sums.weights.sum + var i = 0 + while (i < k) { + val mu = sums.means(i) / sums.weights(i) + val sigma = sums.sigmas(i) / sums.weights(i) - mu * new Transpose(mu) // TODO: Use BLAS.dsyr + weights(i) = sums.weights(i) / sumWeights + gaussians(i) = new MultivariateGaussian(mu, sigma) + i = i + 1 + } + + llhp = llh // current becomes previous + llh = sums.logLikelihood // this is the freshly computed log-likelihood + iter += 1 + } + + // Need to convert the breeze matrices to MLlib matrices + val means = Array.tabulate(k) { i => Vectors.fromBreeze(gaussians(i).mu) } + val sigmas = Array.tabulate(k) { i => Matrices.fromBreeze(gaussians(i).sigma) } + new GaussianMixtureModel(weights, means, sigmas) + } + + /** Average of dense breeze vectors */ + private def vectorMean(x: IndexedSeq[BreezeVector[Double]]): BreezeVector[Double] = { + val v = BreezeVector.zeros[Double](x(0).length) + x.foreach(xi => v += xi) + v / x.length.toDouble + } + + /** + * Construct matrix where diagonal entries are element-wise + * variance of input vectors (computes biased variance) + */ + private def initCovariance(x: IndexedSeq[BreezeVector[Double]]): BreezeMatrix[Double] = { + val mu = vectorMean(x) + val ss = BreezeVector.zeros[Double](x(0).length) + x.map(xi => (xi - mu) :^ 2.0).foreach(u => ss += u) + diag(ss / x.length.toDouble) + } +} + +// companion class to provide zero constructor for ExpectationSum +private object ExpectationSum { + def zero(k: Int, d: Int): ExpectationSum = { + new ExpectationSum(0.0, Array.fill(k)(0.0), + Array.fill(k)(BreezeVector.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d,d))) + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + def add( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: BreezeVector[Double]): ExpectationSum = { + val p = weights.zip(dists).map { + case (weight, dist) => MLUtils.EPSILON + weight * dist.pdf(x) + } + val pSum = p.sum + sums.logLikelihood += math.log(pSum) + val xxt = x * new Transpose(x) + var i = 0 + while (i < sums.k) { + p(i) /= pSum + sums.weights(i) += p(i) + sums.means(i) += x * p(i) + sums.sigmas(i) += xxt * p(i) // TODO: use BLAS.dsyr + i = i + 1 + } + sums + } +} + +// Aggregation class for partial expectation results +private class ExpectationSum( + var logLikelihood: Double, + val weights: Array[Double], + val means: Array[BreezeVector[Double]], + val sigmas: Array[BreezeMatrix[Double]]) extends Serializable { + + val k = weights.length + + def +=(x: ExpectationSum): ExpectationSum = { + var i = 0 + while (i < k) { + weights(i) += x.weights(i) + means(i) += x.means(i) + sigmas(i) += x.sigmas(i) + i = i + 1 + } + logLikelihood += x.logLikelihood + this + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala new file mode 100644 index 0000000000000..11a110db1f7ca --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrix, Vector} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian +import org.apache.spark.mllib.util.MLUtils + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where weight(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { + + /** Number of gaussians in mixture */ + def k: Int = weight.length + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): RDD[Int] = { + val responsibilityMatrix = predictMembership(points, mu, sigma, weight, k) + responsibilityMatrix.map(r => r.indexOf(r.max)) + } + + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictMembership( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], + weight: Array[Double], + k: Int): RDD[Array[Double]] = { + val sc = points.sparkContext + val dists = sc.broadcast { + (0 until k).map { i => + new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) + }.toArray + } + val weights = sc.broadcast(weight) + points.map { x => + computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) + } + } + + /** + * Compute the partial assignments for each vector + */ + private def computeSoftAssignments( + pt: BreezeVector[Double], + dists: Array[MultivariateGaussian], + weights: Array[Double], + k: Int): Array[Double] = { + val p = weights.zip(dists).map { + case (weight, dist) => MLUtils.EPSILON + weight * dist.pdf(pt) + } + val pSum = p.sum + for (i <- 0 until k) { + p(i) /= pSum + } + p + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala new file mode 100644 index 0000000000000..2eab5d277827d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.impl + +import breeze.linalg.{DenseVector => DBV, DenseMatrix => DBM, Transpose, det, pinv} + +/** + * Utility class to implement the density function for multivariate Gaussian distribution. + * Breeze provides this functionality, but it requires the Apache Commons Math library, + * so this class is here so-as to not introduce a new dependency in Spark. + */ +private[mllib] class MultivariateGaussian( + val mu: DBV[Double], + val sigma: DBM[Double]) extends Serializable { + private val sigmaInv2 = pinv(sigma) * -0.5 + private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) + + /** Returns density of this multivariate Gaussian at given point, x */ + def pdf(x: DBV[Double]): Double = { + val delta = x - mu + val deltaTranspose = new Transpose(delta) + U * math.exp(deltaTranspose * sigmaInv2 * delta) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index b0d05ae33e1b5..1d07b5dab8268 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -39,7 +39,7 @@ import org.apache.spark.streaming.dstream.DStream */ object MLUtils { - private[util] lazy val EPSILON = { + private[mllib] lazy val EPSILON = { var eps = 1.0 while ((1.0 + (eps / 2.0)) != 1.0) { eps /= 2.0 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala new file mode 100644 index 0000000000000..23feb82874b70 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{Vectors, Matrices} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ + +class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContext { + test("single cluster") { + val data = sc.parallelize(Array( + Vectors.dense(6.0, 9.0), + Vectors.dense(5.0, 10.0), + Vectors.dense(4.0, 11.0) + )) + + // expectations + val Ew = 1.0 + val Emu = Vectors.dense(5.0, 10.0) + val Esigma = Matrices.dense(2, 2, Array(2.0 / 3.0, -2.0 / 3.0, -2.0 / 3.0, 2.0 / 3.0)) + + val gmm = new GaussianMixtureEM().setK(1).run(data) + + assert(gmm.weight(0) ~== Ew absTol 1E-5) + assert(gmm.mu(0) ~== Emu absTol 1E-5) + assert(gmm.sigma(0) ~== Esigma absTol 1E-5) + } + + test("two clusters") { + val data = sc.parallelize(Array( + Vectors.dense(-5.1971), Vectors.dense(-2.5359), Vectors.dense(-3.8220), + Vectors.dense(-5.2211), Vectors.dense(-5.0602), Vectors.dense( 4.7118), + Vectors.dense( 6.8989), Vectors.dense( 3.4592), Vectors.dense( 4.6322), + Vectors.dense( 5.7048), Vectors.dense( 4.6567), Vectors.dense( 5.5026), + Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734) + )) + + // we set an initial gaussian to induce expected results + val initialGmm = new GaussianMixtureModel( + Array(0.5, 0.5), + Array(Vectors.dense(-1.0), Vectors.dense(1.0)), + Array(Matrices.dense(1, 1, Array(1.0)), Matrices.dense(1, 1, Array(1.0))) + ) + + val Ew = Array(1.0 / 3.0, 2.0 / 3.0) + val Emu = Array(Vectors.dense(-4.3673), Vectors.dense(5.1604)) + val Esigma = Array(Matrices.dense(1, 1, Array(1.1098)), Matrices.dense(1, 1, Array(0.86644))) + + val gmm = new GaussianMixtureEM() + .setK(2) + .setInitialModel(initialGmm) + .run(data) + + assert(gmm.weight(0) ~== Ew(0) absTol 1E-3) + assert(gmm.weight(1) ~== Ew(1) absTol 1E-3) + assert(gmm.mu(0) ~== Emu(0) absTol 1E-3) + assert(gmm.mu(1) ~== Emu(1) absTol 1E-3) + assert(gmm.sigma(0) ~== Esigma(0) absTol 1E-3) + assert(gmm.sigma(1) ~== Esigma(1) absTol 1E-3) + } +}