-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
metric_utils.py
136 lines (105 loc) · 3.89 KB
/
metric_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import threading
import time
def get_ray_spilled_and_restored_mb():
import ray._private.internal_api as internal_api
import re
summary_str = internal_api.memory_summary(stats_only=True)
match = re.search("Spilled (\d+) MiB", summary_str)
spilled_mb = int(match.group(1)) if match else 0
match = re.search("Restored (\d+) MiB", summary_str)
restored_mb = int(match.group(1)) if match else 0
return spilled_mb, restored_mb
class MaxMemoryUtilizationTracker:
"""
Class that enables tracking of the maximum memory utilization on a
system.
This creates a thread which samples the available memory every sample_interval_s
seconds. The "available" memory is reported directly from psutil.
See https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory for more
information.
"""
def __init__(self, sample_interval_s: float):
self._results = {}
self._stop_event = threading.Event()
self._print_updates = False
self._thread = threading.Thread(
target=self._track_memory_utilization,
args=(
sample_interval_s,
self._print_updates,
self._results,
self._stop_event,
),
)
@staticmethod
def _track_memory_utilization(
sample_interval_s: float,
print_updates: bool,
output_dict: dict,
stop_event: threading.Event,
):
import psutil
min_available = float("inf")
while not stop_event.is_set():
memory_stats = psutil.virtual_memory()
if memory_stats.available < min_available:
if print_updates:
print(
"{before:.02f} -> {after:.02f}".format(
before=min_available / (1 << 30),
after=memory_stats.available / (1 << 30),
)
)
min_available = memory_stats.available
time.sleep(sample_interval_s)
output_dict["min_available_bytes"] = min_available
def start(self) -> None:
assert (
not self._stop_event.is_set()
), "Can't start a thread that has been stopped."
self._thread.start()
def stop(self) -> int:
assert (
not self._stop_event.is_set()
), "Can't stop a thread that has been stopped."
self._stop_event.set()
self._thread.join()
return self._results["min_available_bytes"]
def determine_if_memory_monitor_is_enabled_in_latest_session():
"""
Grep session_latest raylet logs to see if the memory monitor is enabled.
This is really only helpful when you're interested in session_latest, use with care.
"""
import subprocess
completed_proc = subprocess.run(
[
"grep",
"-q",
"MemoryMonitor initialized",
"/tmp/ray/session_latest/logs/raylet.out",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
assert completed_proc.returncode in [
0,
1,
], f"Unexpected returncode {completed_proc.returncode}"
assert not completed_proc.stdout, f"Unexpected stdout {completed_proc.stdout}"
assert not completed_proc.stderr, f"Unexpected stderr {completed_proc.stderr}"
return completed_proc.returncode == 0
def test_max_mem_util_tracker():
max_mem_tracker = MaxMemoryUtilizationTracker(sample_interval_s=1)
max_mem_tracker.start()
import numpy as np
time.sleep(4)
print("create numpy")
large_tensor = np.random.randint(10, size=1 << 30, dtype=np.uint8)
large_tensor += 1
print("done create numpy")
time.sleep(2)
results = max_mem_tracker.stop()
min_available_gb = results["min_available_bytes"] / (1 << 30)
print(f"{min_available_gb:.02f}")
if __name__ == "__main__":
test_max_mem_util_tracker()