Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set different environment variables per worker with loky backend #1064

Open
btel opened this issue Jun 10, 2020 · 8 comments
Open

set different environment variables per worker with loky backend #1064

btel opened this issue Jun 10, 2020 · 8 comments

Comments

@btel
Copy link

btel commented Jun 10, 2020

I would like to pass different environment variables to each worker, so they can be initialized on different resources.

For a bit of context, I would like to parallelize some code on mutiple GPUs (on a single machine) by setting CUDA_VISIBLE_DEVICES for each worker. From what I understand, at the moment the environment that I set in the main process will be shared by all workers. Note that the variable has to be set before any GPU-based libraries are imported and ressources are allocated.

@ogrisel
Copy link
Contributor

ogrisel commented Jun 10, 2020

Unfortunately we do not have any public API for this use case.

The underlying backend loky.ProcessPoolExecutor accepts an initializer callable and optional initargs tuple (exactly as the concurrent.futures.ProcessPoolExecutor from the standard library in Python 3.7+).

However even with this I am not sure how we could split the list of visible GPU ids to have different initargs values for different workers.

Alternatively it's possible to use parallel_backend("dask") and use a client object that points to a LocalCUDACluster instance that will start one worker process with a single thread per-GPU on your machine and set CUDA_VISIBLE_DEVICES accordingly:

https://github.com/rapidsai/dask-cuda#example

@ogrisel
Copy link
Contributor

ogrisel commented Jun 10, 2020

@tomMoral if you have any suggestion on how to do that with the loky initializer...

@btel
Copy link
Author

btel commented Jun 10, 2020

Thanks @ogrisel! I haven't head of dask-cuda before, I will check it out. Cheers

@ogrisel
Copy link
Contributor

ogrisel commented Jun 10, 2020

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from joblib import Parallel, delayed, parallel_backend

cluster = LocalCUDACluster()
client = Client(cluster)
parallel_backend("dask")


results = Parallel()(delayed(my_func)(task_data) for task_data in all_task_data)

@ogrisel
Copy link
Contributor

ogrisel commented Jun 10, 2020

Alternatively you can use the dask API directly if you do not need the joblib layer:

results = client.map(my_func, all_task_data)

@tomMoral
Copy link
Contributor

tomMoral commented Jun 10, 2020

Yes I had the same issue before and for now this is not easily doable. The dask solution seems nice.

One solution I used before was to set the GPU in each worker based on the pid or the process name (LokyProcess-XXX) but it is unreliable. In particular, when using pytorch and joblib, many workers seem to shutdown at the end of a task because of memory management.

A solution I was thinking for this would be to implement a worker-id for each worker, so one can use them in the initializer. The issue with this is to ensure that these worker-id are consistent even with shutting-down workers and resizeable executor.

@btel
Copy link
Author

btel commented Jun 10, 2020

yes, I think that the dask option looks promising. I ran into some problems with pickling my models but this is not joblib/dask specific. Once I fix them, I will try give dask a try. Should we close the issue?

@lesteve
Copy link
Member

lesteve commented Jun 10, 2020

Just a remark, if this is some new code and you don't have a constraint on keeping joblib, I would recommend using only Dask. I agree it is a slightly bigger change than using the joblib Dask backend (and a change of habit too which is never easy). At the same time my feeling is that the Dask joblib glue code is rather tricky and avoiding it may be a good idea if you don't have a strong need for it.

This is an example how it would look with pure Dask (i.e. without the joblib backend):

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

# this use the Future API, there is a delayed API as well see the doc for more details
futures = [client.submit(my_func, task_data) for task_data in all_task_data]
results = client.gather(futures)

About the pickling problems with PyTorch this is something that people tend to bump into although Dask has some PyTorch-specific serialisers/deserialisers to help and some of the problems have been fixed. One possible work-around is to try to avoid having to pickle the model by creating on the worker, you can try things like this:

# creates the model on the worker
model_future = client.submit(create_model_func)

futures = [client.submit(my_func, model_future, task_data) for task_data in all_task_data]
results = client.gather(futures)

This has limitations in practice because of work stealing for example that will try to move the model to idle workers and moving the model will fail with the same serialisation error. There are probably more robust work-arounds if needed but that depends a bit on the use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants