Skip to content

Commit

Permalink
Update average parameters (#191)
Browse files Browse the repository at this point in the history
* using aggregation type

* update average tests

* using latest daops

* use average_time

* added average_dim process

* added test for average_dim process

* added average test with lat/lon

* added workflow test for average latlon

* added orchestrate test for average latlon

* use dims parameter in wps process

* update smoke tests for average operator
  • Loading branch information
cehbrecht committed Mar 17, 2022
1 parent 56c8b1a commit 0ab2d71
Show file tree
Hide file tree
Showing 22 changed files with 406 additions and 149 deletions.
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies:
- netcdf4>=1.4
- bottleneck>=1.3.1,<1.4
#- daops>=0.7.0,<0.8
- clisops>=0.8.0,<0.9
#- clisops>=0.8.0,<0.9
- roocs-utils>=0.5.0,<0.6
# workflow
- networkx
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ psutil
# daops
# daops>=0.7.0,<0.8
daops @ git+https://github.com/roocs/daops@master#egg=daops
clisops>=0.8.0,<0.9
#clisops>=0.8.0,<0.9
roocs-utils>=0.5.0,<0.6
xarray>=0.20
dask[complete]
Expand Down
20 changes: 15 additions & 5 deletions rook/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

from rook.director import wrap_director
from rook.utils.input_utils import resolve_to_file_paths
from rook.utils.average_utils import run_average
from rook.utils.average_utils import (
run_average_by_time,
run_average_by_dim,
)
from rook.utils.subset_utils import run_subset
from roocs_utils.utils.file_utils import is_file_list, FileMapper

Expand Down Expand Up @@ -58,11 +61,18 @@ def _get_runner(self):
return run_subset


class Average(Operator):
prefix = "average"
class AverageByTime(Operator):
prefix = "average_time"

def _get_runner(self):
return run_average
return run_average_by_time


Diff = Average
class AverageByDimension(Operator):
prefix = "average_dim"

def _get_runner(self):
return run_average_by_dim


Diff = AverageByTime
6 changes: 4 additions & 2 deletions rook/processes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from .wps_usage import Usage
from .wps_dashboard import DashboardProcess
from .wps_average import Average
from .wps_average_time import AverageByTime
from .wps_average_dim import AverageByDimension
from .wps_orchestrate import Orchestrate
from .wps_subset import Subset

processes = [
Usage(),
DashboardProcess(),
Subset(),
Average(),
AverageByTime(),
AverageByDimension(),
Orchestrate(),
]
134 changes: 134 additions & 0 deletions rook/processes/wps_average_dim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import logging
import os

from pywps import FORMATS, ComplexOutput, Format, LiteralInput, Process
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaFile, MetaLink4

from ..director import wrap_director
from ..utils.input_utils import parse_wps_input
from ..utils.metalink_utils import build_metalink
from ..utils.response_utils import populate_response
from ..utils.average_utils import run_average_by_dim

LOGGER = logging.getLogger()


class AverageByDimension(Process):
def __init__(self):
inputs = [
LiteralInput(
"collection",
"Collection",
abstract="A dataset identifier or list of comma separated identifiers. "
"Example: c3s-cmip5.output1.ICHEC.EC-EARTH.historical.day.atmos.day.r1i1p1.tas.latest",
data_type="string",
min_occurs=1,
max_occurs=1,
),
LiteralInput(
"dims",
"Dimensions",
abstract="Dimensions used for aggregation. Example: level",
allowed_values=["time", "level", "latitude", "longitude"],
data_type="string",
min_occurs=1,
max_occurs=4,
),
LiteralInput(
"pre_checked",
"Pre-Checked",
data_type="boolean",
abstract="Use checked data only.",
default="0",
min_occurs=1,
max_occurs=1,
),
LiteralInput(
"apply_fixes",
"Apply Fixes",
data_type="boolean",
abstract="Apply fixes to datasets.",
default="1",
min_occurs=1,
max_occurs=1,
),
]
outputs = [
ComplexOutput(
"output",
"METALINK v4 output",
abstract="Metalink v4 document with references to NetCDF files.",
as_reference=True,
supported_formats=[FORMATS.META4],
),
ComplexOutput(
"prov",
"Provenance",
abstract="Provenance document using W3C standard.",
as_reference=True,
supported_formats=[FORMATS.JSON],
),
ComplexOutput(
"prov_plot",
"Provenance Diagram",
abstract="Provenance document as diagram.",
as_reference=True,
supported_formats=[
Format("image/png", extension=".png", encoding="base64")
],
),
]

super(AverageByDimension, self).__init__(
self._handler,
identifier="average_dim",
title="Average by Dimensions",
abstract="Run averaging by dimensions on climate model data.",
metadata=[
Metadata("DAOPS", "https://github.com/roocs/daops"),
],
version="1.0",
inputs=inputs,
outputs=outputs,
store_supported=True,
status_supported=True,
)

def _handler(self, request, response):
# show me the environment used by the process in debug mode
LOGGER.debug(f"Environment used in average: {os.environ}")

# from roocs_utils.exceptions import InvalidParameterValue, MissingParameterValue
collection = parse_wps_input(
request.inputs, "collection", as_sequence=True, must_exist=True
)

inputs = {
"collection": collection,
"output_dir": self.workdir,
"apply_fixes": parse_wps_input(request.inputs, "apply_fixes", default=True),
"pre_checked": parse_wps_input(
request.inputs, "pre_checked", default=False
),
"dims": parse_wps_input(
request.inputs, "dims", as_sequence=True, default=None
),
}
print(inputs)

# Let the director manage the processing or redirection to original files
director = wrap_director(collection, inputs, run_average_by_dim)

ml4 = build_metalink(
"average-dim-result",
"Averaging by dimension result as NetCDF files.",
self.workdir,
director.output_uris,
)

populate_response(
response, "average_dim", self.workdir, inputs, collection, ml4
)
return response
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
from ..utils.input_utils import parse_wps_input
from ..utils.metalink_utils import build_metalink
from ..utils.response_utils import populate_response
from ..utils.average_utils import run_average
from ..utils.average_utils import run_average_by_time

LOGGER = logging.getLogger()


class Average(Process):
class AverageByTime(Process):
def __init__(self):
inputs = [
LiteralInput(
Expand All @@ -28,9 +28,10 @@ def __init__(self):
max_occurs=1,
),
LiteralInput(
"dims",
"Dimensions",
abstract="Dimensions to average over. Example: time",
"freq",
"Frequency",
abstract="Aggregation time frequency. Example: year",
allowed_values=["year", "month", "day"],
data_type="string",
min_occurs=1,
max_occurs=1,
Expand Down Expand Up @@ -80,11 +81,11 @@ def __init__(self):
),
]

super(Average, self).__init__(
super(AverageByTime, self).__init__(
self._handler,
identifier="average",
title="Average",
abstract="Run averaging on climate model data. Calls daops operators.",
identifier="average_time",
title="Average by Time",
abstract="Run averaging by time on climate model data.",
metadata=[
Metadata("DAOPS", "https://github.com/roocs/daops"),
],
Expand All @@ -96,8 +97,6 @@ def __init__(self):
)

def _handler(self, request, response):
# TODO: handle lazy load of daops

# show me the environment used by the process in debug mode
LOGGER.debug(f"Environment used in average: {os.environ}")

Expand All @@ -109,24 +108,24 @@ def _handler(self, request, response):
inputs = {
"collection": collection,
"output_dir": self.workdir,
"apply_fixes": parse_wps_input(
request.inputs, "apply_fixes", default=False
),
"apply_fixes": parse_wps_input(request.inputs, "apply_fixes", default=True),
"pre_checked": parse_wps_input(
request.inputs, "pre_checked", default=False
),
"dims": parse_wps_input(request.inputs, "dims", default=None),
"freq": parse_wps_input(request.inputs, "freq", default=None),
}

# Let the director manage the processing or redirection to original files
director = wrap_director(collection, inputs, run_average)
director = wrap_director(collection, inputs, run_average_by_time)

ml4 = build_metalink(
"average-result",
"Averaging result as NetCDF files.",
"average-time-result",
"Averaging by time result as NetCDF files.",
self.workdir,
director.output_uris,
)

populate_response(response, "average", self.workdir, inputs, collection, ml4)
populate_response(
response, "average_time", self.workdir, inputs, collection, ml4
)
return response
6 changes: 5 additions & 1 deletion rook/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,14 @@ def add_operator(self, operator, parameters, collection, output):
"area",
"level",
"dims",
"freq",
"apply_fixes",
]:
if param in parameters:
attributes[ROOCS[param]] = parameters[param]
value = parameters[param]
if isinstance(value, list):
value = ",".join(value)
attributes[ROOCS[param]] = value
op = self._execution_activity(
identifier=ROOCS[f"{operator}_{uuid.uuid4()}"],
label=operator,
Expand Down
9 changes: 8 additions & 1 deletion rook/utils/average_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
def run_average(args):
def run_average_by_time(args):
# TODO: handle lazy load of daops
from daops.ops.average import average_time

result = average_time(**args)
return result.file_uris


def run_average_by_dim(args):
from daops.ops.average import average_over_dims

result = average_over_dims(**args)
Expand Down
18 changes: 14 additions & 4 deletions rook/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import yaml

from .exceptions import WorkflowValidationError
from .operator import Average, Diff, Subset
from .operator import (
AverageByTime,
AverageByDimension,
Diff,
Subset,
)
from .provenance import Provenance

LOGGER = logging.getLogger()
Expand Down Expand Up @@ -76,7 +81,8 @@ def provenance(self):
class BaseWorkflow(object):
def __init__(self, output_dir):
self.subset_op = Subset(output_dir)
self.average_op = Average(output_dir)
self.average_time_op = AverageByTime(output_dir)
self.average_dim_op = AverageByDimension(output_dir)
self.diff_op = Diff(output_dir)
self.prov = Provenance(output_dir)

Expand Down Expand Up @@ -134,9 +140,13 @@ def _run_step(self, step_id, step, inputs=None):
collection = step["in"]["collection"]
result = self.subset_op.call(step["in"])
self.prov.add_operator(step_id, step["in"], collection, result)
elif "average" == step["run"]:
elif "average_time" == step["run"]:
collection = step["in"]["collection"]
result = self.average_op.call(step["in"])
result = self.average_time_op.call(step["in"])
self.prov.add_operator(step_id, step["in"], collection, result)
elif "average_dim" == step["run"]:
collection = step["in"]["collection"]
result = self.average_dim_op.call(step["in"])
self.prov.add_operator(step_id, step["in"], collection, result)
elif "diff" == step["run"]:
result = self.diff_op.call(step["in"])
Expand Down

0 comments on commit 0ab2d71

Please sign in to comment.