Skip to content

Commit

Permalink
WIP: Proposed refactor of read API for backends (#4477)
Browse files Browse the repository at this point in the history
* add in api.open_dataset dispatching to stub apiv2

* remove in apiv2 check for input AbstractDataStore

* bugfix typo

* add kwarg engines in _get_backend_cls needed by apiv2

* add alpha support for h5netcdf

* style: clean not used code, modify some variable/function name

* Add ENGINES entry for cfgrib.

* Define function open_backend_dataset_cfgrib() to be used in apiv2.py.
Add necessary imports for this function.

* Apply black to check formatting.

* Apply black to check formatting.

* add dummy zarr apiv2 backend

* align apiv2.open_dataset to api.open_dataset

* remove unused extra_coords in open_backend_dataset_*

* remove extra_coords in open_backend_dataset_cfgrib

* transform zarr maybe_chunk and get_chunks in classmethod
- to be used in apiv2 without instantiate the object

* make alpha zarr apiv2 working

* refactor apiv2.open_dataset:
- modify signature
- move default setting inside backends

* move dataset_from_backend_dataset out of apiv2.open_dataset

* remove blank lines

* remove blank lines

* style

* Re-write error messages

* Fix code style

* Fix code style

* remove unused import

* replace warning with ValueError for not supported kwargs in backends

* change zarr.ZarStore.get_chunks into a static method

* group `backend_kwargs` and `kwargs` in `extra_tokes` argument in apiv2.dataset_from_backend_dataset`

* remove in open_backend_dayaset_${engine} signature kwarags and the related error message

* black

* Try add a strategy with an environment variable

* Try add a strategy with an environment variable

* black

Co-authored-by: TheRed86 <m.rossetti@bopen.eu>
Co-authored-by: Alessandro Amici <a.amici@bopen.eu>
  • Loading branch information
3 people committed Oct 22, 2020
1 parent 2ce1cfc commit cc271e6
Show file tree
Hide file tree
Showing 7 changed files with 447 additions and 8 deletions.
3 changes: 3 additions & 0 deletions azure-pipelines.yml
Expand Up @@ -20,6 +20,9 @@ jobs:
conda_env: py37
py38:
conda_env: py38
py38-backend-api-v2:
conda_env: py38
environment_variables: XARRAY_BACKEND_API=v2
py38-all-but-dask:
conda_env: py38-all-but-dask
py38-upstream-dev:
Expand Down
2 changes: 1 addition & 1 deletion ci/azure/unit-tests.yml
Expand Up @@ -11,7 +11,7 @@ steps:
# https://github.com/microsoft/azure-pipelines-tasks/issues/9302
- bash: |
source activate xarray-tests
pytest \
$(environment_variables) pytest \
--junitxml=junit/test-results.xml \
--cov=xarray \
--cov-report=xml \
Expand Down
13 changes: 10 additions & 3 deletions xarray/backends/api.py
@@ -1,4 +1,4 @@
import os.path
import os
import warnings
from glob import glob
from io import BytesIO
Expand Down Expand Up @@ -163,10 +163,10 @@ def _autodetect_engine(filename_or_obj):
return engine


def _get_backend_cls(engine):
def _get_backend_cls(engine, engines=ENGINES):
"""Select open_dataset method based on current engine"""
try:
return ENGINES[engine]
return engines[engine]
except KeyError:
raise ValueError(
"unrecognized engine for open_dataset: {}\n"
Expand Down Expand Up @@ -432,6 +432,13 @@ def open_dataset(
--------
open_mfdataset
"""
if os.environ.get("XARRAY_BACKEND_API", "v1") == "v2":
kwargs = locals().copy()
from . import apiv2

if engine in apiv2.ENGINES:
return apiv2.open_dataset(**kwargs)

if autoclose is not None:
warnings.warn(
"The autoclose argument is no longer used by "
Expand Down
224 changes: 224 additions & 0 deletions xarray/backends/apiv2.py
@@ -0,0 +1,224 @@
import os

from ..core.utils import is_remote_uri
from . import cfgrib_, h5netcdf_, zarr
from .api import (
_autodetect_engine,
_get_backend_cls,
_normalize_path,
_protect_dataset_variables_inplace,
)

ENGINES = {
"h5netcdf": h5netcdf_.open_backend_dataset_h5necdf,
"zarr": zarr.open_backend_dataset_zarr,
"cfgrib": cfgrib_.open_backend_dataset_cfgrib,
}


def dataset_from_backend_dataset(
ds,
filename_or_obj,
engine,
chunks,
cache,
overwrite_encoded_chunks,
extra_tokens,
):
if not (isinstance(chunks, (int, dict)) or chunks is None):
if chunks != "auto":
raise ValueError(
"chunks must be an int, dict, 'auto', or None. "
"Instead found %s. " % chunks
)

_protect_dataset_variables_inplace(ds, cache)
if chunks is not None and engine != "zarr":
from dask.base import tokenize

# if passed an actual file path, augment the token with
# the file modification time
if isinstance(filename_or_obj, str) and not is_remote_uri(filename_or_obj):
mtime = os.path.getmtime(filename_or_obj)
else:
mtime = None
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens)
name_prefix = "open_dataset-%s" % token
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token)

elif engine == "zarr":

if chunks == "auto":
try:
import dask.array # noqa
except ImportError:
chunks = None

if chunks is None:
return ds

if isinstance(chunks, int):
chunks = dict.fromkeys(ds.dims, chunks)

variables = {
k: zarr.ZarrStore.maybe_chunk(k, v, chunks, overwrite_encoded_chunks)
for k, v in ds.variables.items()
}
ds2 = ds._replace(variables)

else:
ds2 = ds
ds2._file_obj = ds._file_obj

# Ensure source filename always stored in dataset object (GH issue #2550)
if "source" not in ds.encoding:
if isinstance(filename_or_obj, str):
ds.encoding["source"] = filename_or_obj

return ds2


def open_dataset(
filename_or_obj,
*,
engine=None,
chunks=None,
cache=None,
backend_kwargs=None,
**kwargs,
):
"""Open and decode a dataset from a file or file-like object.
Parameters
----------
filename_or_obj : str, Path, file-like or DataStore
Strings and Path objects are interpreted as a path to a netCDF file
or an OpenDAP URL and opened with python-netCDF4, unless the filename
ends with .gz, in which case the file is gunzipped and opened with
scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like
objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF).
group : str, optional
Path to the netCDF4 group in the given file to open (only works for
netCDF4 files).
decode_cf : bool, optional
Whether to decode these variables, assuming they were saved according
to CF conventions.
mask_and_scale : bool, optional
If True, replace array values equal to `_FillValue` with NA and scale
values according to the formula `original_values * scale_factor +
add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are
taken from variable attributes (if they exist). If the `_FillValue` or
`missing_value` attribute contains multiple values a warning will be
issued and all array values matching one of the multiple values will
be replaced by NA. mask_and_scale defaults to True except for the
pseudonetcdf backend.
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
removed) if they have no corresponding variable and if they are only
used as the last dimension of character arrays.
decode_coords : bool, optional
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", \
"pseudonetcdf", "zarr"}, optional
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
"netcdf4".
chunks : int or dict, optional
If chunks is provided, it is used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays. When using ``engine="zarr"``, setting
``chunks='auto'`` will create dask chunks based on the variable's zarr
chunks.
lock : False or lock-like, optional
Resource lock to use when reading data from disk. Only relevant when
using dask or another form of parallelism. By default, appropriate
locks are chosen to safely read and write files with the currently
active dask scheduler.
cache : bool, optional
If True, cache data loaded from the underlying datastore in memory as
NumPy arrays when accessed to avoid reading from the underlying data-
store multiple times. Defaults to True unless you specify the `chunks`
argument to use dask, in which case it defaults to False. Does not
change the behavior of coordinates corresponding to dimensions, which
always load their data from disk into a ``pandas.Index``.
drop_variables: str or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
backend_kwargs: dict, optional
A dictionary of keyword arguments to pass on to the backend. This
may be useful when backend options would improve performance or
allow user control of dataset processing.
use_cftime: bool, optional
Only relevant if encoded dates come from a standard calendar
(e.g. "gregorian", "proleptic_gregorian", "standard", or not
specified). If None (default), attempt to decode times to
``np.datetime64[ns]`` objects; if this is not possible, decode times to
``cftime.datetime`` objects. If True, always decode times to
``cftime.datetime`` objects, regardless of whether or not they can be
represented using ``np.datetime64[ns]`` objects. If False, always
decode times to ``np.datetime64[ns]`` objects; if this is not possible
raise an error.
decode_timedelta : bool, optional
If True, decode variables and coordinates with time units in
{"days", "hours", "minutes", "seconds", "milliseconds", "microseconds"}
into timedelta objects. If False, leave them encoded as numbers.
If None (default), assume the same value of decode_time.
Returns
-------
dataset : Dataset
The newly created dataset.
Notes
-----
``open_dataset`` opens the file with read-only access. When you modify
values of a Dataset, even one linked to files on disk, only the in-memory
copy you are manipulating in xarray is modified: the original file on disk
is never touched.
See Also
--------
open_mfdataset
"""

if cache is None:
cache = chunks is None

if backend_kwargs is None:
backend_kwargs = {}

filename_or_obj = _normalize_path(filename_or_obj)

if engine is None:
engine = _autodetect_engine(filename_or_obj)

backend_kwargs = backend_kwargs.copy()
overwrite_encoded_chunks = backend_kwargs.pop("overwrite_encoded_chunks", None)

open_backend_dataset = _get_backend_cls(engine, engines=ENGINES)
backend_ds = open_backend_dataset(
filename_or_obj,
**backend_kwargs,
**{k: v for k, v in kwargs.items() if v is not None},
)
ds = dataset_from_backend_dataset(
backend_ds,
filename_or_obj,
engine,
chunks,
cache,
overwrite_encoded_chunks,
{**backend_kwargs, **kwargs},
)

return ds
67 changes: 66 additions & 1 deletion xarray/backends/cfgrib_.py
@@ -1,7 +1,9 @@
import numpy as np

from .. import conventions
from ..core import indexing
from ..core.utils import Frozen, FrozenDict
from ..core.dataset import Dataset
from ..core.utils import Frozen, FrozenDict, close_on_error
from ..core.variable import Variable
from .common import AbstractDataStore, BackendArray
from .locks import SerializableLock, ensure_lock
Expand Down Expand Up @@ -69,3 +71,66 @@ def get_encoding(self):
dims = self.get_dimensions()
encoding = {"unlimited_dims": {k for k, v in dims.items() if v is None}}
return encoding


def open_backend_dataset_cfgrib(
filename_or_obj,
*,
decode_cf=True,
mask_and_scale=True,
decode_times=None,
concat_characters=None,
decode_coords=None,
drop_variables=None,
use_cftime=None,
decode_timedelta=None,
lock=None,
indexpath="{path}.{short_hash}.idx",
filter_by_keys={},
read_keys=[],
encode_cf=("parameter", "time", "geography", "vertical"),
squeeze=True,
time_dims=("time", "step"),
):

if not decode_cf:
mask_and_scale = False
decode_times = False
concat_characters = False
decode_coords = False
decode_timedelta = False

store = CfGribDataStore(
filename_or_obj,
indexpath=indexpath,
filter_by_keys=filter_by_keys,
read_keys=read_keys,
encode_cf=encode_cf,
squeeze=squeeze,
time_dims=time_dims,
lock=lock,
)

with close_on_error(store):
vars, attrs = store.load()
file_obj = store
encoding = store.get_encoding()

vars, attrs, coord_names = conventions.decode_cf_variables(
vars,
attrs,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)

ds = Dataset(vars, attrs=attrs)
ds = ds.set_coords(coord_names.intersection(vars))
ds._file_obj = file_obj
ds.encoding = encoding

return ds

0 comments on commit cc271e6

Please sign in to comment.