Skip to content

Commit

Permalink
Merge pull request #83 from roocs/director-fixes
Browse files Browse the repository at this point in the history
Updated director and tests
  • Loading branch information
agstephens committed Dec 19, 2020
2 parents e6d17ba + c74f44f commit f402f51
Show file tree
Hide file tree
Showing 21 changed files with 397 additions and 133 deletions.
2 changes: 1 addition & 1 deletion rook/director/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .director import Director
from .director import Director, wrap_director
36 changes: 36 additions & 0 deletions rook/director/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@
from pywps.app.exceptions import ProcessError

from daops.utils import is_characterised, fixer
from daops.utils.normalise import ResultSet

# from roocs_utils.project_utils import get_project_name
from roocs_utils.exceptions import InvalidParameterValue

from .inventory import Inventory
from .alignment import SubsetAlignmentChecker
from ..utils.input_utils import clean_inputs


def wrap_director(collection, inputs, runner):
# Ask director whether request should be rejected, use original files or apply WPS process
try:
director = Director(collection, inputs)
director.process(runner)
return director
except Exception as e:
raise ProcessError(f"{e}")


class Director:

def __init__(self, coll, inputs):
self.coll = coll
self.inputs = inputs
Expand Down Expand Up @@ -122,3 +135,26 @@ def request_aligns_with_files(self):
self.original_file_urls = required_files

return True

def process(self, runner):
# Either packages up orginal files (URLs) or
# runs the process to generate the outputs

# If original files should be returned, then add the files
if self.use_original_files:
result = ResultSet()

for ds_id, file_urls in self.original_file_urls.items():
result.add(ds_id, file_urls)

file_uris = result.file_uris

# else: generate the new subset of files
else:
clean_inputs(self.inputs)
try:
file_uris = runner(self.inputs)
except Exception as e:
raise ProcessError(f"{e}")

self.output_uris = file_uris
104 changes: 61 additions & 43 deletions rook/operator.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
import os
import tempfile


def _resolve_collection_if_files(outputs):
# If multiple outputs are files with a common directory name, then
# return that as a single output

if len(outputs) > 1:
first_dir = os.path.dirname(outputs[0])

if all([os.path.isfile(output) for output in outputs]):
if os.path.dirname(os.path.commonprefix(outputs)) == first_dir:
return first_dir

return outputs[0]
from rook.utils.subset_utils import run_subset
from rook.utils.average_utils import run_average
from rook.director import wrap_director


class Operator(object):
# Sub-classes require "prefix" property
prefix = NotImplemented

def __init__(self, output_dir, apply_fixes=True):
self.config = {
"output_dir": output_dir,
Expand All @@ -26,41 +19,66 @@ def __init__(self, output_dir, apply_fixes=True):
# 'filenamer': dconfig.filenamer,
}

def call(self, args, collection):
raise NotImplementedError("implemented in subclass")
def call(self, args):
args.update(self.config)
args['output_dir'] = self._get_output_dir()
collection = args['collection']

# Quick hack to find out if collection is a list of files
runner = self._get_runner()

if os.path.isfile(collection[0]):
output_uris = runner(args)
else:
# Setting "original_files" to False, to force use of WPS in a workflow
args['original_files'] = False
director = wrap_director(collection, args, runner)
output_uris = director.output_uris

# In rook.operator, within the `Operator.call()` function, we need...
#
# NOTE: output_uris might be file paths OR URLs
# If they are URLs: then any subsequent Operators will need to download them
# How will we do that?
# In daops.utils.consolidate:
# - run a single: `collection = consolidate_collection(collection)`
# - it would group a sequence of items into:
# 1. dataset ids (from individual ids and/or id patterns)
# 2. URLs to individual NC files
# - analyse URLs and compare path and file names,
# - if path and relevant parts of file name are the same:
# - group by inferred dataset in separate directories
# - implement by: 1. strip the last component of files
# 2. create collection object to group them
# 3. download them into directories related to collection
# 3. Directories
# 4. File paths:
# - Group by common directory()
# - so that xarray will attempt to aggregate them
#
# - then call the existing consolidate code that loops through each _dset_
#
return output_uris

def _get_runner(self):
return NotImplementedError

def _get_output_dir(self):
return tempfile.mkdtemp(dir=self.config["output_dir"], prefix=f"{self.prefix}_")


class Subset(Operator):
def call(self, args):
# Convert file list to directory if required
collection = _resolve_collection_if_files(args.get("collection"))

# TODO: handle lazy load of daops
from daops.ops.subset import subset

# from .tweaks import subset
kwargs = dict(
collection=collection,
time=args.get("time"),
level=args.get("level"),
area=args.get("area"),
apply_fixes=args.get("apply_fixes"),
)
kwargs.update(self.config)
kwargs["output_dir"] = tempfile.mkdtemp(
dir=self.config["output_dir"], prefix="subset_"
)
result = subset(
**kwargs,
)
return result.file_uris
prefix = 'subset'

def _get_runner(self):
return run_subset


class Average(Operator):
def call(self, args):
return args["collection"]
prefix = 'average'

def _get_runner(self):
return run_average

class Diff(Operator):
def call(self, args):
return args["collection_a"]

Diff = Average
28 changes: 19 additions & 9 deletions rook/processes/wps_orchestrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pywps.app.Common import Metadata
from pywps.inout.outputs import MetaLink4, MetaFile

from ..utils.metalink_utils import build_metalink

from rook import workflow

