Skip to content

Commit

Permalink
Extended CoreNLP module to do more than just word annotations -- also…
Browse files Browse the repository at this point in the history
… parsing, with the possibility of easily adding more outputs in future.

New module output handling that permits optional outputs, which the config file can choose whether to include.
  • Loading branch information
Mark Granroth-Wilding committed Mar 30, 2016
1 parent 59f5e80 commit 2492d27
Show file tree
Hide file tree
Showing 22 changed files with 355 additions and 149 deletions.
14 changes: 3 additions & 11 deletions src/python/pimlico/cli/check.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
import sys

from pimlico.core.config import check_for_cycles, PipelineStructureError
from pimlico.core.modules.base import ModuleInfoLoadError
from pimlico.utils.format import multiline_tablate


def check_cmd(pipeline, opts):
# Metadata has already been loaded if we've got this far
print "All module metadata loaded successfully"

# Output what variants are available and say which we're checking
print "Available pipeline variants: %s" % ", ".join(["main"] + pipeline.available_variants)
print "Checking variant '%s'\n" % pipeline.variant

# Load all the modules' metadata
# This makes sure none of the modules have trouble loading
for name in pipeline.modules:
try:
pipeline[name]
except ModuleInfoLoadError, e:
print "Error loading module %s: %s" % (name, e)
sys.exit(1)
print "All module metadata loaded successfully"

# Check the pipeline for cycles
# This will raise an exception if a cycle is found
try:
Expand Down
1 change: 1 addition & 0 deletions src/python/pimlico/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def schedule_cmd(pipeline, opts):
# Check status of each input datatypes
for input_name in module.input_names:
print " input %s: %s" % (input_name, "ready" if module.input_ready(input_name) else "not ready")
print " outputs: %s" % ", ".join(module.output_names)


def run_cmd(pipeline, opts):
Expand Down
39 changes: 34 additions & 5 deletions src/python/pimlico/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
Reading of various types of config files, in particular a pipeline config.
"""
import ConfigParser
from ConfigParser import SafeConfigParser, RawConfigParser
from cStringIO import StringIO
import os
from ConfigParser import SafeConfigParser, RawConfigParser

import sys
from cStringIO import StringIO

REQUIRED_LOCAL_CONFIG = ["short_term_store", "long_term_store"]

Expand Down Expand Up @@ -74,8 +75,11 @@ def __init__(self, pipeline_config, local_config, raw_module_configs, module_ord
"pipeline section")
check_release(self.pipeline_config["release"])

# Certain standard system-wide settings, loaded from the local config
self.long_term_store = os.path.join(self.local_config["long_term_store"], self.name, self.variant)
self.short_term_store = os.path.join(self.local_config["short_term_store"], self.name, self.variant)
# Number of processes to use for anything that supports multiprocessing
self.processes = int(self.local_config.get("processes", 1))

# Get paths to add to the python path for the pipeline
# Used so that a project can specify custom module types and other python code outside the pimlico source tree
Expand All @@ -86,9 +90,30 @@ def __init__(self, pipeline_config, local_config, raw_module_configs, module_ord
# Add these paths for the python path, so later code will be able to import things from them
sys.path.extend(additional_paths)

# Some modules need to know which of their (potential) outputs get used by other models when they're loaded
# Since many modules need to load those they're dependent on while loading, this becomes cyclic!
# Build a list of used outputs, before loading any modules
self.used_outputs = {}
for module_name in module_order:
# Do minimal processing to get input connections: more thorough checks are done during instantiation
for opt_name, opt_value in raw_module_configs[module_name].items():
if opt_name == "input" or opt_name.startswith("input_"):
if "." in opt_value:
# Module name + output name
input_module, __, input_module_output = opt_value.partition(".")
else:
# Module name, with default output
input_module = opt_value
input_module_output = None
self.used_outputs.setdefault(input_module, set([])).add(input_module_output)

self._module_info_cache = {}
self._module_schedule = None

# Now that we've got the pipeline instance prepared, load all the module info instances, so they've cached
for module_name in module_order:
self.load_module_info(module_name)

@property
def modules(self):
return self.module_order
Expand Down Expand Up @@ -128,11 +153,11 @@ def load_module_info(self, module_name):

# Pass in all other options to the info constructor
options_dict = dict(module_config)
inputs, options = module_info_class.process_config(options_dict)
inputs, optional_outputs, options = module_info_class.process_config(options_dict)

# Instantiate the module info
self._module_info_cache[module_name] = \
module_info_class(module_name, self, inputs=inputs, options=options)
module_info_class(module_name, self, inputs=inputs, options=options, optional_outputs=optional_outputs)
return self._module_info_cache[module_name]

def get_module_schedule(self):
Expand Down Expand Up @@ -176,6 +201,9 @@ def path_relative_to_config(self, path):

@staticmethod
def load(filename, local_config=None, variant="main"):
if variant is None:
variant = "main"

if local_config is None:
# Use the default locations for local config file
# May want to add other locations here...
Expand Down Expand Up @@ -206,7 +234,7 @@ def load(filename, local_config=None, variant="main"):
# Perform pre-processing of config file to replace includes, etc
config_text, available_variants = preprocess_config_file(os.path.abspath(filename), variant=variant)
# If we were asked to load a particular variant, check it's in the list of available variants
if variant is not None and variant != "main" and variant not in available_variants:
if variant != "main" and variant not in available_variants:
raise PipelineConfigParseError("could not load pipeline variant '%s': it is not declared anywhere in the "
"config file")
text_buffer = StringIO(config_text)
Expand Down Expand Up @@ -241,6 +269,7 @@ def load(filename, local_config=None, variant="main"):
])
except ConfigParser.Error, e:
raise PipelineConfigParseError("could not parse config file. %s" % e)

# Do no further checking or processing at this stage: just keep raw dictionaries for the time being
return PipelineConfig(pipeline_config, local_config_data, raw_module_options, module_order,
filename=filename, variant=variant, available_variants=list(sorted(available_variants)))
Expand Down
80 changes: 63 additions & 17 deletions src/python/pimlico/core/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
is loaded.
"""
from importlib import import_module
import os
from importlib import import_module
from types import FunctionType

