Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pool functionality #2105

Closed
richardliaw opened this issue May 20, 2018 · 7 comments
Closed

Add pool functionality #2105

richardliaw opened this issue May 20, 2018 · 7 comments

Comments

@richardliaw
Copy link
Contributor

This utility that multiprocessing provides is quite nice:

    with closing(multiprocessing.Pool()) as pool:
        X = pool.map(fn, [os.urandom(4) for _ in range(num_trials)])
        episode_rewards = np.hstack(X)

We can add something exactly the same into ray.experimental.

@robertnishihara
Copy link
Collaborator

This is specifically about having a pool of "stateless actors", right?

@richardliaw
Copy link
Contributor Author

richardliaw commented May 20, 2018

This was actually posted separate from that discussion, but it would make sense for this to include discussion of an API for a pool of stateless actors.

@robertnishihara
Copy link
Collaborator

We can make a separate issue for that.

What do you like about the pool functionality? E.g., it looks like the code snippet is equivalent to

X = ray.get([fn.remote(os.urandom(4)) for _ in range(num_trials)])
episode_rewards = np.hstack(X)

@richardliaw
Copy link
Contributor Author

Ideally, it would be something like

items = [os.urandom(4)) for _ in range(num_trials)]
X = ray.map(fn, items)

As a user, I like this a lot more than the above provided snippet because I know I want to apply 1 function to a list of items, and I don't need to think about making the function remote, futures, getting the future, whether all these function calls will have enough resources, etc...

The reason why I think stateless actors and this functionality is closely related is that depending on the function, I may want map to either be completely stateless (tasks) or sort-of stateless (stateless actors, i.e. a neural network eval).

@vakker
Copy link
Contributor

vakker commented May 9, 2019

It's an old thread, but I've come across a usecase that could also benefit from some sort of a pool feature.
E.g. having the following:

@ray.remote
def fn(i):
    # some setup
    # process input
    return output

has the problem of running the setup in every function call.
The same thing with actors is better because you can just run the setup part once. E.g.:

@ray.remote
class ProcessStuff:
    def __init__(self):
        # some setup

    def fn(self, i):
        # process input
        return output

However, the problem is that you cannot just create a list (or queue) for the items and pass it to a list of actors, but you need to iterate over the actors or chunk the input list. It would be better to have a pool of actors where each actor takes an item and processes it until the queue is empty.

The experimental streaming library might be a way to do this, but it seems to me it's a bit more complex than just a queue and a set of actors.

If there's a way to this already in a neat way, then let me know, currently I have to fall back to Python multiprocessing to achieve this.

@ericl
Copy link
Contributor

ericl commented Oct 17, 2019

Try this?

import ray


class ActorPool(object):
    def __init__(self, actors):
        self.actors = actors

    def map(self, fn, values):
        values = list(values)
        idle = list(self.actors)
        running = {}
        results = {}
        for i, v in enumerate(values):
            if not idle:
                [r], _ = ray.wait(list(running), num_returns=1)
                j, a = running.pop(r)
                results[j] = ray.get(r)
                idle.append(a)
            a = idle.pop()
            running[fn(a, v)] = (i, a)
            i += 1
        while running:
            [r], _ = ray.wait(list(running), num_returns=1)
            j, _ = running.pop(r)
            results[j] = ray.get(r)
        return [results[i] for i in range(len(results))]


if __name__ == "__main__":

    @ray.remote
    class MyActor(object):
        def __init__(self):
            pass
        def f(self, x):
            return x * 2

    ray.init()
    actors = [MyActor.remote() for _ in range(4)]
    pool = ActorPool(actors)
    print(pool.map(
        lambda actor, v: actor.f.remote(v),
        range(10)))

@edoakes
Copy link
Contributor

edoakes commented Mar 5, 2020

Stale - please open new issue if still relevant

@edoakes edoakes closed this as completed Mar 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants