Skip to content

Commit

Permalink
Improvements to opennlp wrapper modules and Py4J tools in general.
Browse files Browse the repository at this point in the history
New datatypes for storing annotations along with tokenized text, used by POS tagger, for example.
Further dev of regex search tool for extracting textual features.
  • Loading branch information
Mark Granroth-Wilding committed Mar 28, 2016
1 parent 2eee27b commit 6dcb008
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 109 deletions.
12 changes: 12 additions & 0 deletions src/java/pimlico/core/Py4JGatewayStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import py4j.GatewayServer;
import py4j.Py4JNetworkException;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.net.BindException;
import java.util.List;

Expand Down Expand Up @@ -47,6 +51,14 @@ public static void startGateway(Object entryPoint, int port, int pythonPort) {
System.out.println("" + listening_port);
System.out.flush();
} catch (RuntimeException e) {
// Write the full stack trace out to a file to help identify what went wrong
try {
PrintStream ps = new PrintStream(new File(System.getProperty("user.home") + "/py4jgateway.log"));
e.printStackTrace(ps);
ps.close();
} catch (IOException e1) {
e1.printStackTrace();
}
// If we have any errors starting the server, output them on stderr
System.err.println("Error starting server: " + e);
// Also output a line to stdout, so the caller isn't left hanging waiting for something
Expand Down
23 changes: 20 additions & 3 deletions src/python/pimlico/core/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""
from importlib import import_module
import os
from types import FunctionType

from pimlico.core.config import PipelineStructureError
from pimlico.core.modules.options import process_module_options
Expand Down Expand Up @@ -201,6 +202,10 @@ def get_output_datatype(self, output_name=None):
raise PipelineStructureError("%s module does not have an output named '%s'. Available outputs: %s" %
(self.module_type_name, output_name, ", ".join(outputs.keys())))
datatype = outputs[output_name]
# The datatype might be a dynamic type -- a function that we call to get the type
if type(datatype) is FunctionType:
# Call the function to build the datatype
datatype = datatype(self)
return output_name, datatype

def instantiate_output_datatype(self, output_name, output_datatype):
Expand Down Expand Up @@ -237,6 +242,15 @@ def get_input_module_connection(self, input_name=None):
previous_module = self.pipeline[previous_module_name]
return previous_module, output_name

def get_input_datatype(self, input_name=None):
"""
Get a datatype class corresponding to one of the inputs to the module.
If an input name is not given, the first input is returned.
"""
previous_module, output_name = self.get_input_module_connection(input_name)
return previous_module.get_output_datatype(output_name)

def get_input(self, input_name=None):
"""
Get a datatype instance corresponding to one of the inputs to the module.
Expand Down Expand Up @@ -274,17 +288,20 @@ def typecheck_inputs(self):
module_inputs = dict(self.module_inputs)
for input_name, (dep_module_name, output) in self.inputs.items():
# Check the type of each input in turn
input_type_requirement = module_inputs[input_name]
input_type_requirements = module_inputs[input_name]
# Input types may be tuples, to allow multiple types
if type(input_type_requirements) is not tuple:
input_type_requirements = (input_type_requirements,)
# Load the dependent module
dep_module = self.pipeline[dep_module_name]
# Try to load the named output (or the default, if no name was given)
output_name, dep_module_output = dep_module.get_output_datatype(output_name=output)
# Check that the provided output type is a subclass of (or equal to) the required input type
if not issubclass(dep_module_output, input_type_requirement):
if not issubclass(dep_module_output, input_type_requirements):
raise PipelineStructureError(
"module %s's %s-input is required to be of %s type (or a descendent), but module %s's "
"%s-output provides %s" % (
self.module_name, input_name, input_type_requirement.__name__,
self.module_name, input_name, "/".join(t.__name__ for t in input_type_requirements),
dep_module_name, output_name, dep_module_output.__name__
))

Expand Down
11 changes: 10 additions & 1 deletion src/python/pimlico/core/modules/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ class DocumentMapModuleExecutor(BaseModuleExecutor):
def process_document(self, filename, *docs):
raise NotImplementedError

def get_writer(self, info):
"""
Get the writer instance that will be given processed documents to write. Should return
a subclass of TarredCorpusWriter. The default implementation instantiates a plain
TarredCorpusWriter.
"""
return TarredCorpusWriter(info.get_output_dir("documents"))

def preprocess(self, info):
"""
Allows subclasses to define a set-up procedure to be called before corpus processing begins.
Expand Down Expand Up @@ -56,7 +65,7 @@ def execute(self, module_instance_info):
complete = False
try:
# Prepare a corpus writer for the output
with TarredCorpusWriter(module_instance_info.get_output_dir("documents")) as writer:
with self.get_writer(module_instance_info) as writer:
for archive, filename, docs in pbar(input_iterator.archive_iter()):
# Get the subclass to process the doc
result = self.process_document(filename, *docs)
Expand Down
2 changes: 1 addition & 1 deletion src/python/pimlico/datatypes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PimlicoDatatype(object):
Datatypes are used to specify the routines for reading the output from modules. They're also
used to specify how to read pipeline inputs. Most datatypes that have data simply read it in
when required. Some (in particular those use as inputs) need a preparation phase to be run,
when required. Some (in particular those used as inputs) need a preparation phase to be run,
where the raw data itself isn't sufficient to implement the reading interfaces required. In this
case, they should override prepare_data().
Expand Down
6 changes: 4 additions & 2 deletions src/python/pimlico/datatypes/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

class TarredCorpus(IterableDocumentCorpus):
datatype_name = "tar"
# This may be overridden by subclasses to provide filters for documents applied before main doc processing
document_preprocessors = []

def __init__(self, base_dir, raw_data=False):
"""
Expand Down Expand Up @@ -63,7 +65,7 @@ def archive_iter(self, subsample=None, start=0):
filename = tarinfo.name
# Read in the data
with open(os.path.join(tmp_dir, filename), "r") as f:
document = f.read()
document = f.read().decode("utf-8")
# Apply subclass-specific post-processing if we've not been asked to yield just the raw data
if not self.raw_data:
document = self.process_document(document)
Expand Down Expand Up @@ -119,7 +121,7 @@ def add_document(self, archive_name, doc_name, data):
self.doc_count += 1

# Add a new document to archive
data_file = StringIO.StringIO(data)
data_file = StringIO.StringIO(data.encode("utf-8"))
info = tarfile.TarInfo(name=doc_name)
info.size = len(data_file.buf)
self.current_archive_tar.addfile(info, data_file)
Expand Down
211 changes: 211 additions & 0 deletions src/python/pimlico/datatypes/word_annotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
from operator import itemgetter
import re

from pimlico.datatypes.base import DatatypeLoadError
from pimlico.datatypes.tar import TarredCorpus, TarredCorpusWriter
from pimlico.modules.opennlp.tokenize.datatypes import TokenizedCorpus


class WordAnnotationCorpus(TarredCorpus):
# Subclasses may provide a list of the fields included for each word
# Doing so allows an extra level of type checking, since datatype users know before the dataset is available
# which fields will be in it
annotation_fields = None

def __init__(self, base_dir):
super(WordAnnotationCorpus, self).__init__(base_dir)
self._sentence_boundary_re = None
self._word_re = None
self._word_boundary = None

def read_annotation_fields(self):
"""
Get the available annotation fields from the dataset's configuration. These are the actual
fields that will be available in the dictionary produced corresponding to each word.
"""
# To make sure the fields are in the order in which they're specified, order by matching group number
return list(map(itemgetter(0), sorted(self.word_re.groupindex.items(), key=itemgetter(1))))

@property
def sentence_boundary_re(self):
if self._sentence_boundary_re is None:
# Load the annotation format from the metadata
if "sentence_boundary" not in self.metadata:
raise DatatypeLoadError(
"word annotation corpus does not have a sentence boundary specified in its metadata"
)
# Prepare a regex for detecting sentence boundaries
boundary = self.metadata["sentence_boundary"].replace("\\n", "\n")
# Interpret the boundary specifier as a regex
self._sentence_boundary_re = re.compile(re.escape(boundary), re.MULTILINE)
return self._sentence_boundary_re

@property
def word_boundary(self):
if self._word_boundary is None:
# Load the annotation format from the metadata
if "word_boundary" not in self.metadata:
raise DatatypeLoadError(
"word annotation corpus does not have a word boundary specified in its metadata"
)
# Allow \n to be used as (or in) the word boundary
self._word_boundary = self.metadata["word_boundary"].replace("\\n", "\n")
return self._word_boundary

@property
def word_re(self):
if self._word_re is None:
# Load the annotation format from the metadata
if "word_format" not in self.metadata:
raise DatatypeLoadError(
"word annotation corpus does not have a word format specified in its metadata"
)
if "nonword_chars" not in self.metadata:
raise DatatypeLoadError(
"word annotation corpus does not have non-word chars specified in its metadata"
)
# Prepare a regex for detecting sentence boundaries
fmt = self.metadata["word_format"].replace("\\n", "\n")
# First escape the whole thing so that we can use characters that have a special meaning in a regex
fmt = re.escape(fmt)
# The word format includes field specifiers of the form {name}
# Replace these to make a regex with a named group
nonwords = self.metadata["nonword_chars"].replace("\\n", "\n")
word_re = re.sub(r"\\{(.+?)\\}", r"(?P<\1>[^%s]*)" % re.escape(nonwords), fmt)
# Require the re to match the full string: it will only be called on individual words
word_re = "%s$" % word_re
# Compile the resulting re
self._word_re = re.compile(word_re)
return self._word_re

def parse_annotations(self, doc):
sentences = []
while len(doc):
# Find the next sentence boundary
sb = self.sentence_boundary_re.search(doc)
if sb is None:
# No more sentence boundaries, the rest must be a single sentence
sentence_text = doc
doc = ""
else:
# Process the text up to the next boundary as a sentence
sentence_text = doc[:sb.start()]
doc = doc[sb.end():]
# Split the sentence on word boundaries
words = sentence_text.split(self.word_boundary)
# Parse each word
word_dicts = [self.word_re.match(word).groupdict() for word in words]
# Check that all the words matched the word re
if None in word_dicts:
raise AnnotationParseError("word did not match regex for word format: %s. Matching using: %s" %
(words[word_dicts.index(None)], self.word_re.pattern))
sentences.append(word_dicts)
return sentences

document_preprocessors = [parse_annotations]

def data_ready(self):
if not super(WordAnnotationCorpus, self).data_ready():
return False
# We now know at least that the data dir exists
# Check the required formats are specified in the metadata
try:
self.sentence_boundary_re
self.word_boundary
self.word_re
except DatatypeLoadError:
return False
else:
return True


class WordAnnotationCorpusWriter(TarredCorpusWriter):
"""
Ensures that the correct metadata is provided for a word annotation corpus. Doesn't take care of
the formatting of the data: that needs to be done by the writing code, or by a subclass.
"""

def __init__(self, sentence_boundary, word_boundary, word_format, nonword_chars, base_dir):
super(WordAnnotationCorpusWriter, self).__init__(base_dir)
self.metadata["sentence_boundary"] = sentence_boundary.replace("\n", "\\n")
self.metadata["word_boundary"] = word_boundary.replace("\n", "\\n")
self.metadata["word_format"] = word_format.replace("\n", "\\n")
self.metadata["nonword_chars"] = nonword_chars.replace("\n", "\\n")


class SimpleWordAnnotationCorpusWriter(WordAnnotationCorpusWriter):
"""
Takes care of writing word annotations in a simple format, where each line contains a sentence, words
are separated by spaces and a series of annotation fields for each word are separated by |s (or a given
separator). This corresponds to the standard tag format for C&C.
"""

def __init__(self, base_dir, field_names, field_sep="|"):
self.field_names = field_names
self.field_sep = field_sep
# Prepare a word format that includes the given field names
word_format = field_sep.join("{%s}" % field for field in field_names)
super(SimpleWordAnnotationCorpusWriter, self).__init__("\n", " ", word_format, " \n%s" % field_sep, base_dir)

def add_document(self, archive_name, doc_name, data):
"""
Takes data in the form of a list of sentences, where each is a list of words, where each
is a list of values for each field (in the same order in which the field names were given).
Encodes it in a format that can be read by a WordAnnotationCorpus.
:param archive_name: current archive
:param doc_name: document being added
:param data: sentence data in the form described above
"""
doc_string = \
"\n".join(" ".join(self.field_sep.join(word_fields) for word_fields in sentence) for sentence in data)
super(SimpleWordAnnotationCorpusWriter, self).add_document(archive_name, doc_name, doc_string)


def add_annotation_field(input_name, add_fields):
"""
Dynamic type constructor that can be used in place of a module's output type. When called
(when the output type is needed), dynamically creates a new type that is a WordAnnotationCorpus
with the same fields as the named input to the module, with the addition of one or more new ones.
Only works if the input datatype explicitly declares the fields it makes available.
:param module_info: ModuleInfo instance
:param input_name: input to the module whose fields we extend
:param add_fields: field or fields to add, string names
:return: new datatype, subclass of WordAnnotationCorpus
"""
# Make it easy to add just a single field, the most common case
if isinstance(add_fields, basestring):
add_fields = [add_fields]

def _builder(module_info):
from pimlico.core.modules.base import ModuleInfoLoadError

input_datatype = module_info.get_input_datatype(input_name)[1]
# Allow the special case where the input datatype is a tokenized corpus
# Pretend it's an annotated corpus with no annotations, just words
if issubclass(input_datatype, TokenizedCorpus):
base_annotation_fields = ["word"]
else:
if not issubclass(input_datatype, WordAnnotationCorpus):
raise ModuleInfoLoadError("cannot construct a dynamic word annotation corpus type, since input we're "
"extending isn't a word annotation corpus. Input '%s' is a %s" %
(input_name, input_datatype.__name__))
if input_datatype.annotation_fields is None:
raise ModuleInfoLoadError("cannot construct a word annotation corpus type by adding fields to input '%s', "
"since the input type, %s, doesn't explicitly declare its annotation fields" %
(input_name, input_datatype.__name__))
base_annotation_fields = input_datatype.annotation_fields

class ExtendedWordAnnotationCorpus(WordAnnotationCorpus):
annotation_fields = base_annotation_fields + add_fields

return ExtendedWordAnnotationCorpus
return _builder


class AnnotationParseError(Exception):
pass
19 changes: 8 additions & 11 deletions src/python/pimlico/modules/opennlp/pos/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
from pimlico.datatypes.tar import TarredCorpus
from pimlico.datatypes.word_annotations import WordAnnotationCorpus, SimpleWordAnnotationCorpusWriter


class PosTaggedCorpus(TarredCorpus):
class PosTaggedCorpus(WordAnnotationCorpus):
"""
Specialized datatype for a tarred corpus that's had POS tagging applied.
Each document is a list of sentences. Each sentence is a list of words. Each word is a list of
pairs (word, POS tag).
Each document is a list of sentences. Each sentence is a list of words. Each word is a dict including the
word and its POS tag.
"""
def process_document(self, data):
return [
[_word_tag_pair(word) for word in sentence.split(" ")] for sentence in data.split("\n")
]
annotation_fields = ["word", "pos"]


def _word_tag_pair(text):
word, __, tag = text.rpartition("|")
return word, tag
class PosTaggedCorpusWriter(SimpleWordAnnotationCorpusWriter):
def __init__(self, base_dir):
super(PosTaggedCorpusWriter, self).__init__(base_dir, ["word", "pos"])

0 comments on commit 6dcb008

Please sign in to comment.