Skip to content

Commit

Permalink
[ray_client] Passing actors to actors (#12585)
Browse files Browse the repository at this point in the history
* start building tests around passing handles to handles

Change-Id: Ie8c3de5c8ce789c3ec8d29f0702df80ba598279f

* clean up the switch statements by moving to a method, implement state tranfer, extend test

Change-Id: Ie7b6493db3a6c203d3a0b262b8fbacb90e5cdbc5

* passing

Change-Id: Id88dc0a41da1c9d5ba68f754c5b57141aae47beb

* flush out tests

Change-Id: If77c0f586e9e99449d494be4e85f854e4a7a4952

* formatting

Change-Id: I497c07cee70b52453b221ed4393f04f6f560061e

* fix python3.6 and other attributes

Change-Id: I5a2c5231e8a021184d9dfc3e346df7f71fc93257

* address documentation

Change-Id: I049d841ed1f85b7350c17c05da4a4d81d5cb03df

* formatting

Change-Id: I6a2b32a2466ffc9f03fc91ac17901b9c1a49505c

* use the pickled handle as the id bytes for actors

Change-Id: I9ddcb41d614de65d42d6f0382fe0faa7ad2c2ade

* pydoc

Change-Id: I9b32a0f383d5ff5ac052e61929b7ae3e42a89fc5

* format

Change-Id: Iac0010bb990a4025a98139ab88700030b2e9e7f5

* todos

Change-Id: I7b550800cf7499403e8a17b77484bc46f20f0afc

* tests

Change-Id: If8ebf6a335baeb113c1332acc930c41a6b4f5384

* fix lint

Change-Id: I019f41e0ec341d39bbbbd39aa43d9fb5f8b57cf0

* nits

Change-Id: I2e6813d8db34f4ce008326faa095d414c10eee95

* add some tricky, python3.6-troublesome type checking

Change-Id: Ib887fc943a6e7084002bc13dfbe113b69b4d9317
  • Loading branch information
barakmich committed Dec 9, 2020
1 parent d534719 commit dc4b5c7
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 129 deletions.
97 changes: 72 additions & 25 deletions python/ray/experimental/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,88 @@

logger = logging.getLogger(__name__)

# _client_api has to be external to the API stub, below.
# Otherwise, ray.remote() that contains ray.remote()
# contains a reference to the RayAPIStub, therefore a
# reference to the _client_api, and then tries to pickle
# the thing.
# About these global variables: Ray 1.0 uses exported module functions to
# provide its API, and we need to match that. However, we want different
# behaviors depending on where, exactly, in the client stack this is running.
#
# The reason for these differences depends on what's being pickled and passed
# to functions, or functions inside functions. So there are three cases to care
# about
#
# (Python Client)-->(Python ClientServer)-->(Internal Raylet Process)
#
# * _client_api should be set if we're inside the client
# * _server_api should be set if we're inside the clientserver
# * Both will be set if we're running both (as in a test)
# * Neither should be set if we're inside the raylet (but we still need to shim
# from the client API surface to the Ray API)
#
# The job of RayAPIStub (below) delegates to the appropriate one of these
# depending on what's set or not. Then, all users importing the ray object
# from this package get the stub which routes them to the appropriate APIImpl.
_client_api: Optional[APIImpl] = None
_server_api: Optional[APIImpl] = None

# The reason for _is_server is a hack around the above comment while running
# tests. If we have both a client and a server trying to control these static
# variables then we need a way to decide which to use. In this case, both
# _client_api and _server_api are set.
# This boolean flips between the two
_is_server: bool = False


@contextmanager
def stash_api_for_tests(in_test: bool):
api = None
global _is_server
is_server = _is_server
if in_test:
api = stash_api()
yield api
_is_server = True
yield _server_api
if in_test:
restore_api(api)
_is_server = is_server


def _set_client_api(val: Optional[APIImpl]):
global _client_api
global _is_server
if _client_api is not None:
raise Exception("Trying to set more than one client API")
_client_api = val
_is_server = False


def _set_server_api(val: Optional[APIImpl]):
global _server_api
global _is_server
if _server_api is not None:
raise Exception("Trying to set more than one server API")
_server_api = val
_is_server = True

def stash_api() -> Optional[APIImpl]:

def reset_api():
global _client_api
a = _client_api
global _server_api
global _is_server
_client_api = None
return a
_server_api = None
_is_server = False


def restore_api(api: Optional[APIImpl]):
def _get_client_api() -> APIImpl:
global _client_api
_client_api = api
global _server_api
global _is_server
api = None
if _is_server:
api = _server_api
else:
api = _client_api
if api is None:
# We're inside a raylet worker
from ray.experimental.client.server.core_ray_api import CoreRayAPI
return CoreRayAPI()
return api


class RayAPIStub:
Expand All @@ -43,11 +97,10 @@ def connect(self,
secure: bool = False,
metadata: List[Tuple[str, str]] = None,
stub=None):
global _client_api
from ray.experimental.client.worker import Worker
_client_worker = Worker(
conn_str, secure=secure, metadata=metadata, stub=stub)
_client_api = ClientAPI(_client_worker)
_set_client_api(ClientAPI(_client_worker))

def disconnect(self):
global _client_api
Expand All @@ -56,15 +109,9 @@ def disconnect(self):
_client_api = None

def __getattr__(self, key: str):
global _client_api
self.__check_client_api()
return getattr(_client_api, key)

def __check_client_api(self):
global _client_api
if _client_api is None:
from ray.experimental.client.server.core_ray_api import CoreRayAPI
_client_api = CoreRayAPI()
global _get_client_api
api = _get_client_api()
return getattr(api, key)


ray = RayAPIStub()
Expand Down
84 changes: 77 additions & 7 deletions python/ray/experimental/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,105 @@

from abc import ABC
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Union
if TYPE_CHECKING:
from ray.experimental.client.common import ClientStub
from ray.experimental.client.common import ClientObjectRef
from ray._raylet import ObjectRef

# Use the imports for type checking. This is a python 3.6 limitation.
# See https://www.python.org/dev/peps/pep-0563/
PutType = Union[ClientObjectRef, ObjectRef]


class APIImpl(ABC):
"""
APIImpl is the interface to implement for whichever version of the core
Ray API that needs abstracting when run in client mode.
"""

@abstractmethod
def get(self, *args, **kwargs):
def get(self, *args, **kwargs) -> Any:
"""
get is the hook stub passed on to replace `ray.get`
Args:
args: opaque arguments
kwargs: opaque keyword arguments
"""
pass

@abstractmethod
def put(self, *args, **kwargs):
def put(self, vals: Any, *args,
**kwargs) -> Union["ClientObjectRef", "ObjectRef"]:
"""
put is the hook stub passed on to replace `ray.put`
Args:
vals: The value or list of values to `put`.
args: opaque arguments
kwargs: opaque keyword arguments
"""
pass

@abstractmethod
def wait(self, *args, **kwargs):
"""
wait is the hook stub passed on to replace `ray.wait`
Args:
args: opaque arguments
kwargs: opaque keyword arguments
"""
pass

@abstractmethod
def remote(self, *args, **kwargs):
"""
remote is the hook stub passed on to replace `ray.remote`.
This sets up remote functions or actors, as the decorator,
but does not execute them.
Args:
args: opaque arguments
kwargs: opaque keyword arguments
"""
pass

@abstractmethod
def call_remote(self, f, kind, *args, **kwargs):
def call_remote(self, instance: "ClientStub", *args, **kwargs):
"""
call_remote is called by stub objects to execute them remotely.
This is used by stub objects in situations where they're called
with .remote, eg, `f.remote()` or `actor_cls.remote()`.
This allows the client stub objects to delegate execution to be
implemented in the most effective way whether it's in the client,
clientserver, or raylet worker.
Args:
instance: The Client-side stub reference to a remote object
args: opaque arguments
kwargs: opaque keyword arguments
"""
pass

@abstractmethod
def close(self, *args, **kwargs):
def close(self) -> None:
"""
close cleans up an API connection by closing any channels or
shutting down any servers gracefully.
"""
pass


class ClientAPI(APIImpl):
"""
The Client-side methods corresponding to the ray API. Delegates
to the Client Worker that contains the connection to the ClientServer.
"""

def __init__(self, worker):
self.worker = worker

Expand All @@ -55,10 +125,10 @@ def wait(self, *args, **kwargs):
def remote(self, *args, **kwargs):
return self.worker.remote(*args, **kwargs)

def call_remote(self, f, kind, *args, **kwargs):
return self.worker.call_remote(f, kind, *args, **kwargs)
def call_remote(self, instance: "ClientStub", *args, **kwargs):
return self.worker.call_remote(instance, *args, **kwargs)

def close(self, *args, **kwargs):
def close(self) -> None:
return self.worker.close()

def __getattr__(self, key: str):
Expand Down

0 comments on commit dc4b5c7

Please sign in to comment.