From 2891e890180aa6da843d08ee1b8aed71d095edc3 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 12 Jan 2015 12:34:32 -0800 Subject: [PATCH] Prepped LDA main class for PR, but some cleanups remain --- .../spark/examples/mllib/LDAExample.scala | 202 +++++ .../SimpleLatentDirichletAllocation.scala | 692 ------------------ .../apache/spark/mllib/clustering/LDA.scala | 566 ++++++++++++++ .../LatentDirichletAllocation.scala | 223 ------ 4 files changed, 768 insertions(+), 915 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/SimpleLatentDirichletAllocation.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/topicmodel/LatentDirichletAllocation.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala new file mode 100644 index 0000000000000..b028fe4fc943d --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import scala.collection.mutable.ArrayBuffer + +import java.text.BreakIterator + +import scopt.OptionParser + +import org.apache.log4j.{Level, Logger} + +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.clustering.LDA +import org.apache.spark.mllib.clustering.LDA.Document +import org.apache.spark.mllib.linalg.SparseVector +import org.apache.spark.rdd.RDD + + +/** + * An example Latent Dirichlet Allocation (LDA) app. Run with + * {{{ + * ./bin/run-example mllib.DenseKMeans [options] + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object LDAExample { + + case class Params( + input: Seq[String] = Seq.empty, + k: Int = 20, + topicSmoothing: Double = 0.1, + termSmoothing: Double = 0.1, + vocabSize: Int = 10000) extends AbstractParams[Params] + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("LDAExample") { + head("LDAExample: an example LDA app for plain text data.") + opt[Int]("k") + .text(s"number of topics. default: ${defaultParams.k}") + .action((x, c) => c.copy(k = x)) + opt[Double]("topicSmoothing") + .text(s"amount of topic smoothing to use. default: ${defaultParams.topicSmoothing}") + .action((x, c) => c.copy(topicSmoothing = x)) + opt[Double]("termSmoothing") + .text(s"amount of word smoothing to use. default: ${defaultParams.termSmoothing}") + .action((x, c) => c.copy(termSmoothing = x)) + opt[Int]("vocabSize") + .text(s"number of distinct word types to use, chosen by frequency." + + s" default: ${defaultParams.vocabSize}") + .action((x, c) => c.copy(vocabSize = x)) + arg[String]("...") + .text("input paths (directories) to plain text corpora") + .unbounded() + .required() + .action((x, c) => c.copy(input = c.input :+ x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + }.getOrElse { + parser.showUsageAsError + sys.exit(1) + } + } + + private def run(params: Params) { + val conf = new SparkConf().setAppName(s"LDAExample with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val corpus = preprocess(sc, params.input, params.vocabSize) + corpus.cache() + + val lda = new LDA() + lda.setK(params.k) + .setMaxIterations(4) + .setTopicSmoothing(params.topicSmoothing) + .setTermSmoothing(params.termSmoothing) + val ldaModel = lda.run(corpus) + + // TODO: print log likelihood + + } + + /** + * Load documents, tokenize them, create vocabulary, and prepare documents as word count vectors. + */ + private def preprocess( + sc: SparkContext, + paths: Seq[String], + vocabSize: Int): RDD[Document] = { + + val files: Seq[RDD[(String, String)]] = for (p <- paths) yield { + sc.wholeTextFiles(p) + } + + // Dataset of document texts + val textRDD: RDD[String] = + files.reduce(_ ++ _) // combine results from multiple paths + .map { case (path, text) => text } + + // Split text into words + val tokenized: RDD[(Long, IndexedSeq[String])] = textRDD.zipWithIndex().map { case (text, id) => + id -> SimpleTokenizer.getWords(text) + } + + // Counts words: RDD[(word, wordCount)] + val wordCounts: RDD[(String, Int)] = tokenized + .flatMap { case (_, tokens) => tokens.map(_ -> 1) } + .reduceByKey(_ + _) + + // Choose vocabulary: Map[word -> id] + val vocab: Map[String, Int] = wordCounts + .sortBy(_._2, ascending = false) + .take(vocabSize) + .map(_._1) + .zipWithIndex + .toMap + + val documents = tokenized.map { case (id, tokens) => + // Filter tokens by vocabulary, and create word count vector representation of document. + val wc = new scala.collection.mutable.HashMap[Int, Int]() + tokens.foreach { term => + if (vocab.contains(term)) { + val termIndex = vocab(term) + wc(termIndex) = wc.getOrElse(termIndex, 0) + 1 + } + } + val indices = wc.keys.toArray.sorted + val values = indices.map(i => wc(i).toDouble) + + val sb = new SparseVector(vocab.size, indices, values) + LDA.Document(sb, id) + } + + documents + } +} + +/** + * Simple Tokenizer. + * + * TODO: Formalize the interface, and make it a public class in mllib.feature + */ +private object SimpleTokenizer { + + // Matches sequences of Unicode letters + private val allWordRegex = "^(\\p{L}*)$".r + + // Ignore words shorter than this length. + private val minWordLength = 3 + + def getWords(text: String): IndexedSeq[String] = { + + val words = new ArrayBuffer[String]() + + // Use Java BreakIterator to tokenize text into words. + val wb = BreakIterator.getWordInstance + wb.setText(text) + + // current,end index start,end of each word + var current = wb.first() + var end = wb.next() + while (end != BreakIterator.DONE) { + // Convert to lowercase + val word: String = text.substring(current, end).toLowerCase + // Remove short words and strings that aren't only letters + word match { + case allWordRegex(w) if w.length >= minWordLength => + words += word + case _ => + } + + current = end + end = wb.next() + } + words + } + + // TODO: stopwords + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SimpleLatentDirichletAllocation.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SimpleLatentDirichletAllocation.scala deleted file mode 100644 index d3c30b292cb23..0000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SimpleLatentDirichletAllocation.scala +++ /dev/null @@ -1,692 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.examples.mllib - -import scopt.OptionParser -import org.apache.spark.{SparkContext, SparkConf} -import scala.collection.mutable.ArrayBuffer -import java.text.BreakIterator -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.topicmodel.LatentDirichletAllocation -import org.apache.spark.mllib.topicmodel.LatentDirichletAllocation.Document -import org.apache.spark.mllib.linalg.SparseVector -import org.apache.log4j.{Level, Logger} - -/** - * - * - * @author dlwh - */ -object SimpleLatentDirichletAllocation { - case class Params( - input: Seq[String] = Seq.empty, - numTopics: Int = 20, - wordSmoothing: Double = 0.1, - topicSmoothing: Double = 0.1, - vocabSize: Int = 10000, - minWordCount: Int = 10) extends AbstractParams[Params] - - def main(args: Array[String]) { - - val parser = new OptionParser[Params]("SimpleLatentDirichletAllocation") { - head("SimpleLatentDirichletAllocation: an example LDA app for plain text data.") - opt[Int]("numTopics") - .text("number of topics") - .action((x, c) => c.copy(numTopics = x)) - opt[Double]("wordSmoothing") - .text("amount of word smoothing to use") - .action((x, c) => c.copy(wordSmoothing = x)) - opt[Double]("topicSmoothing") - .text(s"amount of topic smoothing to use") - .action((x, c) => c.copy(topicSmoothing = x)) - opt[Int]("vocabSize") - .text(s"number of distinct word types to use, chosen by frequency (after stopword removal)") - .action((x, c) => c.copy(vocabSize = x)) - opt[Int]("minWordCount") - .text(s"minimum number of times a word must appear to be included in vocab.") - .action((x, c) => c.copy(minWordCount = x)) - arg[String]("...") - .text("input paths (directories) to plain text corpora") - .unbounded() - .required() - .action((x, c) => c.copy(input = c.input :+ x)) - } - - val params = parser.parse(args, Params()).getOrElse{parser.showUsageAsError; sys.exit(1)} - - val conf = new SparkConf().setAppName(s"LDA with $params") - val sc = new SparkContext(conf) - - Logger.getRootLogger.setLevel(Level.WARN) - - val corpus = preprocess(sc, params.input, params.vocabSize, params.minWordCount) - corpus.cache() - - val lda = new LatentDirichletAllocation(params.numTopics, - 100, - params.topicSmoothing, - params.wordSmoothing, - 0) - - for (state <- lda.iterations(corpus)) { - println(state.logLikelihood) - } - } - - def preprocess(sc: SparkContext, - paths: Seq[String], - vocabSize: Int, - minWordCount: Int): RDD[Document] = { - val files = for(p <- paths) yield { - sc.wholeTextFiles(p) - } - - val textRDD = files.reduce( _ ++ _ ) - - val tokenized = textRDD.zipWithIndex.map { case ((name, content), id) => - id -> SimpleTokenizer.getWords(content) - } - - val wordCounts: RDD[(String, Int)] = { - tokenized - .flatMap{ case (_, tokens) => tokens.map(_ -> 1)} - .reduceByKey(_ + _) - .filter(_._2 >= minWordCount) - } - - // word -> id - val vocab = ( - wordCounts - .sortBy(_._2, ascending = false) - .take(vocabSize) - .map(_._1) - .zipWithIndex - .toMap - ) - - val documents = tokenized.map { case (id, toks) => - val counts = breeze.linalg.Counter.countTraversable(toks) - - val indexedCounts = counts.iterator.collect { case (k, v) if vocab.contains(k) => - vocab(k) -> v.toDouble - } - - val sb = org.apache.spark.mllib.linalg.Vectors.sparse(vocab.size, indexedCounts.toSeq) - // I do not know why .sparse doesn't return a SparseVector. - LatentDirichletAllocation.Document(sb.asInstanceOf[SparseVector], id) - } - - documents - } - -} - -object SimpleTokenizer { - - val allWordRegex = "^(\\p{L}|\\p{M})*$".r - - def getWords(text: String): IndexedSeq[String] = { - val words = new ArrayBuffer[String]() - val wb = BreakIterator.getWordInstance - wb.setText(text) - - var current = wb.first() - var end = wb.next() - while (end != BreakIterator.DONE) { - val word: String = text.substring(current, end).toLowerCase - - // remove short words, things that aren't only letters, and stop words - if (allWordRegex.unapplySeq(word).nonEmpty && !stopWords(word) && word.length >= 3) { - words += word - } - - current = end - end = wb.next() - } - words - } - - val stopWords = - """ - |a - |able - |about - |above - |abst - |accordance - |according - |accordingly - |across - |act - |actually - |added - |adj - |affected - |affecting - |affects - |after - |afterwards - |again - |against - |ah - |all - |almost - |alone - |along - |already - |also - |although - |always - |am - |among - |amongst - |an - |and - |announce - |another - |any - |anybody - |anyhow - |anymore - |anyone - |anything - |anyway - |anyways - |anywhere - |apparently - |approximately - |are - |aren - |arent - |arise - |around - |as - |aside - |ask - |asking - |at - |auth - |available - |away - |awfully - |b - |back - |be - |became - |because - |become - |becomes - |becoming - |been - |before - |beforehand - |begin - |beginning - |beginnings - |begins - |behind - |being - |believe - |below - |beside - |besides - |between - |beyond - |biol - |both - |brief - |briefly - |but - |by - |c - |ca - |came - |can - |cannot - |can't - |cause - |causes - |certain - |certainly - |co - |com - |come - |comes - |contain - |containing - |contains - |could - |couldnt - |d - |date - |did - |didn't - |different - |do - |does - |doesn't - |doing - |done - |don't - |down - |downwards - |due - |during - |e - |each - |ed - |edu - |effect - |eg - |eight - |eighty - |either - |else - |elsewhere - |end - |ending - |enough - |especially - |et - |et-al - |etc - |even - |ever - |every - |everybody - |everyone - |everything - |everywhere - |ex - |except - |f - |far - |few - |ff - |fifth - |first - |five - |fix - |followed - |following - |follows - |for - |former - |formerly - |forth - |found - |four - |from - |further - |furthermore - |g - |gave - |get - |gets - |getting - |give - |given - |gives - |giving - |go - |goes - |gone - |got - |gotten - |h - |had - |happens - |hardly - |has - |hasn't - |have - |haven't - |having - |he - |hed - |hence - |her - |here - |hereafter - |hereby - |herein - |heres - |hereupon - |hers - |herself - |hes - |hi - |hid - |him - |himself - |his - |hither - |home - |how - |howbeit - |however - |hundred - |i - |id - |ie - |if - |i'll - |im - |immediate - |immediately - |importance - |important - |in - |inc - |indeed - |index - |information - |instead - |into - |invention - |inward - |is - |isn't - |it - |itd - |it'll - |its - |itself - |i've - |j - |just - |k - |keep keeps - |kept - |kg - |km - |know - |known - |knows - |l - |largely - |last - |lately - |later - |latter - |latterly - |least - |less - |lest - |let - |lets - |like - |liked - |likely - |line - |little - |'ll - |look - |looking - |looks - |ltd - |m - |made - |mainly - |make - |makes - |many - |may - |maybe - |me - |mean - |means - |meantime - |meanwhile - |merely - |mg - |might - |million - |miss - |ml - |more - |moreover - |most - |mostly - |mr - |mrs - |much - |mug - |must - |my - |myself - |n - |na - |name - |namely - |nay - |nd - |near - |nearly - |necessarily - |necessary - |need - |needs - |neither - |never - |nevertheless - |new - |next - |nine - |ninety - |no - |nobody - |non - |none - |nonetheless - |noone - |nor - |normally - |nos - |not - |noted - |nothing - |now - |nowhere - |o - |obtain - |obtained - |obviously - |of - |off - |often - |oh - |ok - |okay - |old - |omitted - |on - |once - |one - |ones - |only - |onto - |or - |ord - |other - |others - |otherwise - |ought - |our - |ours - |ourselves - |out - |outside - |over - |overall - |owing - |own - |p - |page - |pages - |part - |particular - |particularly - |past - |per - |perhaps - |placed - |please - |plus - |poorly - |possible - |possibly - |potentially - |pp - |predominantly - |present - |previously - |primarily - |probably - |promptly - |proud - |provides - |put - |q - |que - |quickly - |quite - |qv - |r - |ran - |rather - |rd - |re - |readily - |really - |recent - |recently - |ref - |refs - |regarding - |regardless - |regards - |related - |relatively - |research - |respectively - |resulted - |resulting - |results - |right - |run - |s - |said - |same - |saw - |say - |saying - |says - |sec - |section - |see - |seeing - |seem - |seemed - |seeming - |seems - |seen - |self - |selves - |sent - |seven - |several - |shall - |she - |shed - |she'll - |shes - |should - |shouldn't - |show - |showed - |shown - |showns - |shows - |significant - |significantly - |similar - |similarly - |since - |six - |slightly - |so - |some - |somebody - |somehow - |someone - |somethan - |something - |sometime - |sometimes - |somewhat - |somewhere - |soon - |sorry - |specifically - |specified - |specify - |specifying - |still - |stop - |strongly - |sub - |substantially - |successfully - |such - |sufficiently - |suggest - |sup - |sure - |t's take taken tell tends - |th than thank thanks thanx - |that that's thats the their - |theirs them themselves then thence - |there there's thereafter thereby therefore - |therein theres thereupon these they - |they'd they'll they're they've think - |third this thorough thoroughly those - |though three through throughout thru - |thus to together too took - |toward towards tried tries truly - |try trying twice two un - |under unfortunately unless unlikely until - |unto up upon us use - |used useful uses using usually - |value various very via viz - |vs want wants was wasn't - |way we we'd we'll we're - |we've welcome well went were - |weren't what what's whatever when - |whence whenever where where's whereafter - |whereas whereby wherein whereupon wherever - |whether which while whither who - |who's whoever whole whom whose - |why will willing wish with - |within without won't wonder would - |wouldn't yes yet you you'd - |you'll you're you've your yours - |yourself yourselves zero - """.stripMargin.split("\\s+").toSet - -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala new file mode 100644 index 0000000000000..433d102206396 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum => brzSum, normalize} + +import org.apache.spark.graphx._ +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors, Matrix, Matrices} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.{BoundedPriorityQueue, Utils} + + +/** + * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. + * + * Terminology: + * - "word" = "term": an element of the vocabulary + * - "token": instance of a term appearing in a document + * - "topic": multinomial distribution over words representing some concept + * + * Currently, the underlying implementation uses Expectation-Maximization (EM), implemented + * according to the Asuncion et al. (2009) paper referenced below. + * + * References: + * - Original LDA paper (journal version): + * Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. + * - Paper which clearly explains several algorithms, including EM: + * Asuncion, Welling, Smyth, and Teh. + * "On Smoothing and Inference for Topic Models." UAI, 2009. + */ +class LDA private ( + private var k: Int, + private var maxIterations: Int, + private var topicSmoothing: Double, + private var termSmoothing: Double, + private var seed: Long) { + + import LDA._ + + def this() = this(k = 10, maxIterations = 10, topicSmoothing = -1, termSmoothing = 0.1, + seed = Utils.random.nextLong()) + + /** + * Number of topics to infer. I.e., the number of soft cluster centers. + * (default = 10) + */ + def getK: Int = k + + def setK(k: Int): this.type = { + this.k = k + this + } + + // TODO: UDPATE alpha, eta to be > 1 automatically for MAP + + /** + * Topic smoothing parameter (commonly named "alpha"). + * + * This is the parameter to the Dirichlet prior placed on the per-document topic distributions + * ("theta"). We use a symmetric Dirichlet prior. + * + * This value should be > 0.0, where larger values mean more smoothing (more regularization). + * If set <= 0, then topicSmoothing is set to equal 50 / k (where k is the number of topics). + * (default = 50 / k) + */ + def getTopicSmoothing: Double = topicSmoothing + + def setTopicSmoothing(alpha: Double): this.type = { + topicSmoothing = alpha + this + } + + /** + * Term smoothing parameter (commonly named "eta"). + * + * This is the parameter to the Dirichlet prior placed on the per-topic word distributions + * (which are called "beta" in the original LDA paper by Blei et al., but are called "phi" in many + * later papers such as Asuncion et al., 2009.) + * + * This value should be > 0.0. + * (default = 0.1) + */ + def getTermSmoothing: Double = termSmoothing + + def setTermSmoothing(eta: Double): this.type = { + termSmoothing = eta + this + } + + /** + * Maximum number of iterations for learning. + * (default = 10) + */ + def getMaxIterations: Int = maxIterations + + def setMaxIterations(maxIterations: Int): this.type = { + this.maxIterations = maxIterations + this + } + + /** Random seed */ + def getSeed: Long = seed + + def setSeed(seed: Long): this.type = { + this.seed = seed + this + } + + /** + * Learn an LDA model using the given dataset. + * + * @param docs RDD of documents, where each document is represented as a vector of term counts. + * Document IDs must be >= 0. + * @return Inferred LDA model + */ + def run(docs: RDD[Document]): DistributedLDAModel = { + var state = + LDA.initialState(docs, k, termSmoothing, topicSmoothing, seed) + var iter = 0 + while (iter < maxIterations) { + state = state.next() + iter += 1 + } + new DistributedLDAModel(state) + } +} + +/** + * Latent Dirichlet Allocation (LDA) model + */ +abstract class LDAModel private[clustering] { + + import LDA._ + + /** Number of topics */ + def k: Int + + /** Vocabulary size (number of terms or terms in the vocabulary) */ + def vocabSize: Int + + /** + * Inferred topics, where each topic is represented by a distribution over terms. + * This is a matrix of size vocabSize x k, where each column is a topic. + * No guarantees are given about the ordering of the topics. + */ + def topicsMatrix: Matrix + + /* TODO + * Computes the estimated log likelihood of data (a set of documents), given the model. + * + * Note that this is an estimate since it requires inference (and exact inference is intractable + * for the LDA model). + * + * @param documents A set of documents, where each is represented as a vector of term counts. + * This must use the same vocabulary (ordering of term counts) as in training. + * Document IDs must be >= 0. + * @return Estimated log likelihood of the data under this model + */ + // TODO + //def logLikelihood(documents: RDD[Document]): Double + + /* TODO + * Compute the estimated topic distribution for each document. + * This is often called “theta” in the literature. + * + * @param documents A set of documents, where each is represented as a vector of term counts. + * This must use the same vocabulary (ordering of term counts) as in training. + * Document IDs must be >= 0. + * @return Estimated topic distribution for each document. + * The returned RDD may be zipped with the given RDD, where each returned vector + * is a multinomial distribution over topics. + */ + // def topicDistributions(documents: RDD[Document]): RDD[(Long, Vector)] +} + +/** + * Local LDA model. + * This model stores only the inferred topics. + * It may be used for computing topics for new documents, but it may give less accurate answers + * than the [[DistributedLDAModel]]. + * + * @param topics Inferred topics (vocabSize x k matrix). + */ +class LocalLDAModel private[clustering] ( + private val topics: Matrix) extends LDAModel with Serializable { + + import LDA._ + + override def k: Int = topics.numCols + + override def vocabSize: Int = topics.numRows + + override def topicsMatrix: Matrix = topics + + // TODO + //override def logLikelihood(documents: RDD[Document]): Double = ??? + + // TODO: + // override def topicDistributions(documents: RDD[Document]): RDD[(Long, Vector)] = ??? + +} + +/** + * Distributed LDA model. + * This model stores the inferred topics, the full training dataset, and the topic distributions. + * When computing topics for new documents, it may give more accurate answers + * than the [[LocalLDAModel]]. + */ +class DistributedLDAModel private[clustering] ( + private val state: LDA.LearningState) extends LDAModel { + + import LDA._ + + override def k: Int = state.k + + override def vocabSize: Int = state.vocabSize + + /** + * Inferred topics, where each topic is represented by a distribution over terms. + * This is a matrix of size vocabSize x k, where each column is a topic. + * No guarantees are given about the ordering of the topics. + * + * WARNING: This matrix is collected from an RDD. Beware memory usage when vocabSize, k are large. + */ + override lazy val topicsMatrix: Matrix = { + // Collect row-major topics + val termTopicCounts: Array[(Int, TopicCounts)] = + state.graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) => + (index2term(termIndex), cnts) + }.collect() + // Convert to Matrix + val brzTopics = BDM.zeros[Double](vocabSize, k) + termTopicCounts.foreach { case (term, cnts) => + var j = 0 + while (j < k) { + brzTopics(term, j) = cnts(j) + j += 1 + } + } + Matrices.fromBreeze(brzTopics) + } + + // TODO + //override def logLikelihood(documents: RDD[Document]): Double = ??? + + /** + * For each document in the training set, return the distribution over topics for that document + * (i.e., "theta_doc"). + * + * @return RDD of (document ID, topic distribution) pairs + */ + def topicDistributions: RDD[(Long, Vector)] = { + state.graph.vertices.filter(_._1 >= 0).map { case (docID, topicCounts) => + (docID.toLong, Vectors.fromBreeze(topicCounts)) + } + } + + // TODO: + // override def topicDistributions(documents: RDD[Document]): RDD[(Long, Vector)] = ??? + + /* + // TODO: Do this properly + lazy val logLikelihood = { + graph.triplets.aggregate(0.0)({ (acc, triple) => + val scores = triple.srcAttr :* triple.dstAttr + val logScores = breeze.numerics.log(scores) + scores /= brzSum(scores) + brzSum(scores :*= logScores) * triple.attr + }, _ + _) + } + */ + + /** + * + * @param maxTermsPerTopic + * @return Array over topics, where each element is a set of top terms represented + * as (term weight in topic, term index). + */ + def getTopics(maxTermsPerTopic: Int): Array[Array[(Double, Int)]] = { + val nt = maxTermsPerTopic + state.graph.vertices.filter(_._1 < 0) // select term vertices + .mapPartitions { items => + // Create queue of + val queues = Array.fill(nt)(new BoundedPriorityQueue[(Double, Int)](maxTermsPerTopic)) + for ((termId, factor) <- items) { + var t = 0 + while (t < nt) { + queues(t) += (factor(t) -> termId.toInt) + t += 1 + } + } + Iterator(queues) + }.reduce { (q1, q2) => + q1.zip(q2).foreach { case (a,b) => a ++= b } + q1 + }.map ( q => q.toArray ) + } + +} + +object LDA { + + /* + DEVELOPERS NOTE: + + This implementation uses GraphX, where the graph is bipartite with 2 types of vertices: + - Document vertices + - indexed {0, 1, ..., numDocuments-1} + - Store vectors of length k (# topics). + - Term vertices + - indexed {-1, -2, ..., -vocabSize} + - Store vectors of length k (# topics). + - Edges correspond to terms appearing in documents. + - Edges are directed Document -> Term. + - Edges are partitioned by documents. + + Info on EM implementation. + - We follow Section 2.2 from Asuncion et al., 2009. + - In this implementation, there is one edge for every unique term appearing in a document, + i.e., for every unique (document, term) pair. + - Notation: + - N_{wkj} = count of tokens of term w currently assigned to topic k in document j + - N_{*} where * is missing a subscript w/k/j is the count summed over missing subscript(s) + - gamma_{wjk} = P(z_i = k | x_i = w, d_i = j), + the probability of term x_i in document d_i having topic z_i. + - Data graph + - Document vertices store N_{kj} + - Term vertices store N_{wk} + - Edges store N_{wj}. + - Global data N_k + - Algorithm + - Initial state: + - Document and term vertices store random counts N_{wk}, N_{kj}. + - E-step: For each (document,term) pair i, compute P(z_i | x_i, d_i). + - Aggregate N_k from term vertices. + - Compute gamma_{wjk} for each possible topic k, from each triplet. + using inputs N_{wk}, N_{kj}, N_k. + - M-step: Compute sufficient statistics for hidden parameters phi and theta + (counts N_{wk}, N_{kj}, N_k). + - Document update: + - N_{kj} <- sum_w N_{wj} gamma_{wjk} + - N_j <- sum_k N_{kj} (only needed to output predictions) + - Term update: + - N_{wk} <- sum_j N_{wj} gamma_{wjk} + - N_k <- sum_w N_{wk} + */ + + /** + * Document + * + * @param counts Vector of term (word) counts in the document. + * This is the "bag of words" representation. + * @param id Unique ID associated with this document. + * Documents should be indexed {0, 1, ..., numDocuments-1}. + * + * TODO: Can we remove the id and still be able to zip predicted topics with the Documents? + */ + case class Document(counts: SparseVector, id: VertexId) + + /** + * Vector over topics (length k) of token counts. + * The meaning of these counts can vary, and it may or may not be normalized to be a distribution. + */ + private[clustering] type TopicCounts = BDV[Double] + + private[clustering] type TokenCount = Double + + /** Term vertex IDs are {-1, -2, ..., -vocabSize} */ + private[clustering] def term2index(term: Int): Long = -(1 + term.toLong) + + private[clustering] def index2term(termIndex: Long): Int = -(1 + termIndex).toInt + + private[clustering] def isTermVertex(v: Tuple2[VertexId, _]): Boolean = v._1 < 0 + + private[clustering] def isDocVertex(v: Tuple2[VertexId, _]): Boolean = v._1 >= 0 + + /** + * + * Has all the information needed to run collapsed Gibbs sampling. + * + * @param graph + * @param k + * @param vocabSize + * @param topicSmoothing + * @param termSmoothing + */ + private[clustering] case class LearningState( + graph: Graph[TopicCounts, TokenCount], + k: Int, + vocabSize: Int, + topicSmoothing: Double, + termSmoothing: Double) { + + // TODO: Checkpoint periodically + def next() = copy(graph = step(graph)) + + private def step(graph: Graph[TopicCounts, TokenCount]): Graph[TopicCounts, TokenCount] = { + val eta = termSmoothing + val W = vocabSize + val alpha = topicSmoothing + + // Collect N_k from term vertices. + val N_k = collectTopicTotals() + val sendMsg: EdgeContext[TopicCounts, TokenCount, TopicCounts] => Unit = (edgeContext) => { + // Compute N_{wj} gamma_{wjk} + val N_wj = edgeContext.attr + // E-STEP: Compute gamma_{wjk} (smoothed topic distributions), scaled by token count N_{wj}. + val scaledTopicDistribution: TopicCounts = + computePTopic(edgeContext, N_k, W, eta, alpha) * N_wj + edgeContext.sendToDst(scaledTopicDistribution) + edgeContext.sendToSrc(scaledTopicDistribution) + } + // M-STEP: Aggregation computes new N_{kj}, N_{wk} counts. + val docTopicDistributions: VertexRDD[TopicCounts] = + graph.aggregateMessages[TopicCounts](sendMsg, _ + _) + // Update the vertex descriptors with the new counts. + graph.outerJoinVertices(docTopicDistributions){ (vid, oldDist, newDist) => newDist.get } + } + + /* + /** + * Update document topic distributions, i.e., theta_doc = p(z|doc) for each doc. + * @return Graph with updated document vertex descriptors + */ + private def updateDocs(graph: Graph[TopicCounts, TokenCount]): Graph[TopicCounts, TokenCount] = { + val alpha = topicSmoothing + // Compute smoothed topic distributions for each document (size: numDocuments x k). + val docTopicTotals = updateExpectedCounts(_.srcId) + val newTotals = docTopicTotals.mapValues(total => normalize(total += alpha, 1)) + println(s"E-STEP newTotals.take(1): ${newTotals.take(1)(0)._2}") + // Update document vertices with new topic distributions. + graph.outerJoinVertices(newTotals){ (vid, old, newOpt) => newOpt.getOrElse(old) } + } + + /** + * Update topics, i.e., beta_z = p(w|z) for each topic z. + * (Conceptually, these are the topics. However, they are stored transposed, where each + * term vertex stores the distribution value for each topic.) + * @return Graph with updated term vertex descriptors + */ + private def updateTerms(graph: Graph[TopicCounts, TokenCount]): Graph[TopicCounts, TokenCount] = { + // Compute new topics. + val termTotals = updateExpectedCounts(_.dstId) + // Collect the aggregate counts over terms (summing all topics). + val eta: Double = termSmoothing + val topicTotals = termTotals.map(_._2).fold(BDV.zeros[Double](k))(_ + _) + topicTotals += (eta * vocabSize) + println(s"M-STEP topicTotals: $topicTotals") + // Update term vertices with new topic weights. + graph.outerJoinVertices(termTotals)( (vid, old, newOpt) => + newOpt + .map { counts => (counts += eta) :/= topicTotals } // smooth individual counts; normalize + .getOrElse(old) + ) + } + + private def updateExpectedCounts(sendToWhere: (EdgeTriplet[_, _]) => VertexId): VertexRDD[TopicCounts] = { + // Collect N_k from term vertices. + val N_k = collectTopicTotals() + val eta = termSmoothing + val W = vocabSize + val alpha = topicSmoothing + graph.mapReduceTriplets[TopicCounts]({ + trip => Iterator(sendToWhere(trip) -> computePTopic(trip, N_k, W, eta, alpha)) + }, _ += _) + } + */ + + private def collectTopicTotals(): TopicCounts = { + val numTopics = k + graph.vertices.filter(isTermVertex).map(_._2).fold(BDV.zeros[Double](numTopics))(_ + _) + } + + } + + private def computePTopic(edgeContext: EdgeContext[TopicCounts, TokenCount, TopicCounts], + N_k: TopicCounts, vocabSize: Int, eta: Double, alpha: Double): TopicCounts = { + val smoothed_N_wk: TopicCounts = edgeContext.dstAttr + (eta - 1.0) + val smoothed_N_kj: TopicCounts = edgeContext.srcAttr + (alpha - 1.0) + val smoothed_N_k: TopicCounts = N_k + (vocabSize * (eta - 1.0)) + // proportional to p(w|z) * p(z|d) / p(z) + val unnormalizedGamma = smoothed_N_wk :* smoothed_N_kj :/ smoothed_N_k + // normalize + unnormalizedGamma /= brzSum(unnormalizedGamma) + } + + /* + private def computePTopic(edge: EdgeTriplet[TopicCounts, TokenCount], N_k: TopicCounts, vocabSize: Int, eta: Double, alpha: Double): TopicCounts = { + val smoothed_N_wk: TopicCounts = edge.dstAttr + (eta - 1.0) + val smoothed_N_kj: TopicCounts = edge.srcAttr + (alpha - 1.0) + val smoothed_N_k: TopicCounts = N_k + (vocabSize * (eta - 1.0)) + // proportional to p(w|z) * p(z|d) / p(z) + val unnormalizedGamma = smoothed_N_wk :* smoothed_N_kj :/ smoothed_N_k + // normalize + unnormalizedGamma /= brzSum(unnormalizedGamma) + } + */ + + /** + * Compute bipartite term/doc graph. + * doc ids are shifted by vocabSize to maintain uniqueness + */ + private def initialState( + docs: RDD[Document], + k: Int, + topicSmoothing: Double, + termSmoothing: Double, + randomSeed: Long): LearningState = { + // For each document, create an edge (Document -> Term) for each unique term in the document. + val edges: RDD[Edge[TokenCount]] = docs.mapPartitionsWithIndex { case (partIndex, partDocs) => + partDocs.flatMap { doc: Document => + // Add edges for terms with non-zero counts. + doc.counts.toBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) => + Edge(doc.id, term2index(term), cnt) + } + } + } + + val vocabSize = docs.take(1).head.counts.size + + // Create vertices. + // Initially, we use random soft assignments of tokens to topics (random gamma). + val edgesWithGamma: RDD[(Edge[TokenCount], TopicCounts)] = + edges.mapPartitionsWithIndex { case (partIndex, partEdges) => + val random = new Random(partIndex + randomSeed) + partEdges.map { edge => + // Create a random gamma_{wjk} + (edge, normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)) + } + } + def createVertices(sendToWhere: Edge[TokenCount] => VertexId): RDD[(VertexId, TopicCounts)] = { + val verticesTMP: RDD[(VertexId, TopicCounts)] = + edgesWithGamma.map { case (edge, gamma: TopicCounts) => + val N_wj = edge.attr + (sendToWhere(edge), gamma * N_wj) + } + verticesTMP.foldByKey(BDV.zeros[Double](k))(_ + _) + } + val docVertices = createVertices(_.srcId) + val termVertices = createVertices(_.dstId) + + // Partition such that edges are grouped by document + val graph = Graph(docVertices ++ termVertices, edges) + .partitionBy(PartitionStrategy.EdgePartition1D) + + LearningState(graph, k, vocabSize, topicSmoothing, termSmoothing) + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/topicmodel/LatentDirichletAllocation.scala b/mllib/src/main/scala/org/apache/spark/mllib/topicmodel/LatentDirichletAllocation.scala deleted file mode 100644 index 24bb398904494..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/topicmodel/LatentDirichletAllocation.scala +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.mllib.topicmodel - -import org.apache.spark.rdd.RDD -import java.util.Random -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.util.BoundedPriorityQueue -import org.apache.hadoop.fs.shell.Count - - - - -import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, sum => brzSum, normalize} -import org.apache.spark.mllib.linalg.{DenseVector => SDV, SparseVector => SSV, Vector => SV} - -import org.apache.spark.graphx._ - - -/** - * - * - */ -class LatentDirichletAllocation(var numTopics: Int, - var maxIterations: Int, - var topicSmoothing: Double, - var wordSmoothing: Double, - randomSeed: Long) { - def this(numTopics: Int, maxIterations: Int) = { - this(numTopics, maxIterations, 0.1, 0.1, System.currentTimeMillis()) - } - - def this(numTopics: Int) = this(numTopics, 100) - - - def setNumTopics(k: Int):this.type = {numTopics = k; this} - - def setTopicSmoothing(alpha: Double):this.type = {topicSmoothing = alpha; this} - - def setWordSmoothing(beta: Double):this.type = {wordSmoothing = beta; this} - - import LatentDirichletAllocation._ - - def iterations(docs: RDD[LatentDirichletAllocation.Document]):Iterator[State] = { - val state = initialState(docs, numTopics, wordSmoothing, topicSmoothing, randomSeed) - Iterator.iterate(state)(_.next()).drop(2).take(maxIterations) - } - - def run(docs: RDD[LatentDirichletAllocation.Document]):State = { - import breeze.util.Implicits._ - iterations(docs).last - } -} - -object LatentDirichletAllocation { - case class Document(counts: SSV, id: VertexId) - - private type TopicCounts = BDV[Double] - // Strictly should be an integer, but the algorithm works with Doubles - private type WordCount = Double - - trait State { - def logLikelihood: Double - - def topWords(k: Int):Array[Array[(Double, Int)]] - } - - /** - * - * Has all the information needed to run EM. - * - * The Graph has two kinds of nodes: words and documents. The attr for a word - * is p(w|z). The attr for a document is p(z|doc) - * - * @param graph - * @param numTopics - * @param numWords - * @param topicSmoothing - * @param wordSmoothing - * @param numEStepIters - */ - private case class LearningState(graph: Graph[TopicCounts, Double], - numTopics: Int, - numWords: Int, - topicSmoothing: Double, - wordSmoothing: Double, - numEStepIters: Int = 10) extends State { - - def next() = copy(graph = mStep(eStep(graph))) - - // update p(z|doc) for each doc - private def eStep(graph: Graph[TopicCounts, Double]): Graph[TopicCounts, Double] = { - (0 until numEStepIters).foldLeft(graph) { (graph, _) => - // TODO: we should be able to detect which documents have converged and - // filter them so we don't bother with them for the rest of the estep - val docTopicTotals = updateExpectedCounts(graph, _.srcId) - val alpha = topicSmoothing - val newTotals = docTopicTotals.mapValues(total => normalize(total += alpha, 1)) - graph.outerJoinVertices(newTotals){ (vid, old, newOpt) => newOpt.getOrElse(old)} - } - } - - // update p(w|z) for each word - private def mStep(graph: Graph[TopicCounts, Double]): Graph[TopicCounts, Double] = { - val wordTotals = updateExpectedCounts(graph, _.dstId) - val beta: Double = wordSmoothing - val topicTotals = wordTotals.map(_._2).fold(BDV.zeros[Double](numTopics))(_ + _) - // smooth the totals - topicTotals += (beta * numWords) - - graph.outerJoinVertices(wordTotals)( (vid, old, newOpt) => - newOpt - .map ( counts => (counts += beta) :/= topicTotals) // smooth individual counts; normalize - .getOrElse(old) // keep old p(z|doc) vectors - ) - } - - lazy val logLikelihood = { - graph.triplets.aggregate(0.0)({ (acc, triple) => - val scores = triple.srcAttr :* triple.dstAttr - val logScores = breeze.numerics.log(scores) - scores /= brzSum(scores) - brzSum(scores :*= logScores) * triple.attr - }, _ + _) - } - - // cribbed from jegonzal's implementation - def topWords(k: Int): Array[Array[(Double, Int)]] = { - val nt = numTopics - val nw = numWords - graph.vertices.filter { - case (vid, c) => vid < nw - }.mapPartitions { items => - val queues = Array.fill(nt)(new BoundedPriorityQueue[(Double, Int)](k)) - for ((wordId, factor) <- items) { - var t = 0 - while (t < nt) { - queues(t) += (factor(t) -> wordId.toInt) - t += 1 - } - } - Iterator(queues) - }.reduce { (q1, q2) => - q1.zip(q2).foreach { case (a,b) => a ++= b } - q1 - }.map ( q => q.toArray ) - } - - - } - - - private def updateExpectedCounts(wordCountGraph: Graph[TopicCounts, Double], - sendToWhere: (EdgeTriplet[_, _]) => VertexId) = { - wordCountGraph.mapReduceTriplets[TopicCounts]({ - trip => Iterator(sendToWhere(trip) -> computePTopic(trip)) - }, _ += _) - } - - /** - * Compute bipartite term/doc graph. doc ids are shifted by numWords to maintain uniqueness - * @param docs - * @param numTopics - * @param randomSeed - * @return - */ - private def initialState(docs: RDD[LatentDirichletAllocation.Document], - numTopics: Int, - topicSmoothing: Double, - wordSmoothing: Double, - randomSeed: Long): LearningState = { - val edges:RDD[Edge[WordCount]] = for { - d <- docs - (word, count) <- d.counts.toBreeze.activeIterator - if count != 0.0 - } yield { - Edge(d.id, word, count) - } - - val numWords = docs.take(1).head.counts.size - - val initialDocTopics = docs.map { doc => - val random: Random = new Random(doc.id + randomSeed) - (numWords + doc.id) -> BDV.fill(numTopics)(random.nextDouble()) - } - val initialWordCounts = docs.sparkContext.parallelize(0 until numWords).map { wid => - val random: Random = new Random(randomSeed + wid) - wid.toLong -> BDV.fill(numTopics)(random.nextDouble()) - } - - // partition such that edges are grouped by document - val graph = ( - Graph(initialDocTopics ++ initialWordCounts, edges) - .partitionBy(PartitionStrategy.EdgePartition1D) - ) - - LearningState(graph, numTopics, numWords, topicSmoothing, wordSmoothing) - - - } - - private def computePTopic(edge: EdgeTriplet[TopicCounts, WordCount]):TopicCounts = { - // \propto p(w|z) * p(z|d) - val scores = (edge.srcAttr :* edge.dstAttr) - // normalize and scale by number of times word occurs - scores *= (edge.attr / brzSum(scores)) - } -} -