Skip to content

Commit

Permalink
[Asyncio] Remove async init legacy code (#8177)
Browse files Browse the repository at this point in the history
* [Asyncio] Remove async init legacy code

* Fix places that call async_init
  • Loading branch information
simon-mo committed Apr 25, 2020
1 parent 9dc6253 commit 13c14ea
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 33 deletions.
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,8 @@ cdef class CoreWorker:
asyncio.set_event_loop(self.async_event_loop)
# Initialize the async plasma connection.
# Delayed import due to async_api depends on _raylet.
from ray.experimental.async_api import _async_init
self.async_event_loop.run_until_complete(_async_init())
from ray.experimental.async_api import init as plasma_async_init
plasma_async_init()

# Create and attach the monitor object
monitor_state = AsyncMonitorState(self.async_event_loop)
Expand Down
33 changes: 4 additions & 29 deletions python/ray/experimental/async_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Note: asyncio is only compatible with Python 3

import asyncio
import threading

import ray
from ray.experimental.async_plasma import PlasmaEventHandler
Expand All @@ -10,7 +7,10 @@
handler = None


async def _async_init():
def init():
"""Initialize plasma event handlers for asyncio support."""
assert ray.is_initialized(), "Please call ray.init before async_api.init"

global handler
if handler is None:
worker = ray.worker.global_worker
Expand All @@ -20,31 +20,6 @@ async def _async_init():
logger.debug("AsyncPlasma Connection Created!")


def init():
"""
Initialize synchronously.
"""
assert ray.is_initialized(), "Please call ray.init before async_api.init"

# Noop when handler is set.
if handler is not None:
return

loop = asyncio.get_event_loop()
if loop.is_running():
if loop._thread_id != threading.get_ident():
# If the loop is runing outside current thread, we actually need
# to do this to make sure the context is initialized.
asyncio.run_coroutine_threadsafe(_async_init(), loop=loop)
else:
async_init_done = asyncio.get_event_loop().create_task(
_async_init())
# Block until the async init finishes.
async_init_done.done()
else:
asyncio.get_event_loop().run_until_complete(_async_init())


def as_future(object_id):
"""Turn an object_id into a Future object.
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop):
loop.set_debug(True)

# This is needed for async plasma
from ray.experimental.async_api import _async_init
await _async_init()
from ray.experimental.async_api import init
init()

# Test Async Plasma
@ray.remote
Expand Down

0 comments on commit 13c14ea

Please sign in to comment.