Skip to content

Commit

Permalink
Added new modules for extracting features from documents. Some improv…
Browse files Browse the repository at this point in the history
…ements to CoreNLP and regex modules
  • Loading branch information
Mark Granroth-Wilding committed Apr 4, 2016
1 parent f90ac5b commit 603fc99
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 34 deletions.
6 changes: 6 additions & 0 deletions docs/wishlist.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,11 @@ Things I plan to add to Pimlico:

- Bundle Pimlico Java code as jars, so user doesn't need to compile. Dead easy - just add jar target to ant
builds and check in resulting jars
- Reconsider TarredCorpus as the common dataset format. There's no obvious advantage over just grouping documents
in directories, which has the advantage of allowing freer parallelization. However, this might run into filesystem
trouble where there's a very large number of docs in a corpus (many inodes in subdirectories)
- Or consider parallelizing at the archive level: allow TarredCorpus to iterate over specific archives, then send
each archive off to be processed in parallel, meaning we can write files linearly within an archive, but have many
(roughly equally sized) archives going at once

*I'll add to this list as I think of things...*
16 changes: 15 additions & 1 deletion src/python/pimlico/cli/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import argparse
import sys
from operator import itemgetter

from pimlico.cli.check import check_cmd
from pimlico.core.config import PipelineConfig, PipelineConfigParseError
from pimlico.core.modules.execute import execute_module, ModuleExecutionError
Expand Down Expand Up @@ -44,6 +46,10 @@ def list_variants(pipeline, opts):
parser.add_argument("--debug", "-d", help="Output verbose debugging info", action="store_true")
parser.add_argument("--variant", "-v", help="Load a particular variant of a pipeline. For a list of available "
"variants, use the 'variants' command", default="main")
parser.add_argument("--override-local-config", "--olc",
help="Override a parameter set in the local config files (usually ~/.pimlico.conf). For just "
"this execution. Specify as param=value. Use this option multiple times to override "
"more than one parameter", action="append")
subparsers = parser.add_subparsers(help="Select a sub-command")

check = subparsers.add_parser("check",
Expand All @@ -68,9 +74,17 @@ def list_variants(pipeline, opts):

opts = parser.parse_args()

if opts.override_local_config is not None:
override_local = dict(
itemgetter(0, 2)(param.partition("=")) for param in opts.override_local_config
)
else:
override_local = {}

# Read in the pipeline config from the given file
try:
pipeline = PipelineConfig.load(opts.pipeline_config, variant=opts.variant)
pipeline = PipelineConfig.load(opts.pipeline_config, variant=opts.variant,
override_local_config=override_local)
except PipelineConfigParseError, e:
print >>sys.stderr, "Error reading pipeline config: %s" % e
sys.exit(1)
Expand Down
49 changes: 31 additions & 18 deletions src/python/pimlico/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class PipelineConfig(object):
directive is found, is given after a space.
"""
def __init__(self, pipeline_config, local_config, raw_module_configs, module_order, filename=None,
def __init__(self, name, pipeline_config, local_config, raw_module_configs, module_order, filename=None,
variant="main", available_variants=[]):
self.available_variants = available_variants
self.variant = variant
Expand All @@ -64,16 +64,7 @@ def __init__(self, pipeline_config, local_config, raw_module_configs, module_ord
self.raw_module_configs = raw_module_configs
self.pipeline_config = pipeline_config
self.filename = filename

# Process configs to get out the core things we need
if "name" not in self.pipeline_config:
raise PipelineConfigParseError("pipeline name must be specified as 'name' attribute in pipeline section")
self.name = self.pipeline_config["name"]
# Check that this pipeline is compatible with the Pimlico version being used
if "release" not in self.pipeline_config:
raise PipelineConfigParseError("Pimlico release version must be specified as 'release' attribute in "
"pipeline section")
check_release(self.pipeline_config["release"])
self.name = name

# Certain standard system-wide settings, loaded from the local config
self.long_term_store = os.path.join(self.local_config["long_term_store"], self.name, self.variant)
Expand Down Expand Up @@ -110,10 +101,6 @@ def __init__(self, pipeline_config, local_config, raw_module_configs, module_ord
self._module_info_cache = {}
self._module_schedule = None

# Now that we've got the pipeline instance prepared, load all the module info instances, so they've cached
for module_name in module_order:
self.load_module_info(module_name)

@property
def modules(self):
return self.module_order
Expand Down Expand Up @@ -200,7 +187,9 @@ def path_relative_to_config(self, path):
return os.path.abspath(os.path.join(config_dir, path))

@staticmethod
def load(filename, local_config=None, variant="main"):
def load(filename, local_config=None, variant="main", override_local_config={}):
from pimlico.core.modules.base import ModuleInfoLoadError

if variant is None:
variant = "main"

Expand All @@ -226,6 +215,8 @@ def load(filename, local_config=None, variant="main"):
local_config_parser = SafeConfigParser()
local_config_parser.readfp(local_text_buffer)
local_config_data = dict(local_config_parser.items("main"))
# Allow parameters to be overridden on the command line
local_config_data.update(override_local_config)

for attr in REQUIRED_LOCAL_CONFIG:
if attr not in local_config_data:
Expand Down Expand Up @@ -270,9 +261,31 @@ def load(filename, local_config=None, variant="main"):
except ConfigParser.Error, e:
raise PipelineConfigParseError("could not parse config file. %s" % e)

# Process configs to get out the core things we need
if "name" not in pipeline_config:
raise PipelineConfigParseError("pipeline name must be specified as 'name' attribute in pipeline section")
name = pipeline_config["name"]

# Check that this pipeline is compatible with the Pimlico version being used
if "release" not in pipeline_config:
raise PipelineConfigParseError("Pimlico release version must be specified as 'release' attribute in "
"pipeline section")
check_release(pipeline_config["release"])

# Do no further checking or processing at this stage: just keep raw dictionaries for the time being
return PipelineConfig(pipeline_config, local_config_data, raw_module_options, module_order,
filename=filename, variant=variant, available_variants=list(sorted(available_variants)))
pipeline = PipelineConfig(
name, pipeline_config, local_config_data, raw_module_options, module_order,
filename=filename, variant=variant, available_variants=list(sorted(available_variants))
)

# Now that we've got the pipeline instance prepared, load all the module info instances, so they've cached
for module_name in module_order:
try:
pipeline.load_module_info(module_name)
except ModuleInfoLoadError, e:
raise PipelineConfigParseError("error loading module metadata for module '%s': %s" % (module_name, e))

return pipeline


def var_substitute(option_val, vars):
Expand Down
4 changes: 2 additions & 2 deletions src/python/pimlico/core/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ def load_module_info(path):
try:
mod = import_module(info_path)
except ImportError:
raise ModuleInfoLoadError("module %s could not be imported, could not import %s" % (path, info_path))
raise ModuleInfoLoadError("module type '%s' could not be found (could not import %s)" % (path, info_path))

if not hasattr(mod, "ModuleInfo"):
raise ModuleInfoLoadError("could not load class %s.ModuleInfo" % info_path)
raise ModuleInfoLoadError("invalid module type code: could not load class %s.ModuleInfo" % info_path)
return mod.ModuleInfo
2 changes: 2 additions & 0 deletions src/python/pimlico/core/modules/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def execute_module(pipeline, module_name, force_rerun=False, debug=False):
log.info("Loaded pipeline %s" % pipeline_name)

# Load the module instance
if module_name not in pipeline.modules:
raise ModuleExecutionError("%s pipeline doesn't have a module called '%s'" % (pipeline.name, module_name))
module = pipeline[module_name]
log.info("Checking module config")
# Run basic checks on the config for this module
Expand Down
114 changes: 114 additions & 0 deletions src/python/pimlico/datatypes/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from operator import itemgetter

from pimlico.core.modules.map import skip_invalid, invalid_doc_on_error
from pimlico.datatypes.tar import TarredCorpus, TarredCorpusWriter, pass_up_invalid


class KeyValueListCorpus(TarredCorpus):
def __init__(self, base_dir, pipeline):
super(KeyValueListCorpus, self).__init__(base_dir, pipeline)
self.separator = self.metadata.get("separator", " ")
self.fv_separator = self.metadata.get("fv_separator", "=")

@skip_invalid
def process_document(self, data):
# Read a set of feature-value pairs from each line
data_points = []
for line in data.splitlines():
# Skip blank lines
if line.strip():
# Split up the various feature assignments
fvs = line.strip().split(self.separator)
# Now we've split on sep, unescape any instances that were escaped
fvs = [unescape_sep(self.separator, "ITEMSEP", fv) for fv in fvs]
# Split each one into a feature-value pair
fvs = [itemgetter(0, 2)(fv.split(self.fv_separator)) for fv in fvs]
# Unescape the fv sep within feature names and feature values
fvs = [
(unescape_sep(self.fv_separator, "FVSEP", fv[0]), unescape_sep(self.fv_separator, "FVSEP", fv[1]))
for fv in fvs
]
data_points.append(fvs)
return data_points


class KeyValueListCorpusWriter(TarredCorpusWriter):
def __init__(self, base_dir, separator=" ", fv_separator="="):
super(KeyValueListCorpusWriter, self).__init__(base_dir)
self.fv_separator = fv_separator
self.separator = separator
# Put the separators in the metadata, so we know how to read the data in again
self.metadata["separator"] = separator
self.metadata["fv_separator"] = fv_separator

@pass_up_invalid
def add_document(self, archive_name, doc_name, data):
# Input should be a list of data points, where each is a list of feature-value pairs
# One data point per line
data = "\n".join([
# Fv pairs are separated by separator
self.separator.join([
# Make sure they don't include the separator
escape_sep(
self.separator, "ITEMSEP",
# Feature and value are separated by fv_separator
"%s%s%s" % (
# Make sure they don't include the fv separator
escape_sep(self.fv_separator, "FVSEP", feature_name),
self.fv_separator,
escape_sep(self.fv_separator, "FVSEP", feature_value)
)
) for (feature_name, feature_value) in data_point
]) for data_point in data
])
super(KeyValueListCorpusWriter, self).add_document(archive_name, doc_name, data)


class TermFeatureListCorpus(KeyValueListCorpus):
"""
Special case of KeyValueListCorpus, where one special feature "term" is always present and the other
feature types are counts of the occurrence of a particular feature with this term in each data point.
"""
def __init__(self, base_dir, pipeline):
super(TermFeatureListCorpus, self).__init__(base_dir, pipeline)

@skip_invalid
@invalid_doc_on_error
def process_document(self, data):
data = super(TermFeatureListCorpus, self).process_document(data)

data_points = []
for data_point in data:
# Pull out the special "term" feature (usually at the beginning)
try:
term = (value for (feature, value) in data_point if feature == "term").next()
except StopIteration:
# No "term" feature found -- uh-oh! Catch as invalid doc
raise ValueError("data point has no 'term' feature: %s" % data_point)
# The rest of the features are feature counts
features = dict((feature, int(value)) for (feature, value) in data_point if feature != "term")
data_points.append((term, features))
return data_points


class TermFeatureListCorpusWriter(KeyValueListCorpusWriter):
def __init__(self, base_dir, separator=" ", fv_separator="="):
super(TermFeatureListCorpusWriter, self).__init__(base_dir, separator=" ", fv_separator="=")

@pass_up_invalid
def add_document(self, archive_name, doc_name, data):
# Input should be a list of data points, where each is a (term, feature count) pair
data = [
[("term", term)] + [(feature, str(count)) for (feature, count) in feature_counts.items()]
for (term, feature_counts) in data
]
super(TermFeatureListCorpusWriter, self).add_document(archive_name, doc_name, data)


def escape_sep(sep, sep_type, text):
return text.replace(sep, "~~%s~~" % sep_type)


def unescape_sep(sep, sep_type, text):
return text.replace("~~%s~~" % sep_type, sep)
8 changes: 8 additions & 0 deletions src/python/pimlico/datatypes/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import gzip
import os
import traceback
from multiprocessing import Pool, Queue

import time
Expand Down Expand Up @@ -179,6 +180,13 @@ def count_files_parallel(filenames, truncate, document_node_type, processes):


def count_files_process(filename, document_node_type):
# BS can go very slowly if it tries to use chardet to detect input encoding
# Remove chardet and cchardet from the Python modules, so that import fails and it doesn't try to use them
# This prevents it getting stuck on reading long input files
import sys
sys.modules["cchardet"] = None
sys.modules["chardet"] = None
# Now we can import BS
from bs4 import BeautifulSoup

if filename.endswith(".gz"):
Expand Down
3 changes: 3 additions & 0 deletions src/python/pimlico/modules/corenlp/exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def preprocess(self):
self.corenlp.start()
self.log.info("CoreNLP server started on %s" % self.corenlp.server_url)
self.log.info("Calling CoreNLP with annotators: %s" % ", ".join(annotators))
self.log.info("Annotations that will be available on the output: %s" % ", ".join(
self.info.get_output_datatype("annotations")[1].annotation_fields
))
self.properties = {
"annotators": ",".join(annotators),
"outputFormat": "json",
Expand Down
14 changes: 5 additions & 9 deletions src/python/pimlico/modules/corenlp/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@ def annotation_fields_from_options(module_info):
from pimlico.core.modules.base import ModuleInfoLoadError

input_datatype = module_info.get_input_datatype("documents")[1]
# Allow the special case where the input datatype is a tokenized corpus
# Pretend it's an annotated corpus with no annotations, just words
if issubclass(input_datatype, (TokenizedCorpus, TarredCorpus)):
base_annotation_fields = []
else:
if not issubclass(input_datatype, WordAnnotationCorpus):
raise ModuleInfoLoadError("cannot construct a dynamic word annotation corpus type, since input we're "
"extending isn't a word annotation corpus. Input is a %s" %
input_datatype.__name__)
if issubclass(input_datatype, WordAnnotationCorpus):
if input_datatype.annotation_fields is None:
raise ModuleInfoLoadError("cannot construct a word annotation corpus type by adding fields to input, "
"since the input type, %s, doesn't explicitly declare its annotation fields" %
input_datatype.__name__)
base_annotation_fields = input_datatype.annotation_fields
else:
# Allow the special case where the input datatype is a tokenized corpus
# Pretend it's an annotated corpus with no annotations, just words
base_annotation_fields = []

# Look at the options to see what annotations are going to be added
add_fields = module_info.options["annotators"].split(",") if module_info.options["annotators"] else []
Expand Down
Empty file.
Empty file.
58 changes: 58 additions & 0 deletions src/python/pimlico/modules/features/term_feature_compiler/info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import random

from pimlico.core.modules.map import DocumentMapModuleInfo
from pimlico.datatypes.features import TermFeatureListCorpus, KeyValueListCorpus
from pimlico.datatypes.tar import TarredCorpus


# Subclass TermFeatureListCorpus so that inputs expecting one can accept this
# TODO There should be a better superclass for doc-doc filters like this
class TermFeatureListCorpusFilter(TermFeatureListCorpus):
def __init__(self, input_datatype, pipeline, **kwargs):
TarredCorpus.__init__(self, None, pipeline, **kwargs)
self.input_datatype = input_datatype

def __len__(self):
return len(self.input_datatype)

def archive_iter(self, subsample=None, start=0):
# TODO Implement this, which does the key
tarballs = self.tarballs

current_archive = 0
current_archive_count = 0

for file_num, (doc_name, doc) in enumerate(self.input_datatype):
# Allow the first portion of the corpus to be skipped
if file_num < start:
continue
# If subsampling, decide whether to extract this file
if subsample is not None and random.random() > subsample:
# Reject this file
continue

# Check whether we've put enough files in the current archive to move onto the next
if current_archive_count == self.archive_size:
current_archive += 1
current_archive_count = 0

yield tarballs[current_archive], doc_name, doc

def data_ready(self):
return self.input_datatype.data_ready()


class ModuleInfo(DocumentMapModuleInfo):
module_type_name = "term_feature_list_filter"
module_inputs = [("key_values", KeyValueListCorpus)]
module_outputs = [("term_features", TermFeatureListCorpusFilter)]
module_options = [
# TODO Add some options
]
module_executable = False

def instantiate_output_datatype(self, output_name, output_datatype):
if output_name == "term_features":
return TermFeatureListCorpusFilter(self.pipeline, self.get_input("key_values"))
else:
return super(ModuleInfo, self).instantiate_output_datatype(output_name, output_datatype)

0 comments on commit 603fc99

Please sign in to comment.