-
Notifications
You must be signed in to change notification settings - Fork 97
/
slurm-submit.py
executable file
·132 lines (115 loc) · 4.21 KB
/
slurm-submit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python3
import sys
import re
import argparse
import subprocess
from snakemake.utils import read_job_properties
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"--help", help="Display help message.", action="store_true")
parser.add_argument(
"positional", action="append",
nargs="?", metavar="POS",
help="additional arguments not in slurm parser group to pass to sbatch")
# A subset of SLURM-specific arguments
slurm_parser = parser.add_argument_group("slurm-specific arguments")
slurm_parser.add_argument(
"-a", "--array", help="job array index values")
slurm_parser.add_argument(
"-A", "--account", help="charge job to specified account")
slurm_parser.add_argument(
"--begin", help="defer job until HH:MM MM/DD/YY")
slurm_parser.add_argument(
"-c", "--cpus-per-task", help="number of cpus required per task")
slurm_parser.add_argument(
"-d", "--dependency",
help="defer job until condition on jobid is satisfied")
slurm_parser.add_argument(
"-D", "--workdir", help="set working directory for batch script")
slurm_parser.add_argument(
"-e", "--error", help="file for batch script's standard error")
slurm_parser.add_argument(
"-J", "--job-name", help="name of job")
slurm_parser.add_argument(
"--mail-type", help="notify on state change: BEGIN, END, FAIL or ALL")
slurm_parser.add_argument(
"--mail-user", help="who to send email notification for job state changes")
slurm_parser.add_argument(
"-n", "--ntasks", help="number of tasks to run")
slurm_parser.add_argument(
"-N", "--nodes", help="number of nodes on which to run (N = min[-max])")
slurm_parser.add_argument(
"-o", "--output", help="file for batch script's standard output")
slurm_parser.add_argument(
"-p", "--partition", help="partition requested")
slurm_parser.add_argument(
"-Q", "--quiet", help="quiet mode (suppress informational messages)")
slurm_parser.add_argument(
"-t", "--time", help="time limit")
slurm_parser.add_argument(
"--wrap", help="wrap command string in a sh script and submit")
slurm_parser.add_argument(
"-C", "--constraint", help="specify a list of constraints")
slurm_parser.add_argument(
"--mem", help="minimum amount of real memory")
args = parser.parse_args()
if args.help:
parser.print_help()
sys.exit(0)
jobscript = sys.argv[-1]
job_properties = read_job_properties(jobscript)
extras = ""
if args.positional:
for m in args.positional:
if m is not None:
extras = extras + " " + m
arg_dict = dict(args.__dict__)
# Process resources
if "resources" in job_properties:
resources = job_properties["resources"]
if arg_dict["time"] is None:
if "runtime" in resources:
arg_dict["time"] = resources["runtime"]
elif "walltime" in resources:
arg_dict["time"] = resources["walltime"]
if "mem" in resources and arg_dict["mem"] is None:
arg_dict["mem"] = resources["mem"]*1e9
# Threads
if "threads" in job_properties:
arg_dict["ntasks"] = job_properties["threads"]
opt_keys = ["array", "account", "begin", "cpus_per_task",
"depedency", "workdir", "error", "job_name", "mail_type",
"mail_user", "ntasks", "nodes", "output", "partition",
"quiet", "time", "wrap", "constraint", "mem"]
# Set default partition
if arg_dict["partition"] is None:
if not "":
# partitions and SLURM - If not specified, the default behavior is to
# allow the slurm controller to select the default partition as
# designated by the system administrator.
opt_keys.remove("partition")
else:
arg_dict["partition"] = ""
opts = ""
for k, v in arg_dict.items():
if k not in opt_keys:
continue
if v is not None:
opts += " --{} \"{}\" ".format(k, v)
if arg_dict["wrap"] is not None:
cmd = "sbatch {opts}".format(opts=opts)
else:
cmd = "sbatch {opts} {extras}".format(opts=opts, extras=extras)
try:
res = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise e
# Get jobid
res = res.stdout.decode()
try:
m = re.search("Submitted batch job (\d+)", res)
jobid = m.group(1)
print(jobid)
except Exception as e:
print(e)
raise