Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelization with python dask and dask-learn. Proposal. #304

Open
ghgr opened this issue Nov 5, 2016 · 24 comments
Open

Parallelization with python dask and dask-learn. Proposal. #304

ghgr opened this issue Nov 5, 2016 · 24 comments

Comments

@ghgr
Copy link

ghgr commented Nov 5, 2016

According to Issue #177 I would like to propose work on the distributed/cluster environment. I believe that would be a major advancement on the applicability of tpot to real world problems. At this moment I am working with a dataset roughly (200000 , 500) and it smashes one core while the rest sit idle.

There has been discussion on python-dask [1], which is for the unfamiliar something like TensorFlow. You symbolically build a graph of the operations and then you call the compute method. This computation is automatically distributed, and intermediate results cached. I'd recommend taking a look at the official documentation (it took me 20 minutes on a train ride, so no big deal) to see some examples [2-5].

The first step I would like to propose is to model a single individual (sklearn pipeline) as a graph. Some work has been carried out in the Dask-learn project [6]. This project builds a graph for a single Pipeline.

Next step would be to model the entire population for each generation as a graph. As far as I know there is no work in this direction, but it should not be particularly complex to generalize. Since many individuals share many components, Dask will automatically execute them once and reuse the results for the others, dramatically increasing performance. At the same time it distributes computations between many cores in a single or multiple mode architecture.

At this point the whole generation will be calculated at the same time.

The advantages are clear, mainly (a) distributed computation and (b) caching of intermediate results.
As disadvantages I see:

  • If an individual takes x1000 times more than the rest, the system will be mostly idle waiting for it to finish. That can be partially solved by allowing an individual to use many cores, but that opens a new set of problems (like potentially having more threads running than cores).
  • Intermediate states of the graph are not cached between generations. The solution for this is trickier. Maybe it is possible to build a graph for all generations at the same time, but that would imply to reimplement DEAP as a dask graph and just the idea makes my head spin.

I'd love to hear your comments/considerations. As said above I believe this kind of unlimited scalability is worth the effort and a large part of the work has been already done by the dask-learn project. In addition, development of this new feature can be orthogonal to the development of core features.

Regards,

Eduardo

[1] http://dask.pydata.org/en/latest/
[2] https://github.com/dask/dask-tutorial/blob/master/01-Array.ipynb
[3] https://github.com/dask/dask-tutorial/blob/master/02-Foundations.ipynb
[4] https://github.com/dask/dask-tutorial/blob/master/03a-DataFrame.ipynb
[5] https://github.com/dask/dask-tutorial/blob/master/04-Imperative.ipynb
[6] http://blaze.pydata.org/blog/2015/10/19/dask-learn/

@danthedaniel
Copy link
Contributor

danthedaniel commented Dec 14, 2016

With dask, is there any way to prevent constant duplication of a dataset in memory? As it is, it's fairly easy to parallelize TPOT with the python multiprocessing module, but that requires cloning the entire environment with pickle through some IPC. This is a problem for large data-sets, as the memory copying takes up a lot of time, and in the end you need a shit-ton of RAM.

@ghgr
Copy link
Author

ghgr commented Dec 15, 2016

AFAIK with dask you build the computation graph, and then you execute it. The memory allocation, redundancy elimination and job distribution (in case of a cluster) is entirely Dask's problem.

@QuantumDamage
Copy link

The more I'm watching Dask videos (for example https://www.youtube.com/watch?v=RA_2qdipVng) i believe that this could make much sense.
Was there any approaches to test tpot with dask?

@QuantumDamage
Copy link

There seems to be some approaches to accelerate scikit-learn algorithms with dask: https://dask-ml.readthedocs.io/en/latest/

@TomAugspurger
Copy link
Contributor

With dask, is there any way to prevent constant duplication of a dataset in memory?

Dask lets you swap out the scheduler (threaded, multiprocessing, distributed) easily. The threaded scheduler will avoid the need to clone the datasets multiple times, but for the best performance the algorithms should release the GIL.

I spent a bit of time a few weeks ago looking into this, but I'm not too familiar with tpot. The joblib-based parallelization added in the dev branch seems to do pretty well for coarse-grained parallelism like "fit these 8 models in parallel".

If you're able to build the entire graph ahead of time, and if there's redundant computations in multiple branch, dask will be able to avoid those redundant computations.

