Skip to content

Commit

Permalink
change OnlineLDA to class
Browse files Browse the repository at this point in the history
  • Loading branch information
hhbyyh committed Apr 2, 2015
1 parent 97b9e1a commit d19ef55
Showing 1 changed file with 72 additions and 31 deletions.
103 changes: 72 additions & 31 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/OnlineLDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD


/**
* :: Experimental ::
* Latent Dirichlet Allocation (LDA), a topic model designed for text documents.
Expand All @@ -37,7 +36,58 @@ import org.apache.spark.rdd.RDD
* Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
*/
@Experimental
object OnlineLDA{
class OnlineLDA(
private var k: Int,
private var numIterations: Int,
private var miniBatchFraction: Double,
private var tau_0: Double,
private var kappa: Double) {

def this() = this(k = 10, numIterations = 100, miniBatchFraction = 0.01,
tau_0 = 1024, kappa = 0.5)

/**
* Number of topics to infer. I.e., the number of soft cluster centers.
* (default = 10)
*/
def setK(k: Int): this.type = {
require(k > 0, s"OnlineLDA k (number of clusters) must be > 0, but was set to $k")
this.k = k
this
}

/**
* Set the number of iterations for OnlineLDA. Default 100.
*/
def setNumIterations(iters: Int): this.type = {
this.numIterations = iters
this
}

/**
* Set fraction of data to be used for each iteration. Default 0.01.
*/
def setMiniBatchFraction(fraction: Double): this.type = {
this.miniBatchFraction = fraction
this
}

/**
* A (positive) learning parameter that downweights early iterations. Default 1024.
*/
def setTau_0(t: Double): this.type = {
this.tau_0 = t
this
}

/**
* Learning rate: exponential decay rate. Default 0.5.
*/
def setKappa(kappa: Double): this.type = {
this.kappa = kappa
this
}


/**
* Learns an LDA model from the given data set, using online variational Bayes (VB) algorithm.
Expand All @@ -49,33 +99,18 @@ object OnlineLDA{
* The term count vectors are "bags of words" with a fixed-size vocabulary
* (where the vocabulary size is the length of the vector).
* Document IDs must be unique and >= 0.
* @param k Number of topics to infer.
* @param batchNumber Number of batches to split input corpus. For each batch, recommendation
* size is [4, 16384]. -1 for automatic batchNumber.
* @return Inferred LDA model
*/
def run(documents: RDD[(Long, Vector)], k: Int, batchNumber: Int = -1): LDAModel = {
require(batchNumber > 0 || batchNumber == -1,
s"batchNumber must be greater or -1, but was set to $batchNumber")
require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k")

def run(documents: RDD[(Long, Vector)]): LDAModel = {
val vocabSize = documents.first._2.size
val D = documents.count().toInt // total documents count
val batchSize =
if (batchNumber == -1) { // auto mode
if (D / 100 > 16384) 16384
else if (D / 100 < 4) 4
else D / 100
}
else {
D / batchNumber
}

val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize)
val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt
for(i <- 1 to actualBatchNumber){
val batch = documents.sample(true, batchSize.toDouble / D)
onlineLDA.submitMiniBatch(batch)
val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize, tau_0, kappa)

val arr = Array.fill(math.ceil(1.0 / miniBatchFraction).toInt)(miniBatchFraction)
for(i <- 0 until numIterations){
val splits = documents.randomSplit(arr)
val index = i % splits.size
onlineLDA.submitMiniBatch(splits(index))
}
onlineLDA.getTopicDistribution()
}
Expand All @@ -93,10 +128,12 @@ object OnlineLDA{
* Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
*/
@Experimental
class OnlineLDAOptimizer (
private[clustering] class OnlineLDAOptimizer (
private var k: Int,
private var D: Int,
private val vocabSize:Int) extends Serializable {
private val vocabSize: Int,
private val tau_0: Double,
private val kappa: Double) extends Serializable {

// Initialize the variational distribution q(beta|lambda)
private var lambda = getGammaMatrix(k, vocabSize) // K * V
Expand All @@ -115,7 +152,11 @@ class OnlineLDAOptimizer (
* Document IDs must be unique and >= 0.
* @return Inferred LDA model
*/
def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = {
private[clustering] def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = {
if(documents.isEmpty()){
return
}

var stat = BDM.zeros[Double](k, vocabSize)
stat = documents.treeAggregate(stat)(gradient, _ += _)
update(stat, i, documents.count().toInt)
Expand All @@ -125,13 +166,13 @@ class OnlineLDAOptimizer (
/**
* get the topic-term distribution
*/
def getTopicDistribution(): LDAModel ={
private[clustering] def getTopicDistribution(): LDAModel ={
new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
}

private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
// weight of the mini-batch. 1024 helps down weights early iterations
val weight = math.pow(1024 + iter, -0.5)
// weight of the mini-batch.
val weight = math.pow(tau_0 + iter, -kappa)

// This step finishes computing the sufficient statistics for the M step
val stat = raw :* expElogbeta
Expand Down

0 comments on commit d19ef55

Please sign in to comment.