from pimlico.core.config import PipelineStructureError
Expand All @@ -33,21 +33,47 @@ class BaseModuleInfo(object):
module_type_name = NotImplemented
module_options = []
module_inputs = []
# Specifies a list of (name, datatype class) pairs
# Specifies a list of (name, datatype class) pairs for outputs that are always written
module_outputs = []
# Specifies a list of (name, datatype class) pairs for outputs that are written only if they're specified
# in the "output" option or used by another module
module_optional_outputs = []
# Whether the module should be executed
# Typically True for almost all modules, except input modules (though some of them may also require execution) and
# filters
module_executable = True
# If specified, this ModuleExecutor class will be used instead of looking one up in the exec Python module
module_executor_override = None

def __init__(self, module_name, pipeline, inputs={}, options={}):
def __init__(self, module_name, pipeline, inputs={}, options={}, optional_outputs=[]):
self.inputs = inputs
self.options = options
self.module_name = module_name
self.pipeline = pipeline

self.default_output_name = (self.module_outputs+self.module_optional_outputs)[0][0]

# Work out what outputs this module will make available
if len(self.module_outputs + self.module_optional_outputs) == 0:
# Need at least one output
if len(self.module_optional_outputs):
raise PipelineStructureError(
"module %s has no outputs. Select at least one optional output from [%s] using the 'output' option"
% (self.module_name, ", ".join(name for name, dt in self.module_optional_outputs))
)
else:
raise PipelineStructureError("module %s defines no outputs" % self.module_name)
# The basic outputs are always available
self.available_outputs = list(self.module_outputs)
# Others may be requested in the config, given to us in optional_outputs
# Also include those that are used as inputs to other modules
used_output_names = self.pipeline.used_outputs.get(self.module_name, [])
# Replace None with the default output name (which could be an optional output if no non-optional are defined)
used_output_names = set([name if name is not None else self.default_output_name for name in used_output_names])
# Include all of these outputs in the final output list
self.available_outputs.extend((name, dt) for (name, dt) in self.module_optional_outputs
if name in set(optional_outputs)|used_output_names)

self._metadata = None

def __repr__(self):
Expand Down Expand Up @@ -97,7 +123,7 @@ def input_names(self):

