Skip to content

Commit

Permalink
Merge pull request #87 from roocs/c3s_mapping
Browse files Browse the repository at this point in the history
C3s mapping
  • Loading branch information
agstephens committed Jan 28, 2021
2 parents 2127a61 + f898ed4 commit dd01dab
Show file tree
Hide file tree
Showing 41 changed files with 370 additions and 219 deletions.
2 changes: 1 addition & 1 deletion .cruft.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
"_template": "https://github.com/bird-house/cookiecutter-birdhouse.git"
}
}
}
}
12 changes: 6 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ repos:
# hooks:
# - id: autopep8
# args: ['--global-config=setup.cfg','--in-place']
- repo: https://github.com/timothycrosley/isort
rev: 5.6.4
hooks:
- id: isort
language_version: python3
args: ['--profile', 'black']
#- repo: https://github.com/timothycrosley/isort
# rev: 5.6.4
# hooks:
# - id: isort
# language_version: python3
# args: ['--profile', 'black']
#- repo: https://github.com/pycqa/pydocstyle
# rev: 5.0.2
# hooks:
Expand Down
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Unreleased
* ``apply_fixes`` and ``original_files`` option added for WPS processes and the ``Operator`` class.
* ``director`` module added. This makes decisions on what is returned - NetCDF files or original file URLs.
* ``python-dateutil>=2.8.1`` added as a new dependency.
* Allow no inventory option when processing datasets
* c3s-cmip6 dataset ids must now be identified by the use of ``c3s-cmip6``

0.2.0 (2020-11-19)
==================
Expand Down
1 change: 0 additions & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,3 @@ Apache License
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

2 changes: 1 addition & 1 deletion rook/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-

"""Top-level package for rook."""

from .__version__ import __author__, __email__, __version__ # noqa: F401

from roocs_utils.config import get_config

import rook

CONFIG = get_config(rook)
Expand Down
51 changes: 41 additions & 10 deletions rook/director/alignment.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from roocs_utils.parameter import time_parameter
import dateutil.parser as parser
import os

import dateutil.parser as parser
from roocs_utils.parameter import time_parameter


class SubsetAlignmentChecker:
def __init__(self, input_files, inputs):
self.input_files = input_files
self.input_files = sorted(input_files)
self.is_aligned = False
self.aligned_files = []

Expand Down Expand Up @@ -50,24 +51,54 @@ def _get_file_times(self, fpath):
return start, end

def _check_time_alignment(self, start, end):
# ...loop through the files to see if the `start` happens to match the start
# of a file and the `end` happens to match the end of a file
exact_matches = 0

"""
Loops through all data files to check if the `start` and `end` can be aligned
with the exact start or end time in the file(s).
If both the `start` and the `end` are aligned then the following properties
are set:
- self.aligned_files = [list of matching files in range]
- self.is_aligned = True
If the `start` is before the start time of the first file and/or
the `end` is after the end time of the last file then that is considered
a valid match to the required time range.
"""
# Set matches as a counter to see if we get valid time alignment.
# Must result in matches==2 in order to be valid.
matches = 0

# First of all truncate requested range to actual range if it extends
# beyond the actual range in the files
start_in_files, _ = self._get_file_times(self.input_files[0])
_, end_in_files = self._get_file_times(self.input_files[-1])

if start < start_in_files:
start = start_in_files

if end > end_in_files:
end = end_in_files

# Now go through files to check alignment
for fpath in self.input_files:

fstart, fend = self._get_file_times(fpath)

# Break out if start of file is beyond end of requested range
if fstart > end:
break

if fstart == start:
exact_matches += 1
matches += 1

if fend == end:
exact_matches += 1
matches += 1

if fstart >= start or fend <= end:
self.aligned_files.append(fpath)

if exact_matches != 2:
# If there were not
if matches != 2:
self.aligned_files.clear()
return

