Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for specifying output locations in pipeline build requests #130

Merged
merged 3 commits into from
Aug 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 96 additions & 9 deletions eta/core/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class PipelineBuildRequestConfig(Config):
def __init__(self, d):
self.pipeline = self.parse_string(d, "pipeline")
self.inputs = self.parse_dict(d, "inputs", default={})
self.outputs = self.parse_dict(d, "outputs", default={})
self.parameters = self.parse_dict(d, "parameters", default={})


Expand All @@ -58,15 +59,18 @@ class PipelineBuildRequest(Configurable):
A pipeline build request is valid if all of the following are true:
- a pipeline of the specified name exists
- all required pipeline inputs are provided and have valid values
- any output paths specified must refer to valid pipeline output names
and the specified paths must be valid paths for each output type
- all required pipeline parameters are provided and have valid values

Note that input/parameter fields set to `None` are ignored (and thus must
be optional in order for the build request to be valid).
Note that any fields set to `None` are ignored, so any inputs/parameters
that are `None` must be optional in order for the request to be valid.

Attributes:
pipeline: the (name of the) pipeline to run
pipeline: the pipeline to run
metadata: the PipelineMetadata instance for the pipeline
inputs: a dictionary mapping input names to input paths
outputs: a dictionary mapping output names to output paths (if any)
parameters: a dictionary mapping <module>.<parameter> names to
parameter values
'''
Expand All @@ -85,9 +89,11 @@ def __init__(self, config):
self.pipeline = config.pipeline
self.metadata = etap.load_metadata(config.pipeline)
self.inputs = etau.remove_none_values(config.inputs)
self.outputs = etau.remove_none_values(config.outputs)
self.parameters = etau.remove_none_values(config.parameters)

self._validate_inputs()
self._validate_outputs()
self._validate_parameters()

def _validate_inputs(self):
Expand All @@ -110,6 +116,20 @@ def _validate_inputs(self):
"Required input '%s' of pipeline '%s' was not "
"supplied" % (miname, self.pipeline))

def _validate_outputs(self):
# Validate outputs
for oname, opath in iteritems(self.outputs):
if not self.metadata.has_output(oname):
raise PipelineBuildRequestError(
"Pipeline '%s' has no output '%s'" % (
self.pipeline, oname))
if not self.metadata.is_valid_output(oname, opath):
raise PipelineBuildRequestError(
"'%s' is not a valid value for output '%s' of pipeline "
"'%s'" % (opath, oname, self.pipeline))
# Convert to absolute paths
self.outputs[oname] = os.path.abspath(opath)

def _validate_parameters(self):
# Validate parameters
for pname, pval in iteritems(self.parameters):
Expand Down Expand Up @@ -154,8 +174,14 @@ class PipelineBuilder(object):
will be generated when the pipeline is run
pipeline_logfile_path: the path to the pipeline logfile that will be
generated when the pipeline is run
outputs: a dictionary mapping pipeline outputs to the paths where they
will be written when the pipeline is run
pipeline_outputs: a dictionary mapping pipeline outputs to the paths
where they will be written when the pipeline is run. These
paths are all within `output_dir`, and they are populated for all
pipeline outputs regardless of whether the output was included in
the `outputs` dictionary of the PipelineBuildRequest
outputs: the outputs dictionary from the PipelineBuildRequest, which
specifies where to publish certain pipeline outputs after the
pipeline is run
'''

def __init__(self, request):
Expand All @@ -168,6 +194,10 @@ def __init__(self, request):
self._concrete_data_params = etat.ConcreteDataParams()
self.reset()

@property
def outputs(self):
return self.request.outputs

def reset(self):
'''Resets the builder so that another pipeline can be built.'''
self.timestamp = None
Expand All @@ -176,7 +206,7 @@ def reset(self):
self.pipeline_config_path = None
self.pipeline_logfile_path = None
self.pipeline_status_path = None
self.outputs = {}
self.pipeline_outputs = {}

def build(self):
'''Builds the pipeline and writes the associated config files.'''
Expand All @@ -203,11 +233,62 @@ def build(self):
self.pipeline_logfile_path = self._make_pipeline_logfile_path()
if not self.pipeline_status_path:
self.pipeline_status_path = self._make_pipeline_status_path()
self.outputs = {}
self.pipeline_outputs = {}

self._build_pipeline_config()
self._build_module_configs()

def run(self):
'''Runs the built pipeline and publishes any outputs to their
specified locations.

Raises:
PipelineBuilderError: if the pipeline hasn't been built
'''
if not self.pipeline_config_path:
raise PipelineBuilderError(
"You must build the pipeline before running it")

# Run pipeline
etap.run(self.pipeline_config_path)

# Publish outputs
for oname, opath in iteritems(self.outputs):
ppath = self.pipeline_outputs[oname]
if os.path.isfile(ppath):
# Output is a file, so copy it
etau.copy_file(ppath, opath)
elif os.path.isdir(ppath):
# Output is a directory, so copy it
etau.copy_dir(ppath, opath)
else:
# Output must be a sequence, so copy the base directory
etau.copy_dir(os.path.dirname(ppath), os.path.dirname(opath))

def cleanup(self):
'''Cleans up the configs and output files generated when the pipeline
was run, if necessary. Published outputs are NOT deleted.

Raises:
PipelineBuilderError: if the pipeline hasn't been built
'''
if not self.config_dir or not self.output_dir:
raise PipelineBuilderError(
"You must build and run the pipeline before you can clean "
"it up")

try:
etau.delete_dir(self.config_dir)
logger.info("Deleted config directory '%s'", self.config_dir)
except OSError:
pass

try:
etau.delete_dir(self.output_dir)
logger.info("Deleted output directory '%s'", self.output_dir)
except OSError:
pass

def _build_pipeline_config(self):
# Build job configs
# @todo handle non-py executables
Expand Down Expand Up @@ -269,7 +350,7 @@ def _build_module_configs(self):
for osink in osinks:
if osink.is_pipeline_output:
# Set pipeline output
self.outputs[osink.node] = opath
self.pipeline_outputs[osink.node] = opath
else:
# Pass output to connected module input
module_inputs[osink.module][osink.node] = opath
Expand Down Expand Up @@ -326,6 +407,13 @@ def _get_data_path(self, module, node):
return node.type.gen_path(basedir, params)


class PipelineBuilderError(Exception):
'''Exception raised when an invalid action is taken with a
PipelineBuilder.
'''
pass


def _get_param_value(param, request):
'''Gets the value for the parameter.

Expand All @@ -347,4 +435,3 @@ def _get_param_value(param, request):
val = param.default_value

return val

13 changes: 5 additions & 8 deletions eta/core/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,14 @@ def run(args):
builder.build()

if args.run_now or args.dry_run:
# Run pipeline now
RunPipeline.run(
argparse.Namespace(config=builder.pipeline_config_path))
# Run pipeline
logger.info("Running pipeline '%s'", request.pipeline)
builder.run()

if args.dry_run:
# Cleanup pipeline files
logger.info("***** Dry run complete *****")
logger.info("Deleting config directory '%s'", builder.config_dir)
etau.delete_dir(builder.config_dir)
logger.info("Deleting output directory '%s'", builder.output_dir)
etau.delete_dir(builder.output_dir)
logger.info("Dry run complete; cleaning up")
builder.cleanup()


class GenerateMetadata(Command):
Expand Down
7 changes: 6 additions & 1 deletion eta/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,15 @@ def copy_dir(indir, outdir):
'''
if os.path.isdir(outdir):
shutil.rmtree(outdir)

shutil.copytree(indir, outdir)


def delete_file(path):
'''Deletes the file at the given path and recursively deletes any empty
directories from the resulting directory tree.

Raises:
OSError: if the file did not exist
'''
os.remove(path)
try:
Expand All @@ -298,6 +300,9 @@ def delete_file(path):
def delete_dir(dir_):
'''Deletes the given directory and recursively deletes any empty
directories from the resulting directory tree.

Raises:
OSError: if the directory did not exist
'''
dir_ = os.path.normpath(dir_)
shutil.rmtree(dir_)
Expand Down