Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask executor is broken with dask upstream #259

Closed
rabernat opened this issue Jan 12, 2022 · 19 comments · Fixed by #260
Closed

Dask executor is broken with dask upstream #259

rabernat opened this issue Jan 12, 2022 · 19 comments · Fixed by #260

Comments

@rabernat
Copy link
Contributor

rabernat commented Jan 12, 2022

In our Dask executor, we create a high-level graph as follows:

hlg = HighLevelGraph(layers, dependencies)
delayed = Delayed(prev_token[0], hlg)
return delayed

Our upstream test (dask commit 725110f9367931291a3e68c9d582544cdb032f77) has revealed the following error, triggered by the line

delayed = Delayed(prev_token[0], hlg)
self = Delayed('finalize_target-8dfa49acbe8cb65c6a2f82ebca37350d')
key = 'finalize_target-8dfa49acbe8cb65c6a2f82ebca37350d'
dsk = HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f338c0b3280>
 0. config
 1. cache_input
 2. prepare_target
 3. store_chunk
 4. finalize_target

length = None, layer = None

    def __init__(self, key, dsk, length=None, layer=None):
        self._key = key
        self._dask = dsk
        self._length = length
    
        # NOTE: Layer is used by `to_delayed` in other collections, but not in normal Delayed use
        self._layer = layer or key
        if isinstance(dsk, HighLevelGraph) and self._layer not in dsk.layers:
>           raise ValueError(
                f"Layer {self._layer} not in the HighLevelGraph's layers: {list(dsk.layers)}"
            )
E           ValueError: Layer finalize_target-8dfa49acbe8cb65c6a2f82ebca37350d not in the HighLevelGraph's layers: ['config', 'cache_input', 'prepare_target', 'store_chunk', 'finalize_target']

It looks like Dask does not like how we are creating the Delayed object. Is this a dask regression? Or do we need to change our code.

@TomAugspurger - any insights here?

@TomAugspurger
Copy link
Contributor

Seems like that check was added in dask/dask#8452 (cc @gjoseph92).

Maybe we need a similar fix as https://github.com/dask/dask-ml/pull/898/files#diff-db396642d72db7ff2df3330275627030696e1ae2dbd1972b86b881529cd519b0R131-R134, by providing the layer name when creating the delayed?

@gjoseph92
Copy link
Contributor

Yes, this code was previously silently creating invalid Delayed objects; now doing so is an error.

From skimming the code, I think you could do Delayed(prev_token[0], hlg, layer=prev_token[0].split("-")[0]).
Or, the more appropriate thing to do might be to use the same names for HLG layers as you're using for keys. I'm not sure what the purpose is of appending a token to the keys, but not to the layer names—could the layer names just have tokens too?

@rabernat
Copy link
Contributor Author

When I try that with dask==2021.08.1, I get

delayed = Delayed(prev_token[0], hlg, layer=stage.name)
E       TypeError: __init__() got an unexpected keyword argument 'layer'

@gjoseph92
Copy link
Contributor

Correct. Matching HLG layer names to task keys would be the backwards-compatible change.

@TomAugspurger
Copy link
Contributor

If it isn't time-sensitive, I can take a look at this tomorrow or Friday.

@rabernat
Copy link
Contributor Author

Tom I think I see how to fix it.

@rabernat
Copy link
Contributor Author

rabernat commented Jan 13, 2022

Matching HLG layer names to task keys would be the backwards-compatible change.

Over in #260 there is a naive attempt at that. It passed for me in my local environment with dask 2021.08.1, but it failed after upgrading to dask 2021.12.0.

If anyone sees a quick workaround, let me know. Edit: nm I think I got it. Otherwise we can go with the layer= approach and bump our dask required version. Edit: that will not work because not even 2021.12.0 (latest release) supports the Delayed(..., layer=) syntax.

@gjoseph92
Copy link
Contributor

@rabernat I'm just personally curious—why are you using a HLG for this, as opposed to a simple low-level graph? It's evidently adding some extra complexity, but since you're not using Blockwise or anything special like that, I'm wondering what benefit you get from it.

@rabernat
Copy link
Contributor Author

I don't know. I just vaguely thought it would be better somehow, since our computation certainly fits the HighLevelGraph model well. At the very least, we get a nicer repr for our graph. What would the other possible advantages be?

@gjoseph92
Copy link
Contributor

IMO there are no advantages (besides the repr I guess) unless you're using non-materialized layers. https://docs.dask.org/en/stable/high-level-graphs.html makes it sound as though there are some special optimizations that are only available by using HLGs, but that's not exactly the case: there are special optimizations available when using non-materialized layers, which can only be used with HLGs. But if you're writing the graphs as dicts, HLGs don't add anything.

@rabernat
Copy link
Contributor Author

rabernat commented Jan 13, 2022

Can you say more about non-materialized layers? Would that make our HLG consume less memory? If so that, would be great. The documentation does not really explain what a non-materialized layer is nor how to construct one.

Edit for clarity: we creating the graph as a dict because that's the only way I know how to do it. In our other executors (Prefect, Beam), the mapping operation in each layer is implicit, while in dask it is explicit (every task of the map operation is explicitly represented). Perhaps nml's would allow us to do the same in dask?

@martindurant
Copy link
Contributor

Along with the from_numpy discussion (which xarray calls), you can end up materialising very large graphs for some datasets like the ones in this project. If you mean to slice this up and compute piece-by-piece, then a good fraction of those graph items need not to have been made, which is exactly the sort of thing a HLG layer should be able to handle.

Yes, completely agree, the mapping should not, in general, require you to build a dict by hand (although having the option is very useful as an escape hatch).

@martindurant
Copy link
Contributor

