-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
310 lines (270 loc) · 10.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
from __future__ import absolute_import
import argparse
import logging
import logging.config
import docker
import multiprocessing.pool
import os
import psutil
import random
import shutil
import sys
import traceback
from kernel_matrix_benchmarks.datasets import get_dataset, DATASETS
from kernel_matrix_benchmarks.definitions import (
get_definitions,
list_algorithms,
algorithm_status,
InstantiationStatus,
)
from kernel_matrix_benchmarks.results import get_result_filename
from kernel_matrix_benchmarks.runner import run, run_docker
def positive_int(s):
"""Converts the input to an integer, raises an exception if it is <= 0."""
i = None
try:
i = int(s)
except ValueError:
pass
if not i or i < 1:
raise argparse.ArgumentTypeError("%r is not a positive integer" % s)
return i
def run_worker(cpu, args, queue):
"""Runs all the jobs in the queue, possibly using Docker.
Args:
cpu (int): the max number of CPUs that should run the job.
args (parsed arguments): the arguments given by the user when typing
`python run.py --arguments...`
queue (multiprocessing Queue): methods asked by the user.
"""
while not queue.empty():
# Definitions are instantiated at the end of this script.
definition = queue.get()
if args.local:
# Case 1: the user does not want to bother with Docker,
# e.g. when writing and testing a pull request on a local machine.
run(definition=definition, dataset=args.dataset, runs=args.runs)
else:
# Case 2: the user is using Docker, e.g. when rendering the website.
memory_margin = 500e6 # reserve some extra memory (~500 Mb) for misc stuff
mem_limit = int((psutil.virtual_memory().available - memory_margin))
# Use all available CPUs:
cpu_limit = "0-%d" % (multiprocessing.cpu_count() - 1)
run_docker(
definition=definition,
dataset=args.dataset,
runs=args.runs,
timeout=args.timeout,
cpu_limit=cpu_limit,
mem_limit=mem_limit,
)
def main():
# This function is called when the user types `python run.py --arguments...`
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--dataset",
metavar="NAME",
help="the dataset to load training points from",
default="product-sphere-D3-E1-M1000-N1000-inverse-distance",
choices=DATASETS.keys(),
)
parser.add_argument(
"--hardware",
metavar="INSTANCE",
help="the type of instance that is currently running the script",
default="CPU",
choices=["CPU", "GPU"],
)
parser.add_argument(
"--definitions",
metavar="FILE",
help="load algorithm definitions from FILE",
default="algos.yaml",
)
parser.add_argument(
"--algorithm", metavar="NAME", help="run only the named algorithm", default=None
)
parser.add_argument(
"--docker-tag",
metavar="NAME",
help="run only algorithms in a particular docker image",
default=None,
)
parser.add_argument(
"--list-algorithms",
help="print the names of all known algorithms and exit",
action="store_true",
)
parser.add_argument(
"--force",
help="re-run algorithms even if their results already exist",
action="store_true",
)
parser.add_argument(
"--runs",
metavar="COUNT",
type=positive_int,
help="run each algorithm instance %(metavar)s times and use only"
" the best result. This is especially useful for methods that rely on"
" just-in-time compiling: the first run includes compiling times"
" whereas the second doesn't.",
default=2,
)
parser.add_argument(
"--timeout",
type=int,
help="Timeout (in seconds) for each individual algorithm run, or -1"
"if no timeout should be set",
default=2 * 600, # Max 10mn per run to keep costs manageable.
)
parser.add_argument(
"--local",
action="store_true",
help="If set, then will run everything locally (inside the same "
"process) rather than using Docker",
)
parser.add_argument(
"--max-n-algorithms",
type=int,
help="Max number of algorithms to run (just used for testing)",
default=-1,
)
parser.add_argument(
"--run-disabled",
help="run algorithms that are disabled in algos.yml",
action="store_true",
)
args = parser.parse_args()
if args.timeout == -1:
args.timeout = None
if args.list_algorithms:
# `python run.py --list-algorithms`
# -> We just display all the possible algorithms and exit.
list_algorithms(args.definitions)
sys.exit(0)
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("kmb")
# Load the dataset:
dataset, dimension = get_dataset(args.dataset)
# Properties of the dataset:
kernel = dataset.attrs["kernel"]
task = dataset.attrs["task"]
normalize_rows = dataset.attrs.get("normalize_rows", False)
# Don't forget to close the HDF5 file:
dataset.close()
# Definition of the input problem.
# These correspond to the experiments listed in algos.yaml
definitions = get_definitions(
definition_file=args.definitions,
dimension=dimension,
dataset=args.dataset,
task=task,
hardware=args.hardware,
kernel=kernel,
normalize_rows=normalize_rows,
run_disabled=args.run_disabled,
)
# Filter out, from the loaded definitions, all those query argument groups
# that correspond to experiments that have already been run. (This might
# mean removing a definition altogether, so we can't just use a list
# comprehension.)
filtered_definitions = []
for definition in definitions:
query_argument_groups = definition.query_argument_groups # = [{}] in most cases
# Filter out, for this specific "definition" (i.e. Python object + parameters)
# what are the arguments "at query time" that have already been tried.
not_yet_run = []
for query_arguments in query_argument_groups:
# The result filename looks like
# "results/dataset/algorithm/M_4_L_0_5.hdf5"
fn = get_result_filename(args.dataset, definition, query_arguments)
if args.force or not os.path.exists(fn):
not_yet_run.append(query_arguments)
if not_yet_run: # ...is not empty, i.e. some experiments remain to be run:
if definition.query_argument_groups:
definition = definition._replace(query_argument_groups=not_yet_run)
filtered_definitions.append(definition)
# "definitions" is a list of "experiment definitions" whose results do not
# already appear on the hard drive.
definitions = filtered_definitions
# N.B.: We shuffle all the experiments. This could help us to avoid
# unnecessary bias against e.g. the last experiments in "algos.yaml",
# since some GPUs may overheat and experience a sharp decline in performance
# after several hours of continuous work.
random.shuffle(definitions)
# If the user has specified a single algorithm,
# we filter out all the other experiments:
if args.algorithm:
logger.info(f"running only {args.algorithm}")
definitions = [d for d in definitions if d.algorithm == args.algorithm]
# Case 1: The user is working with Docker, i.e. "not on the local machine".
if not args.local:
# See which Docker images we have available
docker_client = docker.from_env()
docker_tags = set()
for image in docker_client.images.list():
for tag in image.tags:
tag = tag.split(":")[0]
docker_tags.add(tag)
# If the user has specified a single Docker image,
# we filter out all the other experiments:
if args.docker_tag:
logger.info(f"running only {args.docker_tag}")
definitions = [d for d in definitions if d.docker_tag == args.docker_tag]
# If some docker images are referenced in "algos.yaml" but cannot
# be found in the docker environment, we add a warning in the log file:
if set(d.docker_tag for d in definitions).difference(docker_tags):
logger.info(f"not all docker images available, only: {set(docker_tags)}")
logger.info(
f"missing docker images: "
f"{str(set(d.docker_tag for d in definitions).difference(docker_tags))}"
)
definitions = [d for d in definitions if d.docker_tag in docker_tags]
# Case 2: The user is working without Docker, i.e. "on the local machine".
else:
# Check that all the modules referenced in "algos.yaml" can actually
# be loaded.
def _test(df):
status = algorithm_status(df)
# If the module was loaded but doesn't actually have a constructor
# of the right name, then the definition is broken
if status == InstantiationStatus.NO_CONSTRUCTOR:
raise Exception(
"%s.%s(%s): error: the module '%s' does not"
" expose the named constructor"
% (df.module, df.constructor, df.arguments, df.module)
)
if status == InstantiationStatus.NO_MODULE:
# If the module couldn't be loaded (presumably because
# of a missing dependency), print a warning and remove
# this definition from the list of things to be run
logging.warning(
"%s.%s(%s): the module '%s' could not be "
"loaded; skipping"
% (df.module, df.constructor, df.arguments, df.module)
)
return False
else:
return True
# Keep the modules that can be loaded:
definitions = [d for d in definitions if _test(d)]
# For debugging, the user may wish to only run the first "n" methods:
if args.max_n_algorithms >= 0:
definitions = definitions[: args.max_n_algorithms]
if len(definitions) == 0:
raise Exception("Nothing to run")
else:
logger.info(f"Order: {definitions}")
# Multiprocessing magic to farm this out to all CPUs
queue = multiprocessing.Queue()
for definition in definitions:
queue.put(definition)
workers = [
multiprocessing.Process(target=run_worker, args=(i + 1, args, queue))
for i in range(1)
]
[worker.start() for worker in workers]
[worker.join() for worker in workers]
# TODO: need to figure out cleanup handling here