If anyone more familiar with tpot is interested in prototyping something, I'd be happy to support from the dask side. Otherwise, I'll try to take another look in a few weeks.

@QuantumDamage
Copy link

@TomAugspurger
Maybe if there is a way to predict memory allocation we could swap scheduler to better fit on available resources? Kind of heuristics which will take available cpus, threads, ram and swap as parameters?

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Nov 1, 2017 via email

@mrocklin
Copy link
Contributor

With dask, is there any way to prevent constant duplication of a dataset in memory? As it is, it's fairly easy to parallelize TPOT with the python multiprocessing module, but that requires cloning the entire environment with pickle through some IPC. This is a problem for large data-sets, as the memory copying takes up a lot of time, and in the end you need a shit-ton of RAM.

I suspect that Dask's memory management heuristics will handle this without significant issue.

Intermediate states of the graph are not cached between generations. The solution for this is trickier. Maybe it is possible to build a graph for all generations at the same time, but that would imply to reimplement DEAP as a dask graph and just the idea makes my head spin.

Note that with the newer scheduler you can evolve a computation on the fly, so you don't need to decide everything ahead of time. The right API to look at for this is probably the concurrent.futures API. You can submit many tasks, wait for a few of them to come back, and then based on those results submit more that are more likely to be helpful. Here is a talk at the last SciPy conference that talks about some of the more real-time features of the dask schedulers: https://www.youtube.com/watch?v=ZxNPVTHQwGo

@mrocklin
Copy link
Contributor

FWIW I'm enthusiastic about this pairing. From my perspective as a Dask developer TPOT's workload is a nice example of something that is clearly parallelizable, but requires more sophistication than your typical big data system (or at least, this is based on a guess of what I think your workloads look like). This plays nicely to Dask's strengths.

I suspect that the efficient parallelism would provide some convenient speedups (especially if you're currently spending time repeatedly shipping data off with the multiprocessing module), but that you might actually find more value in the visual diagnostics, parallel profiling, etc. that come along for free.

@westurner
Copy link

Would these memory mapping and zero-copy approaches help with parallelism here?
https://arrow.apache.org/docs/python/memory.html

https://github.com/maartenbreddels/vaex

@mrocklin
Copy link
Contributor

Would these memory mapping and zero-copy approaches help with parallelism here?

Generally for workloads like this my recommendations would be to stay within a single process per node if possible, so zero-copy isn't really a concern. This would differ if you're handling mostly text data. In that case serialization will kill you anyway. Generally speaking my guess is that very few workloads of this type are at the point where zero-copy is something they should worry about.

@mrocklin
Copy link
Contributor

Is there an obvious central place within TPOT from where most computation is planned that would make sense for someone familiar with dask to look at?

@rhiever
Copy link
Contributor

rhiever commented Jul 12, 2018

TPOT pipeline parallelization is primarily done here, where we're currently using scikit-learn's port of joblib.

Some additional considerations:

  • The parallelized evaluations need to be interruptible, both because TPOT has options to stop evaluation on a per-pipeline basis (e.g., each pipeline gets 5 minutes to evaluate) and options to stop the optimization process after a fixed amount of time (e.g., TPOT gets 1 hour to evaluate as many pipelines as possible). Preferably the evaluations can be self-interruptible.

  • The parallelization procedure needs to be optional, currently achieved with the if statement checking whether n_jobs==1.

  • The parallelization procedure needs to be customizable by the user through (preferably) 1 option in the instantiation of TPOTClassifier/TPOTRegressor. Currently it's customizable with the n_jobs parameter. I am open to opening more customization options, if useful.

@mrocklin
Copy link
Contributor

The first thing to try is probably just using the joblib.parallel_backend('dask') solution and see how that performs on a distributed system. My guess is that we'll eventually want to go further in order to avoid recomputation of shared results and such, but it would be good to have a baseline.

Is there a standard workflow to try this out on?

This might be a good blogpost around the dask-joblib integration being useful for things outside of just Scikit-Learn.

Also cc @stsievert who might find this conversation interesting.

@rhiever
Copy link
Contributor

rhiever commented Jul 13, 2018

Sure, here's a basic TPOT workflow:

from tpot import TPOTClassifier
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split

digits = load_digits()
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target,
                                                    train_size=0.75, test_size=0.25)

tpot = TPOTClassifier(generations=100, population_size=100, # TPOT will evaluate 100x100 pipelines
                      verbosity=2, # TPOT will show a progress bar during optimization
                      n_jobs=-1, # Enable multiprocessing
)
tpot.fit(X_train, y_train)
print(tpot.score(X_test, y_test))

