-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
ldamulticore.py
351 lines (291 loc) · 17.1 KB
/
ldamulticore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Author: Jan Zikes, Radim Rehurek
# Copyright (C) 2014 Radim Rehurek <me@radimrehurek.com>
# Licensed under the GNU LGPL v2.1 - https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
"""Online Latent Dirichlet Allocation (LDA) in Python, using all CPU cores to parallelize and speed up model training.
The parallelization uses multiprocessing; in case this doesn't work for you for some reason,
try the :class:`gensim.models.ldamodel.LdaModel` class which is an equivalent, but more straightforward and single-core
implementation.
The training algorithm:
* is **streamed**: training documents may come in sequentially, no random access required,
* runs in **constant memory** w.r.t. the number of documents: size of the
training corpus does not affect memory footprint, can process corpora larger than RAM
Wall-clock `performance on the English Wikipedia <https://radimrehurek.com/gensim/wiki.html>`_ (2G corpus positions,
3.5M documents, 100K features, 0.54G non-zero entries in the final bag-of-words matrix), requesting 100 topics:
====================================================== ==============
algorithm training time
====================================================== ==============
LdaMulticore(workers=1) 2h30m
LdaMulticore(workers=2) 1h24m
LdaMulticore(workers=3) 1h6m
old LdaModel() 3h44m
simply iterating over input corpus = I/O overhead 20m
====================================================== ==============
(Measured on `this i7 server <http://www.hetzner.de/en/hosting/produkte_rootserver/ex40ssd>`_
with 4 physical cores, so that optimal `workers=3`, one less than the number of cores.)
This module allows both LDA model estimation from a training corpus and inference of topic distribution on new,
unseen documents. The model can also be updated with new documents for online training.
The core estimation code is based on the `onlineldavb.py script
<https://github.com/blei-lab/onlineldavb/blob/master/onlineldavb.py>`_, by
Matthew D. Hoffman, David M. Blei, Francis Bach:
`'Online Learning for Latent Dirichlet Allocation', NIPS 2010`_.
.. _'Online Learning for Latent Dirichlet Allocation', NIPS 2010: online-lda_
.. _'Online Learning for LDA' by Hoffman et al.: online-lda_
.. _online-lda: https://papers.neurips.cc/paper/2010/file/71f6278d140af599e06ad9bf1ba03cb0-Paper.pdf
Usage examples
--------------
The constructor estimates Latent Dirichlet Allocation model parameters based on a training corpus
.. sourcecode:: pycon
>>> from gensim.test.utils import common_corpus, common_dictionary
>>>
>>> lda = LdaMulticore(common_corpus, id2word=common_dictionary, num_topics=10)
Save a model to disk, or reload a pre-trained model
.. sourcecode:: pycon
>>> from gensim.test.utils import datapath
>>>
>>> # Save model to disk.
>>> temp_file = datapath("model")
>>> lda.save(temp_file)
>>>
>>> # Load a potentially pretrained model from disk.
>>> lda = LdaModel.load(temp_file)
Query, or update the model using new, unseen documents
.. sourcecode:: pycon
>>> other_texts = [
... ['computer', 'time', 'graph'],
... ['survey', 'response', 'eps'],
... ['human', 'system', 'computer']
... ]
>>> other_corpus = [common_dictionary.doc2bow(text) for text in other_texts]
>>>
>>> unseen_doc = other_corpus[0]
>>> vector = lda[unseen_doc] # get topic probability distribution for a document
>>>
>>> # Update the model by incrementally training on the new corpus.
>>> lda.update(other_corpus) # update the LDA model with additional documents
"""
import logging
import queue
from multiprocessing import Pool, Queue, cpu_count
import numpy as np
from gensim import utils
from gensim.models.ldamodel import LdaModel, LdaState
logger = logging.getLogger(__name__)
class LdaMulticore(LdaModel):
"""An optimized implementation of the LDA algorithm, able to harness the power of multicore CPUs.
Follows the similar API as the parent class :class:`~gensim.models.ldamodel.LdaModel`.
"""
def __init__(self, corpus=None, num_topics=100, id2word=None, workers=None,
chunksize=2000, passes=1, batch=False, alpha='symmetric',
eta=None, decay=0.5, offset=1.0, eval_every=10, iterations=50,
gamma_threshold=0.001, random_state=None, minimum_probability=0.01,
minimum_phi_value=0.01, per_word_topics=False, dtype=np.float32):
"""
Parameters
----------
corpus : {iterable of list of (int, float), scipy.sparse.csc}, optional
Stream of document vectors or sparse matrix of shape (`num_documents`, `num_terms`).
If not given, the model is left untrained (presumably because you want to call
:meth:`~gensim.models.ldamodel.LdaModel.update` manually).
num_topics : int, optional
The number of requested latent topics to be extracted from the training corpus.
id2word : {dict of (int, str), :class:`gensim.corpora.dictionary.Dictionary`}
Mapping from word IDs to words. It is used to determine the vocabulary size, as well as for
debugging and topic printing.
workers : int, optional
Number of workers processes to be used for parallelization. If None all available cores
(as estimated by `workers=cpu_count()-1` will be used. **Note** however that for
hyper-threaded CPUs, this estimation returns a too high number -- set `workers`
directly to the number of your **real** cores (not hyperthreads) minus one, for optimal performance.
chunksize : int, optional
Number of documents to be used in each training chunk.
passes : int, optional
Number of passes through the corpus during training.
alpha : {float, numpy.ndarray of float, list of float, str}, optional
A-priori belief on document-topic distribution, this can be:
* scalar for a symmetric prior over document-topic distribution,
* 1D array of length equal to num_topics to denote an asymmetric user defined prior for each topic.
Alternatively default prior selecting strategies can be employed by supplying a string:
* 'symmetric': (default) Uses a fixed symmetric prior of `1.0 / num_topics`,
* 'asymmetric': Uses a fixed normalized asymmetric prior of `1.0 / (topic_index + sqrt(num_topics))`.
eta : {float, numpy.ndarray of float, list of float, str}, optional
A-priori belief on topic-word distribution, this can be:
* scalar for a symmetric prior over topic-word distribution,
* 1D array of length equal to num_words to denote an asymmetric user defined prior for each word,
* matrix of shape (num_topics, num_words) to assign a probability for each word-topic combination.
Alternatively default prior selecting strategies can be employed by supplying a string:
* 'symmetric': (default) Uses a fixed symmetric prior of `1.0 / num_topics`,
* 'auto': Learns an asymmetric prior from the corpus.
decay : float, optional
A number between (0.5, 1] to weight what percentage of the previous lambda value is forgotten
when each new document is examined. Corresponds to :math:`\\kappa` from
`'Online Learning for LDA' by Hoffman et al.`_
offset : float, optional
Hyper-parameter that controls how much we will slow down the first steps the first few iterations.
Corresponds to :math:`\\tau_0` from `'Online Learning for LDA' by Hoffman et al.`_
eval_every : int, optional
Log perplexity is estimated every that many updates. Setting this to one slows down training by ~2x.
iterations : int, optional
Maximum number of iterations through the corpus when inferring the topic distribution of a corpus.
gamma_threshold : float, optional
Minimum change in the value of the gamma parameters to continue iterating.
minimum_probability : float, optional
Topics with a probability lower than this threshold will be filtered out.
random_state : {np.random.RandomState, int}, optional
Either a randomState object or a seed to generate one. Useful for reproducibility.
Note that results can still vary due to non-determinism in OS scheduling of the worker processes.
minimum_phi_value : float, optional
if `per_word_topics` is True, this represents a lower bound on the term probabilities.
per_word_topics : bool
If True, the model also computes a list of topics, sorted in descending order of most likely topics for
each word, along with their phi values multiplied by the feature length (i.e. word count).
dtype : {numpy.float16, numpy.float32, numpy.float64}, optional
Data-type to use during calculations inside model. All inputs are also converted.
"""
self.workers = max(1, cpu_count() - 1) if workers is None else workers
self.batch = batch
if isinstance(alpha, str) and alpha == 'auto':
raise NotImplementedError("auto-tuning alpha not implemented in LdaMulticore; use plain LdaModel.")
super(LdaMulticore, self).__init__(
corpus=corpus, num_topics=num_topics,
id2word=id2word, chunksize=chunksize, passes=passes, alpha=alpha, eta=eta,
decay=decay, offset=offset, eval_every=eval_every, iterations=iterations,
gamma_threshold=gamma_threshold, random_state=random_state, minimum_probability=minimum_probability,
minimum_phi_value=minimum_phi_value, per_word_topics=per_word_topics, dtype=dtype,
)
def update(self, corpus, chunks_as_numpy=False):
"""Train the model with new documents, by EM-iterating over `corpus` until the topics converge
(or until the maximum number of allowed iterations is reached).
Train the model with new documents, by EM-iterating over the corpus until the topics converge, or until
the maximum number of allowed iterations is reached. `corpus` must be an iterable. The E step is distributed
into the several processes.
Notes
-----
This update also supports updating an already trained model (`self`) with new documents from `corpus`;
the two models are then merged in proportion to the number of old vs. new documents.
This feature is still experimental for non-stationary input streams.
For stationary input (no topic drift in new documents), on the other hand,
this equals the online update of `'Online Learning for LDA' by Hoffman et al.`_
and is guaranteed to converge for any `decay` in (0.5, 1].
Parameters
----------
corpus : {iterable of list of (int, float), scipy.sparse.csc}, optional
Stream of document vectors or sparse matrix of shape (`num_documents`, `num_terms`) used to update the
model.
chunks_as_numpy : bool
Whether each chunk passed to the inference step should be a np.ndarray or not. Numpy can in some settings
turn the term IDs into floats, these will be converted back into integers in inference, which incurs a
performance hit. For distributed computing it may be desirable to keep the chunks as `numpy.ndarray`.
"""
try:
lencorpus = len(corpus)
except TypeError:
logger.warning("input corpus stream has no len(); counting documents")
lencorpus = sum(1 for _ in corpus)
if lencorpus == 0:
logger.warning("LdaMulticore.update() called with an empty corpus")
return
self.state.numdocs += lencorpus
if self.batch:
updatetype = "batch"
updateafter = lencorpus
else:
updatetype = "online"
updateafter = self.chunksize * self.workers
eval_every = self.eval_every or 0
evalafter = min(lencorpus, eval_every * updateafter)
updates_per_pass = max(1, lencorpus / updateafter)
logger.info(
"running %s LDA training, %s topics, %i passes over the supplied corpus of %i documents, "
"updating every %i documents, evaluating every ~%i documents, "
"iterating %ix with a convergence threshold of %f",
updatetype, self.num_topics, self.passes, lencorpus, updateafter,
evalafter, self.iterations, self.gamma_threshold
)
if updates_per_pass * self.passes < 10:
logger.warning(
"too few updates, training might not converge; "
"consider increasing the number of passes or iterations to improve accuracy"
)
job_queue = Queue(maxsize=2 * self.workers)
result_queue = Queue()
# rho is the "speed" of updating; TODO try other fncs
# pass_ + num_updates handles increasing the starting t for each pass,
# while allowing it to "reset" on the first pass of each update
def rho():
return pow(self.offset + pass_ + (self.num_updates / self.chunksize), -self.decay)
def process_result_queue(force=False):
"""
Clear the result queue, merging all intermediate results, and update the
LDA model if necessary.
"""
merged_new = False
while not result_queue.empty():
other.merge(result_queue.get())
queue_size[0] -= 1
merged_new = True
if (force and merged_new and queue_size[0] == 0) or (other.numdocs >= updateafter):
self.do_mstep(rho(), other, pass_ > 0)
other.reset()
if eval_every > 0 and (force or (self.num_updates / updateafter) % eval_every == 0):
self.log_perplexity(chunk, total_docs=lencorpus)
logger.info("training LDA model using %i processes", self.workers)
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue, self))
for pass_ in range(self.passes):
queue_size, reallen = [0], 0
other = LdaState(self.eta, self.state.sstats.shape)
chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=chunks_as_numpy)
for chunk_no, chunk in enumerate(chunk_stream):
reallen += len(chunk) # keep track of how many documents we've processed so far
# put the chunk into the workers' input job queue
while True:
try:
job_queue.put((chunk_no, chunk, self.state), block=False)
queue_size[0] += 1
logger.info(
"PROGRESS: pass %i, dispatched chunk #%i = documents up to #%i/%i, "
"outstanding queue size %i",
pass_, chunk_no, chunk_no * self.chunksize + len(chunk), lencorpus, queue_size[0]
)
break
except queue.Full:
# in case the input job queue is full, keep clearing the
# result queue, to make sure we don't deadlock
process_result_queue()
process_result_queue()
# endfor single corpus pass
# wait for all outstanding jobs to finish
while queue_size[0] > 0:
process_result_queue(force=True)
if reallen != lencorpus:
raise RuntimeError("input corpus size changed during training (don't use generators as input)")
# endfor entire update
pool.terminate()
def worker_e_step(input_queue, result_queue, worker_lda):
"""Perform E-step for each job.
Parameters
----------
input_queue : queue of (int, list of (int, float), :class:`~gensim.models.lda_worker.Worker`)
Each element is a job characterized by its ID, the corpus chunk to be processed in BOW format and the worker
responsible for processing it.
result_queue : queue of :class:`~gensim.models.ldamodel.LdaState`
After the worker finished the job, the state of the resulting (trained) worker model is appended to this queue.
worker_lda : :class:`~gensim.models.ldamulticore.LdaMulticore`
LDA instance which performed e step
"""
logger.debug("worker process entering E-step loop")
while True:
logger.debug("getting a new job")
chunk_no, chunk, w_state = input_queue.get()
logger.debug("processing chunk #%i of %i documents", chunk_no, len(chunk))
worker_lda.state = w_state
worker_lda.sync_state()
worker_lda.state.reset()
worker_lda.do_estep(chunk) # TODO: auto-tune alpha?
del chunk
logger.debug("processed chunk, queuing the result")
result_queue.put(worker_lda.state)
worker_lda.state = None
logger.debug("result put")