Expand Down
36 changes: 19 additions & 17 deletions rook/director/director.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from collections import OrderedDict
from pywps.app.exceptions import ProcessError

from daops.utils import is_characterised, fixer
from daops.utils import fixer, is_characterised
from daops.utils.normalise import ResultSet

# from roocs_utils.project_utils import get_project_name
from pywps.app.exceptions import ProcessError
from roocs_utils.exceptions import InvalidParameterValue
from roocs_utils.project_utils import get_project_name

from rook import CONFIG

from .inventory import Inventory
from .alignment import SubsetAlignmentChecker
from ..utils.input_utils import clean_inputs
from .alignment import SubsetAlignmentChecker
from .inventory import Inventory


def wrap_director(collection, inputs, runner):
Expand All @@ -23,22 +24,24 @@ def wrap_director(collection, inputs, runner):


class Director:

def __init__(self, coll, inputs):
self.coll = coll
self.inputs = inputs

# self.project = get_project_name(coll[0])
self.project = "c3s-cmip6"

try:
self.inv = Inventory(self.project)
except Exception:
self.invalid_collection()
self.project = get_project_name(coll[0])
# self.project = "c3s-cmip6"

self.use_original_files = False
self.original_file_urls = None
self._resolve()
self.output_uris = None

if CONFIG[f"project:{self.project}"].get("use_inventory"):
try:
self.inv = Inventory(self.project)
except Exception:
self.invalid_collection()

self._resolve()