More examples available in the docs here.

@westurner
Copy link

westurner commented Jul 13, 2018 via email

@stsievert
Copy link

My guess is that we'll eventually want to go further in order to avoid recomputation of shared results

Related: dask-searchcv has some caching to avoid repeated tuning for sections of pipelines: https://dask-ml.readthedocs.io/en/latest/hyper-parameter-search.html#avoid-repeated-work. This would be most useful when some of the first elements in a pipeline take a long time and have a couple parameters to tune (e.g., text feature extraction).

@mrocklin
Copy link
Contributor

It looks like we might want to also dive into the _wrapped_cross_val_score function and wrap around the _fit_and_score function

def _wrapped_cross_val_score(sklearn_pipeline, features, target,
                             cv, scoring_function, sample_weight=None, groups=None):
    """Fit estimator and compute scores for a given dataset split.
    Parameters
    ----------
    sklearn_pipeline : pipeline object implementing 'fit'
        The object to use to fit the data.
    features : array-like of shape at least 2D
        The data to fit.
    target : array-like, optional, default: None
        The target variable to try to predict in the case of
        supervised learning.
    cv: int or cross-validation generator
        If CV is a number, then it is the number of folds to evaluate each
        pipeline over in k-fold cross-validation during the TPOT optimization
         process. If it is an object then it is an object to be used as a
         cross-validation generator.
    scoring_function : callable
        A scorer callable object / function with signature
        ``scorer(estimator, X, y)``.
    sample_weight : array-like, optional
        List of sample weights to balance (or un-balanace) the dataset target as needed
    groups: array-like {n_samples, }, optional
        Group labels for the samples used while splitting the dataset into train/test set
    """
    sample_weight_dict = set_sample_weight(sklearn_pipeline.steps, sample_weight)

    features, target, groups = indexable(features, target, groups)

    cv = check_cv(cv, target, classifier=is_classifier(sklearn_pipeline))
    cv_iter = list(cv.split(features, target, groups))
    scorer = check_scoring(sklearn_pipeline, scoring=scoring_function)

    try:
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            scores = [_fit_and_score(estimator=clone(sklearn_pipeline),
                                    X=features,
                                    y=target,
                                    scorer=scorer,
                                    train=train,
                                    test=test,
                                    verbose=0,
                                    parameters=None,
                                    fit_params=sample_weight_dict)
                                for train, test in cv_iter]
            CV_score = np.array(scores)[:, 0]
            return np.nanmean(CV_score)

@rhiever
Copy link
Contributor

rhiever commented Jul 14, 2018

That would be nice - would allow for simultaneous parallelization of the CV evaluations.

@karan10111
Copy link

karan10111 commented Jul 20, 2018

For people who already have spark setup. We have a first cut out of parallelising TPOT (+ DEAP) with spark, for quite some time now, in our private fork. It's (alpha) tested and fairly profiled in terms of memory.

We are using parallel delayed to call fit_and_score. And tested it with sklearn's transformer caching to remove redundant computations and have seen impressive results (though it takes huge amount of disk if number of individuals and generations are high.).

We might think of ways to send similar tasks to executors such that transformers caching can be more effective.

gp_deap.py
checkout the _wrapped_cross_val_score_spark method, line 458.

We'll open separate PR soon. Suggestions will be highly appreciated. Thanks.

PS - We had to make some changes to DEAP. DEAP#268. These were serialisation related changes.

@westurner
Copy link

Pandas on Ray may have some helpful performance optimizations that may be helpful for spark as well:

See also:

@TomAugspurger
Copy link
Contributor

For those following this issue, #730 has been merged into development. If you're interested you could try that out with tpot dev and dask-ml >= 0.9.0.

#730 solved the relatively easy task of training many individuals in parallel (on a cluster). It did not address some of the points in the original issue like some individuals in a generation being relatively slow, or caching between generations, or parallelizing the crossover and mutation stage. If anyone is interested in working on those I could assist, but I don't have plans to work on it right now.

@ballcap231
Copy link

ballcap231 commented Apr 25, 2020

Just to be clear, if I specify "use_dask=True" for TPOT API do I still need to specify "memory='auto' " if I want caching between transformers? That is, does using dask with TPOT ensure models with the same configurations aren't recomputed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants