Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decode_cf() loads chunked arrays #1372

Closed
darothen opened this issue Apr 12, 2017 · 7 comments · Fixed by #2047
Closed

decode_cf() loads chunked arrays #1372

darothen opened this issue Apr 12, 2017 · 7 comments · Fixed by #2047

Comments

@darothen
Copy link

Currently using xarray version 0.9.2 and dask version 0.14.0.

Suppose you load a NetCDF file with the chunks parameter:

ds = xr.open_dataset("my_data.nc", decode_cf=False, chunks={'lon': 10, 'lat': 10})

The data is loaded as dask arrays, as expected. But if we then manually call xarray.decode_cf(), it'll eagerly load the data. Is this the expected behavior, or should decode_cf() preserve the laziness of the data?

@shoyer
Copy link
Member

shoyer commented Apr 12, 2017

decode_cf() should definitely work lazily on dask array. If not, I would consider that a bug.

@shoyer shoyer added the bug label Apr 12, 2017
@spencerahill
Copy link
Contributor

+1 we have come across this recently also in aospy

@shoyer
Copy link
Member

shoyer commented Apr 13, 2017

A simple test case:

In [16]: ds = xr.Dataset({'foo': ('x', np.arange(100))})

In [17]: chunked = ds.chunk()

In [18]: chunked.foo
Out[18]:
<xarray.DataArray 'foo' (x: 100)>
dask.array<xarray-foo, shape=(100,), dtype=int64, chunksize=(100,)>
Dimensions without coordinates: x

In [19]: xr.decode_cf(chunked).foo
Out[19]:
<xarray.DataArray 'foo' (x: 100)>
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
       18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
       36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,
       54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
       72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89,
       90, 91, 92, 93, 94, 95, 96, 97, 98, 99])
Dimensions without coordinates: x

@shoyer
Copy link
Member

shoyer commented Apr 13, 2017

Ah, so here's the thing: decode_cf is actually lazy, but it uses some old xarray machinery for lazily decoding conventions instead of dask:

In [21]: xr.decode_cf(chunked).foo.variable._data
Out[21]: LazilyIndexedArray(array=dask.array<xarray-foo, shape=(100,), dtype=int64, chunksize=(100,)>, key=(slice(None, None, None),))

You could call .chunk() on it again to load it into dask, but even though the whole thing is lazy, the new dask arrays don't know they are holding dask arrays, which makes life difficult for dask.

You might ask why this separate lazy compute machinery exists. The answer is that dask fails to optimize element-wise operations like (scale * array)[subset] -> scale * array[subset], which is a critical optimization for lazy decoding of large datasets.

See dask/dask#746 for discussion and links to PRs about this. @jcrist had a solution that worked, but it slowed down every dask array operations by 20%, which wasn't a great win.

I wonder if this is worth revisiting with a simpler, less general optimization pass that doesn't bother with broadcasting. See the subclasses of NDArrayMixin in xarray/conventions.py for examples of the sorts of functionality we need:

  • Casting (e.g., array.astype(bool)).
  • Chained arithmetic with scalars (e.g., 0.5 + 0.5 * array).
  • Custom element-wise operations (e.g., map_blocks(convert_to_datetime64, array, dtype=np.datetime64))
  • Custom aggregations that drop a dimension (e.g., map_blocks(characters_to_string, array, drop_axis=-1))

If we could optimize all these operations (and ideally chain them), then we could drop all the lazy loading stuff from xarray in favor of dask, which would be a real win. @mrocklin any thoughts?

@mrocklin
Copy link
Contributor

Optimization has come up a bit recently. In other projects like dask-glm I've actually been thinking about just turning it off entirely. The spread of desires here is wide. I think it would be good to make it a bit more customizable so that different applications can more easily customize their optimization suite (see dask/dask#2206)

But for this application in particular presumably all of these already tasks fuse together into a single composite task, yes? Then we would need to commute certain operations in some order (preferring to push down slicing operations). Then we would need to merge all of the slices, yes? I think it would be interesting to see how fast one could make such an optimization. If the answer is "decently fast" then we might also want to look into doing inplace operations. If the answer is "only sorta-fast" then I would hesitate before adding this as a default to dask.array but would again stress that we should work to make different projects customize optimization to suit their own needs.

@shoyer
Copy link
Member

shoyer commented Apr 13, 2017

But for this application in particular presumably all of these already tasks fuse together into a single composite task, yes? Then we would need to commute certain operations in some order (preferring to push down slicing operations). Then we would need to merge all of the slices, yes?

Yes, that sounds right to me.

@rabernat
Copy link
Contributor

rabernat commented Oct 19, 2017

I just hit this issue. I tried to reproduce it with a synthetic dataset, as in @shoyer's example, but I couldn't. I can only reproduce it with data loaded from netcdf4 via open_mfdataset.

I downloaded one year of air-temperature data from NARR:
ftp://ftp.cdc.noaa.gov/Datasets/NARR/Dailies/pressure/

I load it this way (preprocessing is necessary to resolve conflict between missing_value and _FillValue):

def preprocess_narr(ds):
    del ds.air.attrs['_FillValue']
    ds = ds.set_coords(['lon', 'lat', 'Lambert_Conformal', 'time_bnds'])
    return ds
ds = xr.open_mfdataset('air.*.nc', preprocess=preprocess_narr, decode_cf=False)
print(ds)
print(ds.chunks)
<xarray.Dataset>
Dimensions:            (level: 29, nbnds: 2, time: 365, x: 349, y: 277)
Coordinates:
  * level              (level) float32 1000.0 975.0 950.0 925.0 900.0 875.0 ...
    lat                (y, x) float32 1.0 1.10431 1.20829 1.31196 1.4153 ...
    lon                (y, x) float32 -145.5 -145.315 -145.13 -144.943 ...
  * y                  (y) float32 0.0 32463.0 64926.0 97389.0 129852.0 ...
  * x                  (x) float32 0.0 32463.0 64926.0 97389.0 129852.0 ...
    Lambert_Conformal  int32 -2147483647
  * time               (time) float64 1.666e+06 1.666e+06 1.666e+06 ...
    time_bnds          (time, nbnds) float64 1.666e+06 1.666e+06 1.666e+06 ...
Dimensions without coordinates: nbnds
Data variables:
    air                (time, level, y, x) float32 297.475 297.463 297.453 ...
Frozen(SortedKeysDict({'y': (277,), 'x': (349,), 'time': (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31), 'level': (29,), 'nbnds': (2,)}))

If I try to decode cf, it returns instantly.

ds_decode = xr.decode_cf(ds)
print(ds_decode)
<xarray.Dataset>
Dimensions:            (level: 29, nbnds: 2, time: 365, x: 349, y: 277)
Coordinates:
  * level              (level) float32 1000.0 975.0 950.0 925.0 900.0 875.0 ...
    lat                (y, x) float32 1.0 1.10431 1.20829 1.31196 1.4153 ...
    lon                (y, x) float32 -145.5 -145.315 -145.13 -144.943 ...
  * y                  (y) float32 0.0 32463.0 64926.0 97389.0 129852.0 ...
  * x                  (x) float32 0.0 32463.0 64926.0 97389.0 129852.0 ...
    Lambert_Conformal  int32 -2147483647
  * time               (time) datetime64[ns] 1990-01-01 1990-01-02 ...
    time_bnds          (time, nbnds) float64 1.666e+06 1.666e+06 1.666e+06 ...
Dimensions without coordinates: nbnds
Data variables:
    air                (time, level, y, x) float64 297.5 297.5 297.5 297.4 ...

There are no more chunks: ds_decode.air.chunks is None but ds_decode.air._in_memory is False.

This is already a weird situation, since the data is not in memory, but it is not a dask array either.

If I try to do anything beyond this with the data, it triggers eager computation. Even if I just call type(ds.air.data) it computes. If I do any arithmetic, it computes.

In my case, I could get around this problem if the preprocess function in open_mfdataset were applied before the cf_decoding of the store. But in general, we really need to make decode_cf fully dask compatible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants