Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement interp for interpolating between chunks of data (dask) #4155

Merged
merged 44 commits into from
Aug 11, 2020

Conversation

pums974
Copy link
Contributor

@pums974 pums974 commented Jun 15, 2020

In a project of mine I need to interpolate a dask-based xarray between chunk of data.

When using the current official interp function (xarray v0.15.1), the code:

    datax = xr.DataArray(data=da.from_array(np.arange(0, 4), chunks=2),
                         coords={"x": np.linspace(0, 1, 4)},
                         dims="x")
    datay = xr.DataArray(data=da.from_array(np.arange(0, 4), chunks=2),
                         coords={"y": np.linspace(0, 1, 4)},
                         dims="y")
    data = datax * datay

    # both of these interp call fails
    res = datax.interp(x=np.linspace(0, 1))
    print(res.load())

    res = data.interp(x=np.linspace(0, 1), y=0.5)
    print(res.load())

fails with NotImplementedError: Chunking along the dimension to be interpolated (0) is not yet supported., but succeed with this version

I also want to alert that my version does not work with "advanced interpolation" (as shown in the xarray documentation)
Also, my version cannot be used to make interpolate_na work with chunked data

@pep8speaks
Copy link

pep8speaks commented Jun 15, 2020

Hello @pums974! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:

There are currently no PEP 8 issues detected in this Pull Request. Cheers! 🍻

Comment last updated at 2020-07-31 18:56:12 UTC

@pums974
Copy link
Contributor Author

pums974 commented Jun 15, 2020

On my computer it passes pytest:

$> pytest . 
======================= test session starts =================================
platform linux -- Python 3.8.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1
[...]
===== 3822 passed, 2710 skipped, 77 xfailed, 24 xpassed, 32 warnings in 48.25s ========

$> pip freeze
appdirs==1.4.4
attrs==19.3.0
black==19.10b0
click==7.1.2
dask==2.18.1
flake8==3.8.3
isort==4.3.21
mccabe==0.6.1
more-itertools==8.4.0
numpy==1.18.5
packaging==20.4
pandas==1.0.4
pathspec==0.8.0
pluggy==0.13.1
py==1.8.2
pycodestyle==2.6.0
pyflakes==2.2.0
pyparsing==2.4.7
pytest==5.4.3
python-dateutil==2.8.1
pytz==2020.1
PyYAML==5.3.1
regex==2020.6.8
scipy==1.4.1
six==1.15.0
toml==0.10.1
toolz==0.10.0
typed-ast==1.4.1
wcwidth==0.2.4
-e git+git@github.com:pums974/xarray.git@c47a1d5d8fd7ca401a0dddea67574af00c4d8e3b#egg=xarray

@rabernat
Copy link
Contributor

Thanks for this contribution @pums974! We appreciate your patience in awaiting a review of your PR.

@pums974
Copy link
Contributor Author

pums974 commented Jun 24, 2020

No problem, we are all very busy. But thanks for your message.

@fujiisoup
Copy link
Member

Hi @pums974

Thanks for sending the PR.
I'm working to review it, but it may take more time.

A few comments;
Does it work with an unsorted destination?
e.g.,

da.interp(y=[0, -1, 2])

I'm feeling that the basic algorithm, such as np.interp-equivalence, should be interpreted in upstream.
I'm sure Dask community welcomes this addition.
Do you have an interest on it?

@fujiisoup
Copy link
Member

Also in my local environment, it gives
AttributeError: 'memoryview' object has no attribute 'dtype'

The full stack trace is

_______________________________________________________ test_interpolate_1d[1-y-cubic] ________________________________________________________

method = 'cubic', dim = 'y', case = 1

    @pytest.mark.parametrize("method", ["linear", "cubic"])
    @pytest.mark.parametrize("dim", ["x", "y"])
    @pytest.mark.parametrize("case", [0, 1])
    def test_interpolate_1d(method, dim, case):
        if not has_scipy:
            pytest.skip("scipy is not installed.")
    
        if not has_dask and case in [1]:
            pytest.skip("dask is not installed in the environment.")
    
        da = get_example_data(case)
        xdest = np.linspace(0.0, 0.9, 80)
    
        actual = da.interp(method=method, **{dim: xdest})
    
        # scipy interpolation for the reference
        def func(obj, new_x):
            return scipy.interpolate.interp1d(
                da[dim],
                obj.data,
                axis=obj.get_axis_num(dim),
                bounds_error=False,
                fill_value=np.nan,
                kind=method,
            )(new_x)
    
        if dim == "x":
            coords = {"x": xdest, "y": da["y"], "x2": ("x", func(da["x2"], xdest))}
        else:  # y
            coords = {"x": da["x"], "y": xdest, "x2": da["x2"]}
    
        expected = xr.DataArray(func(da, xdest), dims=["x", "y"], coords=coords)
