Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSoC 2018] Multistream API for vocabulary building in *2vec #2078

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
92e6e22
multistream scan vocab for doc2vec, word2vec & fastText
Jun 4, 2018
2618a2e
fixes
Jun 4, 2018
7960af8
fix tags for doc2vec
Jun 4, 2018
b8da97a
fix tests
Jun 4, 2018
16be716
removed benchmark vocab
Jun 4, 2018
c2d674a
addressing comments
Jun 7, 2018
85e689c
make interfaces and documentation more pretty
Jun 7, 2018
0d5ae38
add word2vec multistream tests
Jun 7, 2018
df3ae5f
fix pep8
Jun 8, 2018
49357cb
iteritems -> items
Jun 8, 2018
0365eea
more precise test
Jun 8, 2018
812ab8c
add doc2vec tests
Jun 8, 2018
f11f44d
add fasttext tests
Jun 8, 2018
941dfd8
remove prints
Jun 8, 2018
36e7238
fix seed=42
Jun 8, 2018
fa57f7a
fixed tests
Jun 8, 2018
9ea007d
add build_vocab test for fasttext
Jun 8, 2018
aec68ea
fix
Jun 8, 2018
07f3fd4
change size from 10 to 5 in fasttext test because of appveyor memory …
Jun 8, 2018
8b49fb8
another test with memory error
Jun 8, 2018
d0c11d9
fix py3 tests
Jun 8, 2018
5974448
fix iteritems for py3
Jun 8, 2018
1419847
fix functools reduce
Jun 8, 2018
280e826
addressing comments
Jun 12, 2018
7d489f4
addressing @jayantj comments
Jun 13, 2018
49a1ee6
fix language
Jun 13, 2018
1cbad7f
add final vocab pruning in multistream modes
Jun 13, 2018
d024625
keys -> iterkeys
Jun 14, 2018
5e4de19
use heapq.nlargest
Jun 15, 2018
74e7b02
fix
Jun 15, 2018
0d12d8b
multistream flag to input_streams param
Jun 19, 2018
25d00cd
fix tests
Jun 19, 2018
2281265
fix flake 8
Jun 19, 2018
543a9e0
fix doc2vec docstrings
Jun 19, 2018
d520d68
fix merging streams
Jun 19, 2018
d11a0b8
fix doc2vec
Jun 19, 2018
ecd8f39
max_vocab_size -> max_vocab_size / workers
Jun 19, 2018
a96d5a4
fixed
Jun 19, 2018
0a327b0
/ -> // (py3 division)
Jun 19, 2018
62873fb
fix
Jun 19, 2018
5f61219
Merge branch 'develop' into feature/gsoc-multistream-vocab
Jun 20, 2018
c67f964
fix docstring
Jun 20, 2018
a16cec0
Merge branch 'develop' into feature/gsoc-multistream-vocab
Jun 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 47 additions & 18 deletions gensim/models/base_any2vec.py
Expand Up @@ -43,6 +43,7 @@
from types import GeneratorType
from gensim.utils import deprecated
import warnings
import itertools

try:
from queue import Queue
Expand Down Expand Up @@ -130,6 +131,11 @@ def _check_training_sanity(self, epochs=None, total_examples=None, total_words=N
"""Check that the training parameters provided make sense. e.g. raise error if `epochs` not provided."""
raise NotImplementedError()

def _check_input_data_sanity(self, data_iterable=None, data_iterables=None):
"""Check that only one argument is not None."""
if not ((data_iterable is not None) ^ (data_iterables is not None)):
raise ValueError("You must provide only one of singlestream or multistream arguments.")

def _worker_loop(self, job_queue, progress_queue):
"""Train the model, lifting batches of data from the queue.

Expand Down Expand Up @@ -322,14 +328,16 @@ def _log_epoch_progress(self, progress_queue, job_queue, cur_epoch=0, total_exam
self.total_train_time += elapsed
return trained_word_count, raw_word_count, job_tally

def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, total_examples=None,
total_words=None, queue_factor=2, report_delay=1.0):
"""Train the model for a single epoch.

Parameters
----------
data_iterable : iterable of list of object
The input corpus. This will be split in chunks and these chunks will be pushed to the queue.
data_iterables : iterable of iterables of list of object
The iterable of input streams like `data_iterable`. Use this parameter in multistream mode.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
Expand All @@ -353,6 +361,7 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
* Total word count used in training.

"""
self._check_input_data_sanity(data_iterable, data_iterables)
job_queue = Queue(maxsize=queue_factor * self.workers)
progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers)

Expand All @@ -363,6 +372,9 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
for _ in xrange(self.workers)
]

# Chain all input streams into one, because multistream training is not supported yet.
if data_iterables is not None:
data_iterable = itertools.chain(*data_iterables)
workers.append(threading.Thread(
target=self._job_producer,
args=(data_iterable, job_queue),
Expand All @@ -378,7 +390,7 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,

return trained_word_count, raw_word_count, job_tally

def train(self, data_iterable, epochs=None, total_examples=None,
def train(self, data_iterable=None, data_iterables=None, epochs=None, total_examples=None,
total_words=None, queue_factor=2, report_delay=1.0, callbacks=(), **kwargs):
"""Train the model for multiple epochs using multiple workers.

Expand Down Expand Up @@ -433,8 +445,9 @@ def train(self, data_iterable, epochs=None, total_examples=None,
callback.on_epoch_begin(self)

trained_word_count_epoch, raw_word_count_epoch, job_tally_epoch = self._train_epoch(
data_iterable, cur_epoch=cur_epoch, total_examples=total_examples, total_words=total_words,
queue_factor=queue_factor, report_delay=report_delay)
data_iterable=data_iterable, data_iterables=data_iterables, cur_epoch=cur_epoch,
total_examples=total_examples, total_words=total_words, queue_factor=queue_factor,
report_delay=report_delay)
trained_word_count += trained_word_count_epoch
raw_word_count += raw_word_count_epoch
job_tally += job_tally_epoch
Expand Down Expand Up @@ -525,9 +538,9 @@ def _do_train_job(self, data_iterable, job_parameters, thread_private_mem):
def _set_train_params(self, **kwargs):
raise NotImplementedError()

def __init__(self, sentences=None, workers=3, vector_size=100, epochs=5, callbacks=(), batch_words=10000,
trim_rule=None, sg=0, alpha=0.025, window=5, seed=1, hs=0, negative=5, ns_exponent=0.75, cbow_mean=1,
min_alpha=0.0001, compute_loss=False, fast_version=0, **kwargs):
def __init__(self, sentences=None, input_streams=None, workers=3, vector_size=100, epochs=5, callbacks=(),
batch_words=10000, trim_rule=None, sg=0, alpha=0.025, window=5, seed=1, hs=0, negative=5,
ns_exponent=0.75, cbow_mean=1, min_alpha=0.0001, compute_loss=False, fast_version=0, **kwargs):
"""

Parameters
Expand Down Expand Up @@ -624,13 +637,20 @@ def __init__(self, sentences=None, workers=3, vector_size=100, epochs=5, callbac
self.neg_labels = zeros(self.negative + 1)
self.neg_labels[0] = 1.

if sentences is not None:
if isinstance(sentences, GeneratorType):
if sentences is not None or input_streams is not None:
self._check_input_data_sanity(data_iterable=sentences, data_iterables=input_streams)
if input_streams is not None:
if not isinstance(input_streams, (tuple, list)):
raise TypeError("You must pass tuple or list as the input_streams argument.")
if any(isinstance(stream, GeneratorType) for stream in input_streams):
raise TypeError("You can't pass a generator as any of input streams. Try an iterator.")
elif isinstance(sentences, GeneratorType):
raise TypeError("You can't pass a generator as the sentences argument. Try an iterator.")
self.build_vocab(sentences, trim_rule=trim_rule)

self.build_vocab(sentences=sentences, input_streams=input_streams, trim_rule=trim_rule)
self.train(
sentences, total_examples=self.corpus_count, epochs=self.epochs, start_alpha=self.alpha,
end_alpha=self.min_alpha, compute_loss=compute_loss)
sentences=sentences, input_streams=input_streams, total_examples=self.corpus_count, epochs=self.epochs,
start_alpha=self.alpha, end_alpha=self.min_alpha, compute_loss=compute_loss)
else:
if trim_rule is not None:
logger.warning(
Expand Down Expand Up @@ -763,7 +783,8 @@ def __str__(self):
self.__class__.__name__, len(self.wv.index2word), self.vector_size, self.alpha
)

def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_vocab=False, trim_rule=None, **kwargs):
def build_vocab(self, sentences=None, input_streams=None, workers=None, update=False, progress_per=10000,
keep_raw_vocab=False, trim_rule=None, **kwargs):
"""Build vocabulary from a sequence of sentences (can be a once-only generator stream).

Parameters
Expand All @@ -773,7 +794,13 @@ def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_voca
consider an iterable that streams the sentences directly from disk/network.
See :class:`~gensim.models.word2vec.BrownCorpus`, :class:`~gensim.models.word2vec.Text8Corpus`
or :class:`~gensim.models.word2vec.LineSentence` module for such examples.
update : bool, optional
input_streams : list or tuple of iterable of iterables
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
workers : int
Used if `input_streams` is passed. Determines how many processes to use for vocab building.
Actual number of workers is determined by `min(len(input_streams), workers)`.
update : bool
If true, the new words in `sentences` will be added to model's vocab.
progress_per : int, optional
Indicates how many words to process before showing/updating the progress.
Expand All @@ -797,8 +824,10 @@ def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_voca
Key word arguments propagated to `self.vocabulary.prepare_vocab`

"""
workers = workers or self.workers
total_words, corpus_count = self.vocabulary.scan_vocab(
sentences, progress_per=progress_per, trim_rule=trim_rule)
sentences=sentences, input_streams=input_streams, progress_per=progress_per, trim_rule=trim_rule,
workers=workers)
self.corpus_count = corpus_count
report_values = self.vocabulary.prepare_vocab(
self.hs, self.negative, self.wv, update=update, keep_raw_vocab=keep_raw_vocab,
Expand Down Expand Up @@ -887,7 +916,7 @@ def estimate_memory(self, vocab_size=None, report=None):
)
return report

def train(self, sentences, total_examples=None, total_words=None,
def train(self, sentences=None, input_streams=None, total_examples=None, total_words=None,
epochs=None, start_alpha=None, end_alpha=None, word_count=0,
queue_factor=2, report_delay=1.0, compute_loss=False, callbacks=()):
"""Train the model. If the hyper-parameters are passed, they override the ones set in the constructor.
Expand Down Expand Up @@ -933,8 +962,8 @@ def train(self, sentences, total_examples=None, total_words=None,
self.compute_loss = compute_loss
self.running_training_loss = 0.0
return super(BaseWordEmbeddingsModel, self).train(
sentences, total_examples=total_examples, total_words=total_words,
epochs=epochs, start_alpha=start_alpha, end_alpha=end_alpha, word_count=word_count,
data_iterable=sentences, data_iterables=input_streams, total_examples=total_examples,
total_words=total_words, epochs=epochs, start_alpha=start_alpha, end_alpha=end_alpha, word_count=word_count,
queue_factor=queue_factor, report_delay=report_delay, compute_loss=compute_loss, callbacks=callbacks)

def _get_job_params(self, cur_epoch):
Expand Down