import logging
Expand Down Expand Up @@ -71,18 +73,26 @@ def _handler(self, request, response):
# workaround for CDATA issue in pywps
# wfdata = wfdata.replace("<![CDATA[", "").replace("]]>", "")
wf = workflow.WorkflowRunner(output_dir=self.workdir)
output = wf.run(wfdata)
file_uris = wf.run(wfdata)
except Exception as e:
raise ProcessError(f"{e}")
# metalink document with collection of netcdf files
ml4 = MetaLink4(
"workflow-result", "Workflow result as NetCDF files.", workdir=self.workdir
)
for ncfile in output:
mf = MetaFile("NetCDF file", "NetCDF file", fmt=FORMATS.NETCDF)
mf.file = ncfile
ml4.append(mf)

# Metalink document with collection of netcdf files
# ml4 = MetaLink4(
# "workflow-result", "Workflow result as NetCDF files.", workdir=self.workdir
# )

ml4 = build_metalink("workflow-result", "Workflow result as NetCDF files.",
self.workdir, file_uris,
as_urls=False)

# for ncfile in output:
# mf = MetaFile("NetCDF file", "NetCDF file", fmt=FORMATS.NETCDF)
# mf.file = ncfile
# ml4.append(mf)

response.outputs["output"].data = ml4.xml
response.outputs["prov"].file = wf.provenance.write_json()
response.outputs["prov_plot"].file = wf.provenance.write_png()

return response
95 changes: 23 additions & 72 deletions rook/processes/wps_subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaFile, MetaLink4

from ..director import Director
from ..utils.input_utils import parse_wps_input
from ..utils.subset_utils import run_subset
from ..utils.metalink_utils import build_metalink
from ..utils.response_utils import populate_response
from ..director import wrap_director
from ..provenance import Provenance

LOGGER = logging.getLogger()
Expand Down Expand Up @@ -122,86 +126,33 @@ def __init__(self):

def _handler(self, request, response):
# TODO: handle lazy load of daops
from daops.ops.subset import subset
from daops.utils.normalise import ResultSet
# from daops.ops.subset import subset
# from daops.utils.normalise import ResultSet

# show me the environment used by the process in debug mode
LOGGER.debug(f"Environment used in subset: {os.environ}")

# from roocs_utils.exceptions import InvalidParameterValue, MissingParameterValue
collection = [dset.data for dset in request.inputs["collection"]]
collection = parse_wps_input(request.inputs, 'collection', as_sequence=True,
must_exist=True)

config_args = {
"output_dir": self.workdir,
"apply_fixes": request.inputs["apply_fixes"][0].data,
"pre_checked": request.inputs["pre_checked"][0].data,
"original_files": request.inputs["original_files"][0].data
# 'chunk_rules': dconfig.chunk_rules,
# 'filenamer': dconfig.filenamer,
}

subset_args = {
inputs = {
"collection": collection,
"output_dir": self.workdir,
"apply_fixes": parse_wps_input(request.inputs, 'apply_fixes', default=False),
"pre_checked": parse_wps_input(request.inputs, 'pre_checked', default=False),
"original_files": parse_wps_input(request.inputs, 'original_files', default=False),
"time": parse_wps_input(request.inputs, 'time', default=None),
"level": parse_wps_input(request.inputs, 'level', default=None),
"area": parse_wps_input(request.inputs, 'area', default=None)
}
if "time" in request.inputs:
subset_args["time"] = request.inputs["time"][0].data
if "level" in request.inputs:
subset_args["level"] = request.inputs["level"][0].data
if "area" in request.inputs:
subset_args["area"] = request.inputs["area"][0].data

subset_args.update(config_args)

# Ask director whether request should be rejected, use original files or apply WPS process
try:
director = Director(collection, subset_args)
except Exception:
# raise ProcessError(f"{e}")
pass

# If original files should be returned...
if False: # director.use_original_files:
result = ResultSet()

for ds_id, file_urls in director.original_file_urls.items():
result.add(ds_id, file_urls)

# else: generate the new subset of files
else:
del subset_args["pre_checked"]
del subset_args["original_files"]
try:
result = subset(**subset_args)
except Exception as e:
raise ProcessError(f"{e}")

# metalink document with collection of netcdf files/ urls
ml4 = MetaLink4(
"subset-result", "Subsetting result as NetCDF files.", workdir=self.workdir
)

# need to handle file URLS
for result in result.file_uris:
mf = MetaFile("NetCDF file", "NetCDF file", fmt=FORMATS.NETCDF)

if False: # director.use_original_files:
mf.url = result
else:
mf.file = result
ml4.append(mf)

response.outputs["output"].data = ml4.xml

# collect provenance
provenance = Provenance(self.workdir)
provenance.start()
urls = []

for f in ml4.files:
urls.extend(f.urls)
# Let the director manage the processing or redirection to original files
director = wrap_director(collection, inputs, run_subset)

provenance.add_operator("subset", subset_args, collection, urls)
response.outputs["prov"].file = provenance.write_json()
response.outputs["prov_plot"].file = provenance.write_png()
ml4 = build_metalink("subset-result", "Subsetting result as NetCDF files.",
self.workdir, director.output_uris,
as_urls=director.use_original_files)

populate_response(response, 'subset', self.workdir, inputs, collection, ml4)
return response
Empty file added rook/utils/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions rook/utils/average_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from copy import deepcopy

from .input_utils import resolve_collection_if_files


def run_average(args):
# Convert file list to directory if required
kwargs = deepcopy(args)
original_collection = args.get("collection")
kwargs['collection'] = resolve_collection_if_files(original_collection)

# TODO: handle lazy load of daops
# from daops.ops.average import average

# result = average(**kwargs)
# return result.file_uris
return original_collection

0 comments on commit f402f51

Please sign in to comment.