Skip to content

Commit

Permalink
Doc map progressbar smoother
Browse files Browse the repository at this point in the history
Previously, progress bar was based on docs yielded from the mapper. If
one process gets stuck for a while, results from the others get
buffered. Previously, the progress bar would then sit still, even though
results were coming in.

Now the progressbar is updated within the doc mapper, based on docs
yielded by the worker processes, before buffering is done to get them in
the right order. This results in a smoother and more realistic
representation of the progress.
  • Loading branch information
markgw committed Oct 7, 2020
1 parent 8a8143c commit 899587f
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/python/pimlico/core/modules/map/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,9 @@ def execute(self):
input_iter = iter(self.input_iterator.archive_iter(start_after=start_after))

# Set map processing going, using the generic function
mapper = DocumentMapper(self, input_iter, processes=self.processes)
mapper = DocumentMapper(self, input_iter, processes=self.processes, pbar=pbar)
for (archive, doc_name), next_output in mapper.map_documents():
docs_completed_now += 1
pbar.update(docs_completed_now)

# Write the result to the output corpora
for result, writer in zip(next_output, writers):
Expand Down Expand Up @@ -287,7 +286,10 @@ def output_to_document(output, datatype):


class DocumentMapper(object):
def __init__(self, executor, input_iter, processes=1, record_invalid=False):
def __init__(self, executor, input_iter, processes=1, record_invalid=False, pbar=None):
# If pbar is given, it will be updated every time a document is received
# from worker processes
self.pbar = pbar
self.record_invalid = record_invalid
self.processes = processes
self.input_iter = input_iter
Expand Down Expand Up @@ -338,6 +340,7 @@ def map_documents(self):
self.input_feeder.started.wait()
# Check what document we're looking for next
next_document = self.input_feeder.get_next_output_document()
num_docs_received = 0

while next_document is not None:
# Wait for a document coming off the output queue
Expand Down Expand Up @@ -374,6 +377,9 @@ def map_documents(self):
# We've got some result, but it might not be the one we're looking for
# Add it to a buffer, so we can potentially keep it and only output it when its turn comes up
result_buffer[(result.archive, result.filename)] = result.data
num_docs_received += 1
if self.pbar is not None:
self.pbar.update(num_docs_received)

# Write out as many as we can of the docs that have been sent and whose output is available
# while maintaining the order they were put in in
Expand Down

0 comments on commit 899587f

Please sign in to comment.