Skip to content

Commit

Permalink
weighted average operator (#230)
Browse files Browse the repository at this point in the history
* added operator weighted_average

* added test for weigthed average
  • Loading branch information
cehbrecht committed Sep 26, 2023
1 parent dfb895a commit 6c6aff5
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 1 deletion.
2 changes: 2 additions & 0 deletions rook/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .wps_dashboard import DashboardProcess
from .wps_average_time import AverageByTime
from .wps_average_dim import AverageByDimension
from .wps_average_weighted import WeightedAverage
from .wps_orchestrate import Orchestrate
from .wps_subset import Subset
from .wps_concat import Concat
Expand All @@ -12,6 +13,7 @@
Subset(),
AverageByTime(),
AverageByDimension(),
WeightedAverage(),
Concat(),
Orchestrate(),
]
2 changes: 1 addition & 1 deletion rook/processes/wps_average_dim.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def _handler(self, request, response):
request.inputs, "dims", as_sequence=True, default=None
),
}
print(inputs)
# print(inputs)

# Let the director manage the processing or redirection to original files
director = wrap_director(collection, inputs, run_average_by_dim)
Expand Down
100 changes: 100 additions & 0 deletions rook/processes/wps_average_weighted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
import os

from pywps import FORMATS, ComplexOutput, Format, LiteralInput, Process
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaFile, MetaLink4

from ..director import wrap_director
from ..utils.input_utils import parse_wps_input
from ..utils.metalink_utils import build_metalink
from ..utils.response_utils import populate_response
from ..utils.average_utils import run_weighted_average

LOGGER = logging.getLogger()


class WeightedAverage(Process):
def __init__(self):
inputs = [
LiteralInput(
"collection",
"Collection",
abstract="A dataset identifier or list of comma separated identifiers. "
"Example: c3s-cmip5.output1.ICHEC.EC-EARTH.historical.day.atmos.day.r1i1p1.tas.latest",
data_type="string",
min_occurs=1,
max_occurs=1,
),
]
outputs = [
ComplexOutput(
"output",
"METALINK v4 output",
abstract="Metalink v4 document with references to NetCDF files.",
as_reference=True,
supported_formats=[FORMATS.META4],
),
ComplexOutput(
"prov",
"Provenance",
abstract="Provenance document using W3C standard.",
as_reference=True,
supported_formats=[FORMATS.JSON],
),
ComplexOutput(
"prov_plot",
"Provenance Diagram",
abstract="Provenance document as diagram.",
as_reference=True,
supported_formats=[
Format("image/png", extension=".png", encoding="base64")
],
),
]

super(WeightedAverage, self).__init__(
self._handler,
identifier="weighted_average",
title="Weighted Average",
abstract="Run weighted averaging on climate model data.",
metadata=[
Metadata("DAOPS", "https://github.com/roocs/daops"),
],
version="1.0",
inputs=inputs,
outputs=outputs,
store_supported=True,
status_supported=True,
)

def _handler(self, request, response):
# from roocs_utils.exceptions import InvalidParameterValue, MissingParameterValue
collection = parse_wps_input(
request.inputs, "collection", as_sequence=True, must_exist=True
)

inputs = {
"collection": collection,
"output_dir": self.workdir,
"apply_fixes": False,
"pre_checked": False,
"dims": ["latitude", "longitude"],
}
# print(inputs)

# Let the director manage the processing or redirection to original files
director = wrap_director(collection, inputs, run_weighted_average)

ml4 = build_metalink(
"weighted-average-result",
"Weighted averaging result as NetCDF files.",
self.workdir,
director.output_uris,
)

populate_response(
response, "weighted_average", self.workdir, inputs, collection, ml4
)
return response
7 changes: 7 additions & 0 deletions rook/utils/average_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ def run_average_by_dim(args):

result = average_over_dims(**args)
return result.file_uris


def run_weighted_average(args):
from daops.ops.average import average_over_dims

result = average_over_dims(**args)
return result.file_uris
20 changes: 20 additions & 0 deletions tests/test_wps_average_weighted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

from pywps import Service
from pywps.tests import assert_response_success, client_for

from rook.processes.wps_average_weighted import WeightedAverage

from .common import PYWPS_CFG, get_output


def test_wps_average_weighted_cmip6():
# test the case where the inventory is used
client = client_for(Service(processes=[WeightedAverage()], cfgfiles=[PYWPS_CFG]))
datainputs = "collection=c3s-cmip6.ScenarioMIP.INM.INM-CM5-0.ssp245.r1i1p1f1.Amon.rlds.gr1.v20190619"
resp = client.get(
f"?service=WPS&request=Execute&version=1.0.0&identifier=weighted_average&datainputs={datainputs}"
)
print(resp)
assert_response_success(resp)
assert "output" in get_output(resp.xml)
1 change: 1 addition & 0 deletions tests/test_wps_caps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ def test_wps_caps():
"orchestrate",
"subset",
"usage",
"weighted_average",
]

0 comments on commit 6c6aff5

Please sign in to comment.