Skip to content
Permalink
Browse files

[minor] Perf optimizations for direct actor task submission (#6044)

* merge optimizations

* fix

* fix memory err

* optimize

* fix tests

* fix serialization of method handles

* document weakref

* fix check

* bazel format

* disable on 2
  • Loading branch information
ericl committed Nov 1, 2019
1 parent eef4ad3 commit fb34928a2a477d97fc61f61302589695d307b22d
@@ -371,6 +371,7 @@ cc_library(
]),
copts = COPTS,
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
":core_worker_cc_proto",
":ray_common",
@@ -413,6 +414,8 @@ cc_library(
deps = [
":core_worker_lib",
":gcs",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_googletest//:gtest_main",
],
)
@@ -683,21 +683,38 @@ cdef void push_objects_into_return_vector(
c_vector[shared_ptr[CRayObject]] *returns):

cdef:
c_string metadata_str = RAW_BUFFER_METADATA
c_string raw_data_str
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata
shared_ptr[CRayObject] ray_object
int64_t data_size

for serialized_object in py_objects:
data_size = serialized_object.total_bytes
data = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](data_size))
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(Buffer.make(data)))
serialized_object.write_to(stream)
ray_object = make_shared[CRayObject](data, metadata)
returns.push_back(ray_object)
if isinstance(serialized_object, bytes):
data_size = len(serialized_object)
raw_data_str = serialized_object
data = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(raw_data_str.data()), raw_data_str.size()))
metadata = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(metadata_str.data()), metadata_str.size()))
ray_object = make_shared[CRayObject](data, metadata, True)
returns.push_back(ray_object)
else:
data_size = serialized_object.total_bytes
data = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](data_size))
metadata.reset()
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(Buffer.make(data)))
serialized_object.write_to(stream)
ray_object = make_shared[CRayObject](data, metadata)
returns.push_back(ray_object)


cdef class CoreWorker:
@@ -981,7 +998,7 @@ cdef class CoreWorker:
function_descriptor,
args,
int num_return_vals,
resources):
double num_method_cpus):

cdef:
CActorID c_actor_id = actor_id.native()
@@ -992,7 +1009,8 @@ cdef class CoreWorker:
c_vector[CObjectID] return_ids

with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
if num_method_cpus > 0:
c_resources[b"CPU"] = num_method_cpus
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
@@ -7,6 +7,7 @@
import logging
import six
import sys
import weakref

from abc import ABCMeta, abstractmethod
from collections import namedtuple
@@ -57,9 +58,8 @@ def annotate_method(method):
class ActorMethod(object):
"""A class used to invoke an actor method.
Note: This class is instantiated only while the actor method is being
invoked (so that it doesn't keep a reference to the actor handle and
prevent it from going out of scope).
Note: This class only keeps a weak ref to the actor, unless it has been
passed to a remote function. This avoids delays in GC of the actor.
Attributes:
_actor: A handle to the actor.
@@ -75,8 +75,13 @@ class ActorMethod(object):
"test_decorated_method" in "python/ray/tests/test_actor.py".
"""

def __init__(self, actor, method_name, num_return_vals, decorator=None):
self._actor = actor
def __init__(self,
actor,
method_name,
num_return_vals,
decorator=None,
hardref=False):
self._actor_ref = weakref.ref(actor)
self._method_name = method_name
self._num_return_vals = num_return_vals
# This is a decorator that is used to wrap the function invocation (as
@@ -86,6 +91,11 @@ def __init__(self, actor, method_name, num_return_vals, decorator=None):
# and return the resulting ObjectIDs.
self._decorator = decorator

# Acquire a hard ref to the actor, this is useful mainly when passing
# actor method handles to remote functions.
if hardref:
self._actor_hard_ref = actor

