Skip to content

Commit

Permalink
Major changes to add further standard modules, including major extens…
Browse files Browse the repository at this point in the history
…ion of CoreNLP wrappers.

Some new docs detailing future plans.
  • Loading branch information
Mark Granroth-Wilding committed Apr 1, 2016
1 parent 2492d27 commit 34f9523
Show file tree
Hide file tree
Showing 26 changed files with 866 additions and 148 deletions.
10 changes: 10 additions & 0 deletions docs/plans/berkeley.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
===============
Berkeley Parser
===============

https://github.com/slavpetrov/berkeleyparser

Java constituency parser. Pre-trained models are also provided in the Github repo.

Probably no need for a Java wrapper here. The parser itself accepts input on stdin and outputs to stdout,
so just use a subprocess with pipes.
14 changes: 14 additions & 0 deletions docs/plans/candc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
=========
C&C Tools
=========

http://svn.ask.it.usyd.edu.au/trac/candc/

Can't fetch via make script, as you need to sign-up for a login (free). Need to get user to do this. Pain in the arse.

Written wrappers around C&C before using subprocesses (with the REST server). Can reuse most of this code as is.

Consider also whether it's even worth wrapping this. Might be better off just using the new Java implementation,
which is faster and as accurate.

https://www.cl.cam.ac.uk/~sc609/java-candc.html
13 changes: 13 additions & 0 deletions docs/plans/cherry_picker.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
=============
Cherry Picker
=============

**Coreference resolver**

http://www.hlt.utdallas.edu/~altaf/cherrypicker/

Requires NER, POS tagging and constituency parsing to be done first. Tools for all of these are included in the
Cherry Picker codebase, but we just need a wrapper around the Cherry Picker tool itself to be able to feed these
annotations in from other modules and perform coref.

Write a Java wrapper and interface with it using Py4J, as with OpenNLP.
17 changes: 15 additions & 2 deletions docs/wishlist.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@

Things I plan to add to Pimlico:

- Easy parallelization for document map module types
- Handle software dependencies within Python

- Those that can be installed directly can be installed as part of the pre-run
checks
- Simply output instructions for others (e.g. system-wide install required)

- *I'll add to this list as I think of things...*
- Further modules:

- :doc:`CherryPicker <plans/cherry_picker>` for coreference resolution
- :doc:`Berkeley Parser <plans/berkeley>` for fast constituency parsing
- :doc:`C&C <plans/candc>` for parsing, tagging, etc
- OpenNLP coref. I've already wrapper other OpenNLP tools, so this would be pretty easy.
- `Reconcile <https://www.cs.utah.edu/nlp/reconcile/>` coref. Seems to incorporate upstream NLP tasks. Would want
to interface such that we can reuse output from other modules and just do coref.
- `Malt dependency parser <http://www.maltparser.org/>`. I've wrapper this in Python before, so can probably reuse
that. Faster alternative to CoreNLP dep parser, already wrapped

- Bundle Pimlico Java code as jars, so user doesn't need to compile. Dead easy - just add jar target to ant
builds and check in resulting jars

*I'll add to this list as I think of things...*
2 changes: 1 addition & 1 deletion src/python/pimlico/core/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def instantiate_output_datatype(self, output_name, output_datatype):
particular outputs' datatypes.
"""
return output_datatype(self.get_output_dir(output_name))
return output_datatype(self.get_output_dir(output_name), self.pipeline)

def get_output(self, output_name=None):
"""
Expand Down
6 changes: 6 additions & 0 deletions src/python/pimlico/core/modules/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@ def execute_module(pipeline, module_name, force_rerun=False, debug=False):
class ModuleExecutionError(Exception):
pass


class ModuleNotReadyError(ModuleExecutionError):
pass


class ModuleAlreadyCompletedError(ModuleExecutionError):
pass


class StopProcessing(Exception):
pass
2 changes: 1 addition & 1 deletion src/python/pimlico/core/modules/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class DatatypeInputModuleInfo(InputModuleInfo):
module_options = datatype.input_module_options

def instantiate_output_datatype(self, output_name, output_datatype):
return output_datatype.create_from_options(self.get_module_output_dir(), self.options)
return output_datatype.create_from_options(self.get_module_output_dir(), self.pipeline, self.options)

if datatype.requires_data_preparation:
# This module needs to be executed
Expand Down
64 changes: 59 additions & 5 deletions src/python/pimlico/core/modules/map.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from traceback import format_exc