def _resolve(self):
"""
Expand Down Expand Up @@ -137,9 +140,8 @@ def request_aligns_with_files(self):
return True

def process(self, runner):
# Either packages up orginal files (URLs) or
# Either packages up original files (URLs) or
# runs the process to generate the outputs

# If original files should be returned, then add the files
if self.use_original_files:
result = ResultSet()
Expand Down
1 change: 1 addition & 0 deletions rook/director/inv_cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os

import requests

from rook import CONFIG
Expand Down
5 changes: 3 additions & 2 deletions rook/director/inventory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import yaml
from collections import OrderedDict

import yaml

from rook import CONFIG

from .inv_cache import inventory_cache
Expand All @@ -20,7 +21,7 @@ def _load(self):
_contents = yaml.load(reader, Loader=yaml.SafeLoader)

self.base_dir = _contents[0]["base_dir"]
self.contents = dict([(dset["ds_id"], dset) for dset in _contents[1:]])
self.contents = {dset["ds_id"]: dset for dset in _contents[1:]}

# def __contains__(self, dset):
# TODO: ds_id = convert_to_ds_id(dset)
Expand Down
8 changes: 1 addition & 7 deletions rook/etc/roocs.ini
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
[project:cmip5]
data_node_root =
inventory_url =

[project:cmip6]
data_node_root =
inventory_url =

[project:cordex]
data_node_root =
inventory_url =

[project:c3s-cmip5]
data_node_root =
inventory_url =

[project:c3s-cmip6]
data_node_root = https://data.mips.copernicus-climate.eu/thredds/fileServer/esg_c3s-cmip6/
inventory_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/master/inventories/c3s-cmip6/c3s-cmip6_files_latest.yml

[project:c3s-cordex]
data_node_root =
inventory_url =
inventory_url =
51 changes: 17 additions & 34 deletions rook/operator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
import tempfile
from copy import deepcopy

from rook.utils.subset_utils import run_subset
from rook.utils.average_utils import run_average
from rook.director import wrap_director
from rook.utils.input_utils import resolve_to_file_paths
from rook.utils.average_utils import run_average
from rook.utils.subset_utils import run_subset
from roocs_utils.utils.file_utils import is_file_list, FileMapper


class Operator(object):
Expand All @@ -21,43 +24,23 @@ def __init__(self, output_dir, apply_fixes=True):

def call(self, args):
args.update(self.config)
args['output_dir'] = self._get_output_dir()
collection = args['collection']
args["output_dir"] = self._get_output_dir()
collection = args["collection"] # collection is a list

# Quick hack to find out if collection is a list of files
runner = self._get_runner()

if os.path.isfile(collection[0]):
output_uris = runner(args)
if is_file_list(collection):
# This block is called if this is NOT the first stage of a workflow, and
# the collection will be a file list (one or more files)
kwargs = deepcopy(args)
file_paths = resolve_to_file_paths(args.get("collection"))
kwargs["collection"] = FileMapper(file_paths)
output_uris = runner(kwargs) # this needs to be in a list
else:
# Setting "original_files" to False, to force use of WPS in a workflow
args['original_files'] = False
# This block is called when this is the first stage of a workflow
director = wrap_director(collection, args, runner)
output_uris = director.output_uris

# In rook.operator, within the `Operator.call()` function, we need...
#
# NOTE: output_uris might be file paths OR URLs
# If they are URLs: then any subsequent Operators will need to download them
# How will we do that?
# In daops.utils.consolidate:
# - run a single: `collection = consolidate_collection(collection)`
# - it would group a sequence of items into:
# 1. dataset ids (from individual ids and/or id patterns)
# 2. URLs to individual NC files
# - analyse URLs and compare path and file names,
# - if path and relevant parts of file name are the same:
# - group by inferred dataset in separate directories
# - implement by: 1. strip the last component of files
# 2. create collection object to group them
# 3. download them into directories related to collection
# 3. Directories
# 4. File paths:
# - Group by common directory()
# - so that xarray will attempt to aggregate them
#
# - then call the existing consolidate code that loops through each _dset_
#
return output_uris

def _get_runner(self):
Expand All @@ -68,14 +51,14 @@ def _get_output_dir(self):


class Subset(Operator):
prefix = 'subset'
prefix = "subset"

def _get_runner(self):
return run_subset


class Average(Operator):
prefix = 'average'
prefix = "average"

def _get_runner(self):
return run_average
Expand Down
2 changes: 1 addition & 1 deletion rook/processes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .wps_subset import Subset
from .wps_average import Average
from .wps_orchestrate import Orchestrate
from .wps_subset import Subset

processes = [
Subset(),
Expand Down
7 changes: 3 additions & 4 deletions rook/processes/wps_average.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from pywps import Process, LiteralInput, ComplexOutput
from pywps import FORMATS, Format
from pywps.app.exceptions import ProcessError
from pywps import FORMATS, ComplexOutput, Format, LiteralInput, Process
from pywps.app.Common import Metadata
from pywps.inout.outputs import MetaLink4, MetaFile
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaFile, MetaLink4

from ..provenance import Provenance

Expand Down
21 changes: 12 additions & 9 deletions rook/processes/wps_orchestrate.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from pywps import Process, ComplexInput, ComplexOutput
from pywps import FORMATS, Format
from pywps.app.exceptions import ProcessError
from pywps.app.Common import Metadata
from pywps.inout.outputs import MetaLink4, MetaFile
import logging

from ..utils.metalink_utils import build_metalink
from pywps import FORMATS, ComplexInput, ComplexOutput, Format, Process
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaFile, MetaLink4

from rook import workflow

import logging
from ..utils.metalink_utils import build_metalink

LOGGER = logging.getLogger()

Expand Down Expand Up @@ -82,8 +81,12 @@ def _handler(self, request, response):
# "workflow-result", "Workflow result as NetCDF files.", workdir=self.workdir
# )

ml4 = build_metalink("workflow-result", "Workflow result as NetCDF files.",
self.workdir, file_uris)
ml4 = build_metalink(
"workflow-result",
"Workflow result as NetCDF files.",
self.workdir,
file_uris,
)

# for ncfile in output:
# mf = MetaFile("NetCDF file", "NetCDF file", fmt=FORMATS.NETCDF)
Expand Down

0 comments on commit dd01dab

Please sign in to comment.