Skip to content

Commit

Permalink
Added batch processing possibility to input feeder
Browse files Browse the repository at this point in the history
Doc map processes can accept multiple documents at a time, if the input
feeder gives them multiple.

The input feeder batches the docs before sending them to the workers.

Batch size is controlled by a parameter on the input feeder, currently
always set to 1.

This will be changed in future.
  • Loading branch information
markgw committed Oct 7, 2020
1 parent 8162b54 commit 8a8143c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 19 deletions.
34 changes: 30 additions & 4 deletions src/python/pimlico/core/modules/map/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ def __init__(self, input_queue, iterator, complete_callback=None, record_invalid
self.ended = threading.Event()
self.exception_queue = Queue(1)

self.feeder_batch_size = 1

self.record_invalid = record_invalid
if record_invalid:
# Accumulate a list of invalid docs that have been fed, just for information
Expand Down Expand Up @@ -600,6 +602,8 @@ def check_invalid(self, archive, filename):

def run(self):
try:
# Accumulate docs in a batch to send in one package to the processor
batch = []
# Keep feeding inputs onto the queue as long as we've got more
for i, (archive, filename, docs) in enumerate(self.iterator):
if self.cancelled.is_set():
Expand All @@ -608,22 +612,44 @@ def run(self):
if self.record_invalid:
if any(is_invalid_doc(doc) for doc in docs):
self.invalid_docs.put((archive, filename))
if len(batch) < self.feeder_batch_size:
# Don't send this batch yet: get some more documents
batch.append((archive, filename, docs))
continue
# If the queue is full, this will block until there's room to put the next one on
# It also blocks if the queue is closed/destroyed/something similar, so we need to check now and
# again that we've not been asked to give up
while True:
try:
self.input_queue.put((archive, filename, docs), timeout=0.1)
self.input_queue.put(batch, timeout=0.1)
except Full:
if self.cancelled.is_set():
return
# Otherwise try putting again
else:
break
# Record that we've sent this one off, so we can write the results out in the right order
self._docs_processing.put((archive, filename))
for archive, filename, __ in batch:
self._docs_processing.put((archive, filename))
# As soon as something's been fed, the output processor can get going
self.started.set()
# Start a new batch
batch = []

# We may still need to send off the final batch
if len(batch) > 0:
while True:
try:
self.input_queue.put(batch, timeout=0.1)
except Full:
if self.cancelled.is_set():
return
else:
break
for archive, filename, __ in batch:
self._docs_processing.put((archive, filename))
self.started.set()

self.feeding_complete.set()
if self.complete_callback is not None:
self.complete_callback()
Expand Down Expand Up @@ -720,10 +746,10 @@ def __init__(self, processes):
# the queue just get bigger and bigger.
# In this case, the worker has to wait a bit to send its output back
self.output_queue = self.create_queue(10*processes)
# Limit the input queue to 2*processes: there's no point in filling
# Limit the input queue to 5*processes: there's no point in filling
# it up with far more, just enough that the processes can be sure of
# getting something when they're ready
self.input_queue = self.create_queue(2*processes)
self.input_queue = self.create_queue(5*processes)
self.exception_queue = self.create_queue()
self.processes = processes
self._queues = [self.output_queue, self.input_queue, self.exception_queue]
Expand Down
18 changes: 10 additions & 8 deletions src/python/pimlico/core/modules/map/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,20 @@ def run(self):
while not self.stopped.is_set():
try:
# Timeout and go round the loop again to check whether we're supposed to have stopped
archive, filename, docs = qget(self.input_queue, timeout=0.05)
# The queue feeds us multiple documents at a time: we don't know how many it will be
inputs = qget(self.input_queue, timeout=0.05)
except Empty:
# Don't worry if the queue is empty: just keep waiting for more until we're shut down
pass
else:
# Buffer input documents, so that we can process multiple at once if requested
input_buffer.append(tuple([archive, filename] + docs))
if len(input_buffer) >= self.docs_per_batch or self.no_more_inputs.is_set():
results = self.process_documents(input_buffer)
for input_tuple, result in zip(input_buffer, results):
self.output_queue.put(ProcessOutput(input_tuple[0], input_tuple[1], result))
input_buffer = []
for archive, filename, docs in inputs:
# Buffer input documents, so that we can process multiple at once if requested
input_buffer.append(tuple([archive, filename] + docs))
if len(input_buffer) >= self.docs_per_batch or self.no_more_inputs.is_set():
results = self.process_documents(input_buffer)
for input_tuple, result in zip(input_buffer, results):
self.output_queue.put(ProcessOutput(input_tuple[0], input_tuple[1], result))
input_buffer = []
finally:
try:
self.tear_down()
Expand Down
15 changes: 8 additions & 7 deletions src/python/pimlico/core/modules/map/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run(self):
while not self.stopped.is_set():
try:
# Timeout and go round the loop again to check whether we're supposed to have stopped
archive, filename, docs = qget(self.input_queue, timeout=0.05)
inputs = qget(self.input_queue, timeout=0.05)
except Empty:
# Don't worry if the queue is empty: just keep waiting for more until we're shut down
pass
Expand All @@ -66,12 +66,13 @@ def run(self):
continue
raise
else:
input_buffer.append(tuple([archive, filename] + docs))
if len(input_buffer) >= self.docs_per_batch or self.no_more_inputs.is_set():
results = self.process_documents(input_buffer)
for input_tuple, result in zip(input_buffer, results):
self.output_queue.put(ProcessOutput(input_tuple[0], input_tuple[1], result))
input_buffer = []
for archive, filename, docs in inputs:
input_buffer.append(tuple([archive, filename] + docs))
if len(input_buffer) >= self.docs_per_batch or self.no_more_inputs.is_set():
results = self.process_documents(input_buffer)
for input_tuple, result in zip(input_buffer, results):
self.output_queue.put(ProcessOutput(input_tuple[0], input_tuple[1], result))
input_buffer = []
finally:
self.tear_down()
except Exception as e:
Expand Down

0 comments on commit 8a8143c

Please sign in to comment.