Skip to content
Permalink
Browse files

Basic Async Actor Call (#6183)

* Start trying to figure out where to put fibers

* Pass is_async flag from python to context

* Just running things in fiber works

* Yield implemented, need some debugging to make it work

* It worked!

* Remove debug prints

* Lint

* Revert the clang-format

* Remove unnecessary log

* Remove unncessary import

* Add attribution

* Address comment

* Add test

* Missed a merge conflict

* Make test pass and compile

* Address comment

* Rename async -> asyncio

* Move async test to py3 only

* Fix ignore path
  • Loading branch information
simon-mo committed Nov 21, 2019
1 parent c4132b5 commit 29ba6bfc649e3d0aa1bfa3f34602f16c0cb7bcef
@@ -185,8 +185,8 @@ script:
# ray tests
# Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester.
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_args_test.py; fi
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=10 --timeout=300 python/ray/tests --ignore=python/ray/tests/perf_integration_tests --ignore=python/ray/tests/py3_args_test.py; fi
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=10 --timeout=300 python/ray/tests --ignore=python/ray/tests/perf_integration_tests --ignore=python/ray/tests/py3_test.py; fi

deploy:
- provider: s3
@@ -1,12 +1,12 @@
# Bazel build
# C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html

load("@rules_proto//proto:defs.bzl", "proto_library")
load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library", "cc_proto_library", "cc_test")
load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library")
load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library")
load("@rules_proto_grpc//python:defs.bzl", "python_proto_compile")
load("@rules_proto_grpc//python:defs.bzl", "python_grpc_compile")
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
load("@//bazel:ray.bzl", "flatbuffer_py_library")

COPTS = ["-DRAY_USE_GLOG"] + select({
"@bazel_tools//src/conditions:windows": [
@@ -358,6 +358,7 @@ cc_library(
":raylet_lib",
":worker_rpc",
":gcs",
"@boost//:fiber",
],
)

@@ -5,8 +5,14 @@

from cpython.exc cimport PyErr_CheckSignals

try:
import asyncio
except ImportError:
# Python2 doesn't have asyncio
asyncio = None
import numpy
import gc
import inspect
import threading
import time
import logging
@@ -71,6 +77,7 @@ from ray.includes.libcoreworker cimport (
CCoreWorker,
CTaskOptions,
ResourceMappingType,
CFiberEvent
)
from ray.includes.task cimport CTaskSpec
from ray.includes.ray_config cimport RayConfig
@@ -120,6 +127,7 @@ include "includes/libcoreworker.pxi"
logger = logging.getLogger(__name__)

MEMCOPY_THREADS = 12
PY3 = cpython.PY_MAJOR_VERSION >= 3


if cpython.PY_MAJOR_VERSION >= 3:
@@ -494,6 +502,7 @@ cdef execute_task(
CoreWorker core_worker = worker.core_worker
JobID job_id = core_worker.get_current_job_id()
CTaskID task_id = core_worker.core_worker.get().GetCurrentTaskId()
CFiberEvent fiber_event

# Automatically restrict the GPUs available to this task.
ray.utils.set_cuda_visible_devices(ray.get_gpu_ids())
@@ -547,7 +556,24 @@ cdef execute_task(
c_resources.find(b"object_store_memory")).second)))

def function_executor(*arguments, **kwarguments):
return execution_info.function(actor, *arguments, **kwarguments)
function = execution_info.function
result_or_coroutine = function(actor, *arguments, **kwarguments)

if PY3 and inspect.iscoroutine(result_or_coroutine):
coroutine = result_or_coroutine
loop = core_worker.create_or_get_event_loop()

future = asyncio.run_coroutine_threadsafe(coroutine, loop)
future.add_done_callback(
lambda future: fiber_event.Notify())

with nogil:
(core_worker.core_worker.get()
.YieldCurrentFiber(fiber_event))

return future.result()

return result_or_coroutine

with core_worker.profile_event(b"task", extra_data=extra_data):
try:
@@ -702,7 +728,10 @@ cdef write_serialized_object(


cdef class CoreWorker:
cdef unique_ptr[CCoreWorker] core_worker
cdef:
unique_ptr[CCoreWorker] core_worker
object async_thread
object async_event_loop

def __cinit__(self, is_driver, store_socket, raylet_socket,
JobID job_id, GcsClientOptions gcs_options, log_dir,
@@ -901,7 +930,8 @@ cdef class CoreWorker:
placement_resources,
c_bool is_direct_call,
int32_t max_concurrency,
c_bool is_detached):
c_bool is_detached,
c_bool is_asyncio):
cdef:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
@@ -923,7 +953,7 @@ cdef class CoreWorker:
CActorCreationOptions(
max_reconstructions, is_direct_call, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached),
dynamic_worker_options, is_detached, is_asyncio),
&c_actor_id))

