Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should we provide functionality for running a function on all workers (or all nodes)? #144

Closed
robertnishihara opened this issue Dec 22, 2016 · 6 comments
Labels
question Just a question :)

Comments

@robertnishihara
Copy link
Collaborator

robertnishihara commented Dec 22, 2016

Should we expose the ability to run a given function on all workers? This can be useful for a bunch of reasons (for example, properly setting the Python path on each worker).

Should we also expose the ability to run a given function once on each node? This can be useful for setting things up (e.g., installing some stuff, downloading some files, copying application files).

Of course, we shouldn't add any functionality without a very good reason...

@robertnishihara robertnishihara added the question Just a question :) label Dec 22, 2016
@pcmoritz
Copy link
Contributor

Another use case is to load a dataset in a balanced way.

It is also similar to Environment variables, maybe we can unify these two functionalities in some way or provide a better one that replaces both.

We also need to think a little about the api, i.e. which arguments the function should have (maybe total # of nodes or workers and current worker index). And how does it interplay with the scheduler? How do we make sure people don't abuse it to circumvent the scheduler?

I'd say add it with a disclaimer that it is experimental, encourage people to use it rarely and see what it gets used for.

@robertnishihara
Copy link
Collaborator Author

As we discussed earlier, one elegant approach is to provide a single method for running a function on all workers, and pass a counter into that function indicating how many other workers on that machine have already started executing that function (using Redis to do this atomically).

So if there are 4 worker on one machine, and we call something like ray.run_function_on_all_workers(f), then one worker will call f(1), one worker will call f(2), one will call f(3), and one will call f(4).

@atumanov
Copy link
Contributor

atumanov commented Dec 23, 2016

This is a very interesting question/discussion. To address this specific use case, I think what you are looking for is the anti-affinity placement constraint, ability to run one task per locality domain (e.g., per node, per worker, per rack). We could provide this functionality in the global scheduler, by extending the scheduler API to accept a bag of tasks (collectively defined as a job) and a placement constraint associated with that job. Then, when the placement decision is made for this bag of tasks, it will be done in a way that honors the constraint, or the job is atomically rejected, if the constraint cannot be satisfied. Of course, it will be ideal to do it in a general way as well as make the placement constraint specification optional (perhaps even as a separate loadable module).

Another take on this would be to approach the problem from the OS systems perspective. We could think about a basic primitive Ray could expose (for instance a scoped atomic counter primitive backed by Redis) that enables ensembles of distributed Ray tasks to do leader election, for example. Counters could be node- or worker-scoped and persist for the lifetime of the task ensemble. It is easy to see how node-scoped atomic counters would enable "at most once per node" functionality, while worker-scoped atomic counters would enable "at most once per worker" functionality. So the Ray function that relies on some "once per worker" or "once per node" pre-processing will add a simple if statement checking the worker-scoped or node-scoped atomic counter and calls the init() function if the atomic counter is zero. The init() could either run in the same task context or as a separate task. The latter requires a mechanism that guarantees init() to run in the same locale as the caller, thus some minimal placement/locality awareness is still needed here. BUT, we could make it relative (as opposed to absolute) to preserve the resource abstraction. Locality constraint could be supported in the form "same locale as me" (affinity) or "not same locale as me" (anti-affinity).

Attempting to achieve everything we need by using what the system already provides is the way to go. As tempting as it is, I would discourage side-channel (i.e. internal/invisible) data/task propagation/distribution/broadcast. Thinking about and exposing expressive/composable basic system primitives will make Ray feel more and more like a microkernel!

@atumanov atumanov reopened this Dec 23, 2016
@robertnishihara
Copy link
Collaborator Author

Closing for now.

@AdamGleave
Copy link
Contributor

Just wanted to second this request. In my context, I want to set some defaults for libraries I use (Python logging, NumPy print options, etc) on all workers. I'm working around it by just stuffing this code in init.py which will get executed on all the workers, but this is pretty nasty.

@liujie329
Copy link

@robertnishihara
hi, at the released version: 1.13.0 , worker does't have the : ray.worker.get_global_worker() function . and how can i running a function on all workers now ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Just a question :)
Projects
None yet
Development

No branches or pull requests

5 participants