Can you say more about non-materialized layers

Put simply, a materialised layer is simply a wrapping around the previous dict representation, but a non-materialised layer is a prescription for generating the tasks. Like the difference between a list and a generator. The latter must still be iterated over in the scheduler, but if you are to subselect from it, then you can prune the ranges you are iterating over without making the items.

However, the whole HLG/layer thing is complex and not much documented. I think <5 people have ever tried to make one. I tried once, and only partially succeeded.

@rabernat
Copy link
Contributor Author

This is the thing we are trying to turn into a layer:

@dataclass(frozen=True)
class Stage:
function: StageFunction
name: str
mappable: Optional[Iterable] = None
annotations: Optional[StageAnnotations] = None

It would be great if we did not have to explicitly store each task in the dask graph. Can anyone point me to an example of creating a non-materialized layer?

@martindurant
Copy link
Contributor

That is a perfect use case, and I have also asked for a clear doc showing how you would turn a dict comprehension (which could also express that operation) into the equivalent Layer.

@gjoseph92
Copy link
Contributor

The documentation does not really explain what a non-materialized layer is nor how to construct one.

Agreed! Someday hopefully it will (dask/dask#7709), but it's very unapproachable right now.

In our other executors (Prefect, Beam), the mapping operation in each layer is implicit, while in dask it is explicit (every task of the map operation is explicitly represented)

This is the sort of thing you could use Blockwise to represent. Blockwise is kind of like NumPy's broadcasting/vectorization logic, applied to patterns of keys in a dask graph. Or einsum for dask graphs. It's basically a way of generalizing the "apply this function to every chunk" logic, with an extremely powerful way of representing how to iterate over the chunks, and how to broadcast together multiple chunked inputs.

I'll write you an example of how to use this in a moment, but first I want to clarify something. I am probably completely misunderstanding pangeo-forge and what we're talking about here, but when I see "pipeline", I think of a pipeline of high-level functions a user is chaining together to produce a dataset. Kind of like Prefect, or Airflow, or something like that. Maybe each of those functions themselves contain dask code, and do all sorts of big array operations and call compute internally or something, but that's irrelevant from pangeo-forge's perspective. Since "pipeline" makes me think of Airflow-like things, I'm imagining the number of tasks in the pipeline is usually in the 10s, maybe the 1000s for crazy ones.

At the thousand-task level, graph size is just so far from being a problem that even though you could use fancy Blockwise, I don't see the point.

Are the graphs of your pangeo-flow pipelines large enough that they're creating problems? Like 100,000+ tasks in the pipeline graph?

@rabernat
Copy link
Contributor Author

Yes, we have pipelines that process 100000+ files (e.g. #151), creating ~5x that many tasks. In the future, we plan to deploy on orders of magnitude bigger. We have already been struggling with the memory consumption of our Dask graphs (see e.g. #116). We have been able to get around some issues by serializing things more efficiently (e.g. #160). However, the explicit representation of each task in the graph will remain a bottleneck.

The dask pipeline exectuor is <50 lines of [hopefully pretty readable] code:

class DaskPipelineExecutor(PipelineExecutor[Delayed]):
@staticmethod
def compile(pipeline: Pipeline):
token = dask.base.tokenize(pipeline)
# we are constructing a HighLevelGraph from scratch
# https://docs.dask.org/en/latest/high-level-graphs.html
layers = dict() # type: Dict[str, Dict[Union[str, Tuple[str, int]], Any]]
dependencies = dict() # type: Dict[str, Set[str]]
# start with just the config as a standalone layer
# create a custom delayed object for the config
config_key = append_token("config", token)
layers[config_key] = {config_key: pipeline.config}
dependencies[config_key] = set()
prev_key = config_key
prev_dependency = () # type: Union[Tuple[()], Tuple[str]]
for stage in pipeline.stages:
stage_graph = {} # type: Dict[Union[str, Tuple[str, int]], Any]
if stage.mappable is None:
stage_key = append_token(stage.name, token)
func = wrap_standalone_task(stage.function)
stage_graph[stage_key] = (func, config_key) + prev_dependency
else:
func = wrap_map_task(stage.function)
checkpoint_args = []
for i, m in enumerate(stage.mappable):
key = (append_token(stage.name, token), i)
stage_graph[key] = (func, m, config_key) + prev_dependency
checkpoint_args.append(key)
stage_key = f"{stage.name}-checkpoint-{token}"
stage_graph[stage_key] = (checkpoint, *checkpoint_args)
layers[stage_key] = stage_graph
dependencies[stage_key] = {config_key} | {prev_key}
prev_dependency = (stage_key,)
prev_key = stage_key
hlg = HighLevelGraph(layers, dependencies)
delayed = Delayed(prev_key, hlg)
return delayed
@staticmethod
def execute(delayed: Delayed):
delayed.compute()

If anyone wanted to make a PR to refactor this to use blockwise, that would be super appreciated! We have a strong test suite, so it should be straightforward to know if it works or not. This is probably not something I'll spend time on myself soon, but it's great to know the option is there.

@TomAugspurger
Copy link
Contributor

Still catching up on this thread, but I wanted to comment that I originally wanted to just just HLGs (no low-level dicts) but I ran into some issues building the HLG. I hadn't used Layers before and couldn't figure them out, and so reverted to the low-level approach.

We want to and should be able to avoid the low-level dicts.

gjoseph92 added a commit to gjoseph92/pangeo-forge-recipes that referenced this issue Jan 13, 2022
@gjoseph92
Copy link
Contributor

If anyone wanted to make a PR to refactor this to use blockwise, that would be super appreciated!

I have been successfully nerd-sniped #261

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants