Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Dask map_partitions node #143

Closed
elshize opened this issue Jun 27, 2022 · 9 comments
Closed

Dask map_partitions node #143

elshize opened this issue Jun 27, 2022 · 9 comments

Comments

@elshize
Copy link

elshize commented Jun 27, 2022

ℹ️ This is in response to a discussion with @elijahbenizzy on discord.

This is an example of a use case that could be supported with some additional decorators.

The usecase is that a node takes one Dask data frame and possibly some other arguments (either Pandas data frames or scalars). Then, the node simply executes map_partitions on the dask input, broadcasting the remaining arguments.

For example:

def node(a: dd.DataFrame, b: pd.DataFrame, c: int):
    return a.map_partitions(_node, b, c, align_dataframes=False)

def _node(a: pd.DataFrame, b: pd.DataFrame, c:int):
    # actual logic for each partition `a`

could be:

# this probably should be designed better, I just want to give you an idea
@dask_partition("a")
def node(a: pd.DataFrame, b: pd.DataFrame, c: int):
    # logic

So that it could run that map_partitions automatically. It might need (at least optional) parameter to add meta to the function call.

Not entirely sure if there are some disadvantages of that, and what the value of it would be in general, but I wanted to document what we discussed.

@skrawcz
Copy link
Collaborator

skrawcz commented Jun 30, 2022

@elshize Thanks for raising this. I think at some point we might have to provide "context hints" depending how things are being executed.

Questions to help me understand what the desired end state is:

  • What do you think the goal should be by providing such an annotation?
  • Is this a common thing one might want to do with dask?
  • How would this help you?

Otherwise my current train of thought is:

  • How does this impact code portability and reuse?
  • Is it being a decorator good or bad as to the original? Seems like that could be a yes, it's better.

Alternatively what about:

def _node(a: pd.DataFrame, b: pd.DataFrame, c:int):
    # actual logic...

@config.when(execution='dask')
def node__map_partitions(a: dd.DataFrame, b: pd.DataFrame, c: int) -> pd.DataFrame:
    return a.map_partitions(_node, b, c, align_dataframes=False)

@conf.when_not(execution='dask')
def node__regular(a: pd.DataFrame, b: pd.DataFrame, c:int) -> pd.DataFrame:
    return _node(a, b, c)

@elshize
Copy link
Author

elshize commented Jul 2, 2022

@skrawcz In general, having my own wrapper is not a big deal at all, so from this point of view, this would be a small improvement -- if it were generated from an annotation, for example.

But I was also thinking of how to run this type of thing with when dask already executes the hamilton DAG. (I am not very familiar with how it works, so please let me know if I get things terribly wrong.)

Let's say I have a bunch of nodes that do not operate on any Dask series/dataframe. One might want to run it such that each node is a delayed object and run it with Dask (I think this is possible at the moment, if not fully stable). What would then happen if one of the nodes itself needs to accept a dask dataframe? Would Dask resolve something like this out of the box correctly? As in, a delayed object of a Dask dataframe? I was under the impression that we'd need some support from Hamilton to do this. But maybe I'm wrong? Now that I'm writing this, I'm not sure anymore.

Essentially, I'm talking about a use case where there are a bunch of small nodes that could be done in parallel, but there's a larger data frame that does something with smaller data chunks, and this part should be parallelized within the node itself.

I hope that makes sense.

@skrawcz
Copy link
Collaborator

skrawcz commented Jul 6, 2022

@elshize I want to say, what you ask for should already work in Hamilton, assuming you do things a certain way:

  1. Distributing arbitrary code with dask ✅ we do this via functions.
  2. Dask then also does distributed computing for you if you choose to call a function on one of their distributed types. Hamilton doesn't interfere with this either, but requires you to transform the data so it's distributed ✅

Essentially, I'm talking about a use case where there are a bunch of small nodes that could be done in parallel, but there's a larger data frame that does something with smaller data chunks, and this part should be parallelized within the node itself.

If you're instead asking for a way to apply Hamilton within a DAG node itself? Thoughts:

  1. Do you have a code example to understand this a bit more?
  2. I think you'd be limited by Dask's ability here in the end. Dask can distribute data, and then perform some computation over it, e.g. map reduce style. So you'd be limited in parallelization by how segmented the data is...
  3. It might be possible to model this in Hamilton, e.g. instantiate a driver in a function node, which is itself within a Hamilton DAG, but I'm not convinced it would be helpful?

In case I am misinterpreting, and your question is actually about how do you mix and match data types? I think talking with code might help ground what we're talking about. So here's an example that reads in a file via pandas, but we wrote a function that operates over a dask dataframe. How do we connect the two?

import pandas as pd
from dask import dataframe as dd


#----- option 1 -- things might just "work" -- e.g. same API between dask and pandas used (with a caveat around how we do static type checking when building the DAG -- I believe with the DaskGraphAdapter pandas dataframes & dask ones are equivalent...)
def raw_data(location: str) -> pd.Dataframe:
     return pd.read_csv(location)

def processed_df(raw_data: dd.Dataframe) -> dd.Dataframe:
    # do some logic
    return processed_dataframe

# ------ option 2 -- we explicitly expect a pandas dataframe as input - and distribute it within the function 
def raw_data(location: str) -> pd.Dataframe:
     return pd.read_csv(location)