from pimlico.core.modules.base import BaseModuleInfo, BaseModuleExecutor
from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.core.modules.execute import ModuleExecutionError, StopProcessing
from pimlico.datatypes.base import InvalidDocument
from pimlico.datatypes.tar import TarredCorpus, AlignedTarredCorpora, TarredCorpusWriter
from pimlico.utils.core import multiwith
from pimlico.utils.progress import get_progress_bar
Expand Down Expand Up @@ -27,7 +30,7 @@ class DocumentMapModuleExecutor(BaseModuleExecutor):
for each individual document.
"""
def process_document(self, filename, *docs):
def process_document(self, archive, filename, *docs):
raise NotImplementedError

def get_writer(self, output_name):
Expand Down Expand Up @@ -68,15 +71,24 @@ def execute(self):
pbar = get_progress_bar(len(input_iterator),
title="%s map" % self.info.module_type_name.replace("_", " ").capitalize())
complete = False
invalid_inputs = 0
invalid_outputs = 0
try:
# Prepare a corpus writer for the output
with multiwith(*self.get_writers()) as writers:
for archive, filename, docs in pbar(input_iterator.archive_iter()):
# Useful to know in output
if any(type(d) is InvalidDocument for d in docs):
invalid_inputs += 1

# Get the subclass to process the doc
results = self.process_document(archive, filename, *docs)

# If the processor only produces a single result and there's only one output, that's fine
if type(results) is not tuple:
if type(results) is InvalidDocument:
# Just got a single invalid document out: write it out to every output
results = [results] * len(writers)
elif type(results) is not tuple:
# If the processor only produces a single result and there's only one output, that's fine
results = (results,)
if len(results) != len(writers):
raise ModuleExecutionError(
Expand All @@ -86,15 +98,57 @@ def execute(self):
)
)

# Just for the record (useful output)
if any(type(r) is InvalidDocument for r in results):
invalid_outputs += 1

# Write the result to the output corpora
for result, writer in zip(results, writers):
writer.add_document(archive, filename, result)

complete = True
self.log.info("Input contained %d invalid documents, output contained %d" %
(invalid_inputs, invalid_outputs))
finally:
# Call the finishing-off routine, if one's been defined
if complete:
self.log.info("Document mapping complete. Finishing off")
else:
self.log.info("Document mapping failed. Finishing off")
self.postprocess()
self.postprocess(error=not complete)


def skip_invalid(fn):
"""
Decorator to apply to process_document() methods where you want to skip doing any processing if any of the
input documents are invalid and just pass through the error information.
"""
def _fn(self, archive, filename, *docs):
invalid = [doc for doc in docs if type(doc) is InvalidDocument]
if len(invalid):
# If there's more than one InvalidDocument among the inputs, just return the first one
return invalid[0]
else:
return fn(self, archive, filename, *docs)
return _fn


def invalid_doc_on_error(fn):
"""
Decorator to apply to process_document() methods that causes all exceptions to be caught and an InvalidDocument
to be returned as the result, instead of letting the error propagate up and call a halt to the whole corpus
processing.
"""
def _fn(self, *args, **kwargs):
try:
return fn(self, *args, **kwargs)
except StopProcessing:
# Processing was cancelled, killed or otherwise called to a halt
# Don't report this as an error processing a doc, but raise it
raise
except Exception, e:
# Error while processing the document: output an invalid document, with some error information
return InvalidDocument(self.info.module_name, "%s\n%s" % (e, format_exc()))
return _fn
Empty file.
166 changes: 166 additions & 0 deletions src/python/pimlico/core/parallel/map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
Document map modules can in general be fairly easily parallelized.
"""
from multiprocessing import Queue

from itertools import islice

from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.core.modules.map import DocumentMapModuleExecutor
from pimlico.datatypes.base import InvalidDocument
from pimlico.datatypes.tar import AlignedTarredCorpora
from pimlico.utils.core import multiwith
from pimlico.utils.progress import get_progress_bar


class DocumentProcessorPool(object):
"""
Base class for pools that provide an easy implementation of parallelization for document map modules.
"""
def __init__(self, processes):
self.queue = self.create_queue()
self.processes = processes

@staticmethod
def create_queue():
"""
May be overridden by subclasses to provide different implementations of a Queue. By default, uses the
multiprocessing queue type. Whatever is returned, it should implement the interface of Queue.Queue.
"""
return Queue()

def process_document(self, archive, filename, *docs):
"""
Lightweight method to be called from the main process to fire off the processing in a worker
process. The worker should place its output, wrapped in a ProcessOutput, on the output_queue when
finished. This method should return without blocking.
"""
raise NotImplementedError


class ProcessOutput(object):
"""
Wrapper for all result data coming out from a worker.
"""
def __init__(self, archive, filename, data):
self.data = data
self.filename = filename
self.archive = archive


class DocumentMapModuleParallelExecutor(DocumentMapModuleExecutor):
"""
Inherit from this class, instead of DocumentMapModuleExecutor, to provide parallelization.
"""
def preprocess_parallel(self):
""" Defaults to calling preprocess() """
self.preprocess()

def postprocess_parallel(self, error=False):
""" Defaults to calling postprocess() """
self.postprocess(error=error)

def create_pool(self, processes):
"""
Should return an instance of the pool to be used for document processing. Should generally be a
subclass of DocumentProcessorPool.
Always called after postprocess_parallel().
"""
raise NotImplementedError

def execute(self):
"""
Gets called instead of execute() if the config asks for multiple processes to be used.
"""
processes = self.info.pipeline.processes
if processes < 2:
# Not multiprocessing: just use the single-core version
super(DocumentMapModuleExecutor, self).execute()
else:
# We may have multiple inputs, which should be aligned tarred corpora
# If there's only one, this also works
self.input_corpora = [self.info.get_input(input_name)
for input_name in self.info.input_names]
input_iterator = AlignedTarredCorpora(self.input_corpora)

# Call the set-up routine, if one's been defined
self.log.info("Preparing parallel document map execution for %d documents with %d processes" %
(len(input_iterator), processes))
self.preprocess_parallel()

# Start up a pool
pool = self.create_pool(processes)
output_queue = pool.queue
self.log.info("Process pool created for processing %d documents in parallel" % processes)

pbar = get_progress_bar(len(input_iterator),
title="%s map" % self.info.module_type_name.replace("_", " ").capitalize())
complete = False
docs_processing = []
result_buffer = {}
docs_complete = 0
try:
# Prepare a corpus writer for the output
with multiwith(*self.get_writers()) as writers:
# Inputs will be taken from this as they're needed
input_iter = iter(input_iterator.archive_iter())
# Push the first inputs into the pool
for archive, filename, docs in islice(input_iter, processes):
pool.process_document(archive, filename, *docs)
# Keep track of the order we need the results in
docs_processing.append((archive, filename))

# Look for the first document coming off the queue
while len(docs_processing):
result = output_queue.get()
docs_complete += 1
pbar.update(docs_complete)
# We've got some result, but it might not be the one we're looking for
# Add it to a buffer, so we can potentially keep it and only output it when its turn comes up
result_buffer[(result.archive, result.filename)] = result.data
# Before doing anything else, set the next job going
try:
archive, filename, docs = input_iter.next()
except StopIteration:
# No more documents to send: just receive the remaining results until we've got all the
# ones we've already sent
pass
else:
pool.process_document(archive, filename, *docs)
docs_processing.append((archive, filename))

# Write out as many as we can of the docs that have been sent and whose output is available
# while maintaining the order they were put in in
while len(docs_processing) and docs_processing[0] in result_buffer:
archive, filename = docs_processing.pop(0)
next_output = result_buffer.pop((archive, filename))
# Next document processed: output the result precisely as in the single-core case
if type(next_output) is InvalidDocument:
# Just got a single invalid document out: write it out to every output
next_output = [next_output] * len(writers)
elif type(next_output) is not tuple:
# If the processor produces a single result and there's only one output, fine
next_output = (next_output,)
if len(next_output) != len(writers):
raise ModuleExecutionError(
"%s executor's process_document() returned %d results for a document, but the "
"module has %d outputs" % (type(self).__name__, len(next_output), len(writers))
)
# Write the result to the output corpora
for result, writer in zip(next_output, writers):
writer.add_document(archive, filename, result)

pbar.finish()
complete = True
finally:
# Call the finishing-off routine, if one's been defined
if complete:
self.log.info("Document mapping complete. Finishing off")
else:
self.log.info("Document mapping failed. Finishing off")
self.postprocess_parallel(error=not complete)

0 comments on commit 34f9523

Please sign in to comment.