Improved performance of Dask preprocessing by adding parallelism #1193
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This change addresses a fundamental performance bottleneck in the Dask preprocessing pipeline. Before, every feature was being processed sequentially, resulting in a task graph with the following form:
The problem with this is because of Dask's lazy execution, if any step needs to be materialized (for example, computing a metadata statistic), it results in the entire graph up that point needing to be re-executed. The result is that adding new features can quadratically slow down the preprocessing time (every feature adds a linear amount of extra work due to redundancy).
The culprit is the use of
assign
which occurs when we assign a series to a Dask DataFrame:Because subsequent operations need to reuse the
df
from the previous iteration, it creates a task dependency, meaning that steps cannot be done in parallel, and the task graph is one long chain.The change here is to instead split each feature into an independent subgraph of computation, which (1) improves overall parallelism, and (2) means that computing statistics needs to only process the part of the graph relevant to the given feature. Now the graph looks like the following:
We accomplish this by replacing the intermediate dataframe with a
dict
to store the processed series for each feature. Then, after getting the processed series for each feature at the end, we can assign them to the final output dataframe as a "join" operation.Before, processing the Higgs dataset with Dask took over 45 minutes, and is now down to about 30 seconds (on par with Pandas) when running locally.
cc @clarkzinzow