This repository has been archived by the owner on Apr 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 64
/
test_subprocess.py
166 lines (123 loc) · 6.76 KB
/
test_subprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import logging
import signal
import subprocess
import time
import unittest
import collections
import pytest
import cook.subprocess as cs
import tests.utils as tu
def find_process_ids_in_group(group_id):
group_id_to_process_ids = collections.defaultdict(set)
process_id_to_command = collections.defaultdict(lambda: '')
p = subprocess.Popen('ps -eo pid,pgid,command',
close_fds=True, shell=True,
stderr=subprocess.STDOUT, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
ps_output = p.stdout.read().decode('utf8')
for line in ps_output.splitlines():
line_split = line.split()
pid = line_split[0]
pgid = line_split[1]
command = str.join(' ', line_split[2:])
group_id_to_process_ids[pgid].add(pid)
process_id_to_command[pid] = command
group_id_str = str(group_id)
logging.info("group_id_to_process_ids[{}]: {}".format(group_id, group_id_to_process_ids[group_id_str]))
for pid in group_id_to_process_ids[group_id_str]:
logging.info("process (pid: {}) command is {}".format(pid, process_id_to_command[pid]))
return group_id_to_process_ids[group_id_str]
class SubprocessTest(unittest.TestCase):
# FIXME - remove the xfail mark once the issue with this test failing is resolved:
# https://github.com/twosigma/Cook/issues/737
@pytest.mark.xfail
@unittest.skip('This test fails occasionally')
def test_kill_task_terminate_with_sigterm(self):
task_id = tu.get_random_task_id()
stdout_name = tu.ensure_directory('build/stdout.{}'.format(task_id))
stderr_name = tu.ensure_directory('build/stderr.{}'.format(task_id))
tu.redirect_stdout_to_file(stdout_name)
tu.redirect_stderr_to_file(stderr_name)
try:
command = "bash -c 'function handle_term { echo GOT TERM; }; trap handle_term SIGTERM TERM; sleep 200'"
process = cs.launch_process(command, {})
shutdown_grace_period_ms = 1000
group_id = cs.find_process_group(process.pid)
self.assertGreater(len(find_process_ids_in_group(group_id)), 0)
cs.kill_process(process, shutdown_grace_period_ms)
# await process termination
for i in range(1, 10 * shutdown_grace_period_ms):
if process.poll() is None:
time.sleep(0.01)
if process.poll() is None:
process.kill()
self.assertTrue(((-1 * signal.SIGTERM) == process.poll()) or ((128 + signal.SIGTERM) == process.poll()),
'Process exited with code {}'.format(process.poll()))
self.assertEqual(0, len(find_process_ids_in_group(group_id)))
with open(stdout_name) as f:
file_contents = f.read()
self.assertTrue('GOT TERM' in file_contents)
finally:
tu.cleanup_output(stdout_name, stderr_name)
def test_kill_task_terminate_with_sigkill(self):
task_id = tu.get_random_task_id()
stdout_name = tu.ensure_directory('build/stdout.{}'.format(task_id))
stderr_name = tu.ensure_directory('build/stderr.{}'.format(task_id))
tu.redirect_stdout_to_file(stdout_name)
tu.redirect_stderr_to_file(stderr_name)
try:
command = "trap '' TERM SIGTERM; sleep 200"
process = cs.launch_process(command, {})
shutdown_grace_period_ms = 1000
group_id = cs.find_process_group(process.pid)
self.assertGreater(len(find_process_ids_in_group(group_id)), 0)
cs.kill_process(process, shutdown_grace_period_ms)
# await process termination
for i in range(1, 10 * shutdown_grace_period_ms):
if process.poll() is None:
time.sleep(0.01)
if process.poll() is None:
process.kill()
self.assertTrue(((-1 * signal.SIGKILL) == process.poll()) or ((128 + signal.SIGKILL) == process.poll()),
'Process exited with code {}'.format(process.poll()))
self.assertEqual(len(find_process_ids_in_group(group_id)), 0)
finally:
tu.cleanup_output(stdout_name, stderr_name)
def process_launch_and_kill_helper(self, kill_fn):
task_id = tu.get_random_task_id()
stdout_name = tu.ensure_directory('build/stdout.{}'.format(task_id))
stderr_name = tu.ensure_directory('build/stderr.{}'.format(task_id))
tu.redirect_stdout_to_file(stdout_name)
tu.redirect_stderr_to_file(stderr_name)
try:
start_time = time.time()
command = 'echo "A.$(sleep 30)" & echo "B.$(sleep 30)" & echo "C.$(sleep 30)" &'
environment = {}
process = cs.launch_process(command, environment)
group_id = cs.find_process_group(process.pid)
self.assertGreater(group_id, 0)
child_process_ids = tu.wait_for(lambda: find_process_ids_in_group(group_id),
lambda data: len(data) >= 7,
default_value=[])
self.assertGreaterEqual(len(child_process_ids), 7)
self.assertLessEqual(len(child_process_ids), 10)
kill_fn(process.pid)
child_process_ids = tu.wait_for(lambda: find_process_ids_in_group(group_id),
lambda data: len(data) == 0,
default_value=[])
self.assertEqual(0, len(child_process_ids))
# ensure the test ran in under 30 seconds
self.assertLess(time.time() - start_time, 20)
finally:
tu.cleanup_output(stdout_name, stderr_name)
def test_process_group_assignment_and_killing_send_process_tree_kill(self):
self.process_launch_and_kill_helper(lambda pid: cs._send_signal_to_process_tree(pid, signal.SIGKILL))
def test_process_group_assignment_and_killing_send_process_tree_term(self):
self.process_launch_and_kill_helper(lambda pid: cs._send_signal_to_process_tree(pid, signal.SIGTERM))
def test_process_group_assignment_and_killing_send_process_group_kill(self):
self.process_launch_and_kill_helper(lambda pid: cs._send_signal_to_process_group(pid, signal.SIGKILL))
def test_process_group_assignment_and_killing_send_process_group_term(self):
self.process_launch_and_kill_helper(lambda pid: cs._send_signal_to_process_group(pid, signal.SIGTERM))
def test_process_group_assignment_and_killing_send_signal_kill(self):
self.process_launch_and_kill_helper(lambda pid: cs.send_signal(pid, signal.SIGKILL))
def test_process_group_assignment_and_killing_send_signal_term(self):
self.process_launch_and_kill_helper(lambda pid: cs.send_signal(pid, signal.SIGTERM))