Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defining blocks w/o using dask #13

Open
nbren12 opened this issue May 6, 2021 · 14 comments
Open

Defining blocks w/o using dask #13

nbren12 opened this issue May 6, 2021 · 14 comments

Comments

@nbren12
Copy link
Contributor

nbren12 commented May 6, 2021

Xarray offers a limited lazy array functionality for indexing and merging that does not require dask. This can be achieved by using xarray.open_zarr(, chunks=None).

Currently xpartition requires dask arrays, but it seems like the we could decouple the notion of chunks from xarray/dask by allowing the user to specific the chunks/sizes in

class BlocksAccessor:
.

The could then use the basic feature to plan jobs e.g.:

blocks = Blocks(chunks={"time": 10}, sizes=dataset.sizes)
for i in range(len(blocks)):
    idata = dataset.isel(blocks.indexers(time=i))
@shoyer
Copy link

shoyer commented May 27, 2021

Do you think sizes is an important part of Blocks, or is chunks enough? In Xarray-Beam, we've mostly just been using Mapping[str, int] for chunks, which seems to suffice for many cases.

I guess the other extreme would be to make a fully lazy "schema" for an xarray.Dataset, which could include chunking as well. This is important for cases like writing a new Zarr file. So far I've resisted this in favor of using actual xarray.Dataset objects full of Dask arrays, but it's easy to make a Dataset full of dask arrays that is very expensive to serialize (due to large dask graphs) so this isn't ideal.

@nbren12
Copy link
Contributor Author

nbren12 commented May 27, 2021

sizes gives the blocks Information about the global size of the index space. E.g. len(blocks) needs to know this. Basically size[dim]/chunks[dim] is the number of chunks along a dimension.

Formalizing a small set theory for index space would be a good foundation IMO. I have something like the following in mind

Block = Mapping[str, slice]
select(block: Block, ds: Dataset) -> Dataset
intersect(block, ....)
union(block, ,,,)
local_indexes(super_block, sub_block)
global_indexes(super_block, local_block)


Partition = Sequence[Block]
union(*partition) = sizes
intersect(partiton[i], partition[j]) = None

It’s a different point, but I’m +1 on some kind of DatasetSchema. It can be awkward to make dask work for this purpose.

@spencerkclark
Copy link
Owner

The current BlocksAccessor operates like an interface to dask.array.Array.blocks on dask-backed DataArrays with labeled dimensions; e.g. data_array.blocks.isel(x=slice(1, 5), y=1) will return the same data as dask_array.blocks[1:5, 1]. @nbren12 is correct that the main reason for requiring the sizes information is determining the number of chunks along each dimension. We currently use this to do things like bounds checking on "block indexers" in the example above, and our method of partitioning work also depends on knowing how many dask chunks there are to partition across workers.

I think like @shoyer, this dependence on dask-backed DataArrays was primarily out of convenience. It is convenient because dask gives us an easy way to lazily construct a Dataset, regardless of how the data is stored on disk (i.e. xpartition does not need to know that our source data is split across multiple netCDF files; xarray and dask can handle that). This worked well for the use-case I initially wrote things for -- the dask graph wasn't too large to be inconvenient, but did involve enough data, 30+ TB in some cases, to cause problems if you tried to execute things through a single dask client. That said, I totally get the fact that it can be easy to create dask graphs that are themselves difficult to work with, and so I would be very supportive of working towards a way to define blocks without dask.

For the use-case of xarray.open_zarr(path, chunks=None) this seems fairly straightforward, since the data can all be accessed via a single entry point, but for a dataset stored in a collection of netCDF files this seems a little more involved. Maybe though one would just consider these separate use-cases, which we would not expect to be handled by the same tool.

@spencerkclark
Copy link
Owner

spencerkclark commented May 28, 2021

@nbren12 is correct that the main reason for requiring the sizes information is determining the number of chunks along each dimension.

I suppose if we have the tuple version of chunks (this is what we use in xpartition already), i.e. the explicit size of each chunk along each dimension, we can always infer the full array size information (it's just the sum of the chunk sizes along each dimension).

In [1]: import dask.array

In [2]: dask.array.zeros((10, 5), chunks=(3, 3)).chunks
Out[2]: ((3, 3, 3, 1), (3, 2))

@nbren12
Copy link
Contributor Author

nbren12 commented May 28, 2021

For sure. My thoughts are that we should make an abstract interface, that can be implemented with either chunks tuples or sizes or w/e. Also, "chunking" is just a special kind of partition, one that is orthogonal.

@shoyer
Copy link

shoyer commented May 29, 2021

I suppose if we have the tuple version of chunks (this is what we use in xpartition already), i.e. the explicit size of each chunk along each dimension, we can always infer the full array size information (it's just the sum of the chunk sizes along each dimension).

In [1]: import dask.array

In [2]: dask.array.zeros((10, 5), chunks=(3, 3)).chunks
Out[2]: ((3, 3, 3, 1), (3, 2))

So far I've mostly been sticking to "non-expanded" chunks in public APIs for xarray-beam, e.g., just {'x': 3, 'y': 3}. It's a little easier to reason about than irregular chunks, they are guaranteed to be cheap to serialize and are guaranteed to fit into a Zarr file.

I do still need irregular chunks when splitting for rechunking, but they don't have any particular representation in the public API, other than the "intersection of source and target chunks". You can figure out things like number of chunks in the intersection and how to split chunks for rechunking with a bit of math, e.g.,
https://github.com/pangeo-data/rechunker/pull/89/files#diff-616f56a1b0f9fb20aef64eaf7d777bc391df69cc86a92cbf2769b71e0fc27584R138
https://github.com/google/xarray-beam/blob/2bd11396895f74ea6dfcc30092fe6e59139b6ca0/xarray_beam/_src/rechunk.py#L176-L197

Also, "chunking" is just a special kind of partition, one that is orthogonal.

Do you have plans for non-orthogonal partitions? 😱

@nbren12
Copy link
Contributor Author

nbren12 commented May 29, 2021

So far I've mostly been sticking to "non-expanded" chunks in public APIs for xarray-beam

I think spencer's (and my) point was that with "non-expanded" chunks you also need sizes to know how big the global index space is. Or you can explicitly list all the tuples.

Do you have plans for non-orthogonal partitions?

I may be wrong, but xpartition may already support this. It combines multiple chunks into larger rectangles based on the number of "ranks" the user selects, and I don't think we can assume that all the edges of these line up any longer...imagine a large cube embedded a sea of smaller cubes. this is quite a nice way to allocate work when the individual chunks are too small.

@spencerkclark
Copy link
Owner

I had a similar reaction to @shoyer regarding non-orthogonal partitions :). Just to make sure I am on the same page -- by "orthogonal" partition do we mean a region that can be completely defined with a single Mapping[str, slice]? By this definition, xpartition currently only supports orthogonal partitionings of chunks; since chunks themselves are orthogonal partitionings of the underlying data, this means xpartition's partitions are also always orthogonal partitionings of the underlying data.

The algorithm for splitting arrays across N ranks is to iteratively go dimension by dimension in the "blocks" space, splitting each dimension into as many partitions as possible such that the total number of partitions remains less than or equal to the number of available ranks (here with respect to the underlying data, the partitions are "chunks of chunks," or "meta-chunks" if you will). Sometimes not all ranks will be used.

@nbren12
Copy link
Contributor Author

nbren12 commented May 29, 2021

Ah I see. It goes one dimension at a time.

Just to straighten up terminology and make it line up with set theory:

  1. A "chunk/block" is a Chunk = Mapping[str, slice] that describes a subset of the global index space "A".
  2. A partition of a set A, is a collection of pairwise disjoint subsets of A whose union is "A". See wikipedia. A partition would have the type Sequence[Chunk].
    1. I show a picture of the "orthogonal partition below". Let's call it a lattice...sounds better. The nice thing about a lattice, is that it is easy to coarsen into another lattice...which is basically what xpartition does. However it may be easier to load balance across many ranks with the more general "partition".

This is described using the types in my comment above: #13 (comment).

image

@nbren12
Copy link
Contributor Author

nbren12 commented May 29, 2021

For instance, in my picture above, there is no good way to coarsen the lattice into another lattice with 4 ranks. One worker will have to do C11, C21, C12, C22, another will do C33, and the remaining two will do C31/C32 and C13,C23.

@shoyer
Copy link

shoyer commented May 31, 2021

Right, I can see how non-orthogonal partitioning could potentially be useful, but it's complex enough that I definitely would not bother to try to support it until there is a clear use-case :).

I thought I would mention two other nuances around data structures that have come up for me in xarray-beam:

  1. In addition to partitioning across dimensions with "chunks", there is partitioning across variables (particularly data variables). Splitting into separate variables is often the most favorable way to partition a dataset, because communication patterns across different variables tend to be sparse (or even non-existent).
  2. The other data type that we needed is a "key" that uniquely identifies a chunk. In principle this could be the same as "chunk", but upper bound on slices is redundant so I used hashable Mapping[str, int] in xarray-beam, which is a little easier to dynamically construct & update. I'm pretty happy with this, though it probably also needs to include some indication of partitioned variables as well, per (1) -- see Indicate variables in xarray-beam keys google/xarray-beam#9 for my detailed proposal.

Interestingly the current functionality of xpartition and xarray-beam is actually almost entirely orthogonal, aside from the ability to partition a dataset (although even this is done is fairly different ways). To me this is an argument for focusing on a handful of core data-structures and defining a library of helper functions that act on those core data structures, something that could underlie the current functionality of both xpartition and xarray-beam.

@nbren12
Copy link
Contributor Author

nbren12 commented Jun 1, 2021

Right, I can see how non-orthogonal partitioning could potentially be useful, but it's complex enough that I definitely would not bother to try to support it until there is a clear use-case :).

The main reason I mention it is that xpartition provides a List[Chunk] abstraction and is therefore slightly more general than lattice based partitions. This abstraction is a key simplification of the library IMO, since most computational back-ends support iterating over a list, but few can handle a multidimensional array. It's just a side benefit that it leaves the door open to future improvements e.g. non-orthogonal partitions.

@nbren12
Copy link
Contributor Author

nbren12 commented Jun 1, 2021

The other data type that we needed is a "key" that uniquely identifies a chunk. In principle this could be the same as "chunk", but upper bound on slices is redundant so I used hashable Mapping[str, int] in xarray-beam

It's also a bit redundant with any coordinate info inside of the xarray objects. Presumably if all the dimensions of an xarray object had coordinate info, we wouldn't need to pass around any separate metadata.

@shoyer
Copy link

shoyer commented Jun 1, 2021

The other data type that we needed is a "key" that uniquely identifies a chunk. In principle this could be the same as "chunk", but upper bound on slices is redundant so I used hashable Mapping[str, int] in xarray-beam

It's also a bit redundant with any coordinate info inside of the xarray objects. Presumably if all the dimensions of an xarray object had coordinate info, we wouldn't need to pass around any separate metadata.

This is true, but I didn't want to make any assumptions about the existing or sorted nature of coordinate values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants