forked from dask/dask-jobqueue
/
htcondor.py
212 lines (173 loc) · 7.09 KB
/
htcondor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from __future__ import absolute_import, division, print_function
import logging
import re
import shlex
from collections import OrderedDict
import dask
from distributed.utils import parse_bytes
from .core import JobQueueCluster, docstrings
logger = logging.getLogger(__name__)
class HTCondorCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on an HTCondor cluster with a shared file system
Parameters
----------
disk : str
Total amount of disk per job
job_extra : dict
Extra submit file attributes for the job
%(JobQueueCluster.parameters)s
Examples
--------
>>> from dask_jobqueue.htcondor import HTCondorCluster
>>> cluster = HTCondorCluster(cores=24, memory="4GB", disk="4GB")
>>> cluster.scale(10)
>>> from dask.distributed import Client
>>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and kill workers based on load.
HTCondor can take longer to start jobs than other batch systems - tune Adaptive parameters accordingly.
>>> cluster.adapt(minimum=5, startup_cost='60s')
""", 4)
_script_template = """
%(shebang)s
%(job_header)s
Environment = "%(quoted_environment)s"
Arguments = "%(quoted_arguments)s"
Executable = %(executable)s
""".lstrip()
submit_command = "condor_submit -queue 1 -file"
cancel_command = "condor_rm"
job_id_regexp = r'(?P<job_id>\d+\.\d+)'
# condor sets argv[0] of the executable to "condor_exec.exe", which confuses
# Python (can't find its libs), so we have to go through the shell.
executable = "/bin/sh"
def __init__(self, disk=None, job_extra=None,
config_name='htcondor', **kwargs):
if disk is None:
disk = dask.config.get("jobqueue.%s.disk" % config_name)
if disk is None:
raise ValueError("You must specify how much disk to use per job like ``disk='1 GB'``")
self.worker_disk = parse_bytes(disk)
if job_extra is None:
self.job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name, {})
else:
self.job_extra = job_extra
# Instantiate args and parameters from parent abstract class
super(HTCondorCluster, self).__init__(config_name=config_name, **kwargs)
env_extra = kwargs.get("env_extra", None)
if env_extra is None:
env_extra = dask.config.get("jobqueue.%s.env-extra" % config_name, default=[])
# env_extra is an array of export statements -- turn it into a dict
self.env_dict = {}
for env_line in env_extra:
split_env_line = shlex.split(env_line)
if split_env_line[0] == "export":
split_env_line = split_env_line[1:]
for item in split_env_line:
if '=' in item:
k, v = item.split('=', 1)
self.env_dict[k] = v
self.env_dict["JOB_ID"] = "$F(MY.JobId)"
self.job_header_dict = {
"MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"',
"RequestCpus": "MY.DaskWorkerCores",
"RequestMemory": "floor(MY.DaskWorkerMemory / 1048576)",
"RequestDisk": "floor(MY.DaskWorkerDisk / 1024)",
"MY.JobId": '"$(ClusterId).$(ProcId)"',
"MY.DaskWorkerCores": self.worker_cores,
"MY.DaskWorkerMemory": self.worker_memory,
"MY.DaskWorkerDisk": self.worker_disk,
}
if self.log_directory:
self.job_header_dict.update({
"LogDirectory": self.log_directory,
# $F(...) strips quotes
"Output": "$(LogDirectory)/worker-$F(MY.JobId).out",
"Error": "$(LogDirectory)/worker-$F(MY.JobId).err",
"Log": "$(LogDirectory)/worker-$(ClusterId).log",
# We kill all the workers to stop them so we need to stream their
# output+error if we ever want to see anything
"Stream_Output": True,
"Stream_Error": True,
})
if self.job_extra:
self.job_header_dict.update(self.job_extra)
self.make_job_header()
def make_job_header(self):
""" The string version of the submit file attributes """
self.job_header = "\n".join("%s = %s" % (k, v) for k, v in self.job_header_dict.items())
def job_script(self):
""" Construct a job submission script """
quoted_arguments = quote_arguments(["-c", self._command_template])
quoted_environment = quote_environment(self.env_dict)
return self._script_template % {
'shebang': self.shebang,
'job_header': self.job_header,
'quoted_environment': quoted_environment,
'quoted_arguments': quoted_arguments,
'executable': self.executable,
}
def _job_id_from_submit_output(self, out):
cluster_id_regexp = r"submitted to cluster (\d+)"
match = re.search(cluster_id_regexp, out)
if match is None:
msg = ("Could not parse cluster id from submission command output.\n"
"Cluster id regexp is {!r}\n"
"Submission command output is:\n{}".format(cluster_id_regexp, out))
raise ValueError(msg)
return "%s.0" % match.group(1)
def _double_up_quotes(instr):
return instr.replace("'", "''").replace('"', '""')
def quote_arguments(args):
"""Quote a string or list of strings using the Condor submit file "new" argument quoting rules.
Returns
-------
str
The arguments in a quoted form.
Warnings
--------
You will need to surround the result in double-quotes before using it in
the Arguments attribute.
Examples
--------
>>> quote_arguments(["3", "simple", "arguments"])
'3 simple arguments'
>>> quote_arguments(["one", "two with spaces", "three"])
'one \'two with spaces\' three'
>>> quote_arguments(["one", "\"two\"", "spacy 'quoted' argument"])
'one ""two"" \'spacey \'\'quoted\'\' argument\''
"""
if isinstance(args, str):
args_list = [args]
else:
args_list = args
quoted_args = []
for a in args_list:
qa = _double_up_quotes(a)
if ' ' in qa or "'" in qa:
qa = "'" + qa + "'"
quoted_args.append(qa)
return " ".join(quoted_args)
def quote_environment(env):
"""Quote a dict of strings using the Condor submit file "new" environment quoting rules.
Returns
-------
str
The environment in quoted form.
Warnings
--------
You will need to surround the result in double-quotes before using it in
the Environment attribute.
Examples
--------
>>> quote_environment(OrderedDict([("one", 1), ("two", '"2"'), ("three", "spacey 'quoted' value")]))
'one=1 two=""2"" three=\'spacey \'\'quoted\'\' value\''
"""
if not isinstance(env, dict):
raise TypeError("env must be a dict")
entries = []
for k, v in env.items():
qv = _double_up_quotes(str(v))
if ' ' in qv or "'" in qv:
qv = "'" + qv + "'"
entries.append("%s=%s" % (k, qv))
return " ".join(entries)