From b1178cfaad979f26ce02648795af65ecf08685c9 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 28 Apr 2015 22:01:41 +0800 Subject: [PATCH] fit into the optimizer framework --- .../apache/spark/mllib/clustering/LDA.scala | 38 +-- .../spark/mllib/clustering/LDAOptimizer.scala | 288 ++++++++++++++++-- .../spark/mllib/clustering/OnlineLDA.scala | 246 --------------- 3 files changed, 274 insertions(+), 298 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 37bf88b73b911..92e343fbcba3e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -78,21 +78,15 @@ class LDA private ( * * This is the parameter to a symmetric Dirichlet distribution. */ - def getDocConcentration: Double = { - if (this.docConcentration == -1) { - (50.0 / k) + 1.0 - } else { - this.docConcentration - } - } + def getDocConcentration: Double = this.docConcentration /** * Concentration parameter (commonly named "alpha") for the prior placed on documents' * distributions over topics ("theta"). * - * This is the parameter to a symmetric Dirichlet distribution. + * This is the parameter to a symmetric Dirichlet distribution, where larger values + * mean more smoothing (more regularization). * - * This value should be > 1.0, where larger values mean more smoothing (more regularization). * If set to -1, then docConcentration is set automatically. * (default = -1 = automatic) * @@ -100,13 +94,12 @@ class LDA private ( * - For EM: default = (50 / k) + 1. * - The 50/k is common in LDA libraries. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - For Online: default = (1.0 / k). + * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. * - * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), - * but values in (0,1) are not yet supported. + * Note: For EM optimizer, This value should be > 1.0. */ def setDocConcentration(docConcentration: Double): this.type = { - require(docConcentration > 1.0 || docConcentration == -1.0, - s"LDA docConcentration must be > 1.0 (or -1 for auto), but was set to $docConcentration") this.docConcentration = docConcentration this } @@ -126,13 +119,7 @@ class LDA private ( * Note: The topics' distributions over terms are called "beta" in the original LDA paper * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. */ - def getTopicConcentration: Double = { - if (this.topicConcentration == -1) { - 1.1 - } else { - this.topicConcentration - } - } + def getTopicConcentration: Double = this.topicConcentration /** * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' @@ -143,7 +130,6 @@ class LDA private ( * Note: The topics' distributions over terms are called "beta" in the original LDA paper * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. * - * This value should be > 0.0. * If set to -1, then topicConcentration is set automatically. * (default = -1 = automatic) * @@ -151,13 +137,12 @@ class LDA private ( * - For EM: default = 0.1 + 1. * - The 0.1 gives a small amount of smoothing. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - For Online: default = (1.0 / k). + * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. * - * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), - * but values in (0,1) are not yet supported. + * Note: For EM optimizer, This value should be > 1.0. */ def setTopicConcentration(topicConcentration: Double): this.type = { - require(topicConcentration > 1.0 || topicConcentration == -1.0, - s"LDA topicConcentration must be > 1.0 (or -1 for auto), but was set to $topicConcentration") this.topicConcentration = topicConcentration this } @@ -245,8 +230,7 @@ class LDA private ( * @return Inferred LDA model */ def run(documents: RDD[(Long, Vector)]): LDAModel = { - val state = ldaOptimizer.initialState(documents, k, getDocConcentration, getTopicConcentration, - seed, checkpointInterval) + val state = ldaOptimizer.initialize(documents, this) var iter = 0 val iterationTimes = Array.fill[Double](maxIterations)(0) while (iter < maxIterations) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index ffd72a294c6c6..5b6d226cea2b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -19,13 +19,15 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseVector => BDV, normalize} +import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize, kron} +import breeze.numerics.{digamma, exp, abs} +import breeze.stats.distributions.Gamma import org.apache.spark.annotation.Experimental import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Matrices, SparseVector, DenseVector, Vector} import org.apache.spark.rdd.RDD /** @@ -35,7 +37,7 @@ import org.apache.spark.rdd.RDD * hold optimizer-specific parameters for users to set. */ @Experimental -trait LDAOptimizer{ +trait LDAOptimizer { /* DEVELOPERS NOTE: @@ -49,13 +51,7 @@ trait LDAOptimizer{ * Initializer for the optimizer. LDA passes the common parameters to the optimizer and * the internal structure can be initialized properly. */ - private[clustering] def initialState( - docs: RDD[(Long, Vector)], - k: Int, - docConcentration: Double, - topicConcentration: Double, - randomSeed: Long, - checkpointInterval: Int): LDAOptimizer + private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer private[clustering] def next(): LDAOptimizer @@ -80,12 +76,12 @@ trait LDAOptimizer{ * */ @Experimental -class EMLDAOptimizer extends LDAOptimizer{ +class EMLDAOptimizer extends LDAOptimizer { import LDA._ /** - * Following fields will only be initialized through initialState method + * Following fields will only be initialized through initialize method */ private[clustering] var graph: Graph[TopicCounts, TokenCount] = null private[clustering] var k: Int = 0 @@ -98,13 +94,38 @@ class EMLDAOptimizer extends LDAOptimizer{ /** * Compute bipartite term/doc graph. */ - private[clustering] override def initialState( - docs: RDD[(Long, Vector)], - k: Int, - docConcentration: Double, - topicConcentration: Double, - randomSeed: Long, - checkpointInterval: Int): LDAOptimizer = { + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): + LDAOptimizer = { + + val docConcentration = lda.getDocConcentration + val topicConcentration = lda.getTopicConcentration + val k = lda.getK + + /** + * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), + * but values in (0,1) are not yet supported. + */ + require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" + + s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration") + require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " + + s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration") + + /** + * - For EM: default = (50 / k) + 1. + * - The 50/k is common in LDA libraries. + * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + */ + this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration + + /** + * - For EM: default = 0.1 + 1. + * - The 0.1 gives a small amount of smoothing. + * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. + */ + this.topicConcentration = if (topicConcentration == -1) 1.1 else topicConcentration + + val randomSeed = lda.getSeed + // For each document, create an edge (Document -> Term) for each unique term in the document. val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) => // Add edges for terms with non-zero counts. @@ -113,8 +134,6 @@ class EMLDAOptimizer extends LDAOptimizer{ } } - val vocabSize = docs.take(1).head._2.size - // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). def createVertices(): RDD[(VertexId, TopicCounts)] = { @@ -135,10 +154,8 @@ class EMLDAOptimizer extends LDAOptimizer{ // Partition such that edges are grouped by document this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k - this.vocabSize = vocabSize - this.docConcentration = docConcentration - this.topicConcentration = topicConcentration - this.checkpointInterval = checkpointInterval + this.vocabSize = docs.take(1).head._2.size + this.checkpointInterval = lda.getCheckpointInterval this.graphCheckpointer = new PeriodicGraphCheckpointer[TopicCounts, TokenCount](graph, checkpointInterval) this.globalTopicTotals = computeGlobalTopicTotals() @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{ new DistributedLDAModel(this, iterationTimes) } } + + +/** + * :: Experimental :: + * + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which + * processes a subset of the corpus by each call to next, and update the term-topic + * distribution adaptively. + * + * References: + * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + */ +@Experimental +class OnlineLDAOptimizer extends LDAOptimizer { + + // LDA common parameters + private var k: Int = 0 + private var D: Int = 0 + private var vocabSize: Int = 0 + private var alpha: Double = 0 + private var eta: Double = 0 + private var randomSeed: Long = 0 + + // Online LDA specific parameters + private var tau_0: Double = -1 + private var kappa: Double = -1 + private var batchSize: Int = -1 + + // internal data structure + private var docs: RDD[(Long, Vector)] = null + private var lambda: BDM[Double] = null + private var Elogbeta: BDM[Double]= null + private var expElogbeta: BDM[Double] = null + + // count of invocation to next, used to help deciding the weight for each iteration + private var iteration = 0 + + /** + * A (positive) learning parameter that downweights early iterations + */ + def getTau_0: Double = { + if (this.tau_0 == -1) { + 1024 + } else { + this.tau_0 + } + } + + /** + * A (positive) learning parameter that downweights early iterations + * Automatic setting of parameter: + * - default = 1024, which follows the recommendation from OnlineLDA paper. + */ + def setTau_0(tau_0: Double): this.type = { + require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but was set to $tau_0") + this.tau_0 = tau_0 + this + } + + /** + * Learning rate: exponential decay rate + */ + def getKappa: Double = { + if (this.kappa == -1) { + 0.5 + } else { + this.kappa + } + } + + /** + * Learning rate: exponential decay rate---should be between + * (0.5, 1.0] to guarantee asymptotic convergence. + * - default = 0.5, which follows the recommendation from OnlineLDA paper. + */ + def setKappa(kappa: Double): this.type = { + require(kappa >= 0 || kappa == -1.0, + s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa") + this.kappa = kappa + this + } + + /** + * Mini-batch size, which controls how many documents are used in each iteration + */ + def getBatchSize: Int = { + if (this.batchSize == -1) { + D / 100 + } else { + this.batchSize + } + } + + /** + * Mini-batch size, which controls how many documents are used in each iteration + * default = 1% from total documents. + */ + def setBatchSize(batchSize: Int): this.type = { + this.batchSize = batchSize + this + } + + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + + this.k = lda.getK + this.D = docs.count().toInt + this.vocabSize = docs.first()._2.size + this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration + this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration + this.randomSeed = randomSeed + + this.docs = docs + + // Initialize the variational distribution q(beta|lambda) + this.lambda = getGammaMatrix(k, vocabSize) + this.Elogbeta = dirichlet_expectation(lambda) + this.expElogbeta = exp(Elogbeta) + this.iteration = 0 + this + } + + /** + * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model, + * and it will update the topic distribution adaptively for the terms appearing in the subset. + * + * @return Inferred LDA model + */ + private[clustering] override def next(): OnlineLDAOptimizer = { + iteration += 1 + val batchSize = getBatchSize + val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache() + if(batch.isEmpty()) return this + + val k = this.k + val vocabSize = this.vocabSize + val expElogbeta = this.expElogbeta + val alpha = this.alpha + + val stats = batch.mapPartitions(docs =>{ + val stat = BDM.zeros[Double](k, vocabSize) + docs.foreach(doc =>{ + val termCounts = doc._2 + val (ids, cts) = termCounts match { + case v: DenseVector => (((0 until v.size).toList), v.values) + case v: SparseVector => (v.indices.toList, v.values) + case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + } + + // Initialize the variational distribution q(theta|gamma) for the mini-batch + var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K + var expElogthetad = exp(Elogthetad) // 1 * K + val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids + + var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids + var meanchange = 1D + val ctsVector = new BDV[Double](cts).t // 1 * ids + + // Iterate between gamma and phi until convergence + while (meanchange > 1e-5) { + val lastgamma = gammad + // 1*K 1 * ids ids * k + gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha + Elogthetad = digamma(gammad) - digamma(sum(gammad)) + expElogthetad = exp(Elogthetad) + phinorm = expElogthetad * expElogbetad + 1e-100 + meanchange = sum(abs(gammad - lastgamma)) / k + } + + val m1 = expElogthetad.t.toDenseMatrix.t + val m2 = (ctsVector / phinorm).t.toDenseMatrix + val outerResult = kron(m1, m2) // K * ids + for (i <- 0 until ids.size) { + stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) + } + stat + }) + Iterator(stat) + }) + + val batchResult = stats.reduce(_ += _) + update(batchResult, iteration, batchSize) + batch.unpersist() + this + } + + private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = { + new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) + } + + private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ + + val tau_0 = this.getTau_0 + val kappa = this.getKappa + + // weight of the mini-batch. + val weight = math.pow(tau_0 + iter, -kappa) + + // This step finishes computing the sufficient statistics for the M step + val stat = raw :* expElogbeta + + // Update lambda based on documents. + lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight + Elogbeta = dirichlet_expectation(lambda) + expElogbeta = exp(Elogbeta) + } + + private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ + val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) + val temp = gammaRandomGenerator.sample(row * col).toArray + (new BDM[Double](col, row, temp)).t + } + + private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { + val rowSum = sum(alpha(breeze.linalg.*, ::)) + val digAlpha = digamma(alpha) + val digRowSum = digamma(rowSum) + val result = digAlpha(::, breeze.linalg.*) - digRowSum + result + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala deleted file mode 100644 index 72c550144db0a..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.clustering - -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, kron, sum} -import breeze.numerics._ -import breeze.stats.distributions.Gamma -import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg._ -import org.apache.spark.rdd.RDD - -/** - * :: Experimental :: - * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. - * - * Online LDA breaks the massive corps into mini batches and scans the corpus (doc sets) only - * once. Thus it needs not locally store or collect the documents and can be handily applied to - * streaming document collections. - * - * References: - * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. - */ -@Experimental -class OnlineLDA( - private var k: Int, - private var numIterations: Int, - private var miniBatchFraction: Double, - private var tau_0: Double, - private var kappa: Double) { - - def this() = this(k = 10, numIterations = 100, miniBatchFraction = 0.01, - tau_0 = 1024, kappa = 0.5) - - /** - * Number of topics to infer. I.e., the number of soft cluster centers. - * (default = 10) - */ - def setK(k: Int): this.type = { - require(k > 0, s"OnlineLDA k (number of clusters) must be > 0, but was set to $k") - this.k = k - this - } - - /** - * Set the number of iterations for OnlineLDA. Default 100. - */ - def setNumIterations(iters: Int): this.type = { - this.numIterations = iters - this - } - - /** - * Set fraction of data to be used for each iteration. Default 0.01. - */ - def setMiniBatchFraction(fraction: Double): this.type = { - this.miniBatchFraction = fraction - this - } - - /** - * A (positive) learning parameter that downweights early iterations. Default 1024. - */ - def setTau_0(t: Double): this.type = { - this.tau_0 = t - this - } - - /** - * Learning rate: exponential decay rate. Default 0.5. - */ - def setKappa(kappa: Double): this.type = { - this.kappa = kappa - this - } - - - /** - * Learns an LDA model from the given data set, using online variational Bayes (VB) algorithm. - * This is just designed as a handy API. For massive corpus, it's recommended to use - * OnlineLDAOptimizer directly and call submitMiniBatch in your application, which can help - * downgrade time and space complexity by not loading the entire corpus. - * - * @param documents RDD of documents, which are term (word) count vectors paired with IDs. - * The term count vectors are "bags of words" with a fixed-size vocabulary - * (where the vocabulary size is the length of the vector). - * Document IDs must be unique and >= 0. - * @return Inferred LDA model - */ - def run(documents: RDD[(Long, Vector)]): LDAModel = { - val vocabSize = documents.first._2.size - val D = documents.count().toInt // total documents count - val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize, tau_0, kappa) - - val arr = Array.fill(math.ceil(1.0 / miniBatchFraction).toInt)(miniBatchFraction) - for(i <- 0 until numIterations){ - val splits = documents.randomSplit(arr) - val index = i % splits.size - onlineLDA.submitMiniBatch(splits(index)) - } - onlineLDA.getTopicDistribution() - } -} - -/** - * :: Experimental :: - * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. - * - * An online training optimizer for LDA. The Optimizer processes a subset (like 1%) of the corpus - * by each call to submitMiniBatch, and update the term-topic distribution adaptively. User can - * get the result from getTopicDistribution. - * - * References: - * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. - */ -@Experimental -private[clustering] class OnlineLDAOptimizer ( - private var k: Int, - private var D: Int, - private val vocabSize: Int, - private val tau_0: Double, - private val kappa: Double) extends Serializable { - - // Initialize the variational distribution q(beta|lambda) - private var lambda = getGammaMatrix(k, vocabSize) // K * V - private var Elogbeta = dirichlet_expectation(lambda) // K * V - private var expElogbeta = exp(Elogbeta) // K * V - private var i = 0 - - /** - * Submit a a subset (like 1%) of the corpus to the Online LDA model, and it will update - * the topic distribution adaptively for the terms appearing in the subset (minibatch). - * The documents RDD can be discarded after submitMiniBatch finished. - * - * @param documents RDD of documents, which are term (word) count vectors paired with IDs. - * The term count vectors are "bags of words" with a fixed-size vocabulary - * (where the vocabulary size is the length of the vector). - * Document IDs must be unique and >= 0. - * @return Inferred LDA model - */ - private[clustering] def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { - if(documents.isEmpty()){ - return - } - - var stat = BDM.zeros[Double](k, vocabSize) - stat = documents.treeAggregate(stat)(gradient, _ += _) - update(stat, i, documents.count().toInt) - i += 1 - } - - /** - * get the topic-term distribution - */ - private[clustering] def getTopicDistribution(): LDAModel ={ - new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) - } - - private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ - // weight of the mini-batch. - val weight = math.pow(tau_0 + iter, -kappa) - - // This step finishes computing the sufficient statistics for the M step - val stat = raw :* expElogbeta - - // Update lambda based on documents. - lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * weight - Elogbeta = dirichlet_expectation(lambda) - expElogbeta = exp(Elogbeta) - } - - // for each document d update that document's gamma and phi - private def gradient(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { - val termCounts = doc._2 - val (ids, cts) = termCounts match { - case v: DenseVector => (((0 until v.size).toList), v.values) - case v: SparseVector => (v.indices.toList, v.values) - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) - } - - // Initialize the variational distribution q(theta|gamma) for the mini-batch - var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K - var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K - var expElogthetad = exp(Elogthetad.t).t // 1 * K - val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids - - var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids - var meanchange = 1D - val ctsVector = new BDV[Double](cts).t // 1 * ids - - // Iterate between gamma and phi until convergence - while (meanchange > 1e-5) { - val lastgamma = gammad - // 1*K 1 * ids ids * k - gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + 1.0/k - Elogthetad = vector_dirichlet_expectation(gammad.t).t - expElogthetad = exp(Elogthetad.t).t - phinorm = expElogthetad * expElogbetad + 1e-100 - meanchange = sum(abs((gammad - lastgamma).t)) / gammad.t.size.toDouble - } - - val m1 = expElogthetad.t.toDenseMatrix.t - val m2 = (ctsVector / phinorm).t.toDenseMatrix - val outerResult = kron(m1, m2) // K * ids - for (i <- 0 until ids.size) { - stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) - } - stat - } - - private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ - val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) - val temp = gammaRandomGenerator.sample(row * col).toArray - (new BDM[Double](col, row, temp)).t - } - - private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { - val rowSum = sum(alpha(breeze.linalg.*, ::)) - val digAlpha = digamma(alpha) - val digRowSum = digamma(rowSum) - val result = digAlpha(::, breeze.linalg.*) - digRowSum - result - } - - private def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ - digamma(v) - digamma(sum(v)) - } -} - - - -