Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there any way of having .map_blocks be even more opaque to dask? #8414

Closed
max-sixty opened this issue Nov 5, 2023 · 23 comments
Closed

Is there any way of having .map_blocks be even more opaque to dask? #8414

max-sixty opened this issue Nov 5, 2023 · 23 comments
Labels
enhancement topic-chunked-arrays Managing different chunked backends, e.g. dask topic-dask

Comments

@max-sixty
Copy link
Collaborator

Is your feature request related to a problem?

Currently I have a workload which does something a bit like:

ds = open_zarr(source)
(
    ds.assign(
        x=ds.foo * ds.bar
        y=ds.foo + ds.bar
    ).to_zarr(dest)
)

(the actual calc is a bit more complicated! And while I don't have a MVCE of the full calc, I pasted a task graph below)

Dask — while very impressive in many ways — handles this extremely badly, because it attempts to load the whole of ds into memory before writing out any chunks. There are lots of issues on this in the dask repo; it seems like an intractable problem for dask.

image

Describe the solution you'd like

I was hoping to make the internals of this task opaque to dask, so it became a much dumber task runner — just map over the blocks, running the function and writing the result, block by block. I thought I had some success with .map_blocks last week — the internals of the calc are now opaque at least. But the dask cluster is falling over again, I think because the write is seen as a separate task.

Is there any way to make the write more opaque too?

Describe alternatives you've considered

I've built a homegrown thing which is really hacky which does this on a custom scheduler — just runs the functions and writes with region. I'd much prefer to use & contribute to the broader ecosystem...

Additional context

(It's also possible I'm making some basic error — and I do remember it working much better last week — so please feel free to direct me / ask me for more examples, if this doesn't ring true)

@dcherian
Copy link
Contributor

dcherian commented Nov 5, 2023

dask.delayed maybe?

You have a shuffle like thing going from Tower 3 to Tower 4. That's probably it. What's happening there?

Also you can write from within map_blocks and return 0. Does that fit your workload

@spencerkclark
Copy link
Member

@max-sixty I have run into similar issues, and wrote a package called xpartition to facilitate this kind of crude / pragmatic writing approach. I have been hesitant to advertise it widely as I have many of the same reservations as you, but despite that I find myself using it regularly for important things.

@alexamici
Copy link
Collaborator

@max-sixty I was doing a similar thing last week, hit the same issue and after trying a thousand strategies I ended up working around it with a custom scheduler inside the map_blocks as you did 😢

@TomNicholas
Copy link
Contributor

TomNicholas commented Nov 5, 2023

FYI cubed.map_blocks exists, so it might be interesting to try using that on this problem. However xr.map_blocks doesn't (yet) have the ability to defer to cubed instead of dask, so you would have to write your computation using the array-level function cubed.map_blocks.

EDIT: Also doing this

I was hoping to make the internals of this task opaque to dask, so it became a much dumber task runner — just map over the blocks, running the function and writing the result, block by block.

is essentially what I would like to do with xr.map_blocks to make it support using cubed.map_blocks. The reason I haven't made this change yet is because I don't know enough about the internals of xr.map_blocks to know why it doesn't just work by calling dask.map_blocks (I assume there is a good reason!).

@TomNicholas TomNicholas added topic-chunked-arrays Managing different chunked backends, e.g. dask topic-dask labels Nov 5, 2023
@max-sixty
Copy link
Collaborator Author

Eeeek, sounds like this is a common issue then... Without wanting to blast @s anytime anything is a problem, I'll politely CC @martindurant & @mrocklin for info, as I count four alternatives that folks have built, within a few hours of me posting the issue...

(Goes without saying that I am very happy to be wrong, not trying to throw stones at the hard work your team has done)


Also you can write from within map_blocks and return 0

That would work well, but it needs the indexes, and that's effectively prevented by #8409, because we get >1GB task graphs.


You have a shuffle like thing going from Tower 3 to Tower 4. That's probably it. What's happening there?

Yes, good observation.

Speculatively — I think this is splitting the source read by data variable, then shuffling to do the calc, then writing each destination data variable.

If that's correct, then another approach could be to try and keep the tasks for each block together, rather than organizing by each data variable.

I'll try and get to checking whether that assertion is correct.


FYI cubed.map_blocks exists, so it might be interesting to try using that on this problem.

I'm very up for checking this out, and some of the team pointed me towards it (hence my DM last week @TomNicholas !). I am way over on trying new things vs getting work done, and I would need to set up an env for running cubed.

I'd also like to try beam!

@mrocklin
Copy link
Contributor

mrocklin commented Nov 5, 2023

I'll defer to @fjetter, who has been playing a lot with Dask graph ordering recently and having decent success. He may have a new version for you to try.

If you're able to provide an mcve that recreates your currently unhappy graph I'd encourage you to do so. There's pretty good activity in this space right now.

@dcherian
Copy link
Contributor

dcherian commented Nov 5, 2023

I think this is splitting the source read by data variable, then shuffling to do the calc, then writing each destination data variable.
If that's correct, then another approach could be to try and keep the tasks for each block together, rather than organizing by each data variable.

IDK you have one dask array per variable anyway. But maybe the viz is misleading. This kind of shuffle type workload is where distributed falls apart. Also why are you splitting by variable? map_blocks can handle datasets.

In general, I'm also confused as to how a shuffle type workload gets expressed as a blockwise workload without a major rechunking/rearrangement (i.e. a shuffle) in the middle.

that's effectively prevented by #8409, because we get >1GB task graphs.

Could try with the current state of #8412

@max-sixty
Copy link
Collaborator Author

IDK you have one dask array per variable anyway. But maybe the viz is misleading. This kind of shuffle type workload is where distributed falls apart. Also why are you splitting by variable? map_blocks can handle datasets.

It wasn't me splitting by variable! 😄 dask seems to be doing it.

I'll try and get together an MCVE...

@max-sixty
Copy link
Collaborator Author

max-sixty commented Nov 5, 2023

OK, here's an MCVE. It requires more specificity than I expected — I think my read that things were working last week was correct actually, because the calcs were slightly different. It relies on the nested calcs, such that return d doesn't trigger the issue.

(though that was the beauty of .map_blocks for me — that it made the graph opaque to dask, and small changes in the inputs couldn't cause catastrophic runtime impact)

ds = xr.Dataset(
    data_vars=dict(
        a=(("x", "y"), np.arange(80).reshape(8, 10)),
        b=(("y"), np.arange(10)),
    )
).chunk(x=1)

def f(ds):
    d = ds.a * ds.b
    return (d * ds.b).sum("y")

result = ds.map_blocks(f).compute()

Here's the graph — you can see the lack of basic parallelism that @dcherian pointed out above — though obv less severe than the big example above:

image

@TomNicholas
Copy link
Contributor

Excellent @max-sixty!

@tomwhite we should add this to the distributed arrays repo examples, and try it with cubed

@dcherian
Copy link
Contributor

dcherian commented Nov 5, 2023

This is basically the same as pangeo-data/distributed-array-examples#2 which @fjetter tried to fix with dask/dask#10535

EDIT: And yeah the viz was misleading. It's not a shuffle, it's blockwise ops on different arrays, but only purely blockwise if you're unchunked on y

@TomNicholas
Copy link
Contributor

@dcherian right yeah, I see that now. We should still try it out!

@max-sixty
Copy link
Collaborator Author

This is basically the same as pangeo-data/distributed-array-examples#2 which @fjetter tried to fix with dask/dask#10535

Nice! Yes I've been following that, it looks quite promising.

Unfortunately I have been running on 2023.10.0, which IIUC includes that fix...

@max-sixty
Copy link
Collaborator Author

To put numbers onto this:

  • The source array is 2.7TB; the chunks are ~500MB each
  • The result is 92GB (it's doing lots of reduction)
  • Each worker has ~300GB of memory; there are 350 workers, so a total of 105TB of memory
  • I'm running it serially in 10 slices, with .to_zarr(region=...). So these array sizes can be divided by 10. Doing it this way means that some of the batches get through — workers are being killed a lot, but often it eventually manages a couple of batches.
  • So we really have a lot of memory available. While there is a expansion during the computation (the multiplication between the arrays have non-overlapping dimensions), this fits into memory fine in my custom executor.
    • I had originally tried doing this without .map_blocks — which possibly doesn't need to expand the array's dimensions out — but this fails too (though possibly for a different reason)

I also tried running with the latest point release 2023.10.1, unfortunately with no luck.

image

@fjetter
Copy link

fjetter commented Nov 6, 2023

Unfortunately I have been running on 2023.10.0, which IIUC includes that fix...

Yes, indeed. That includes already the fix. However, I'm working on a follow up that should work even better here dask/dask#10557

At least for your MCVE this new PR works as intended. The following graphs show the prioritization of tasks / the order in which we run tasks.

main / 2023.10.0

Note how all tasks after the 0 in the center are effectively scheduled consecutively before anything else runs. Those tasks are loading data and this is hurting you.

image

New PR

The tasks are only loaded when necessary

image

The PR in question has a couple of performance issues to figure out but it would help a lot if you could try it out on your real problem and let me know if this is indeed helping you. FWIW the performance problems of this PR would show up as a longer delay before your computation kicks off. However, judging by the graph I'm seeing you may not even be impacted by this.

I really appreciate that you put in the effort to create the minimal example. We can put this into our benchmark suite and add a unit test to dask/dask with this. This is very helpful!

@max-sixty
Copy link
Collaborator Author

That's awesome @fjetter !

(FYI it's difficult for me to try out code that's not on PyPI for this problem, but I will do when it's there)

@max-sixty
Copy link
Collaborator Author

One thought a couple of days later — having dask execute this graph reasonably would be ideal. But the original issue was whether we could be more opaque to dask, which is a plausible way of reducing the risk of poor execution.

IIUC, .map_blocks only exists as a form of this risk-reduction — otherwise we can just call foo(ds) rather than ds.map_blocks(foo). So would it make sense to make it even more opaque, so that the dataset chunk is executed as a single unit, without creating dependencies between the data variables?

I don't know how feasible it is to do that. I previously used .compute(scheduler="threads") so dask wouldn't schedule back on the central scheduler here, but I generally feel like I'm in a space where I'm not well-informed.

@TomNicholas
Copy link
Contributor

So would it make sense to make it even more opaque, so that the dataset chunk is executed as a single unit, without creating dependencies between the data variables?

Isn't this pretty much the model of xarray-beam?

@max-sixty
Copy link
Collaborator Author

IIUC, .map_blocks only exists as a form of this risk-reduction — otherwise we can just call foo(ds) rather than ds.map_blocks(foo). So would it make sense to make it even more opaque, so that the dataset chunk is executed as a single unit, without creating dependencies between the data variables?

This would be a bigger change than I envisioned, because it would need to handle cases where some variables don't share dimensions with the chunked dimension. (I'm actually not sure how xarray-beam handles this — maybe it requires all variables to be chunked on the "distributed" dimension?)


I'm not sure there's much to do here apart from wait for upstream / find alternatives in the meantime. So planning to close unless anyone has ideas for this.

@max-sixty max-sixty added the plan to close May be closeable, needs more eyeballs label Dec 8, 2023
@TomNicholas
Copy link
Contributor

I still don't understand what's stopping us rewriting xarray.map_blocks in terms of passing the underlying arrays to dask.array.map_blocks. If we did that then swapping out to try cubed would be trivial. I must be missing something obvious?

@max-sixty max-sixty removed the plan to close May be closeable, needs more eyeballs label Dec 8, 2023
@fjetter
Copy link

fjetter commented Dec 12, 2023

I apologize that this is almost off-topic but I just merged dask/dask#10660 which should fix the performance problems that triggered this conversation. From what I can tell this fixes the issue and I believe the new approach is more robust to your problems. If you encounter more issues like this, please let me know. I believe with the new implementation we can be more responsive to those issues.

@dcherian
Copy link
Contributor

Rewriting xarray.map_blocks in terms of passing the underlying arrays to dask.array.map_blocks.

You have to pass around more than arrays; random dicts as attrs for example, and distinguish between coordinate and data variables. It also passes around tuples of expected shapes, and dimension names to raise nice messages, so perhaps it could be done, but it would be quite ugly! I encourage experimenting here :)

Honestly, I'd much rather work on xarray_object.to_delayed. This is lower-level but would be useful for many things that we hack around with map_blocks (arbitrary tasks with no constraints on return values!)

I just merged dask/dask#10660 which should fix the performance problems that triggered this conversation.

YEAH! Thanks @fjetter.

IMO we can close.

@TomNicholas
Copy link
Contributor

This looks like a big improvement @fjetter ! Thank you for all your hard work here, user experience should be improved greatly by these efforts.

I'm still confused by why we can't just dispatch to map_blocks but I've split that question off into a separate issue (#8545), so that we can close this as hopefully addressing @max-sixty 's original problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement topic-chunked-arrays Managing different chunked backends, e.g. dask topic-dask
Projects
None yet
Development

No branches or pull requests

7 participants