>       assert_allclose(actual, expected)

xarray/tests/test_interp.py:86: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
xarray/testing.py:132: in compat_variable
    return a.dims == b.dims and (a._data is b._data or equiv(a.data, b.data))
xarray/testing.py:31: in _data_allclose_or_equiv
    return duck_array_ops.allclose_or_equiv(arr1, arr2, rtol=rtol, atol=atol)
xarray/core/duck_array_ops.py:221: in allclose_or_equiv
    arr1 = np.array(arr1)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/array/core.py:1314: in __array__
    x = self.compute()
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/base.py:165: in compute
    (result,) = compute(self, traverse=False, **kwargs)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/base.py:436: in compute
    results = schedule(dsk, keys, **kwargs)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/local.py:527: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/local.py:494: in get_async
    fire_task()
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/local.py:466: in fire_task
    callback=queue.put,
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/local.py:516: in apply_sync
    res = func(*args, **kwds)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/local.py:222: in execute_task
    result = _execute_task(task, data)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/core.py:119: in _execute_task
    return func(*args2)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/optimization.py:982: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/core.py:149: in get
    result = _execute_task(task, cache)
../../../anaconda3/envs/xarray/lib/python3.7/site-packages/dask/core.py:119: in _execute_task
    return func(*args2)
xarray/core/missing.py:830: in _dask_aware_interpnd
    return _interpnd(var, old_x, new_x, func, kwargs)
xarray/core/missing.py:793: in _interpnd
    x, new_x = _floatize_x(x, new_x)
xarray/core/missing.py:577: in _floatize_x
    if _contains_datetime_like_objects(x[i]):
xarray/core/common.py:1595: in _contains_datetime_like_objects
    return is_np_datetime_like(var.dtype) or contains_cftime_datetimes(var)
xarray/core/common.py:1588: in contains_cftime_datetimes
    return _contains_cftime_datetimes(var.data)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

array = <memory at 0x7f771d6daef0>

    def _contains_cftime_datetimes(array) -> bool:
        """Check if an array contains cftime.datetime objects
        """
        try:
            from cftime import datetime as cftime_datetime
        except ImportError:
            return False
        else:
>           if array.dtype == np.dtype("O") and array.size > 0:
E           AttributeError: 'memoryview' object has no attribute 'dtype'

xarray/core/common.py:1574: AttributeError

@pums974
Copy link
Contributor Author

pums974 commented Jun 26, 2020

Thanks,
That's weird, I have no problem in mine...
What are your versions of dask and numpy ?

As for implementing this in dask, you may be right, it probably belong there,
But I am even less use to their code base, and have no clue where to put it.

And for unsorted destination, that's something I didn't think about.
maybe we can add an argsort at the beggining.

@keewis
Copy link
Collaborator

keewis commented Jun 26, 2020

@pums974, the CI gets the same error (e.g. here) so you should be able to reproduce this by setting up an environment with something like

conda env create -f ci/requirements/py38.yml -n xarray-py38
conda activate xarray-py38

@fujiisoup
Copy link
Member

As for implementing this in dask, you may be right, it probably belong there,
But I am even less use to their code base, and have no clue where to put it.

OK.
Even so, I would suggest restructuring the code base;
maybe we can add an interp1d equivalence into
core.dask_array_ops.interp1d
which works with dask-arrays (non-xarray object).
It'll be easier to test.
The API should be the as same with scipy.interp.interp1d as possible.

In missing.py, we can call this function.

@pums974
Copy link
Contributor Author

pums974 commented Jun 30, 2020

Hum, ok, but I don't see how it would work if all points are between chunks (see my second example)

@fujiisoup
Copy link
Member

Hum, ok, but I don't see how it would work if all points are between chunks (see my second example)

Maybe we can support sequential interpolation only at this moment.
In this case,

res = data.interp(x=np.linspace(0, 1), y=0.5)

can be interpreted as

res = data.interp(x=np.linspace(0, 1)).interp(y=0.5)

which might not be too difficult.

@pums974
Copy link
Contributor Author

pums974 commented Jun 30, 2020

ok, but what about

res = data.interp(y=0.5)

@pums974
Copy link
Contributor Author

pums974 commented Jun 30, 2020

I mean, in this case you have to interpolate in another direction. You cannot consider having a 1d function.

xarray/core/missing.py Outdated Show resolved Hide resolved
@pums974 pums974 closed this Jul 31, 2020
@pums974 pums974 reopened this Jul 31, 2020
@fujiisoup
Copy link
Member

This PR looks good for me.
Maybe we can wait for a few days in case anyone has some comments on it.
If no comments, I'll merge this then.

@fujiisoup fujiisoup merged commit 7daad4f into pydata:master Aug 11, 2020
@fujiisoup
Copy link
Member

