Skip to content

Experimental asyncio support #2015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 103 commits into from
Dec 7, 2018
Merged

Experimental asyncio support #2015

merged 103 commits into from
Dec 7, 2018

Conversation

suquark
Copy link
Member

@suquark suquark commented May 8, 2018

What do these changes do?

This is a prototype implementation for #493 which provides awaitable interface for ray.wait & ray's ObjectID

As a prototype, these codes are meant to be modified later.

How do these changes work?

  1. AsyncPlasmaClient is implemeted to override original pyarrow.plasma.PlasmaClient. pyarrow.plasma.PlasmaClient is created by pyarrow.plasma.connect and is attached to ray.worker.global_worker to handle basic ray functions. It also create an interface for wrapping ray's ObjectID.
  2. AsyncPlasmaSocket is created for async socket messaging with PlasmaStore & PlasmaManager. It is the core of async. pyarrow.plasma.PlasmaClient does not make use of event loops and only create a single socket connection, it is why original ray does not support much of async functions. AsyncPlasmaSocket uses asyncio event loop and is capable of creating multiple socket connections with PlasmaManager.
  3. plasma.fbs under format directory needs to be compiled with flatbuffer ahead of time.

Related issue number

#493

cc @mitar

@suquark
Copy link
Member Author

suquark commented May 8, 2018

Here's a piece of testing code. I shows how this PR works. I will integrate it into the code later.

import asyncio
import ray

from ray.plasma.plasma_client import AsyncPlasmaClient

def cvt(s):
    return [ray.pyarrow.plasma.ObjectID(t.id()) for t in s]


address_dict = ray.init()

async def test_wait(client: AsyncPlasmaClient):
    a = ray.put(2342342)

    @ray.remote
    def delay():
        import time
        time.sleep(10)
        return 'ready'

    b = delay.remote()
    print(ray.wait([a, b]))
    print(await client.wait(cvt([a, b])))
    await asyncio.sleep(5)
    print(await client.wait(cvt([a, b])))        
    print(ray.wait([a, b]))
    await asyncio.sleep(10)
    print(await client.wait(cvt([a, b])))        
    print(ray.wait([a, b]))
    print(await client.wait(cvt([b])))
    print(ray.wait([b]))

async def test_await_get(client):
    @ray.remote
    def delay():
        import time
        time.sleep(10)
        return 'ready'

    k = delay.remote()
    k = client.wrap_objectid_with_future(k)
    result = await k
    print(result)
    
async def test_client(client: AsyncPlasmaClient):
    await client.connect()
    print("store_capacity = %d" % client.store_capacity)
    await test_wait(client)
    await test_await_get(client)

object_store_address = address_dict['object_store_addresses'][0]

client = AsyncPlasmaClient(store_socket_name=object_store_address.name, manager_socket_name=object_store_address.manager_name)

loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(test_client(client))
except KeyboardInterrupt:
    client.disconnect()

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5271/
Test PASSed.

@suquark
Copy link
Member Author

suquark commented May 9, 2018

There are still some problems remaining to be solved.

  1. pyarrow.plasma.PlasmaClient is part of Arrow. Currently I use AsyncPlasmaClient as wrapper so it is not very efficient (for example, it uses wait for ). A better way is to change Arrow's code.
  2. ray's ObjectID cannot be treated as a base class. So currently I can only create a new class containing ObjectID.
  3. Currently awaitable ray.put is implemented by making use of wait. pyarrow.plasma.PlasmaClient's get is complex and there's no easy ways to create an async version. The easiest way may still be changing the code of Arrow.

@mitar
Copy link
Member

mitar commented May 9, 2018

So looking at this code, it looks like a lot of code is just copied over to new classes unmodified?

This is because inheritance does not work well?

@@ -0,0 +1,331 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is not needed, right? (same with python/ray/plasma/format/__init__.py)

@suquark
Copy link
Member Author

suquark commented May 10, 2018

It takes some words to clarify.

Currently, Ray has a plasma manager (src/plasma/plasma_manager.cc) for itself and shares some codes and formats with Arrow and src/plasma/format/plasma.fbs is one of them.

However, Ray does not implement a plasma client for itself. Instead, it uses Arrow's client using Arrow's plasma.fbs.

And now, Ray's plasma manager uses an event loop (so it's async) but Arrow's plasma client doesn't, so I have to override parts of codes of Arrow's plasma client. To override it, Arrow's plasma.fbs is copied from Arrow's repo.

Arrow's plasma.fbs is almost identical to Ray's but there's still a little difference between them. If we want to keep only one copy of plasma.fbs, a better way may be abandoning Arrow's plasma client (by copying Arrow plasma client's code back to Ray and add event loops). However, this will permanently change Ray's behavior, making Ray totally async. A workaround may be keeping two different APIs for plasma's client that one is async and another is blocking.

@robertnishihara @mitar

@robertnishihara
Copy link
Collaborator

Thanks a lot for the PR. I'm hoping to try it out this weekend.

This seems like really useful functionality to have. It also seems like it belongs more in Arrow than in Ray. E.g., as the plasma client API changes we'll want to make sure this code gets updated/tested.

@mitar
Copy link
Member

mitar commented May 12, 2018

How is it is to make an async code into blocking code? Would it be reasonable to have underlying implementation fully and just async, and then just expose blocking API for those who prefer that?

@robertnishihara
Copy link
Collaborator

The challenge with making async the default implementation is that it requires some sort of event loop, and users who just want to use the blocking version may not have an event loop on hand.

We can expose non-blocking calls along with a socket for receiving notifications from the store. We have something like this that is used internally by the local scheduler and object manager (both of which have an event loop).

@mitar
Copy link
Member

mitar commented May 12, 2018

The challenge with making async the default implementation is that it requires some sort of event loop, and users who just want to use the blocking version may not have an event loop on hand.

So, yea. A blocking implementation would get a loop, do a call, wait for call to finish in a loop (block), which would then finish the loop, ending the blocking call, return the value.

For example:

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

def block_fetch(url):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(fetch(url))

print(block_fetch('https://common.tnode.com'))

@suquark
Copy link
Member Author

suquark commented May 14, 2018

I think we have to make choices:

Options group A:

  1. Change Arrow (change its plasma client)
  2. Change Ray (borrow plasma client from Arrow and change it)

Options group B:

  1. Change C++ codes, implement event loops in C++, export async callbacks to Python via Cython (harder to make use of asyncio, higher performance)
  2. Change C++ codes (keep parts without I/O), use Python to write parts related to I/O with with asyncio's event loops (normal performance)
  3. Just change Python codes (may hurt performance but easier to implement)

Options group C:

  1. Keep both async & blocking implementation (higher performance, more codes)
  2. Make everything async, create an event loop and wait for every blocking API
  3. Make everything async, keep an event loop for each worker and used by every blocking API

Currently I prefer A2, B2, C3

@robertnishihara
Copy link
Collaborator

For group A, modifying Arrow makes a lot more sense to me, since Plasma is part of Arrow, it makes sense to do all Plasma development there. Otherwise updates to Plasma will break the async client, so it's important to have the client tested in the Arrow CI.

For group B, I think it makes sense to do as much as possible in C++. This will make it easier to wrap from other languages like Java. It'd be ok to do something quick in Python using the existing C++ API, but that doesn't feel like a long-term solution.

For group C, creating an event loop for every blocking call sounds very heavyweight to me, but I could be completely wrong about that, so it's a question of performance. However, if we want to use an underlying async implementation, then that would require the async implementation to be in C++.

@mitar
Copy link
Member

mitar commented May 14, 2018

creating an event loop for every blocking call sounds very heavyweight to me, but I could be completely wrong about that, so it's a question of performance

I do not think that code above creates an event loop every time, but it just reuses "main" one. (There is also new_event_loop.) From documentation:

The default policy defines context as the current thread, and manages an event loop per thread that interacts with asyncio. If the current thread doesn’t already have an event loop associated with it, the default policy’s get_event_loop() method creates one when called from the main thread, but raises RuntimeError otherwise.

Not sure how we would benchmark this code and compare regular blocking call to non-blocking call.

@mitar
Copy link
Member

mitar commented May 14, 2018

I tried:

import asyncio
import time

def block_sleep():
    time.sleep(1)

async def sleep():
    block_sleep()
    
def loop_sleep():
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(sleep())

print("start blocking")
results = []
for i in range(100):
    before = time.perf_counter()
    block_sleep()
    after = time.perf_counter()
    results.append(after - before)
print("end blocking", sum(results) / len(results))

print("start async")
results = []
for i in range(100):
    before = time.perf_counter()
    loop_sleep()
    after = time.perf_counter()
    results.append(after - before)
print("end async", sum(results) / len(results))

Results:

start blocking
end blocking 1.001045424739932
start async
end async 1.0013151963998825

This does not look like big difference?

@suquark
Copy link
Member Author

suquark commented Jun 2, 2018

Currently, I am implementing a new async plasma client. Here are some ideas:

C++ Part of Arrow

  1. Every client has two socket pools: one for plasma_manager and another for plasma_store. Each pool has a maximum size.
  2. When a client is asked to do a ray task (put, get, transfer, etc),a C++ coroutine-like object (we call it a PlasmaCoroutine) will be created.
  3. When a PlasmaCoroutine needs a socket to communicate with plasma_manager, a new socket connection will be created and added to the pool. If the pool has met its maximum size, the PlasmaCoroutine will be paused. And it's same for plasma_store.
  4. Every client maintains a Map<int socket_fd, PlasmaCoroutine task> where socket_fd is the currently pending socket of the PlasmaCoroutine. All socket_fds are listened by epoll.
  5. Every client also maintains a queue for those PlasmaCoroutines which have not allocated sockets.
  6. When a PlasmaCoroutine is finished, a finished flag will be set. There will also be flags for exceptions.
  7. There will be a ray_poll function. When called, it will call epoll to get all socket_fds that have finished and continue related PlasmaCoroutines. Then ray_poll will return finished PlasmaCoroutines' results.

Python Part of Arrow

  1. An asyncio-supported selector based on ray_poll will be implemented.
  2. Each client will have an asyncio-based event_loop equipped with the selector.
  3. All ray tasks will be added to the event_loop so we call poll them.

So then we can have asyncio-friendly async ray tasks.
The main problem is that C++ doesn't support coroutines very well (until C++20). There's a lot of hard works to turn original functions into coroutine-like objects.

@robertnishihara
Copy link
Collaborator

@suquark the ray.wait implementation is moving into the Ray codebase at the moment (see #2162), since as you mentioned it relies on Ray-specific components. This should make it easier to prototype an async wait in Ray without copying the .fbs files.

The easiest way to implement an async ray.wait may be to ignore the backend wait implementation and to put most of the wait implementation in the client. Basically, the client can call subscribe (in Python) to get notifications whenever a new object is available, and then implement the wait logic itself. What do you think about something like that? Would that work?

@suquark
Copy link
Member Author

suquark commented Jun 4, 2018

@robertnishihara That's cool. I think it could work but a better idea may be using ray.wait as a selector like linux poll so we can implement general and asyncio-friendly async programming model. I think that today's asynchronous socket model gives us a good example. According to this model, ray.wait is very similar to linux's poll, and the subscription mechanism is very similar to ʻepoll. A viable and asyncio-compliant solution is to implement an eventloop based on pollorepoll`, and then implement generic asynchronous operations based on the eventloop.

And about subscribe, do you mean ray.worker.global_worker.plasma_client.subscribe?

@robertnishihara
Copy link
Collaborator

robertnishihara commented Jun 4, 2018 via email

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5855/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5856/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5866/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5936/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6026/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6027/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6043/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6096/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6100/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6120/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6251/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6252/
Test FAILed.

if sys.version_info >= (3, 5):
from ray.experimental import async_api
# Initialize
async_api.init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't init unless the user imports async_api right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it depends on ray.init(). Because users typically do ray.init() after importing modules, I have to put it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already init on as_future(), which already depends on ray.init().

So we can remove these lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But when initializing it, we should ensure the eventloop is not running. I suppose it is not a good idea to let users judge if it is safe to use as_future() to initialize them. However, in most cases, ray.init() is called before the eventloop starts, so initialize it in connect is safe and it also works for remote functions/actors.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but we don't need to init() on worker start right?

@ericl
Copy link
Contributor

ericl commented Dec 6, 2018

@suquark I made some edits to clarify that async_api.init() or to_future() must be called before the event loop starts. Let me know if this works.

I also deleted the init block in worker.py. Basically the issue there is that you would be running this code even if the user is not using the async api, which is too risky for an experimental API.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9798/
Test PASSed.

@suquark
Copy link
Member Author

suquark commented Dec 6, 2018

@ericl That looks to me, thanks.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9795/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9800/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9802/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9803/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9801/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9825/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9823/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9826/
Test FAILed.

@pcmoritz pcmoritz merged commit c2c501b into ray-project:master Dec 7, 2018
@mitar
Copy link
Member

mitar commented Dec 7, 2018

Awesome! Thanks @suquark for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants