Skip to content

Commit

Permalink
[core][observability] Report idle node information in status and dash…
Browse files Browse the repository at this point in the history
…board (ray-project#39638)


---------

Signed-off-by: vitsai <vitsai@cs.stanford.edu>
  • Loading branch information
vitsai authored and simonsays1980 committed Sep 26, 2023
1 parent 2e8f631 commit 1ca9dc4
Show file tree
Hide file tree
Showing 41 changed files with 545 additions and 273 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1856,6 +1856,7 @@ ray_cc_test(
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
61 changes: 19 additions & 42 deletions dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,16 @@ async def get_node_workers(cls, node_id):
return workers

@classmethod
async def get_node_info(cls, node_id):
async def get_node_info(cls, node_id, get_summary=False):
node_physical_stats = dict(DataSource.node_physical_stats.get(node_id, {}))
node_stats = dict(DataSource.node_stats.get(node_id, {}))
node = DataSource.nodes.get(node_id, {})

node_stats.pop("coreWorkersStats", None)
if get_summary:
node_physical_stats.pop("workers", None)
node_stats.pop("workersStats", None)
else:
node_stats.pop("coreWorkersStats", None)
store_stats = node_stats.get("storeStats", {})
used = int(store_stats.get("objectStoreBytesUsed", 0))
# objectStoreBytesAvail == total in the object_manager.cc definition.
Expand All @@ -139,6 +143,8 @@ async def get_node_info(cls, node_id):
node_info["raylet"] = node_stats
node_info["raylet"].update(ray_stats)

node_info["status"] = node["stateSnapshot"]["state"]

# Merge GcsNodeInfo to node physical stats
node_info["raylet"].update(node)
# Add "is_head_node" field
Expand All @@ -148,53 +154,24 @@ async def get_node_info(cls, node_id):
if node_physical_stats.get("ip")
else False
)
# Merge actors to node physical stats
node_info["actors"] = DataSource.node_actors.get(node_id, {})
# Update workers to node physical stats
node_info["workers"] = DataSource.node_workers.get(node_id, [])
await GlobalSignals.node_info_fetched.send(node_info)

return node_info
if not get_summary:
# Merge actors to node physical stats
node_info["actors"] = DataSource.node_actors.get(node_id, {})
# Update workers to node physical stats
node_info["workers"] = DataSource.node_workers.get(node_id, [])

@classmethod
async def get_node_summary(cls, node_id):
node_physical_stats = dict(DataSource.node_physical_stats.get(node_id, {}))
node_stats = dict(DataSource.node_stats.get(node_id, {}))
node = DataSource.nodes.get(node_id, {})
if get_summary:
await GlobalSignals.node_summary_fetched.send(node_info)
else:
await GlobalSignals.node_info_fetched.send(node_info)

node_physical_stats.pop("workers", None)
node_stats.pop("workersStats", None)
store_stats = node_stats.get("storeStats", {})
used = int(store_stats.get("objectStoreBytesUsed", 0))
# objectStoreBytesAvail == total in the object_manager.cc definition.
total = int(store_stats.get("objectStoreBytesAvail", 0))
ray_stats = {
"object_store_used_memory": used,
"object_store_available_memory": total - used,
}

node_summary = node_physical_stats
# Merge node stats to node physical stats
node_summary["raylet"] = node_stats
node_summary["raylet"].update(ray_stats)
# Merge GcsNodeInfo to node physical stats
node_summary["raylet"].update(node)
# Add "is_head_node" field
# TODO(aguo): Grab head node information from a source of truth
node_summary["raylet"]["is_head_node"] = (
cls.head_node_ip == node_physical_stats.get("ip")
if node_physical_stats.get("ip")
else False
)

await GlobalSignals.node_summary_fetched.send(node_summary)

return node_summary
return node_info

@classmethod
async def get_all_node_summary(cls):
return [
await DataOrganizer.get_node_summary(node_id)
await DataOrganizer.get_node_info(node_id, get_summary=True)
for node_id in DataSource.nodes.keys()
]

Expand Down
3 changes: 2 additions & 1 deletion dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import time
import grpc
from itertools import chain

import aiohttp.web

Expand Down Expand Up @@ -261,7 +262,7 @@ async def get_nodes_logical_resources(self) -> dict:

per_node_resources = {}
# TODO(rickyx): we should just return structure data rather than strings.
for node in cluster_status.healthy_nodes:
for node in chain(cluster_status.active_nodes, cluster_status.idle_nodes):
if not node.resource_usage:
continue

Expand Down
13 changes: 6 additions & 7 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,13 +1337,12 @@ def init(
# If available, use RAY_ADDRESS to override if the address was left
# unspecified, or set to "auto" in the call to init
address_env_var = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
if address_env_var:
if address is None or address == "auto":
address = address_env_var
logger.info(
f"Using address {address_env_var} set in the environment "
f"variable {ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE}"
)
if address_env_var and (address is None or address == "auto"):
address = address_env_var
logger.info(
f"Using address {address_env_var} set in the environment "
f"variable {ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE}"
)

if address is not None and "://" in address:
# Address specified a protocol, use ray client
Expand Down
7 changes: 7 additions & 0 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,22 @@
@dataclass
class AutoscalerSummary:
active_nodes: Dict[NodeType, int]
idle_nodes: Optional[Dict[NodeType, int]]
pending_nodes: List[Tuple[NodeIP, NodeType, NodeStatus]]
pending_launches: Dict[NodeType, int]
failed_nodes: List[Tuple[NodeIP, NodeType]]
node_availability_summary: NodeAvailabilitySummary = field(
default_factory=lambda: NodeAvailabilitySummary({})
)
# A dictionary of node IP to a list of reasons the node is not idle.
node_activities: Optional[Dict[str, Tuple[NodeIP, List[str]]]] = None
pending_resources: Dict[str, int] = field(default_factory=lambda: {})
# A mapping from node name (the same key as `usage_by_node`) to node type.
# Optional for deployment modes which have the concept of node types and
# backwards compatibility.
node_type_mapping: Optional[Dict[str, str]] = None
# Whether the autoscaler summary is v1 or v2.
legacy: bool = False


class NonTerminatedNodes:
Expand Down Expand Up @@ -1486,6 +1491,7 @@ def summary(self) -> Optional[AutoscalerSummary]:
return AutoscalerSummary(
# Convert active_nodes from counter to dict for later serialization
active_nodes=dict(active_nodes),
idle_nodes=None,
pending_nodes=[
(ip, node_type, status) for _, ip, node_type, status in pending_nodes
],
Expand All @@ -1494,6 +1500,7 @@ def summary(self) -> Optional[AutoscalerSummary]:
node_availability_summary=self.node_provider_availability_tracker.summary(),
pending_resources=pending_resources,
node_type_mapping=node_type_mapping,
legacy=True,
)

def info_string(self):
Expand Down
66 changes: 51 additions & 15 deletions python/ray/autoscaler/_private/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ def get_per_node_breakdown_as_dict(
def get_per_node_breakdown(
lm_summary: LoadMetricsSummary,
node_type_mapping: Optional[Dict[str, float]],
node_activities: Optional[Dict[str, List[str]]],
verbose: bool,
) -> str:
sio = StringIO()
Expand All @@ -750,17 +751,28 @@ def get_per_node_breakdown(
node_type_mapping = {}

print(file=sio)
for node_ip, usage in lm_summary.usage_by_node.items():
for node_id, usage in lm_summary.usage_by_node.items():
print(file=sio) # Print a newline.
node_string = f"Node: {node_ip}"
if node_ip in node_type_mapping:
node_type = node_type_mapping[node_ip]
node_string = f"Node: {node_id}"
if node_id in node_type_mapping:
node_type = node_type_mapping[node_id]
node_string += f" ({node_type})"

print(node_string, file=sio)
print(" Usage:", file=sio)
for line in parse_usage(usage, verbose):
print(f" {line}", file=sio)
# Don't print anything if not provided.
if not node_activities:
continue
print(" Activity:", file=sio)
if node_id not in node_activities:
print(" (no activity)", file=sio)
else:
# Note: We have node IP here.
_, reasons = node_activities[node_id]
for reason in reasons:
print(f" {reason}", file=sio)

return sio.getvalue()

Expand Down Expand Up @@ -791,10 +803,22 @@ def format_info_string(
header += "Autoscaler iteration time: " f"{autoscaler_update_time:3f}s\n"

available_node_report_lines = []
for node_type, count in autoscaler_summary.active_nodes.items():
line = f" {count} {node_type}"
available_node_report_lines.append(line)
available_node_report = "\n".join(available_node_report_lines)
if not autoscaler_summary.active_nodes:
available_node_report = " (no active nodes)"
else:
for node_type, count in autoscaler_summary.active_nodes.items():
line = f" {count} {node_type}"
available_node_report_lines.append(line)
available_node_report = "\n".join(available_node_report_lines)

if not autoscaler_summary.idle_nodes:
idle_node_report = " (no idle nodes)"
else:
idle_node_report_lines = []
for node_type, count in autoscaler_summary.idle_nodes.items():
line = f" {count} {node_type}"
idle_node_report_lines.append(line)
idle_node_report = "\n".join(idle_node_report_lines)

pending_lines = []
for node_type, count in autoscaler_summary.pending_launches.items():
Expand Down Expand Up @@ -846,12 +870,18 @@ def format_info_string(

usage_report = get_usage_report(lm_summary, verbose)
demand_report = get_demand_report(lm_summary)

formatted_output = f"""{header}
Node status
{separator}
Healthy:
{available_node_report}
Active:
{available_node_report}"""

if not autoscaler_summary.legacy:
formatted_output += f"""
Idle:
{idle_node_report}"""

formatted_output += f"""
Pending:
{pending_report}
{failure_report}
Expand All @@ -863,10 +893,16 @@ def format_info_string(
{"Total " if verbose else ""}Demands:
{demand_report}"""

if verbose and lm_summary.usage_by_node:
formatted_output += get_per_node_breakdown(
lm_summary, autoscaler_summary.node_type_mapping, verbose
)
if verbose:
if lm_summary.usage_by_node:
formatted_output += get_per_node_breakdown(
lm_summary,
autoscaler_summary.node_type_mapping,
autoscaler_summary.node_activities,
verbose,
)
else:
formatted_output += "\n"

return formatted_output.strip()

Expand Down
8 changes: 6 additions & 2 deletions python/ray/autoscaler/v2/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class NodeInfo:
failure_detail: Optional[str] = None
# Descriptive details.
details: Optional[str] = None
# Activity on the node.
node_activity: Optional[List[str]] = None

def total_resources(self) -> Dict[str, float]:
if self.resource_usage is None:
Expand Down Expand Up @@ -177,8 +179,10 @@ class Stats:

@dataclass
class ClusterStatus:
# Healthy nodes information (alive)
healthy_nodes: List[NodeInfo] = field(default_factory=list)
# Healthy nodes information (non-idle)
active_nodes: List[NodeInfo] = field(default_factory=list)
# Idle node information
idle_nodes: List[NodeInfo] = field(default_factory=list)
# Pending launches.
pending_launches: List[LaunchRequest] = field(default_factory=list)
# Failed launches.
Expand Down
Loading

0 comments on commit 1ca9dc4

Please sign in to comment.