-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
lsi_worker.py
executable file
·193 lines (142 loc) · 5.76 KB
/
lsi_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2010 Radim Rehurek <radimrehurek@seznam.cz>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
"""Worker ("slave") process used in computing distributed Latent Semantic Indexing (LSI,
:class:`~gensim.models.lsimodel.LsiModel`) models.
Run this script on every node in your cluster. If you wish, you may even run it multiple times on a single machine,
to make better use of multiple cores (just beware that memory footprint increases linearly).
How to use distributed LSI
--------------------------
#. Install needed dependencies (Pyro4) ::
pip install gensim[distributed]
#. Setup serialization (on each machine) ::
export PYRO_SERIALIZERS_ACCEPTED=pickle
export PYRO_SERIALIZER=pickle
#. Run nameserver ::
python -m Pyro4.naming -n 0.0.0.0 &
#. Run workers (on each machine) ::
python -m gensim.models.lsi_worker &
#. Run dispatcher ::
python -m gensim.models.lsi_dispatcher &
#. Run :class:`~gensim.models.lsimodel.LsiModel` in distributed mode:
.. sourcecode:: pycon
>>> from gensim.test.utils import common_corpus, common_dictionary
>>> from gensim.models import LsiModel
>>>
>>> model = LsiModel(common_corpus, id2word=common_dictionary, distributed=True)
Command line arguments
----------------------
.. program-output:: python -m gensim.models.lsi_worker --help
:ellipsis: 0, -3
"""
import os
import sys
import logging
import argparse
import threading
import tempfile
import queue as Queue
import Pyro4
from gensim.models import lsimodel
from gensim import utils
logger = logging.getLogger(__name__)
SAVE_DEBUG = 0 # save intermediate models after every SAVE_DEBUG updates (0 for never)
class Worker:
def __init__(self):
"""Partly initialize the model.
A full initialization requires a call to :meth:`~gensim.models.lsi_worker.Worker.initialize`.
"""
self.model = None
@Pyro4.expose
def initialize(self, myid, dispatcher, **model_params):
"""Fully initialize the worker.
Parameters
----------
myid : int
An ID number used to identify this worker in the dispatcher object.
dispatcher : :class:`~gensim.models.lsi_dispatcher.Dispatcher`
The dispatcher responsible for scheduling this worker.
**model_params
Keyword parameters to initialize the inner LSI model, see :class:`~gensim.models.lsimodel.LsiModel`.
"""
self.lock_update = threading.Lock()
self.jobsdone = 0 # how many jobs has this worker completed?
# id of this worker in the dispatcher; just a convenience var for easy access/logging TODO remove?
self.myid = myid
self.dispatcher = dispatcher
self.finished = False
logger.info("initializing worker #%s", myid)
self.model = lsimodel.LsiModel(**model_params)
@Pyro4.expose
@Pyro4.oneway
def requestjob(self):
"""Request jobs from the dispatcher, in a perpetual loop until :meth:`~gensim.models.lsi_worker.Worker.getstate`
is called.
Raises
------
RuntimeError
If `self.model` is None (i.e. worker not initialized).
"""
if self.model is None:
raise RuntimeError("worker must be initialized before receiving jobs")
job = None
while job is None and not self.finished:
try:
job = self.dispatcher.getjob(self.myid)
except Queue.Empty:
# no new job: try again, unless we're finished with all work
continue
if job is not None:
logger.info("worker #%s received job #%i", self.myid, self.jobsdone)
self.processjob(job)
self.dispatcher.jobdone(self.myid)
else:
logger.info("worker #%i stopping asking for jobs", self.myid)
@utils.synchronous('lock_update')
def processjob(self, job):
"""Incrementally process the job and potentially logs progress.
Parameters
----------
job : iterable of list of (int, float)
Corpus in BoW format.
"""
self.model.add_documents(job)
self.jobsdone += 1
if SAVE_DEBUG and self.jobsdone % SAVE_DEBUG == 0:
fname = os.path.join(tempfile.gettempdir(), 'lsi_worker.pkl')
self.model.save(fname)
@Pyro4.expose
@utils.synchronous('lock_update')
def getstate(self):
"""Log and get the LSI model's current projection.
Returns
-------
:class:`~gensim.models.lsimodel.Projection`
The current projection.
"""
logger.info("worker #%i returning its state after %s jobs", self.myid, self.jobsdone)
assert isinstance(self.model.projection, lsimodel.Projection)
self.finished = True
return self.model.projection
@Pyro4.expose
@utils.synchronous('lock_update')
def reset(self):
"""Reset the worker by deleting its current projection."""
logger.info("resetting worker #%i", self.myid)
self.model.projection = self.model.projection.empty_like()
self.finished = False
@Pyro4.oneway
def exit(self):
"""Terminate the worker."""
logger.info("terminating worker #%i", self.myid)
os._exit(0)
if __name__ == '__main__':
"""The main script. """
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
parser = argparse.ArgumentParser(description=__doc__[:-135], formatter_class=argparse.RawTextHelpFormatter)
_ = parser.parse_args()
logger.info("running %s", " ".join(sys.argv))
utils.pyro_daemon('gensim.lsi_worker', Worker(), random_suffix=True)
logger.info("finished running %s", parser.prog)