Skip to content
Permalink
Browse files

Dynamic Custom Resources - create and delete resources (#3742)

  • Loading branch information...
romilbhardwaj authored and raulchen committed May 11, 2019
1 parent 351753a commit 004440f526f13586a1740174235f1aaffc4937b9
@@ -495,6 +495,7 @@ flatbuffer_py_library(
"ConfigTableData.py",
"CustomSerializerData.py",
"DriverTableData.py",
"EntryType.py",
"ErrorTableData.py",
"ErrorType.py",
"FunctionTableData.py",
@@ -26,6 +26,7 @@
"ray.core.generated.ActorCheckpointIdData",
"ray.core.generated.ClientTableData",
"ray.core.generated.DriverTableData",
"ray.core.generated.EntryType",
"ray.core.generated.ErrorTableData",
"ray.core.generated.ErrorType",
"ray.core.generated.GcsTableEntry",
@@ -81,7 +81,7 @@ API. The easiest way to do this is to start or connect to a Ray cluster with
ray.worker.global_state.client_table()
# Returns current information about the nodes in the cluster, such as:
# [{'ClientID': '2a9d2b34ad24a37ed54e4fcd32bf19f915742f5b',
# 'IsInsertion': True,
# 'EntryType': 0,
# 'NodeManagerAddress': '1.2.3.4',
# 'NodeManagerPort': 43280,
# 'ObjectManagerPort': 38062,
@@ -154,6 +154,7 @@ flatbuffers_generated_files = [
"ConfigTableData.java",
"CustomSerializerData.java",
"DriverTableData.java",
"EntryType.java",
"ErrorTableData.java",
"ErrorType.java",
"FunctionTableData.java",
@@ -13,6 +13,7 @@
import org.ray.api.runtimecontext.NodeInfo;
import org.ray.runtime.generated.ActorCheckpointIdData;
import org.ray.runtime.generated.ClientTableData;
import org.ray.runtime.generated.EntryType;
import org.ray.runtime.generated.TablePrefix;
import org.ray.runtime.util.UniqueIdUtil;
import org.slf4j.Logger;
@@ -63,7 +64,7 @@ public GcsClient(String redisAddress, String redisPassword) {
ClientTableData data = ClientTableData.getRootAsClientTableData(ByteBuffer.wrap(result));
final UniqueId clientId = UniqueId.fromByteBuffer(data.clientIdAsByteBuffer());

if (data.isInsertion()) {
if (data.entryType() == EntryType.INSERTION) {
//Code path of node insertion.
Map<String, Double> resources = new HashMap<>();
// Compute resources.
@@ -72,12 +73,24 @@ public GcsClient(String redisAddress, String redisPassword) {
for (int i = 0; i < data.resourcesTotalLabelLength(); i++) {
resources.put(data.resourcesTotalLabel(i), data.resourcesTotalCapacity(i));
}

NodeInfo nodeInfo = new NodeInfo(
clientId, data.nodeManagerAddress(), true, resources);
clients.put(clientId, nodeInfo);
} else if (data.entryType() == EntryType.RES_CREATEUPDATE){
Preconditions.checkState(clients.containsKey(clientId));
NodeInfo nodeInfo = clients.get(clientId);
for (int i = 0; i < data.resourcesTotalLabelLength(); i++) {
nodeInfo.resources.put(data.resourcesTotalLabel(i), data.resourcesTotalCapacity(i));
}
} else if (data.entryType() == EntryType.RES_DELETE){
Preconditions.checkState(clients.containsKey(clientId));
NodeInfo nodeInfo = clients.get(clientId);
for (int i = 0; i < data.resourcesTotalLabelLength(); i++) {
nodeInfo.resources.remove(data.resourcesTotalLabel(i));
}
} else {
// Code path of node deletion.
Preconditions.checkState(data.entryType() == EntryType.DELETION);
NodeInfo nodeInfo = new NodeInfo(clientId, clients.get(clientId).nodeAddress,
false, clients.get(clientId).resources);
clients.put(clientId, nodeInfo);
@@ -32,6 +32,7 @@ from ray.includes.libraylet cimport (
from ray.includes.unique_ids cimport (
CActorCheckpointID,
CObjectID,
CClientID,
)
from ray.includes.task cimport CTaskSpecification
from ray.includes.ray_config cimport RayConfig
@@ -368,6 +369,9 @@ cdef class RayletClient:
check_status(self.client.get().NotifyActorResumedFromCheckpoint(
actor_id.native(), checkpoint_id.native()))

def set_resource(self, basestring resource_name, double capacity, ClientID client_id):
self.client.get().SetResource(resource_name.encode("ascii"), capacity, CClientID.from_binary(client_id.binary()))

@property
def language(self):
return Language.from_native(self.client.get().GetLanguage())
@@ -10,6 +10,7 @@
SimpleGcsFlushPolicy)
from .named_actors import get_actor, register_actor
from .api import get, wait
from .dynamic_resources import set_resource


def TensorFlowVariables(*args, **kwargs):
@@ -24,5 +25,5 @@ def TensorFlowVariables(*args, **kwargs):
"flush_evicted_objects_unsafe", "_flush_finished_tasks_unsafe_shard",
"_flush_evicted_objects_unsafe_shard", "get_actor", "register_actor",
"get", "wait", "set_flushing_policy", "GcsFlushPolicy",
"SimpleGcsFlushPolicy"
"SimpleGcsFlushPolicy", "set_resource"
]
@@ -0,0 +1,35 @@
import ray


def set_resource(resource_name, capacity, client_id=None):
""" Set a resource to a specified capacity.
This creates, updates or deletes a custom resource for a target clientId.
If the resource already exists, it's capacity is updated to the new value.
If the capacity is set to 0, the resource is deleted.
If ClientID is not specified or set to None,
the resource is created on the local client where the actor is running.
Args:
resource_name (str): Name of the resource to be created
capacity (int): Capacity of the new resource. Resource is deleted if
capacity is 0.
client_id (str): The ClientId of the node where the resource is to be
set.
Returns:
None
Raises:
ValueError: This exception is raised when a non-negative capacity is
specified.
"""
if client_id is not None:
client_id_obj = ray.ClientID(ray.utils.hex_to_binary(client_id))
else:
client_id_obj = ray.ClientID.nil()
if (capacity < 0) or (capacity != int(capacity)):
raise ValueError(
"Capacity {} must be a non-negative integer.".format(capacity))
return ray.worker.global_worker.raylet_client.set_resource(
resource_name, capacity, client_id_obj)
@@ -13,6 +13,7 @@

from ray.ray_constants import ID_SIZE
from ray import services
from ray.core.generated.EntryType import EntryType
from ray.utils import (decode, binary_to_object_id, binary_to_hex,
hex_to_binary)

@@ -54,29 +55,43 @@ def parse_client_table(redis_client):
}
client_id = ray.utils.binary_to_hex(client.ClientId())

# If this client is being removed, then it must
if client.EntryType() == EntryType.INSERTION:
ordered_client_ids.append(client_id)
node_info[client_id] = {
"ClientID": client_id,
"EntryType": client.EntryType(),
"NodeManagerAddress": decode(
client.NodeManagerAddress(), allow_none=True),
"NodeManagerPort": client.NodeManagerPort(),
"ObjectManagerPort": client.ObjectManagerPort(),
"ObjectStoreSocketName": decode(
client.ObjectStoreSocketName(), allow_none=True),
"RayletSocketName": decode(
client.RayletSocketName(), allow_none=True),
"Resources": resources
}

# If this client is being updated, then it must
# have previously been inserted, and
# it cannot have previously been removed.
if not client.IsInsertion():
assert client_id in node_info, "Client removed not found!"
assert node_info[client_id]["IsInsertion"], (
"Unexpected duplicate removal of client.")
else:
ordered_client_ids.append(client_id)

node_info[client_id] = {
"ClientID": client_id,
"IsInsertion": client.IsInsertion(),
"NodeManagerAddress": decode(
client.NodeManagerAddress(), allow_none=True),
"NodeManagerPort": client.NodeManagerPort(),
"ObjectManagerPort": client.ObjectManagerPort(),
"ObjectStoreSocketName": decode(
client.ObjectStoreSocketName(), allow_none=True),
"RayletSocketName": decode(
client.RayletSocketName(), allow_none=True),
"Resources": resources
}
assert client_id in node_info, "Client not found!"
assert node_info[client_id]["EntryType"] != EntryType.DELETION, (
"Unexpected updation of deleted client.")
res_map = node_info[client_id]["Resources"]
if client.EntryType() == EntryType.RES_CREATEUPDATE:
for res in resources:
res_map[res] = resources[res]
elif client.EntryType() == EntryType.RES_DELETE:
for res in resources:
res_map.pop(res, None)
elif client.EntryType() == EntryType.DELETION:
pass # Do nothing with the resmap if client deletion
else:
raise RuntimeError("Unexpected EntryType {}".format(
client.EntryType()))
node_info[client_id]["Resources"] = res_map
node_info[client_id]["EntryType"] = client.EntryType()
# NOTE: We return the list comprehension below instead of simply doing
# 'list(node_info.values())' in order to have the nodes appear in the order
# that they joined the cluster. Python dictionaries do not preserve
@@ -757,18 +772,18 @@ def cluster_resources(self):
resources = defaultdict(int)
clients = self.client_table()
for client in clients:
# Only count resources from live clients.
if client["IsInsertion"]:
# Only count resources from latest entries of live clients.
if client["EntryType"] != EntryType.DELETION:
for key, value in client["Resources"].items():
resources[key] += value

return dict(resources)

def _live_client_ids(self):
"""Returns a set of client IDs corresponding to clients still alive."""
return {
client["ClientID"]
for client in self.client_table() if client["IsInsertion"]
for client in self.client_table()
if (client["EntryType"] != EntryType.DELETION)
}

def available_resources(self):
@@ -72,6 +72,7 @@ cdef extern from "ray/raylet/raylet_client.h" nogil:
CActorCheckpointID &checkpoint_id)
CRayStatus NotifyActorResumedFromCheckpoint(
const CActorID &actor_id, const CActorCheckpointID &checkpoint_id)
CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id)
CLanguage GetLanguage() const
CClientID GetClientID() const
CDriverID GetDriverID() const
@@ -8,6 +8,7 @@
import redis

import ray
from ray.core.generated.EntryType import EntryType

logger = logging.getLogger(__name__)

@@ -175,7 +176,8 @@ def wait_for_nodes(self, timeout=30):
while time.time() - start_time < timeout:
clients = ray.experimental.state.parse_client_table(redis_client)
live_clients = [
client for client in clients if client["IsInsertion"]
client for client in clients
if client["EntryType"] == EntryType.INSERTION
]

expected = len(self.list_all_nodes())

0 comments on commit 004440f

Please sign in to comment.
You can’t perform that action at this time.