Thanks @pums974 :)

@pums974
Copy link
Contributor Author

pums974 commented Aug 12, 2020

You're welcome :)

@pums974 pums974 deleted the chunked_interp branch August 13, 2020 15:53
dcherian added a commit to rpgoldman/xarray that referenced this pull request Aug 14, 2020
* 'master' of github.com:pydata/xarray: (260 commits)
  Increase support window of all dependencies (pydata#4296)
  Implement interp for interpolating between chunks of data (dask) (pydata#4155)
  Add @mathause to current core developers. (pydata#4335)
  install sphinx-autosummary-accessors from conda-forge (pydata#4332)
  Use sphinx-accessors-autosummary (pydata#4323)
  ndrolling fixes (pydata#4329)
  DOC: fix typo argmin -> argmax in DataArray.argmax docstring (pydata#4327)
  pin sphinx to 3.1(pydata#4326)
  nd-rolling (pydata#4219)
  Implicit dask import 4164 (pydata#4318)
  allow customizing the inline repr of a duck array (pydata#4248)
  silence the known docs CI issues (pydata#4316)
  enh: fixed pydata#4302 (pydata#4315)
  Remove all unused and warn-raising methods from AbstractDataStore (pydata#4310)
  Fix map_blocks example (pydata#4305)
  Fix docstring for missing_dims argument to isel methods (pydata#4298)
  Support for PyCharm remote deployment (pydata#4299)
  Update map_blocks and map_overlap docstrings (pydata#4303)
  Lazily load resource files (pydata#4297)
  warn about the removal of the ufuncs (pydata#4268)
  ...
@cyhsu
Copy link

cyhsu commented Aug 14, 2020

Hi
Just curious about this. I followed the discussion since this issue addressed. Is this chunk interpolation solved already?

@fujiisoup
Copy link
Member

@cyhsu
Yes, in the current master.

@cyhsu
Copy link

cyhsu commented Aug 15, 2020

@fujiisoup Thanks for letting me know. But I am still unable to do even though I have updated my xarray via "conda update xarray".

@fujiisoup
Copy link
Member

@cyhsu Yes, because it is not yet released.
(I'm not sure when the next release will be, but maybe a few months later)
If you do pip install git+https://github.com/pydata/xarray, the current master will be installed in your system and interpolation over the chunks can be used.
But note that this means you will install (a kind of) beta version.

dcherian added a commit to dcherian/xarray that referenced this pull request Aug 15, 2020
* upstream/master: (34 commits)
  Fix bug in computing means of cftime.datetime arrays (pydata#4344)
  fix some str accessor inconsistencies (pydata#4339)
  pin matplotlib in ci/requirements/doc.yml (pydata#4340)
  Clarify drop_vars return value. (pydata#4244)
  Support explicitly setting a dimension order with to_dataframe() (pydata#4333)
  Increase support window of all dependencies (pydata#4296)
  Implement interp for interpolating between chunks of data (dask) (pydata#4155)
  Add @mathause to current core developers. (pydata#4335)
  install sphinx-autosummary-accessors from conda-forge (pydata#4332)
  Use sphinx-accessors-autosummary (pydata#4323)
  ndrolling fixes (pydata#4329)
  DOC: fix typo argmin -> argmax in DataArray.argmax docstring (pydata#4327)
  pin sphinx to 3.1(pydata#4326)
  nd-rolling (pydata#4219)
  Implicit dask import 4164 (pydata#4318)
  allow customizing the inline repr of a duck array (pydata#4248)
  silence the known docs CI issues (pydata#4316)
  enh: fixed pydata#4302 (pydata#4315)
  Remove all unused and warn-raising methods from AbstractDataStore (pydata#4310)
  Fix map_blocks example (pydata#4305)
  ...
dcherian added a commit to dcherian/xarray that referenced this pull request Aug 16, 2020
* upstream/master: (40 commits)
  Fix bug in computing means of cftime.datetime arrays (pydata#4344)
  fix some str accessor inconsistencies (pydata#4339)
  pin matplotlib in ci/requirements/doc.yml (pydata#4340)
  Clarify drop_vars return value. (pydata#4244)
  Support explicitly setting a dimension order with to_dataframe() (pydata#4333)
  Increase support window of all dependencies (pydata#4296)
  Implement interp for interpolating between chunks of data (dask) (pydata#4155)
  Add @mathause to current core developers. (pydata#4335)
  install sphinx-autosummary-accessors from conda-forge (pydata#4332)
  Use sphinx-accessors-autosummary (pydata#4323)
  ndrolling fixes (pydata#4329)
  DOC: fix typo argmin -> argmax in DataArray.argmax docstring (pydata#4327)
  pin sphinx to 3.1(pydata#4326)
  nd-rolling (pydata#4219)
  Implicit dask import 4164 (pydata#4318)
  allow customizing the inline repr of a duck array (pydata#4248)
  silence the known docs CI issues (pydata#4316)
  enh: fixed pydata#4302 (pydata#4315)
  Remove all unused and warn-raising methods from AbstractDataStore (pydata#4310)
  Fix map_blocks example (pydata#4305)
  ...
@cyhsu
Copy link

cyhsu commented Aug 16, 2020

@fujiisoup Thanks for the response. Since I have not updated my xarray package through this beta version. I hope you can answer my additional question for me. By considering the interpolation, which way is faster? a. chunk the dataset, and then do interpolation or b. chunk the interpolation list and then do interpolation?

a.

datax = xr.DataArray(data=da.from_array(np.arange(0, 4), chunks=2),
                     coords={"x": np.linspace(0, 1, 4)},
                     dims="x")
datay = xr.DataArray(data=da.from_array(np.arange(0, 4), chunks=2),
                     coords={"y": np.linspace(0, 1, 4)},
                     dims="y")
data = datax * datay

# both of these interp call fails
res = datax.interp(x=np.linspace(0, 1))
print(res.load())

res = data.interp(x=np.linspace(0, 1), y=0.5)
print(res.load())

b.

datax = xr.DataArray(data=np.arange(0, 4),
                     coords={"x": np.linspace(0, 1, 4)},
                     dims="x")
datay = xr.DataArray(data=np.arange(0, 4),
                     coords={"y": np.linspace(0, 1, 4)},
                     dims="y")
data = datax * datay

x = xr.DataArray(data = da.from_array(np.linspace(0,1), chunks=2), dims='x')
res = data.interp(x=x)

@pums974
Copy link
Contributor Author

pums974 commented Aug 16, 2020

@cyhsu I can answer this question.

For best performance you should chunk the input array on the non interpolated dimensions and chunk the destination. Aka :

datax = xr.DataArray(data=np.arange(0, 4),
                     coords={"x": np.linspace(0, 1, 4)},
                     dims="x")
datay = xr.DataArray(data=da.from_array(np.arange(0, 4), chunks=2),
                     coords={"y": np.linspace(0, 1, 4)},
                     dims="y")
data = datax * datay

x = xr.DataArray(data = da.from_array(np.linspace(0,1), chunks=2), dims='x')

res = data.interp(x=x)

@cyhsu
Copy link

cyhsu commented Aug 16, 2020

@pums974 then how about if we do the interpolation by using chunk input array to the chunk interpolated dimension?

@pums974
Copy link
Contributor Author

pums974 commented Aug 16, 2020

If the input array is chunked in the interpolated dimension, the chunks will be merged during the interpolation.

This may induce a large memory cost at some point, but I do not know how to avoid it...

@pums974
Copy link
Contributor Author

pums974 commented Aug 16, 2020

Do this answer your question?

@cyhsu
Copy link

cyhsu commented Aug 16, 2020

Gotcha! Yes, it is. If I have many points in lat, lon, depth, and time, I should better chunk my input arrays at this stage to speed up the performance. The reason why I asked this question is I thought chunking the input array to do the interpolation should faster than if I didn't chunk the input array. But in my test case, it is not. Please see the attached.

Screen Shot 2020-08-16 at 4 47 00 PM

The results I show here is the parallel one way slower than the normal case.

@pums974
Copy link
Contributor Author

pums974 commented Aug 16, 2020

In your case, each task (20 000²) will have the entire input (100²), and interpolate a few points (5²).

Maybe the overhead comes with duplicating the input array 20 000² times, maybe it comes with the fact that you are doing 20 000² small interpolation instead of 1 big interpolation

@pums974
Copy link
Contributor Author

pums974 commented Aug 16, 2020

I forgot to take into account that the interpolations are orthogonal
So in sequential we are doing 2 interpolation first x then y
In parallel we do the same:
The fist interpolation will have 20 000 tasks, each task will have the totality of the input array, and compute an interpolation of 5 point of the output (x) producing an array of 5x100 per task or 100 000x100 full result as an intermediate array.
The second interpolation will have 20 000² tasks each task will have a block of 5x100 point of the intermediate array and compute an interpolation on 5 point of the output (y) resulting in a 5² array per task and the 100 000² full result.

So plenty of room for overhead...

@pums974
Copy link
Contributor Author

pums974 commented Aug 16, 2020

And I forgot to take into account that your interpolation only need 48² points of the input array, so the input array will be reduced at the start of the process (you can replace every 100 by 48 in my previous answers)

@lazyoracle
Copy link

@max-sixty Is there a timeline on when we can expect this feature in a stable release? Is it scheduled for the next minor release and to be made available on conda and pip?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: Implement interp for interpolating between chunks of data (dask)
9 participants