-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
mem_check.py
92 lines (77 loc) · 2.88 KB
/
mem_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import argparse
import time
import os
import json
import ray
from ray._private.memory_monitor import MemoryMonitor, get_top_n_memory_usage
from ray._private.test_utils import raw_metrics
from ray.job_submission import JobSubmissionClient, JobStatus
# Initialize ray to avoid autosuspend.
addr = ray.init()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--working-dir",
required=True,
help="working_dir to use for the job within this test.",
)
args = parser.parse_args()
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python workload.py",
runtime_env={"working_dir": args.working_dir},
)
print(job_id)
# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
m = MemoryMonitor()
start = time.time()
# Run for 3 hours
initial_used_gb = m.get_memory_usage()[0]
terminal_states = {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}
while time.time() - start < 3600 * 3:
print(f"{round((time.time() - start) / 60, 2)}m passed...")
m.raise_if_low_memory()
used_gb = m.get_memory_usage()[0]
print("Used GB: ", used_gb)
print(get_top_n_memory_usage())
print("\n\n")
# Terminate the test if the job is failed.
status = client.get_job_status(job_id)
print(f"Job status: {status}")
if status in terminal_states:
break
time.sleep(15)
ending_used_gb = m.get_memory_usage()[0]
mem_growth = ending_used_gb - initial_used_gb
top_n_mem_usage = get_top_n_memory_usage()
print(top_n_mem_usage)
print(f"Memory growth: {mem_growth} GB")
if status == JobStatus.FAILED or status == JobStatus.STOPPED:
print(client.get_job_logs(job_id))
assert False, "Job has failed."
me = raw_metrics(addr)
found = False
for metric, samples in me.items():
if metric == "ray_component_uss_mb":
for sample in samples:
if sample.labels["Component"] == "agent":
print(f"Metrics found memory usage : {sample.value} MB")
found = True
# Make sure it doesn't use more than 500MB of data.
assert sample.value < 500
assert found, "Agent memory metrics are not found."
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
results = {
"memory_growth_gb": mem_growth,
"success": 1,
}
results["perf_metrics"] = [
{
"perf_metric_name": "memory_growth_gb",
"perf_metric_value": mem_growth,
"perf_metric_type": "LATENCY",
}
]
f.write(json.dumps(results))