diff --git a/doc/whats-new.rst b/doc/whats-new.rst index fc5135dc598..0c26c2802ea 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -41,6 +41,9 @@ New Features - Allow passing chunks in ``**kwargs`` form to :py:meth:`Dataset.chunk`, :py:meth:`DataArray.chunk`, and :py:meth:`Variable.chunk`. (:pull:`6471`) By `Tom Nicholas `_. +- Expose `inline_array` kwarg from `dask.array.from_array` in :py:func:`open_dataset`, :py:meth:`Dataset.chunk`, + :py:meth:`DataArray.chunk`, and :py:meth:`Variable.chunk`. (:pull:`6471`) + By `Tom Nicholas `_. - :py:meth:`xr.polyval` now supports :py:class:`Dataset` and :py:class:`DataArray` args of any shape, is faster and requires less memory. (:pull:`6548`) By `Michael Niklas `_. diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 9967b0a08c0..1672d4f7ad2 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -274,6 +274,7 @@ def _chunk_ds( engine, chunks, overwrite_encoded_chunks, + inline_array, **extra_tokens, ): from dask.base import tokenize @@ -292,6 +293,7 @@ def _chunk_ds( overwrite_encoded_chunks=overwrite_encoded_chunks, name_prefix=name_prefix, token=token, + inline_array=inline_array, ) return backend_ds._replace(variables) @@ -303,6 +305,7 @@ def _dataset_from_backend_dataset( chunks, cache, overwrite_encoded_chunks, + inline_array, **extra_tokens, ): if not isinstance(chunks, (int, dict)) and chunks not in {None, "auto"}: @@ -320,6 +323,7 @@ def _dataset_from_backend_dataset( engine, chunks, overwrite_encoded_chunks, + inline_array, **extra_tokens, ) @@ -346,6 +350,7 @@ def open_dataset( concat_characters=None, decode_coords=None, drop_variables=None, + inline_array=False, backend_kwargs=None, **kwargs, ): @@ -430,6 +435,12 @@ def open_dataset( A variable or list of variables to exclude from being parsed from the dataset. This may be useful to drop variables with problems or inconsistent values. + inline_array: bool, optional + How to include the array in the dask task graph. + By default(``inline_array=False``) the array is included in a task by + itself, and each chunk refers to that task by its key. With + ``inline_array=True``, Dask will instead inline the array directly + in the values of the task graph. See :py:func:`dask.array.from_array`. backend_kwargs: dict Additional keyword arguments passed on to the engine open function, equivalent to `**kwargs`. @@ -505,6 +516,7 @@ def open_dataset( chunks, cache, overwrite_encoded_chunks, + inline_array, drop_variables=drop_variables, **decoders, **kwargs, @@ -526,6 +538,7 @@ def open_dataarray( concat_characters=None, decode_coords=None, drop_variables=None, + inline_array=False, backend_kwargs=None, **kwargs, ): @@ -613,6 +626,12 @@ def open_dataarray( A variable or list of variables to exclude from being parsed from the dataset. This may be useful to drop variables with problems or inconsistent values. + inline_array: bool, optional + How to include the array in the dask task graph. + By default(``inline_array=False``) the array is included in a task by + itself, and each chunk refers to that task by its key. With + ``inline_array=True``, Dask will instead inline the array directly + in the values of the task graph. See :py:func:`dask.array.from_array`. backend_kwargs: dict Additional keyword arguments passed on to the engine open function, equivalent to `**kwargs`. @@ -660,6 +679,7 @@ def open_dataarray( chunks=chunks, cache=cache, drop_variables=drop_variables, + inline_array=inline_array, backend_kwargs=backend_kwargs, use_cftime=use_cftime, decode_timedelta=decode_timedelta, diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index d15cbd00c0d..1bca3e6d87a 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -1113,6 +1113,7 @@ def chunk( name_prefix: str = "xarray-", token: str = None, lock: bool = False, + inline_array: bool = False, **chunks_kwargs: Any, ) -> DataArray: """Coerce this array's data into a dask arrays with the given chunks. @@ -1137,6 +1138,9 @@ def chunk( lock : optional Passed on to :py:func:`dask.array.from_array`, if the array is not already as dask array. + inline_array: optional + Passed on to :py:func:`dask.array.from_array`, if the array is not + already as dask array. **chunks_kwargs : {dim: chunks, ...}, optional The keyword arguments form of ``chunks``. One of chunks or chunks_kwargs must be provided. @@ -1144,6 +1148,13 @@ def chunk( Returns ------- chunked : xarray.DataArray + + See Also + -------- + DataArray.chunks + DataArray.chunksizes + xarray.unify_chunks + dask.array.from_array """ if chunks is None: warnings.warn( @@ -1162,7 +1173,11 @@ def chunk( chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk") ds = self._to_temp_dataset().chunk( - chunks, name_prefix=name_prefix, token=token, lock=lock + chunks, + name_prefix=name_prefix, + token=token, + lock=lock, + inline_array=inline_array, ) return self._from_temp_dataset(ds) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 76776b4bc44..987248bf7ff 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -239,6 +239,7 @@ def _maybe_chunk( lock=None, name_prefix="xarray-", overwrite_encoded_chunks=False, + inline_array=False, ): from dask.base import tokenize @@ -250,7 +251,7 @@ def _maybe_chunk( # subtle bugs result otherwise. see GH3350 token2 = tokenize(name, token if token else var._data, chunks) name2 = f"{name_prefix}{name}-{token2}" - var = var.chunk(chunks, name=name2, lock=lock) + var = var.chunk(chunks, name=name2, lock=lock, inline_array=inline_array) if overwrite_encoded_chunks and var.chunks is not None: var.encoding["chunks"] = tuple(x[0] for x in var.chunks) @@ -1994,6 +1995,7 @@ def chunk( name_prefix: str = "xarray-", token: str = None, lock: bool = False, + inline_array: bool = False, **chunks_kwargs: Any, ) -> Dataset: """Coerce all arrays in this dataset into dask arrays with the given @@ -2018,6 +2020,9 @@ def chunk( lock : optional Passed on to :py:func:`dask.array.from_array`, if the array is not already as dask array. + inline_array: optional + Passed on to :py:func:`dask.array.from_array`, if the array is not + already as dask array. **chunks_kwargs : {dim: chunks, ...}, optional The keyword arguments form of ``chunks``. One of chunks or chunks_kwargs must be provided @@ -2031,6 +2036,7 @@ def chunk( Dataset.chunks Dataset.chunksizes xarray.unify_chunks + dask.array.from_array """ if chunks is None and chunks_kwargs is None: warnings.warn( diff --git a/xarray/core/variable.py b/xarray/core/variable.py index 05c70390b46..2445921f4a0 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -1023,6 +1023,7 @@ def chunk( ) = {}, name: str = None, lock: bool = False, + inline_array: bool = False, **chunks_kwargs: Any, ) -> Variable: """Coerce this array's data into a dask array with the given chunks. @@ -1046,6 +1047,9 @@ def chunk( lock : optional Passed on to :py:func:`dask.array.from_array`, if the array is not already as dask array. + inline_array: optional + Passed on to :py:func:`dask.array.from_array`, if the array is not + already as dask array. **chunks_kwargs : {dim: chunks, ...}, optional The keyword arguments form of ``chunks``. One of chunks or chunks_kwargs must be provided. @@ -1053,6 +1057,13 @@ def chunk( Returns ------- chunked : xarray.Variable + + See Also + -------- + Variable.chunks + Variable.chunksizes + xarray.unify_chunks + dask.array.from_array """ import dask.array as da @@ -1098,7 +1109,9 @@ def chunk( if utils.is_dict_like(chunks): chunks = tuple(chunks.get(n, s) for n, s in enumerate(self.shape)) - data = da.from_array(data, chunks, name=name, lock=lock, **kwargs) + data = da.from_array( + data, chunks, name=name, lock=lock, inline_array=inline_array, **kwargs + ) return self._replace(data=data) @@ -2710,7 +2723,7 @@ def values(self, values): f"Please use DataArray.assign_coords, Dataset.assign_coords or Dataset.assign as appropriate." ) - def chunk(self, chunks={}, name=None, lock=False): + def chunk(self, chunks={}, name=None, lock=False, inline_array=False): # Dummy - do not chunk. This method is invoked e.g. by Dataset.chunk() return self.copy(deep=False) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 81bfeb11a1e..e3ed220faaf 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -3825,6 +3825,27 @@ def test_load_dataarray(self): # load_dataarray ds.to_netcdf(tmp) + @pytest.mark.skipif( + ON_WINDOWS, + reason="counting number of tasks in graph fails on windows for some reason", + ) + def test_inline_array(self): + with create_tmp_file() as tmp: + original = Dataset({"foo": ("x", np.random.randn(10))}) + original.to_netcdf(tmp) + chunks = {"time": 10} + + def num_graph_nodes(obj): + return len(obj.__dask_graph__()) + + not_inlined = open_dataset(tmp, inline_array=False, chunks=chunks) + inlined = open_dataset(tmp, inline_array=True, chunks=chunks) + assert num_graph_nodes(inlined) < num_graph_nodes(not_inlined) + + not_inlined = open_dataarray(tmp, inline_array=False, chunks=chunks) + inlined = open_dataarray(tmp, inline_array=True, chunks=chunks) + assert num_graph_nodes(inlined) < num_graph_nodes(not_inlined) + @requires_scipy_or_netCDF4 @requires_pydap