Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
d4bf191
Minor improvements in Ray Core Walkthrough as seen in https://github.…
micahtyong Nov 28, 2020
fd63f77
Resolve merge conflict
micahtyong Jan 14, 2021
f105336
Define node_stats() to return NodeStats object from cluster
micahtyong Jan 14, 2021
2e93b60
Add --group-by and --sort-by capabilities to ray memory script
micahtyong Jan 14, 2021
5bdaf2b
Resolve merge conflict
micahtyong Jan 14, 2021
1ee1e01
Add helper functions for group by and sorting type in memory_utils.py
micahtyong Jan 14, 2021
ca2a89e
Reformat
micahtyong Jan 14, 2021
60ed53b
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Jan 14, 2021
7ceb346
Format
micahtyong Jan 14, 2021
667a404
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Jan 20, 2021
09930f5
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Jan 20, 2021
52dba88
Compartmentalize memory script into get_memory_summary and get_store_…
micahtyong Jan 21, 2021
1915dc1
Modify unit tests in test_mem_stat
micahtyong Jan 21, 2021
5720107
Lint and format
micahtyong Jan 21, 2021
0c09040
Test cases for group_by sort_by
micahtyong Jan 21, 2021
5d0a535
Lint and format
micahtyong Jan 21, 2021
0650221
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Jan 24, 2021
d74556f
Fix actor handle failing test case
micahtyong Jan 24, 2021
71e67a8
Update test_memstat.py
micahtyong Jan 24, 2021
6b38f8f
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Jan 26, 2021
29949b8
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Jan 26, 2021
a6d47ff
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Feb 7, 2021
cae4544
Resolve merge conflicts
micahtyong Feb 7, 2021
5fd7738
Merge remote-tracking branch 'upstream/master' into micah/ray-memory-…
micahtyong Feb 8, 2021
51e410b
Adjust ray memory output based on terminal size
micahtyong Feb 9, 2021
ce9e9e9
Formatting and linting
micahtyong Feb 9, 2021
7f0f2bc
Use constant for callsite length
micahtyong Feb 9, 2021
3996689
Switch from OS to shutil for querying terminal size (official python …
Feb 12, 2021
f17fb5e
Linting and formatting
micahtyong Feb 12, 2021
8bbe1ef
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 13, 2021
d819e7b
Lint and format
micahtyong Feb 16, 2021
0d6924b
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 16, 2021
73cd590
Resolve lint issue in walkthrough.rst
micahtyong Feb 16, 2021
236f79c
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 17, 2021
92ac4c7
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 19, 2021
bffeffd
Revert to python 3.6
micahtyong Feb 19, 2021
faa2889
Delete visitor.py
micahtyong Feb 19, 2021
05d70e9
Delete .eggs
micahtyong Feb 19, 2021
b2cb66e
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 21, 2021
6f2aeef
Merge branch 'micah/ray-memory-dashboard-parity-2' of https://github.…
micahtyong Feb 21, 2021
f41e84d
Resolve test_object_spilling.py test case
micahtyong Feb 21, 2021
c3b7eee
Add stats only argument
micahtyong Feb 21, 2021
7e26adb
revert changes on this file
micahtyong Feb 23, 2021
d47cfc5
Remove package-lock.json
micahtyong Feb 23, 2021
0aaf82d
Add back npm installation
micahtyong Feb 23, 2021
ba0bff2
Sync package-lock.json
micahtyong Feb 23, 2021
6cc248d
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 23, 2021
dbe4685
Linting and formatting
micahtyong Feb 23, 2021
8ed212b
Sync with package-lock
micahtyong Feb 23, 2021
ab2bf6b
Sync with package-lock pt 2
micahtyong Feb 23, 2021
851bc7c
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 23, 2021
9e5cbfe
Update documentation in https://docs.ray.io/en/master/memory-manageme…
micahtyong Feb 24, 2021
84a4cea
Merge remote-tracking branch 'upstream/master' into ray-memory-dashbo…
micahtyong Feb 25, 2021
d1c1b17
Add include_memory_info as argument for node_stats
micahtyong Feb 25, 2021
4342e55
Switch object ref and call site positions
micahtyong Feb 25, 2021
d403056
Linting and formatting
micahtyong Feb 25, 2021
de7afb5
Change from MiB to B
micahtyong Feb 28, 2021
496f4c0
Change from stats-only to store-true
micahtyong Feb 28, 2021
2279e5b
Add memory test case
micahtyong Feb 28, 2021
f1bced4
Add memory test case
micahtyong Feb 28, 2021
9d3fe9f
Lint and format
micahtyong Feb 28, 2021
a5d54c6
Correct test in memstat
micahtyong Feb 28, 2021
154bd8d
Change line wrap and stats only to flags
micahtyong Feb 28, 2021
5c00f90
Clarify --stats-only and --no-format in ray memory
micahtyong Feb 28, 2021
91711a2
--stats-only description modified
micahtyong Feb 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 146 additions & 1 deletion dashboard/memory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import ray

from ray._raylet import (TaskID, ActorID, JobID)
from ray.state import GlobalState
from ray.internal.internal_api import node_stats, store_stats_summary
from ray.ray_constants import REDIS_DEFAULT_PASSWORD
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,6 +61,32 @@ class ReferenceType:
UNKNOWN_STATUS = "UNKNOWN_STATUS"


def get_sorting_type(sort_by: str):
"""Translate string input into SortingType instance"""
sort_by = sort_by.upper()
if sort_by == "PID":
return SortingType.PID
elif sort_by == "OBJECT_SIZE":
return SortingType.OBJECT_SIZE
elif sort_by == "REFERENCE_TYPE":
return SortingType.REFERENCE_TYPE
else:
raise Exception("The sort-by input provided is not one of\
PID, OBJECT_SIZE, or REFERENCE_TYPE.")


def get_group_by_type(group_by: str):
"""Translate string input into GroupByType instance"""
group_by = group_by.upper()
if group_by == "NODE_ADDRESS":
return GroupByType.NODE_ADDRESS
elif group_by == "STACK_TRACE":
return GroupByType.STACK_TRACE
else:
raise Exception("The group-by input provided is not one of\
NODE_ADDRESS or STACK_TRACE.")


class MemoryTableEntry:
def __init__(self, *, object_ref: dict, node_address: str, is_driver: bool,
pid: int):
Expand Down Expand Up @@ -129,7 +158,7 @@ def _is_object_ref_actor_handle(self) -> bool:
actor_random_bits = object_ref_hex[TASKID_RANDOM_BITS_SIZE:
TASKID_RANDOM_BITS_SIZE +
ACTORID_RANDOM_BITS_SIZE]
if (random_bits == "f" * 16 and not actor_random_bits == "f" * 8):
if (random_bits == "f" * 16 and not actor_random_bits == "f" * 24):
return True
else:
return False
Expand Down Expand Up @@ -299,3 +328,119 @@ def construct_memory_table(workers_stats: List,
memory_table = MemoryTable(
memory_table_entries, group_by_type=group_by, sort_by_type=sort_by)
return memory_table


def get_memory_summary(redis_address, redis_password, group_by, sort_by,
line_wrap, stats_only) -> str:
from ray.new_dashboard.modules.stats_collector.stats_collector_head\
import node_stats_to_dict

# Get terminal size
import shutil
size = shutil.get_terminal_size((80, 20)).columns
line_wrap_threshold = 137

# Fetch core memory worker stats, store as a dictionary
state = GlobalState()
state._initialize_global_state(redis_address, redis_password)
core_worker_stats = []
for raylet in state.node_table():
stats = node_stats_to_dict(
node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"],
(not stats_only)))
core_worker_stats.extend(stats["coreWorkersStats"])
assert type(stats) is dict and "coreWorkersStats" in stats

# Build memory table with "group_by" and "sort_by" parameters
group_by, sort_by = get_group_by_type(group_by), get_sorting_type(sort_by)
memory_table = construct_memory_table(core_worker_stats, group_by,
sort_by).as_dict()
assert "summary" in memory_table and "group" in memory_table

# Build memory summary
mem = ""
group_by, sort_by = group_by.name.lower().replace(
"_", " "), sort_by.name.lower().replace("_", " ")
summary_labels = [
"Mem Used by Objects", "Local References", "Pinned Count",
"Pending Tasks", "Captured in Objects", "Actor Handles"
]
summary_string = "{:<19} {:<16} {:<12} {:<13} {:<19} {:<13}\n"

object_ref_labels = [
"IP Address", "PID", "Type", "Call Site", "Size", "Reference Type",
"Object Ref"
]
object_ref_string = "{:<8} {:<3} {:<4} {:<9} {:<4} {:<14} {:<10}\n"
if size > line_wrap_threshold and line_wrap:
object_ref_string = "{:<12} {:<5} {:<6} {:<22} {:<6} {:<18} \
{:<56}\n"

mem += f"Grouping by {group_by}...\
Sorting by {sort_by}...\n\n\n\n"

for key, group in memory_table["group"].items():
# Group summary
summary = group["summary"]
summary["total_object_size"] = str(summary["total_object_size"]) + " B"
mem += f"--- Summary for {group_by}: {key} ---\n"
mem += summary_string\
.format(*summary_labels)
mem += summary_string\
.format(*summary.values()) + "\n"

# Memory table per group
mem += f"--- Object references for {group_by}: {key} ---\n"
mem += object_ref_string\
.format(*object_ref_labels)
for entry in group["entries"]:
entry["object_size"] = str(
entry["object_size"]
) + " B" if entry["object_size"] > -1 else "?"
num_lines = 1
if size > line_wrap_threshold and line_wrap:
call_site_length = 22
entry["call_site"] = [
entry["call_site"][i:i + call_site_length] for i in range(
0, len(entry["call_site"]), call_site_length)
]
num_lines = len(entry["call_site"])
object_ref_values = [
entry["node_ip_address"], entry["pid"], entry["type"],
entry["call_site"], entry["object_size"],
entry["reference_type"], entry["object_ref"]
]
for i in range(len(object_ref_values)):
if not isinstance(object_ref_values[i], list):
object_ref_values[i] = [object_ref_values[i]]
object_ref_values[i].extend(
["" for x in range(num_lines - len(object_ref_values[i]))])
for i in range(num_lines):
row = [elem[i] for elem in object_ref_values]
mem += object_ref_string\
.format(*row)
mem += "\n"
mem += "\n\n\n"
return mem


def get_store_stats_summary(redis_address, redis_password) -> str:
state = GlobalState()
state._initialize_global_state(redis_address, redis_password)
raylet = state.node_table()[0]
stats = node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"])
store_summary = store_stats_summary(stats)
return store_summary


def memory_summary(redis_address,
redis_password=REDIS_DEFAULT_PASSWORD,
group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
line_wrap=True,
stats_only=False):
if stats_only:
return get_store_stats_summary(redis_address, redis_password)
return get_memory_summary(redis_address, redis_password, group_by, sort_by,
line_wrap, stats_only) + get_store_stats_summary(
redis_address, redis_password)
Loading