Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF: unnecessary (expensive) concat #4740

Open
jbrockmendel opened this issue Jul 30, 2022 · 8 comments
Open

PERF: unnecessary (expensive) concat #4740

jbrockmendel opened this issue Jul 30, 2022 · 8 comments
Labels
P2 Minor bugs or low-priority feature requests Performance 🚀 Performance related issues and pull requests.

Comments

@jbrockmendel
Copy link
Collaborator

I have ray worker that is calling PandasDataframeAxisPartition.deploy_axis_func and in that doing pandas.concat on 16 DataFrames with MultiIndex indexes, an expensive concat.

AFAIK there isn't a nice way to see what called deploy_axis_func, so this is a bit speculative.

I think the partitions being collected are exactly the partitions of an existing DataFrame, which I think means that frame's index is already materialized somewhere, so reconstructing it inside concat is unnecessary. i.e. in deploy_axis_func we could do something like

+orig_indexes = [x.index for x in partitions]
+N = 0
+for obj in partitions:
+    obj.index = range(N, N+len(obj))
+    N += len(obj)

dataframe = pandas.concat(list(partitions), axis=axis, copy=False)

+dataframe.index = thing_we_already_know_so_dont_need_to_recompute
+
+for index, obj in zip(orig_indexes, partitions):
+    obj.index = index

If I'm right here, we could see significant savings. e.g. in the script im profiling, ATM 5% is spent in _get_concat_axis, and I think a lot more indirectly inside worker processes.

Moreover, if this works, we could do the pinning/unpinning before pickling/unpickling and save on pickle costs.

@pyrito
Copy link
Collaborator

pyrito commented Aug 1, 2022

Could you share the script you are using to profile? @jbrockmendel

@jbrockmendel
Copy link
Collaborator Author

Could you share the script you are using to profile?

I'm not sure that's allowed. If it helps, @YarShev and @anmyachev are looking at the same script.

@pyrito
Copy link
Collaborator

pyrito commented Aug 1, 2022

@jbrockmendel Ah ok. Is it possible to at least get a minimum reproducible example?

@jbrockmendel
Copy link
Collaborator Author

Is it possible to at least get a minimum reproducible example?

Well, no. I know concat is getting called bc I put a breakpoint() inside MultiIndex.append. I use ray debug to step through the stack that gets to that call and the last thing I see before ray's main_loop is

> /Users/brock/Desktop/modin/modin/modin/core/dataframe/pandas/partitioning/axis_partition.py(159)deploy_axis_func()
-> dataframe = pandas.concat(list(partitions), axis=axis, copy=False)

AFAICT the tooling doesn't provide a nice way to find out what call got me here.

@mvashishtha
Copy link
Collaborator

@jbrockmendel I think deploy_ray_func is remotely calling deploy_axis_func because of a line in PandasOnRayDataframeVirtualPartition.deploy_axis_func, which as far as I know only gets called in PandasDataframeAxisPartition .apply.

In case you don't know, PandasDataframeAxisPartition is a partition containing a sequence of one or more physical pieces of data (its list_of_blocks) along an axis. When you apply a function to it, a remote task will concatenate the blocks and then apply the function to the result.

I didn't know that concatenating the indices would itself be so expensive. In that case, maybe what you suggested in your first post would help. The full index may live in the PandasDataframe (though after #4726, not necessarily). We could plumb this index down to the remote task. But we'd have to pay the cost of putting the index in the object store and then getting it out.

@anmyachev
Copy link
Collaborator

@jbrockmendel I tried this approach, it became slower to work on our script (because of Ray's work with object store as @mvashishtha mentioned). In addition, for some operations, for example groupby.agg we do not have a pre-computed index that we can pass.

Is there the easy way to speed up concatenating the MultiIndex itself on pandas side?

@jbrockmendel
Copy link
Collaborator Author

Is there the easy way to speed up concatenating the MultiIndex itself on pandas side?

There's a patch that speeds up this particular case, but may slow down other cases (so i haven't decided yet whether to upstream it to pandas):

orig_mi_append = pandas.MultiIndex.append

def new_append(self, other):
    if not isinstance(other, list):
        other = [other]

    if all(isinstance(obj, pandas.MultiIndex) for obj in other):
        if all(obj.nlevels == self.nlevels for obj in other):
            if all(all(pandas.core.dtypes.missing.array_equivalent(slev, olev) for slev, olev in zip(self._levels, obj._levels)) for obj in other):
                objs = [self] + other
                new_codes = []
                for i in range(self.nlevels):
                    lev_codes = np.concatenate([obj.codes[i] for obj in objs])
                    new_codes.append(lev_codes)
                mi = pandas.MultiIndex(codes=new_codes, levels=self.levels, names=self.names)
                return mi
    return orig_mi_append(self, other)

pandas.MultiIndex.append = new_append

@anmyachev
Copy link
Collaborator

This should help us for to_pandas function because we most likely already have index in the main process.

@pyrito pyrito added Performance 🚀 Performance related issues and pull requests. P2 Minor bugs or low-priority feature requests labels Aug 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P2 Minor bugs or low-priority feature requests Performance 🚀 Performance related issues and pull requests.
Projects
None yet
Development

No branches or pull requests

4 participants