@property
def output_names(self):
return [name for name, __ in self.module_outputs]
return [name for name, __ in self.available_outputs]

@classmethod
def process_module_options(cls, opt_dict):
Expand Down Expand Up @@ -167,6 +193,19 @@ def extract_input_options(cls, opt_dict, module_name=None):

return inputs

@staticmethod
def get_extra_outputs_from_options(options):
"""
Normally, which optional outputs get produced by a module depend on the 'output' option given in the
config file, plus any outputs that get used by subsequent modules. By overriding this method, module
types can add extra outputs into the list of those to be included, conditional on other options.
E.g. the corenlp module include the 'annotations' output if annotators are specified, so that the
user doesn't need to give both options.
"""
return []

@classmethod
def process_config(cls, config_dict, module_name=None):
"""
Expand All @@ -176,11 +215,18 @@ def process_config(cls, config_dict, module_name=None):
options = dict(config_dict)
# Remove the "type" option if it's still in there
options.pop("type", None)
# Pull out the output option if it's there, to specify optional outputs
output_opt = options.pop("output", "")
outputs = output_opt.split(",") if output_opt else []
# Pull out the input options and match them up with inputs
inputs = cls.extract_input_options(options, module_name=module_name)
# Process the rest of the values as module options
options = cls.process_module_options(options)
return inputs, options

# Get additional outputs to be included on the basis of the options, according to module type's own logic
outputs = set(outputs) | set(cls.get_extra_outputs_from_options(options))

return inputs, outputs, options

def get_module_output_dir(self):
return os.path.join(self.pipeline.short_term_store, self.module_name)
Expand All @@ -189,19 +235,18 @@ def get_output_dir(self, output_name):
return os.path.join(self.get_module_output_dir(), output_name)

def get_output_datatype(self, output_name=None):
if len(self.module_outputs) == 0:
raise PipelineStructureError("%s module has no outputs" % self.module_type_name)
elif output_name is None:
if output_name is None:
# Get the default output
# Often there'll be only one output, so a name needn't be specified
# If there are multiple, the first is the default
output_name, datatype = self.module_outputs[0]
else:
outputs = dict(self.module_outputs)
if output_name not in outputs:
raise PipelineStructureError("%s module does not have an output named '%s'. Available outputs: %s" %
(self.module_type_name, output_name, ", ".join(outputs.keys())))
datatype = outputs[output_name]
output_name = self.default_output_name

outputs = dict(self.available_outputs)
if output_name not in outputs:
raise PipelineStructureError("%s module does not have an output named '%s'. Available outputs: %s" %
(self.module_type_name, output_name, ", ".join(self.output_names)))
datatype = outputs[output_name]

# The datatype might be a dynamic type -- a function that we call to get the type
if type(datatype) is FunctionType:
# Call the function to build the datatype
Expand Down Expand Up @@ -346,10 +391,11 @@ class BaseModuleExecutor(object):
do the work of executing the module on given inputs, writing to given output locations.
"""
def __init__(self, log):
def __init__(self, log, module_instance_info):
self.log = log
self.info = module_instance_info

def execute(self, module_instance_info):
def execute(self):
raise NotImplementedError


Expand Down
2 changes: 1 addition & 1 deletion src/python/pimlico/core/modules/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def execute_module(pipeline, module_name, force_rerun=False, debug=False):
executer = load_module_executor(module)
# Give the module an initial in-progress status
module.status = "STARTED"
executer(log).execute(module)
executer(log, module).execute()

# Update the module status so we know it's been completed
module.status = "COMPLETE"
Expand Down
4 changes: 2 additions & 2 deletions src/python/pimlico/core/modules/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def instantiate_output_datatype(self, output_name, output_datatype):
if datatype.requires_data_preparation:
# This module needs to be executed
class DataPreparationExecutor(BaseModuleExecutor):
def execute(self, module_instance_info):
def execute(self):
# Get the datatype instance
datatype_instance = module_instance_info.get_output("data")
datatype_instance = self.info.get_output("data")
# Run the special data preparation method
datatype_instance.prepare_data(self.log)

Expand Down

0 comments on commit 2492d27

Please sign in to comment.