Skip to content

Commit

Permalink
Allow non-document output from process_document()s
Browse files Browse the repository at this point in the history
A process_document() method in document map modules can now output a bytes object (raw data for doc), a dict (internal data for doc), or a Document instance. The latter used to be the only option, but resulted in unnecessary pickling and unpickling. In future, one of the first two will be the recommended output type, since it should result in much faster process communication.

This change is not yet properly documented. In future, it should be well document in docs about writing doc map modules.
  • Loading branch information
markgw committed Feb 10, 2020
1 parent 99671e0 commit ff33602
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions src/python/pimlico/core/modules/map/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,28 @@ def execute(self):
self.log.info("Document mapping failed. Finishing off")


def output_to_document(output, datatype):
"""
Processing applied to convert the output from a worker to a document of the correct type.
The logic is as follows:
- If it's a dict, treat it as the internal data dict for the expected document type
and instantiate a Document with that data.
- If it's a bytes object, treat it as the raw data for the expected document type
and instantiate a Document with that raw data.
- Otherwise, assume it is a Document instance of the correct type. To avoid unnecessary
type checking, we don't check this (not even that it is a Document instance).
"""
if type(output) is dict:
return datatype(**output)
elif type(output) is bytes:
return datatype(raw_data=output)
else:
return output


class DocumentMapper(object):
def __init__(self, executor, input_iter, processes=1, record_invalid=False):
self.record_invalid = record_invalid
Expand Down Expand Up @@ -280,7 +302,11 @@ def map_documents(self):
complete = False
result_buffer = {}

num_outputs = len(executor.info.get_grouped_corpus_output_names())
# Get the expected output datatypes, ready for any possible output type conversion when we get results
output_datatypes = [
executor.info.get_output_datatype(name) for name in executor.info.get_grouped_corpus_output_names()
]
num_outputs = len(output_datatypes)

try:
# Inputs will be taken from the input_iter as they're needed
Expand Down Expand Up @@ -339,12 +365,18 @@ def map_documents(self):
next_output = [next_output] * num_outputs
elif type(next_output) is not tuple:
# If the processor produces a single result and there's only one output, fine
next_output = (next_output,)
next_output = [next_output]
if len(next_output) != num_outputs:
raise ModuleExecutionError(
"%s executor's process_document() returned %d results for a document, but the "
"module has %d outputs" % (type(executor).__name__, len(next_output), num_outputs)
)

# Post-process the returned data to convert to the correct document type,
# if raw data or an internal data dict was given
next_output = tuple(
[output_to_document(output, dt) for (output, dt) in zip(next_output, output_datatypes)]
)
# Provide the result(s) for writing, or passing on to some other process
yield next_document, next_output

Expand Down

0 comments on commit ff33602

Please sign in to comment.