-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
test_cli_integration.py
255 lines (202 loc) · 8.29 KB
/
test_cli_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
from contextlib import contextmanager
import ray
import json
import os
import logging
import sys
import subprocess
from typing import Optional, Tuple
import pytest
logger = logging.getLogger(__name__)
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
# Delete the cluster address just in case.
ray._private.utils.reset_ray_address()
@contextmanager
def set_env_var(key: str, val: Optional[str] = None):
old_val = os.environ.get(key, None)
if val is not None:
os.environ[key] = val
elif key in os.environ:
del os.environ[key]
try:
yield
finally:
if key in os.environ:
del os.environ[key]
if old_val is not None:
os.environ[key] = old_val
@pytest.fixture
def ray_start_stop():
subprocess.check_output(["ray", "start", "--head"])
try:
with set_env_var("RAY_ADDRESS", "http://127.0.0.1:8265"):
yield
finally:
subprocess.check_output(["ray", "stop", "--force"])
@contextmanager
def ray_cluster_manager():
"""
Used not as fixture in case we want to set RAY_ADDRESS first.
"""
subprocess.check_output(["ray", "start", "--head"])
try:
yield
finally:
subprocess.check_output(["ray", "stop", "--force"])
def _run_cmd(cmd: str, should_fail=False) -> Tuple[str, str]:
"""Convenience wrapper for subprocess.run.
We always run with shell=True to simulate the CLI.
Asserts that the process succeeds/fails depending on should_fail.
Returns (stdout, stderr).
"""
print(f"Running command: '{cmd}'")
p: subprocess.CompletedProcess = subprocess.run(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if p.returncode == 0:
print("Command succeeded.")
if should_fail:
raise RuntimeError(
f"Expected command to fail, but got exit code: {p.returncode}."
)
else:
print(f"Command failed with exit code: {p.returncode}.")
if not should_fail:
raise RuntimeError(
f"Expected command to succeed, but got exit code: {p.returncode}."
)
return p.stdout.decode("utf-8"), p.stderr.decode("utf-8")
class TestJobSubmitHook:
"""Tests the RAY_JOB_SUBMIT_HOOK env var."""
def test_hook(self, ray_start_stop):
with set_env_var("RAY_JOB_SUBMIT_HOOK", "ray._private.test_utils.job_hook"):
stdout, _ = _run_cmd("ray job submit -- echo hello")
assert "hook intercepted: echo hello" in stdout
class TestRayAddress:
"""
Integration version of job CLI test that ensures interaction with the
following components are working as expected:
1) Ray client: use of RAY_ADDRESS and ray.init() in job_head.py
2) Ray dashboard: `ray start --head`
"""
def test_empty_ray_address(self, ray_start_stop):
with set_env_var("RAY_ADDRESS", None):
stdout, _ = _run_cmd("ray job submit -- echo hello")
assert "hello" in stdout
assert "succeeded" in stdout
@pytest.mark.parametrize(
"ray_client_address", ["127.0.0.1:8265", "ray://127.0.0.1:8265"]
)
def test_ray_client_address(self, ray_start_stop, ray_client_address: str):
with set_env_var("RAY_ADDRESS", ray_client_address):
_run_cmd("ray job submit -- echo hello", should_fail=True)
def test_valid_http_ray_address(self, ray_start_stop):
stdout, _ = _run_cmd("ray job submit -- echo hello")
assert "hello" in stdout
assert "succeeded" in stdout
class TestJobSubmit:
def test_basic_submit(self, ray_start_stop):
"""Should tail logs and wait for process to exit."""
cmd = "sleep 1 && echo hello && sleep 1 && echo hello"
stdout, _ = _run_cmd(f"ray job submit -- bash -c '{cmd}'")
assert "hello\nhello" in stdout
assert "succeeded" in stdout
def test_submit_no_wait(self, ray_start_stop):
"""Should exit immediately w/o printing logs."""
cmd = "echo hello && sleep 1000"
stdout, _ = _run_cmd(f"ray job submit --no-wait -- bash -c '{cmd}'")
assert "hello" not in stdout
assert "Tailing logs until the job exits" not in stdout
def test_submit_with_logs_instant_job(self, ray_start_stop):
"""Should exit immediately and print logs even if job returns instantly."""
cmd = "echo hello"
stdout, _ = _run_cmd(f"ray job submit -- bash -c '{cmd}'")
assert "hello" in stdout
class TestRuntimeEnv:
def test_bad_runtime_env(self, ray_start_stop):
"""Should fail with helpful error if runtime env setup fails."""
stdout, _ = _run_cmd(
'ray job submit --runtime-env-json=\'{"pip": '
'["does-not-exist"]}\' -- echo hi',
)
assert "Tailing logs until the job exits" in stdout
assert "runtime_env setup failed" in stdout
assert "No matching distribution found for does-not-exist" in stdout
class TestJobStop:
def test_basic_stop(self, ray_start_stop):
"""Should wait until the job is stopped."""
cmd = "sleep 1000"
job_id = "test_basic_stop"
_run_cmd(f"ray job submit --no-wait --job-id={job_id} -- {cmd}")
stdout, _ = _run_cmd(f"ray job stop {job_id}")
assert "Waiting for job" in stdout
assert f"Job '{job_id}' was stopped" in stdout
def test_stop_no_wait(self, ray_start_stop):
"""Should not wait until the job is stopped."""
cmd = "echo hello && sleep 1000"
job_id = "test_stop_no_wait"
_run_cmd(f"ray job submit --no-wait --job-id={job_id} -- bash -c '{cmd}'")
stdout, _ = _run_cmd(f"ray job stop --no-wait {job_id}")
assert "Waiting for job" not in stdout
assert f"Job '{job_id}' was stopped" not in stdout
class TestJobList:
def test_empty(self, ray_start_stop):
stdout, _ = _run_cmd("ray job list")
assert "[]" in stdout
def test_list(self, ray_start_stop):
_run_cmd("ray job submit --job-id='hello_id' -- echo hello")
runtime_env = {"env_vars": {"TEST": "123"}}
_run_cmd(
"ray job submit --job-id='hi_id' "
f"--runtime-env-json='{json.dumps(runtime_env)}' -- echo hi"
)
stdout, _ = _run_cmd("ray job list")
assert "123" in stdout
assert "hello_id" in stdout
assert "hi_id" in stdout
class TestJobDelete:
def test_basic_delete(self, ray_start_stop):
cmd = "sleep 1000"
job_id = "test_basic_delete"
_run_cmd(f"ray job submit --no-wait --submission-id={job_id} -- {cmd}")
# Job shouldn't be able to be deleted because it is not in a terminal state.
stdout, stderr = _run_cmd(f"ray job delete {job_id}", should_fail=True)
assert "it is in a non-terminal state" in stderr
# Submit a job that finishes quickly.
cmd = "echo hello"
job_id = "test_basic_delete_quick"
_run_cmd(f"ray job submit --submission-id={job_id} -- bash -c '{cmd}'")
# Job should be able to be deleted because it is finished.
stdout, _ = _run_cmd(f"ray job delete {job_id}")
assert f"Job '{job_id}' deleted successfully" in stdout
def test_quote_escaping(ray_start_stop):
cmd = "echo \"hello 'world'\""
job_id = "test_quote_escaping"
stdout, _ = _run_cmd(
f"ray job submit --job-id={job_id} -- {cmd}",
)
assert "hello 'world'" in stdout
def test_resources(shutdown_only):
ray.init(num_cpus=1, num_gpus=1, resources={"Custom": 1})
# Check the case of too many resources.
for id, arg in [
("entrypoint_num_cpus", "--entrypoint-num-cpus=2"),
("entrypoint_num_gpus", "--entrypoint-num-gpus=2"),
("entrypoint_resources", "--entrypoint-resources='{\"Custom\": 2}'"),
]:
_run_cmd(f"ray job submit --submission-id={id} --no-wait {arg} -- echo hi")
stdout, _ = _run_cmd(f"ray job status {id}")
assert "waiting for resources" in stdout
# Check the case of sufficient resources.
stdout, _ = _run_cmd(
"ray job submit --entrypoint-num-cpus=1 "
"--entrypoint-num-gpus=1 --entrypoint-resources='{"
'"Custom": 1}\' -- echo hello',
)
assert "hello" in stdout
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))