def __call__(self, *args, **kwargs):
raise Exception("Actor methods cannot be called directly. Instead "
"of running 'object.{}()', try "
@@ -96,15 +106,14 @@ def remote(self, *args, **kwargs):
return self._remote(args, kwargs)

def _remote(self, args=None, kwargs=None, num_return_vals=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
if num_return_vals is None:
num_return_vals = self._num_return_vals

def invocation(args, kwargs):
return self._actor._actor_method_call(
actor = self._actor_ref()
if actor is None:
raise RuntimeError("Lost reference to actor")
return actor._actor_method_call(
self._method_name,
args=args,
kwargs=kwargs,
@@ -116,6 +125,22 @@ def invocation(args, kwargs):

return invocation(args, kwargs)

def __getstate__(self):
return {
"actor": self._actor_ref(),
"method_name": self._method_name,
"num_return_vals": self._num_return_vals,
"decorator": self._decorator,
}

def __setstate__(self, state):
self.__init__(
state["actor"],
state["method_name"],
state["num_return_vals"],
state["decorator"],
hardref=True)


class ActorClassMetadata(object):
"""Metadata for an actor class.
@@ -502,6 +527,14 @@ def __init__(self,
for method_name in self._ray_method_signatures.keys()
}

for method_name in actor_method_names:
method = ActorMethod(
self,
method_name,
self._ray_method_num_return_vals[method_name],
decorator=self._ray_method_decorators.get(method_name))
setattr(self, method_name, method)

def _actor_method_call(self,
method_name,
args=None,
@@ -526,13 +559,15 @@ def _actor_method_call(self,
"""
worker = ray.worker.get_global_worker()

worker.check_connected()

function_signature = self._ray_method_signatures[method_name]
args = args or []
kwargs = kwargs or {}
function_signature = self._ray_method_signatures[method_name]

list_args = signature.flatten_args(function_signature, args, kwargs)
if not args and not kwargs and not function_signature:
list_args = []
else:
list_args = signature.flatten_args(function_signature, args,
kwargs)
if worker.mode == ray.LOCAL_MODE:
function = getattr(worker.actors[self._actor_id], method_name)
object_ids = worker.local_mode_manager.execute(
@@ -541,7 +576,7 @@ def _actor_method_call(self,
object_ids = worker.core_worker.submit_actor_task(
self._ray_actor_id,
self._ray_function_descriptor_lists[method_name], list_args,
num_return_vals, {"CPU": self._ray_actor_method_cpus})
num_return_vals, self._ray_actor_method_cpus)

if len(object_ids) == 1:
object_ids = object_ids[0]
@@ -554,30 +589,6 @@ def _actor_method_call(self,
def __dir__(self):
return self._ray_actor_method_names

def __getattribute__(self, attr):
try:
# Check whether this is an actor method.
actor_method_names = object.__getattribute__(
self, "_ray_actor_method_names")
if attr in actor_method_names:
# We create the ActorMethod on the fly here so that the
# ActorHandle doesn't need a reference to the ActorMethod.
# The ActorMethod has a reference to the ActorHandle and
# this was causing cyclic references which were prevent
# object deallocation from behaving in a predictable
# manner.
return ActorMethod(
self,
attr,
self._ray_method_num_return_vals[attr],
decorator=self._ray_method_decorators.get(attr))
except AttributeError:
pass

# If the requested attribute is not a registered method, fall back
# to default __getattribute__.
return object.__getattribute__(self, attr)

def __repr__(self):
return "Actor({}, {})".format(self._ray_class_name,
self._actor_id.hex())
@@ -10,19 +10,19 @@
filter_pattern = os.environ.get("TESTS_TO_RUN", "")


@ray.remote
@ray.remote(num_cpus=0)
class Actor(object):
def small_value(self):
return 0
return b"ok"

def small_value_arg(self, x):
return 0
return b"ok"

def small_value_batch(self, n):
ray.get([small_value.remote() for _ in range(n)])


@ray.remote
@ray.remote(num_cpus=0)
class Client(object):
def __init__(self, servers):
if not isinstance(servers, list):
@@ -45,7 +45,7 @@ def small_value_batch_arg(self, n):

@ray.remote
def small_value():
return 0
return b"ok"


@ray.remote
@@ -494,11 +494,16 @@ def getpid(self):
actors = None
[ray.tests.utils.wait_for_pid_to_exit(pid) for pid in pids]


@pytest.mark.skipif(
sys.version_info < (3, 0), reason="This test requires Python 3.")
def test_actor_method_deletion(ray_start_regular):
@ray.remote
class Actor(object):
def method(self):
return 1

# TODO(ekl) this doesn't work in Python 2 after the weak ref method change.
# Make sure that if we create an actor and call a method on it
# immediately, the actor doesn't get killed before the method is
# called.
@@ -292,8 +292,8 @@ def put_object(self, value, object_id=None, return_buffer=None):

if isinstance(value, bytes):
if return_buffer is not None:
raise NotImplementedError(
"returning raw buffers from direct actor calls")
return_buffer.append(value)
return
# If the object is a byte array, skip serializing it and
# use a special metadata to indicate it's raw binary. So
# that this object can also be read by Java.
@@ -25,11 +25,6 @@ class RayObject {
RayObject(const std::shared_ptr<Buffer> &data, const std::shared_ptr<Buffer> &metadata,
bool copy_data = false)
: data_(data), metadata_(metadata), has_data_copy_(copy_data) {
RAY_CHECK(!data || data_->Size())
<< "Zero-length buffers are not allowed when constructing a RayObject.";
RAY_CHECK(!metadata || metadata->Size())
<< "Zero-length buffers are not allowed when constructing a RayObject.";

if (has_data_copy_) {
// If this object is required to hold a copy of the data,
// make a copy if the passed in buffers don't already have a copy.
@@ -39,8 +39,8 @@ void BuildCommonTaskSpec(

// Group object ids according the the corresponding store providers.
void GroupObjectIdsByStoreProvider(const std::vector<ObjectID> &object_ids,
std::unordered_set<ObjectID> *plasma_object_ids,
std::unordered_set<ObjectID> *memory_object_ids) {
absl::flat_hash_set<ObjectID> *plasma_object_ids,
absl::flat_hash_set<ObjectID> *memory_object_ids) {
// There are two cases:
// - for task return objects from direct actor call, use memory store provider;
// - all the others use plasma store provider.
@@ -312,12 +312,12 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, int64_t timeout_ms,
std::vector<std::shared_ptr<RayObject>> *results) {
results->resize(ids.size(), nullptr);

std::unordered_set<ObjectID> plasma_object_ids;
std::unordered_set<ObjectID> memory_object_ids;
absl::flat_hash_set<ObjectID> plasma_object_ids;
absl::flat_hash_set<ObjectID> memory_object_ids;
GroupObjectIdsByStoreProvider(ids, &plasma_object_ids, &memory_object_ids);

bool got_exception = false;
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> result_map;
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> result_map;
auto start_time = current_time_ms();
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(plasma_object_ids, timeout_ms,
worker_context_.GetCurrentTaskID(),
@@ -360,8 +360,8 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids, int num_objects,
"Number of objects to wait for must be between 1 and the number of ids.");
}

std::unordered_set<ObjectID> plasma_object_ids;
std::unordered_set<ObjectID> memory_object_ids;
absl::flat_hash_set<ObjectID> plasma_object_ids;
absl::flat_hash_set<ObjectID> memory_object_ids;
GroupObjectIdsByStoreProvider(ids, &plasma_object_ids, &memory_object_ids);

if (plasma_object_ids.size() + memory_object_ids.size() != ids.size()) {
@@ -377,7 +377,7 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids, int num_objects,
// a timeout of 0, but that does not address the situation where objects
// become available on the second store provider while waiting on the first.

std::unordered_set<ObjectID> ready;
absl::flat_hash_set<ObjectID> ready;
// Wait from both store providers with timeout set to 0. This is to avoid the case
// where we might use up the entire timeout on trying to get objects from one store
// provider before even trying another (which might have all of the objects available).
@@ -421,8 +421,8 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids, int num_objects,

Status CoreWorker::Delete(const std::vector<ObjectID> &object_ids, bool local_only,
bool delete_creating_tasks) {
std::unordered_set<ObjectID> plasma_object_ids;
std::unordered_set<ObjectID> memory_object_ids;
absl::flat_hash_set<ObjectID> plasma_object_ids;
absl::flat_hash_set<ObjectID> memory_object_ids;
GroupObjectIdsByStoreProvider(object_ids, &plasma_object_ids, &memory_object_ids);

RAY_RETURN_NOT_OK(plasma_store_provider_->Delete(plasma_object_ids, local_only,
@@ -2,6 +2,7 @@
#define RAY_CORE_WORKER_CORE_WORKER_H

#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/synchronization/mutex.h"

@@ -421,7 +422,7 @@ class CoreWorker {
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter> direct_actor_submitter_;

/// Map from actor ID to a handle to that actor.
std::unordered_map<ActorID, std::unique_ptr<ActorHandle>> actor_handles_;
absl::flat_hash_map<ActorID, std::unique_ptr<ActorHandle>> actor_handles_;

/* Fields related to task execution. */

0 comments on commit fb34928

Please sign in to comment.
You can’t perform that action at this time.