return ActorID(c_actor_id.Binary())
@@ -1060,3 +1090,15 @@ cdef class CoreWorker:
else:
write_serialized_object(
serialized_object, returns[0][i].get().GetData())

def create_or_get_event_loop(self):
if self.async_event_loop is None:
self.async_event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.async_event_loop)
if self.async_thread is None:
self.async_thread = threading.Thread(
target=lambda: self.async_event_loop.run_forever()
)
self.async_thread.start()

return self.async_event_loop
@@ -358,7 +358,8 @@ def _remote(self,
is_direct_call=None,
max_concurrency=None,
name=None,
detached=False):
detached=False,
is_asyncio=False):
"""Create an actor.
This method allows more flexibility than the remote method because
@@ -381,6 +382,8 @@ def _remote(self,
name: The globally unique name for the actor.
detached: Whether the actor should be kept alive after driver
exits.
is_asyncio: Turn on async actor calls. This only works with direct
actor calls.
Returns:
A handle to the newly created actor.
@@ -400,6 +403,12 @@ def _remote(self,
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")

if is_asyncio and not is_direct_call:
raise ValueError(
"Setting is_asyncio requires is_direct_call=True.")
if is_asyncio and max_concurrency != 1:
raise ValueError("Setting is_asyncio requires max_concurrency=1.")

worker = ray.worker.get_global_worker()
if worker.mode is None:
raise Exception("Actors cannot be created before ray.init() "
@@ -487,7 +496,7 @@ def _remote(self,
function_descriptor.get_function_descriptor_list(),
creation_args, meta.max_reconstructions, resources,
actor_placement_resources, is_direct_call, max_concurrency,
detached)
detached, is_asyncio)

actor_handle = ActorHandle(
actor_id,
@@ -217,7 +217,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached)
c_bool is_detached, c_bool is_asyncio)

cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
@@ -48,6 +48,12 @@ cdef extern from "ray/core_worker/profiling.h" nogil:
cdef cppclass CProfileEvent "ray::worker::ProfileEvent":
void SetExtraData(const c_string &extra_data)

cdef extern from "ray/core_worker/transport/direct_actor_transport.h" nogil:
cdef cppclass CFiberEvent "ray::FiberEvent":
CFiberEvent()
void Wait()
void Notify()

cdef extern from "ray/core_worker/core_worker.h" nogil:
cdef cppclass CCoreWorker "ray::CoreWorker":
CCoreWorker(const CWorkerType worker_type, const CLanguage language,
@@ -125,3 +131,5 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus Delete(const c_vector[CObjectID] &object_ids,
c_bool local_only, c_bool delete_creating_tasks)
c_string MemoryUsageString()

void YieldCurrentFiber(CFiberEvent &coroutine_done)
@@ -3,6 +3,7 @@
from __future__ import division
from __future__ import print_function

import asyncio
import pytest

import ray
@@ -94,3 +95,32 @@ def test_function(fn, remote_fn):
local_method = local_actor.cls_args_intertwined
test_function(local_method, actor_method)
ray.get(remote_test_function.remote(local_method, actor_method))

def test_asyncio_actor(ray_start_regular):
@ray.remote
class AsyncBatcher(object):
def __init__(self):
self.batch = []
# The event currently need to be created from the same thread.
# We currently run async coroutines from a different thread.
self.event = None

async def add(self, x):
if self.event is None:
self.event = asyncio.Event()
self.batch.append(x)
if len(self.batch) >= 3:
self.event.set()
else:
await self.event.wait()
return sorted(self.batch)

a = AsyncBatcher.options(is_direct_call=True, is_asyncio=True).remote()
x1 = a.add.remote(1)
x2 = a.add.remote(2)
x3 = a.add.remote(3)
r1 = ray.get(x1)
r2 = ray.get(x2)
r3 = ray.get(x3)
assert r1 == [1, 2, 3]
assert r1 == r2 == r3
@@ -2852,3 +2852,4 @@ def ping(self):
run_string_as_driver(driver_script)
detached_actor = ray.experimental.get_actor(actor_name)
assert ray.get(detached_actor.ping.remote()) == "pong"

@@ -199,6 +199,11 @@ int TaskSpecification::MaxActorConcurrency() const {
return message_->actor_creation_task_spec().max_concurrency();
}

bool TaskSpecification::IsAsyncioActor() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().is_asyncio();
}

bool TaskSpecification::IsDetachedActor() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().is_detached();
@@ -229,6 +234,8 @@ std::string TaskSpecification::DebugString() const {
stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId()
<< ", max_reconstructions=" << MaxActorReconstructions()
<< ", is_direct_call=" << IsDirectCall()
<< ", max_concurrency=" << MaxActorConcurrency()
<< ", is_asyncio_actor=" << IsAsyncioActor()
<< ", is_detached=" << IsDetachedActor() << "}";
} else if (IsActorTask()) {
// Print actor task spec.
@@ -150,6 +150,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

int MaxActorConcurrency() const;

bool IsAsyncioActor() const;

bool IsDetachedActor() const;

ObjectID ActorDummyObject() const;
@@ -95,7 +95,8 @@ class TaskSpecBuilder {
TaskSpecBuilder &SetActorCreationTaskSpec(
const ActorID &actor_id, uint64_t max_reconstructions = 0,
const std::vector<std::string> &dynamic_worker_options = {},
bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false) {
bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false,
bool is_asyncio = false) {
message_->set_type(TaskType::ACTOR_CREATION_TASK);
auto actor_creation_spec = message_->mutable_actor_creation_task_spec();
actor_creation_spec->set_actor_id(actor_id.Binary());
@@ -105,6 +106,7 @@ class TaskSpecBuilder {
}
actor_creation_spec->set_is_direct_call(is_direct_call);
actor_creation_spec->set_max_concurrency(max_concurrency);
actor_creation_spec->set_is_asyncio(is_asyncio);
actor_creation_spec->set_is_detached(is_detached);
return *this;
}
@@ -105,14 +105,15 @@ struct ActorCreationOptions {
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options,
bool is_detached)
bool is_detached, bool is_asyncio)
: max_reconstructions(max_reconstructions),
is_direct_call(is_direct_call),
max_concurrency(max_concurrency),
resources(resources),
placement_resources(placement_resources),
dynamic_worker_options(dynamic_worker_options),
is_detached(is_detached){};
is_detached(is_detached),
is_asyncio(is_asyncio){};

/// Maximum number of times that the actor should be reconstructed when it dies
/// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed.
@@ -132,6 +133,8 @@ struct ActorCreationOptions {
/// Whether to keep the actor persistent after driver exit. If true, this will set
/// the worker to not be destroyed after the driver shutdown.
const bool is_detached = false;
/// Whether to use async mode of direct actor call. is_direct_call must be true.
const bool is_asyncio = false;
};

} // namespace ray
@@ -98,6 +98,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
current_actor_id_ = task_spec.ActorCreationId();
current_actor_is_direct_call_ = task_spec.IsDirectCall();
current_actor_max_concurrency_ = task_spec.MaxActorConcurrency();
current_actor_is_asyncio_ = task_spec.IsAsyncioActor();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(current_job_id_ == task_spec.JobId());
RAY_CHECK(current_actor_id_ == task_spec.ActorId());
@@ -135,6 +136,8 @@ int WorkerContext::CurrentActorMaxConcurrency() const {
return current_actor_max_concurrency_;
}

bool WorkerContext::CurrentActorIsAsync() const { return current_actor_is_asyncio_; }

WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) {
if (thread_context_ == nullptr) {
thread_context_ = std::unique_ptr<WorkerThreadContext>(new WorkerThreadContext());
@@ -48,6 +48,8 @@ class WorkerContext {

int CurrentActorMaxConcurrency() const;

bool CurrentActorIsAsync() const;

int GetNextTaskIndex();

int GetNextPutIndex();
@@ -60,6 +62,7 @@ class WorkerContext {
bool current_actor_is_direct_call_ = false;
bool current_task_is_direct_call_ = false;
int current_actor_max_concurrency_ = 1;
bool current_actor_is_asyncio_ = false;

/// The id of the (main) thread that constructed this worker context.
boost::thread::id main_thread_id_;

0 comments on commit 29ba6bf

Please sign in to comment.
You can’t perform that action at this time.