Skip to content

Commit

Permalink
Add regrid operator (#232)
Browse files Browse the repository at this point in the history
* added dummy regrid operator

* added regrid operator for workflow

* added wps regrid input parameters method and grid

* update regrid params to provenance
  • Loading branch information
cehbrecht committed Sep 28, 2023
1 parent aeb6b1e commit f51d90e
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 0 deletions.
8 changes: 8 additions & 0 deletions rook/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from rook.utils.subset_utils import run_subset
from rook.utils.concat_utils import run_concat
from rook.utils.regrid_utils import run_regrid
from roocs_utils.utils.file_utils import is_file_list, FileMapper


Expand Down Expand Up @@ -89,6 +90,13 @@ def _get_runner(self):
return run_weighted_average


class Regrid(Operator):
prefix = "regrid"

def _get_runner(self):
return run_regrid


class Concat(Operator):
prefix = "concat"

Expand Down
2 changes: 2 additions & 0 deletions rook/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .wps_orchestrate import Orchestrate
from .wps_subset import Subset
from .wps_concat import Concat
from .wps_regrid import Regrid

processes = [
Usage(),
Expand All @@ -15,5 +16,6 @@
AverageByDimension(),
WeightedAverage(),
Concat(),
Regrid(),
Orchestrate(),
]
118 changes: 118 additions & 0 deletions rook/processes/wps_regrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging

from pywps import FORMATS, ComplexOutput, Format, LiteralInput, Process
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaFile, MetaLink4

from ..director import wrap_director
from ..utils.input_utils import parse_wps_input
from ..utils.metalink_utils import build_metalink
from ..utils.response_utils import populate_response
from ..utils.regrid_utils import run_regrid

LOGGER = logging.getLogger()


class Regrid(Process):
def __init__(self):
inputs = [
LiteralInput(
"collection",
"Collection",
abstract="A dataset identifier or list of comma separated identifiers. "
"Example: c3s-cmip5.output1.ICHEC.EC-EARTH.historical.day.atmos.day.r1i1p1.tas.latest",
data_type="string",
min_occurs=1,
max_occurs=1,
),
LiteralInput(
"method",
"Regrid method",
abstract="Regrid method like consevative or bilinear. Default: nearest_s2d",
data_type="string",
min_occurs=0,
max_occurs=1,
allowed_values=["conservative", "patch", "nearest_s2d", "bilinear"],
default="nearest_s2d",
),
LiteralInput(
"grid",
"Regrid target grid",
abstract="Regrid target grid like 1deg. Default: 1deg",
data_type="string",
min_occurs=0,
max_occurs=1,
allowed_values=["1deg", "2deg_lsm", "0pt25deg_era5_lsm"],
default="1deg",
),
]
outputs = [
ComplexOutput(
"output",
"METALINK v4 output",
abstract="Metalink v4 document with references to NetCDF files.",
as_reference=True,
supported_formats=[FORMATS.META4],
),
ComplexOutput(
"prov",
"Provenance",
abstract="Provenance document using W3C standard.",
as_reference=True,
supported_formats=[FORMATS.JSON],
),
ComplexOutput(
"prov_plot",
"Provenance Diagram",
abstract="Provenance document as diagram.",
as_reference=True,
supported_formats=[
Format("image/png", extension=".png", encoding="base64")
],
),
]

super(Regrid, self).__init__(
self._handler,
identifier="regrid",
title="Regrid",
abstract="Regridding operator for climate model data.",
metadata=[
Metadata("DAOPS", "https://github.com/roocs/daops"),
],
version="1.0",
inputs=inputs,
outputs=outputs,
store_supported=True,
status_supported=True,
)

def _handler(self, request, response):
# from roocs_utils.exceptions import InvalidParameterValue, MissingParameterValue
collection = parse_wps_input(
request.inputs, "collection", as_sequence=True, must_exist=True
)

inputs = {
"collection": collection,
"output_dir": self.workdir,
"apply_fixes": False,
"pre_checked": False,
"method": parse_wps_input(request.inputs, "method", default="nearest_s2d"),
"grid": parse_wps_input(request.inputs, "grid", default="1deg"),
}
# print(inputs)

# Let the director manage the processing or redirection to original files
director = wrap_director(collection, inputs, run_regrid)

ml4 = build_metalink(
"regrid-result",
"regrid result as NetCDF files.",
self.workdir,
director.output_uris,
)

populate_response(response, "regrid", self.workdir, inputs, collection, ml4)
return response
2 changes: 2 additions & 0 deletions rook/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def add_operator(self, operator, parameters, collection, output):
"level",
"dims",
"freq",
"method",
"grid",
"apply_fixes",
"apply_average",
]:
Expand Down
8 changes: 8 additions & 0 deletions rook/utils/regrid_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def run_regrid(args):
from daops.ops.average import average_over_dims

args["apply_fixes"] = False
args["dims"] = ["latitude", "longitude"]

result = average_over_dims(**args)
return result.file_uris
6 changes: 6 additions & 0 deletions rook/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AverageByDimension,
WeightedAverage,
Subset,
Regrid,
Concat,
)
from .provenance import Provenance
Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__(self, output_dir):
self.average_time_op = AverageByTime(output_dir)
self.average_dim_op = AverageByDimension(output_dir)
self.weighted_average_op = WeightedAverage(output_dir)
self.regrid_op = Regrid(output_dir)
self.prov = Provenance(output_dir)

def validate(self, wfdoc):
Expand Down Expand Up @@ -154,6 +156,10 @@ def _run_step(self, step_id, step, inputs=None):
collection = step["in"]["collection"]
result = self.weighted_average_op.call(step["in"])
self.prov.add_operator(step_id, step["in"], collection, result)
elif "regrid" == step["run"]:
collection = step["in"]["collection"]
result = self.regrid_op.call(step["in"])
self.prov.add_operator(step_id, step["in"], collection, result)
elif "concat" == step["run"]:
collection = step["in"]["collection"]
result = self.concat_op.call(step["in"])
Expand Down
1 change: 1 addition & 0 deletions tests/test_wps_caps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_wps_caps():
"concat",
"dashboard",
"orchestrate",
"regrid",
"subset",
"usage",
"weighted_average",
Expand Down
22 changes: 22 additions & 0 deletions tests/test_wps_regrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest

from pywps import Service
from pywps.tests import assert_response_success, client_for

from rook.processes.wps_regrid import Regrid

from .common import PYWPS_CFG, get_output


def test_wps_regrid_cmip6():
# test the case where the inventory is used
client = client_for(Service(processes=[Regrid()], cfgfiles=[PYWPS_CFG]))
datainputs = "collection=c3s-cmip6.ScenarioMIP.INM.INM-CM5-0.ssp245.r1i1p1f1.Amon.rlds.gr1.v20190619"
datainputs += ";method=nearest_s2d"
datainputs += ";grid=1deg"
resp = client.get(
f"?service=WPS&request=Execute&version=1.0.0&identifier=regrid&datainputs={datainputs}"
)
print(resp)
assert_response_success(resp)
assert "output" in get_output(resp.xml)

0 comments on commit f51d90e

Please sign in to comment.