def processed_df(raw_data: pd.Dataframe) -> dd.Dataframe:
    dd_data = dd.from_pandas(raw_data)
    # do some logic
    return processed_dataframe 

# ---- option 3 - have two different load steps -- assuming API equivalence between pandas & dask
@config.when(env='local')
def raw_data__pandas(location: str) -> pd.Dataframe:
     return pd.read_csv(location)

@config.when(env='prod')
def raw_data__dask(location: str) -> dd.Dataframe:
     return dd.read_csv(location)

def processed_df(raw_data: dd.Dataframe) -> dd.Dataframe:
    # do some logic
    return processed_dataframe

# ---- option 4 - have an intermediate step
def raw_data(location: str) -> pd.Dataframe:
     return pd.read_csv(location)

def raw_data_dask(raw_data: pd.Dataframe) -> dd.Dataframe:
     return dd.from_pandas(raw_data)

def processed_df(raw_data_dask: dd.Dataframe) -> dd.Dataframe:
    # do some logic
    return processed_dataframe

As you can see there's quite a few ways to model this....

What I'd be thinking about is:

  1. What functions make sense to curate into separate modules? E.g. instead of @config.when you just switch the module you build the DAG from.
  2. If I need to go from "local" data to "distributed" data then modeling that step as an explicit function seems fine to me.

@elshize
Copy link
Author

elshize commented Jul 8, 2022

Hey, sorry for late reply, I've been quite busy. I'll need some time to experiment with Dask and Hamilton. It's possible that you can already do what I want... I'll try to find some time next week to give this a closer look and educate myself first; and if I can't figure it out, I'll come back with some code to illustrate.

@elijahbenizzy
Copy link
Collaborator

@elshize any updates on this? I like the general idea of having graphadapters/the framework enable alternation between, say, pandas and dask -- still need to scope it out more. The "intermediate step" @skrawcz described could easily be a part of the framework -- if we're aiming at abstracting the transformations from the infra, figuring this out could be a nice way -- thus enabling you to easily write a new function and scale nicely.

That said, other approaches to scale pandas (modin, for instance) could do this as well...

@elshize
Copy link
Author

elshize commented Oct 31, 2022

@elijahbenizzy Apologies, I got busy and then it slipped my mind; I haven't had any time at work to revisit this; but now reading this again, I have some thoughts.

I think what I meant to begin with is much simpler than what we started discussing eventually. I was thinking of doing with dask/pandas something similar to what is done with pandas df/series. For example, when you load data to a DataFrame, you can split by columns to operate on columns independently in nodes. What I had in mind is somewhat similar but instead of splitting dask data frame to pandas, I imagined having an annotation when I'm using that dask data frame as argument in another node to say: this is a dask data frame, but treat it as Pandas in this function, and treat it as applied to all partitions of that dask df. You could even imagine this being inferred from the type (if used pandas as argument, treat as processing per partition) but not sure if this type of implicit mechanism is a good idea, might lead to bugs.

Just want to clarify, this is not a functionality that cannot be implemented currently, as in I can have a node that takes dask and map partitions with another function. But splitting columns can be implemented the same way by taking the entire frame and extracting column. It's an ease-of-use kind of thing.

Having a simple annotation to map partitions without the boilerplate sounds like a good idea to me. That said, I might be missing some downsides, or you might simply not think it important enough use case to implement it. This is just a suggestion, I can certainly live without it if I have to :)

@skrawcz
Copy link
Collaborator

skrawcz commented Dec 19, 2022

@elshize just getting back around to this.

I imagined having an annotation when I'm using that dask data frame as argument in another node to say: this is a dask data frame, but treat it as Pandas in this function, and treat it as applied to all partitions of that dask df.

This is interesting. I think this could be possible, especially with a custom graph adapter. As you point out, we'd just need to think whether this helps and leads to less bugs, rather than more...

E.g. brainstorming an API here, would something like this work?

@tag(dask_functionality="map_partitions")
def named_function(name_of_dataframe: pd.DataFrame, other_arg: int, other_scalar: float) -> pd.DataFrame:
      # implicitly this will cause the graph adapter to do the following if run with the appropriate graph adapter
      # result = name_of_dataframe.map_partitions(named_function, other_arg, other_scalar)
      # if run with plain graph adapter, then the above would not happen.
     ...

@elshize
Copy link
Author

elshize commented Dec 20, 2022

@skrawcz Yeah, this is essentially what I was thinking of.

Again, this is nothing crucial in the sense that it doesn't provide a new functionality. To me, it makes sense, and a nice thing to have. I'm imagining a scenario where I'm working on Pandas DFs/Series and at some point decide that one part of it is large, so instead I load something with dask, and just add the annotation to what I've already implemented.

I suppose ultimately it would boil down to how difficult it is to implement and/or how it fits with the rest of the codebase. For example, it's probably not important enough to rewrite some internal mechanisms to achieve it. I'm not that familiar with the codebase itself, but knowing how annotations work in general, the implementation should be straight forward...

@elijahbenizzy
Copy link
Collaborator

We are moving repositories! Please see the new version of this issue at DAGWorks-Inc/hamilton#32. Also, please give us a star/update any of your internal links.

Note that everything else (slack community, pypi packages, etc...) will all be the same.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants