Skip to content

Commit

Permalink
Merge pull request #1 from piskvorky/ziky90-develop
Browse files Browse the repository at this point in the history
fix bugs in state reset and state init
  • Loading branch information
ziky90 committed Sep 10, 2014
2 parents 0aa9b79 + 3aa7c24 commit 83dda8d
Showing 1 changed file with 38 additions and 37 deletions.
75 changes: 38 additions & 37 deletions gensim/models/ldamodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"""


import os
import logging
import itertools

Expand Down Expand Up @@ -184,7 +184,7 @@ class LdaModel(interfaces.TransformationABC):
"""
def __init__(self, corpus=None, num_topics=100, id2word=None, workers=cpu_count(),
chunksize=2000, passes=1, update_every=1, alpha='symmetric', eta=None, decay=0.5,
eval_every=10, iterations=50, gamma_threshold=0.001):
eval_every=16, iterations=50, gamma_threshold=0.001):
"""
If given, start training from the iterable `corpus` straight away. If not given,
the model is left untrained (presumably because you want to call `update()` manually).
Expand Down Expand Up @@ -466,7 +466,7 @@ def update(self, corpus):
"""

# rho is the "speed" of updating; TODO try other fncs
rho = lambda: pow(1.0 + self.num_updates, -decay)
rho = lambda: pow(1.0 + self.num_updates, -self.decay)

try:
lencorpus = len(corpus)
Expand All @@ -479,88 +479,89 @@ def update(self, corpus):

self.state.numdocs += lencorpus

total_chunks = int(lencorpus / self.chunksize) + 1

if self.update_every:
updatetype = "online"
updateafter = min(lencorpus, self.update_every * self.workers * self.chunksize)
updateafter = min(lencorpus, self.update_every * self.chunksize)
else:
updatetype = "batch"
updateafter = lencorpus
evalafter = min(lencorpus, (self.eval_every or 0) * self.workers * self.chunksize)
evalafter = min(lencorpus, (self.eval_every or 0) * self.chunksize)

updates_per_pass = max(1, lencorpus / updateafter)
logger.info("running %s LDA training, %s topics, %i passes over "
"the supplied corpus of %i documents, updating model once "
"every %i documents, evaluating perplexity every %i documents, "
"iterating %ix with a convergence threshold of %f" %
(updatetype, self.num_topics, self.passes, lencorpus,
updateafter, evalafter, self.iterations,
self.gamma_threshold))
"iterating %ix with a convergence threshold of %f",
updatetype, self.num_topics, self.passes, lencorpus,
updateafter, evalafter, self.iterations,
self.gamma_threshold)

if updates_per_pass * self.passes < 10:
logger.warning("too few updates, training might not converge; consider "
"increasing the number of passes or iterations to improve accuracy")



def worker_e_step(data, result_queue):
def worker_e_step(input_queue, result_queue):
"""
performing E-step and updating variables for each process
Perform E-step for each (model, chunk) pair from the input queue, placing
the resulting state into the result queue.
"""
logger.info("worker process entering E-step loop")
while True:
worker_lda, chunk = data.get()
logger.debug("getting a new job")
worker_lda, chunk = input_queue.get()
logger.info("got a job, processing a chunk of %i documents", len(chunk))
worker_lda.state.reset()
worker_lda.do_estep(chunk)
del chunk
logger.debug("processed chunk, queuing the result")
result_queue.put(worker_lda.state)
del worker_lda # free up some memory
logger.debug("result put")



logger.info("training LDA model using %i processes" % self.workers)
logger.info("training LDA model using %i processes", self.workers)
job_queue = Queue(maxsize=2 * self.workers)
result_queue = Queue()
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,))


for pass_ in xrange(self.passes):
other = LdaState(self.eta, self.state.sstats.shape)

queue_size = 0
reallen = 0
for chunk_no, chunk in enumerate(utils.grouper(corpus, self.chunksize, as_numpy=True)):
queue_size, reallen = 0, 0

chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=True)
for chunk_no, chunk in enumerate(chunk_stream):
reallen += len(chunk) # keep track of how many documents we've processed so far

if self.eval_every and ((reallen == lencorpus) or ((chunk_no + 1) % (self.eval_every * self.workers) == 0)):
if self.eval_every and ((reallen == lencorpus) or ((chunk_no + 1) % self.eval_every == 0)):
self.log_perplexity(chunk, total_docs=lencorpus)

logger.info('PROGRESS: pass %i, dispatching documents up to #%i/%i' %
(pass_, chunk_no * self.chunksize + len(chunk), lencorpus))
# Add the data to the queue for the E step
job_queue.put((self, chunk))
logger.info('PROGRESS: pass %i, dispatched documents up to #%i/%i',
pass_, chunk_no * self.chunksize + len(chunk), lencorpus)
del chunk
queue_size += 1

# FIXME solve what to do in case when self.optimize_alpha ???
# FIXME solve what to do in case when self.optimize_alpha?

# perform an M step. determine when based on update_every, don't do this after every chunk
if (self.update_every and (chunk_no + 1) % (self.update_every * self.workers) == 0) or chunk_no == total_chunks:
# wait for all the processes to finish
logger.info("%i chunks dispatched, waiting for the M step" % queue_size)
other = result_queue.get()
for _ in range(queue_size - 1):
other.merge(result_queue.get())
if reallen == lencorpus or (self.update_every and (chunk_no + 1) % self.update_every == 0):
logger.info("%i chunks dispatched, waiting for M step", queue_size)
other = LdaState(self.eta, self.state.sstats.shape)
for _ in range(queue_size):
state = result_queue.get()
other.merge(state)
self.do_mstep(rho(), other)
del other # free up some mem

other = LdaState(self.eta, self.state.sstats.shape)
queue_size = 0
#endfor single corpus iteration
#endfor single corpus pass
if reallen != lencorpus:
raise RuntimeError("input corpus size changed during training (don't use generators as input)")

#endfor entire corpus update

pool.close()


def do_mstep(self, rho, other):
"""
Expand Down

0 comments on commit 83dda8d

Please sign in to comment.