forked from google/grr
/
run_tests.py
279 lines (209 loc) · 8.01 KB
/
run_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
#!/usr/bin/env python
"""A test runner based on multiprocessing.
This program will run all the tests in separate processes to speed things up.
"""
import curses
import os
import StringIO
import subprocess
import sys
import time
import unittest
import psutil
# These need to register plugins so,
# pylint: disable=unused-import,g-bad-import-order
from grr.lib import server_plugins
from grr.checks import tests
from grr.client import tests
from grr.gui import tests
from grr.lib import flags
from grr.lib import test_lib
from grr.lib import tests
from grr.lib import utils
from grr.parsers import tests
from grr.server.data_server import tests
from grr.tools.export_plugins import tests
from grr.worker import worker_test
# pylint: enable=unused-import,g-bad-import-order
flags.DEFINE_string("output", None,
"The name of the file we write on (default stderr).")
flags.DEFINE_list("exclude_tests", [],
"A comma-separated list of tests to exclude form running.")
flags.DEFINE_integer("processes", 0,
"Total number of simultaneous tests to run.")
class Colorizer(object):
"""A Class which wraps a string with colors."""
COLORS = "BLACK BLUE GREEN CYAN RED MAGENTA YELLOW WHITE"
COLOR_MAP = dict([(x, i) for i, x in enumerate(COLORS.split())])
terminal_capable = False
def __init__(self, stream=None):
if stream is None:
stream = sys.stdout
try:
if stream.isatty():
curses.setupterm()
self.terminal_capable = True
except AttributeError:
pass
def Render(self, color, string, forground=True):
"""Decorate the string with the ansii escapes for the color."""
if not self.terminal_capable or color not in self.COLOR_MAP:
return string
escape_seq = curses.tigetstr("setf")
if not forground:
escape_seq = curses.tigetstr("setb")
if not escape_seq:
return string
return (curses.tparm(escape_seq, self.COLOR_MAP[color]) + string +
curses.tigetstr("sgr0"))
class GRREverythingTestLoader(test_lib.GRRTestLoader):
"""Load all GRR test cases."""
base_class = test_lib.GRRBaseTest
def RunTest(test_suite, stream=None):
"""Run an individual test.
Ignore the argument test_suite passed to this function, then
magically acquire an individual test name as specified by the --tests
flag, run it, and then exit the whole Python program completely.
Args:
test_suite: Ignored.
stream: The stream to print results to.
Returns:
This function does not return; it causes a program exit.
"""
out_fd = stream
if stream:
out_fd = StringIO.StringIO()
try:
# Here we use a GRREverythingTestLoader to load tests from.
# However, the fact that GRREverythingTestLoader loads all
# tests is irrelevant, because GrrTestProgram simply reads
# from the --tests flag passed to the program, so not all
# tests are ran. Only the test specified via --tests will
# be ran. Because --tests supports only one test at a time
# this will cause only an individual test to be ran.
# GrrTestProgram then terminates the execution of the whole
# python program using sys.exit() so this function does not
# return.
test_lib.GrrTestProgram(argv=[sys.argv[0], test_suite],
testLoader=GRREverythingTestLoader(
labels=flags.FLAGS.labels),
testRunner=unittest.TextTestRunner(
stream=out_fd))
finally:
# Clean up before the program exits.
if stream:
stream.write("Test name: %s\n" % test_suite)
stream.write(out_fd.getvalue())
stream.flush()
def WaitForAvailableProcesses(processes, max_processes=5, completion_cb=None):
while True:
pending_processes = 0
# Check up on all the processes in our queue:
for name, metadata in processes.items():
# Skip the processes which already exited.
if metadata.get("exit_code") is not None:
continue
exit_code = metadata["pipe"].poll()
if exit_code is None:
pending_processes += 1
else:
metadata["exit_code"] = exit_code
# Child has exited, report it.
if completion_cb:
completion_cb(name, metadata)
# Do we need to wait for processes to become available?
if pending_processes <= max_processes:
break
time.sleep(0.1)
def ReportTestResult(name, metadata):
"""Print statistics about the outcome of a test run."""
now = time.time()
colorizer = Colorizer()
if metadata["exit_code"] == 0:
# Test completed successfully:
result = colorizer.Render("GREEN", "PASSED")
else:
result = colorizer.Render("RED", "FAILED")
result += open(metadata["output_path"], "rb").read()
print "\t{0: <40} {1} in {2: >6.2f}s".format(
name, result, now - metadata["start"])
def DoesTestHaveLabels(cls, labels):
"""Returns true if any tests in cls have any of the labels."""
labels = set(labels)
for name in dir(cls):
if name.startswith("test"):
item = getattr(cls, name, None)
if labels.intersection(getattr(item, "labels", set(["small"]))):
return True
return False
def main(argv=None):
if flags.FLAGS.tests:
stream = sys.stderr
if flags.FLAGS.output:
stream = open(flags.FLAGS.output, "ab")
os.close(sys.stderr.fileno())
os.close(sys.stdout.fileno())
sys.stderr = stream
sys.stdout = stream
sys.argv = [""]
if flags.FLAGS.verbose:
sys.argv.append("--verbose")
if flags.FLAGS.debug:
sys.argv.append("--debug")
suites = flags.FLAGS.tests or test_lib.GRRBaseTest.classes
if len(suites) != 1:
raise ValueError("Only a single test is supported in single "
"processing mode, but %i were specified" %
len(suites))
test_suite = suites[0]
print "Running test %s in single process mode" % test_suite
sys.stdout.flush()
RunTest(test_suite, stream=stream)
else:
processes = {}
print "Running tests with labels %s" % ",".join(flags.FLAGS.labels)
with utils.TempDirectory() as temp_dir:
start = time.time()
labels = set(flags.FLAGS.labels)
for name, cls in test_lib.GRRBaseTest.classes.items():
if name.startswith("_"):
continue
if labels and not DoesTestHaveLabels(cls, labels):
continue
if name in flags.FLAGS.exclude_tests:
print "Skipping test %s" % name
continue
result_filename = os.path.join(temp_dir, name)
argv = [sys.executable] + sys.argv[:]
if "--output" not in argv:
argv.extend(["--output", result_filename])
if flags.FLAGS.config:
argv.extend(["--config", flags.FLAGS.config])
argv.extend(["--tests", name])
argv.extend(["--labels", ",".join(flags.FLAGS.labels)])
# Maintain metadata about each test.
processes[name] = dict(pipe=subprocess.Popen(argv),
start=time.time(), output_path=result_filename,
test=name)
max_processes = flags.FLAGS.processes
if not max_processes:
max_processes = max(psutil.cpu_count() - 1, 1)
WaitForAvailableProcesses(
processes, max_processes=max_processes,
completion_cb=ReportTestResult)
# Wait for all jobs to finish.
WaitForAvailableProcesses(processes, max_processes=0,
completion_cb=ReportTestResult)
passed_tests = [p for p in processes.values() if p["exit_code"] == 0]
failed_tests = [p for p in processes.values() if p["exit_code"] != 0]
print "\nRan %s tests in %0.2f sec, %s tests passed, %s tests failed." % (
len(processes), time.time() - start, len(passed_tests),
len(failed_tests))
if failed_tests:
colorizer = Colorizer()
print "Failing tests: "
for metadata in failed_tests:
print colorizer.Render("RED", metadata["test"])
sys.exit(-1)
if __name__ == "__main__":
flags.StartMain(main)