/
JobArray.R
302 lines (259 loc) · 14.8 KB
/
JobArray.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#' Class for efficient multi-job SLURM submission and management
#'
#' R6 Class that enables easy submission and manipulation of SLURM job arrays.
#'
#' Job arrays are quickly genearated and submitted allowing for efficient creation and execution
#' of many shell jobs in a SLURM cluster, thus facilitating parallelization when needed. This class eliminates the cumbersome task of
#' manually creating SLURM arrays: in its simplest form two lines of code are sufficient for a job array submmission.
#' Additionally, there is an added functionallity to monitor and wait for all jobs to finished after they have been submitted.
#'
#' All jobs in an job array share the same execution requirements.
#' Each element in `commandList` will be submitted as an individual job in the array. Elements of `commandList` should be
#' vectors of shell commands.
#'
#' Submission is achived by creating and executing an sbatch script. For more details on SLURM refer
#' to https://slurm.schedmd.com/. For job arrays refer to https://slurm.schedmd.com/job_array.html
#' Concatenation is possible for most methods.
#'
#' JobArray class - inherits from JobInfo class
#'
#' @usage
#' # x <- JobArray$new(commandList, jobName = NULL, outDir = NULL, partition = NULL, time = NULL, mem = NULL, proc = NULL, totalProc = NULL, nodes = NULL, email = NULL)
#' # x$submit()
#' # x$wait(stopIfFailed = F, verbose = T)
#' # x$length()
#' # x$cancel()
#' # x$getState(simplify = F)
#' # x$getJobNames()
#' # x$clean()
#'
#' @docType class
#' @format R6 class
#' @export
#'
#' @section Method description:
#' \enumerate{
#' \item{\strong{Initialize}}{
#' \cr
#' \code{x <- JobInfo$new(commandList, jobName = NULL, outDir = NULL, partition = NULL, time = NULL, mem = NULL, proc = NULL, totalProc = NULL, nodes = NULL, email = NULL)}
#' \cr Parameters:
#' \itemize{
#' \item{commandList} {: list of character vectors - Each element of the list should be a vector of shell commands. Each element of the list corresponds to a different job in the array.}
#' \item{jobName} {: character - Name of job, if NULL one will be generated of the form rSubmitter_job_[random_alphanumeric]. Equivalent to \code{--job-name} of SLURM sbatch. Most output files use it as a suffix}
#' \item{outDir} {: character - writeable path for the sabtch script as well as the SLRUM STDERR and STDOUT files. If NULL the current working directory will be used}
#' \item{partition} {: character - Partition to use. Equivalent to \code{--partition} of SLURM sbatch}
#' \item{time} {: character - Time requested for job execution, one accepted format is "HH:MM:SS". Equivalent to \code{--time} of SLURM sbatch}
#' \item{mem} {: character - Memory requested for job execution, one accepted format is "xG" or "xMB". Equivalent to \code{--mem} of SLURM sbatch}
#' \item{proc} {: integer - Number of processors requested per task. Equivalent to \code{--cpus-per-task} of SLURM sbatch}
#' \item{totalProc} {: integer - Number of tasks requested for job. Equivalent to \code{--ntasks} of SLURM sbatch}
#' \item{nodes} {: integer - Number of nodes requested for job. Equivalent to \code{--nodes} of SLURM sbatch}
#' \item{email} {: character - email address to send info when job is done. Equivalent to \code{--mail-user=} of SLURM sbatch}
#' }
#' \cr Return: \cr object of class \code{Job}
#' }
#'
#' \item{\strong{Submit job(s)}}{
#' \cr
#' \code{x$submit()}
#' \cr Creates a job array sbatch script to \code{outDir} and submits it through a system call to sbatch. The script, STDERR and STDOUT sbatch files will be written to \code{outDir}. In the case sbatch returns a non-zero status, it will try resubmitting up 12 times with a defined interval time(TIME_WAIT_JOB_STATUS option at ~/.rSubmitter). Each element of the array will have its individual STDERR and STDOUT files with the format \code{jobName_[1-Inf].[err|out]}. Important options pulled from the config file located at ~/.rSubmitter: maximum number of jobs allowed in the queue (MAX_JOBS_ALLOWED:n); maximum length of a job array (MAX_JOB_ARRAY_LENGTH:n)
#' \cr Return: \cr self - for method concatenation
#' }
#'
#' \item{\strong{Wait for job(s) to finish}}{
#' \cr
#' \code{x$wait(stopIfFailed = F, verbose = T)}
#' \cr The time between each job state check is defined in the entry TIME_WAIT_JOB_STATUS:seconds in the config file located at ~/.rSubmitter
#' \cr Parameters:
#' \itemize{
#' \item{stopIfFailed} {: logical - if TRUE stops when one job has failed (only useful for JobArray) it then cancels the
#' rest of the pending and running jobs. If FALSE and one or more Jobs failed it raises a warning for each failed job}
#' \item{verbose} {: logical - if TRUE prints the job state(s) at every check}
#' }
#' \cr Return: \cr self - for method concatenation
#' }
#'
#' \item{\strong{Get length of array}}{
#' \cr
#' \code{x$length()}
#' \cr Return: \cr numeric - number of individual jobs in array
#' }
#'
#' \item{\strong{Cancel job(s)}}{
#' \cr
#' \code{x$cancel()}
#' \cr Return: \cr self - for method concatenation
#' }
#'
#' \item{\strong{Get job(s) state}}{
#' \cr
#' \code{x$getState(simplify = F)}
#' \cr Parameters:
#' \itemize{
#' \item{simplify} {: logical - if TRUE returns a freqeuncy data.frame of job states, otherwise returns individual jobs and their associated job names, job ids, and states}
#' }
#' \cr Return: \cr data.frame - With SLURM states
#' }
#'
#' \item{\strong{Get job name(s)}}{
#' \cr
#' \code{x$getJobNames()}
#' \cr Return: \cr character vector - With individual job names.
#' }
#'
#' \item{\strong{Remove SLURM-associated files}}{
#' \cr
#' \code{x$clean(script = TRUE, out = TRUE, err = TRUE)}
#' \cr Parameters:
#' \itemize{
#' \item{script} {: logical - if TRUE deletes sbatch submission script(s) associated to this object}
#' \item{out} {: logical - if TRUE deletes STDOUT file(s) from SLURM associated to this object}
#' \item{err} {: logical - if TRUE deletes STDERR file(s) from SLURM associated to this object}
#' }
#' \cr Return: \cr self - for method concatenation
#' }
#' }
#'
#'
#' @return \code{\link{R6Class}} with methods and fields for SLURM job array manipulation
#' @examples
#' \dontrun{
#' # Create and submit 10 dummy jobs
#' commands <- list()
#' for(i in 1:10) commands[[i]] <- c("echo adios","sleep 40")
#' jobArray <- JobArray$new(commands, jobName = "dummy", outDir = "~", mem = "1G", time = "02:00", proc = 1)
#' jobArray$submit()
#' jobArray$getState()
#' jobArray$wait()
#'
#' # Create and submit 10 dummy jobs, where one fails and the rest of the jobs will be cancelled
#' commands <- list()
#' for(i in 1:9) commands[[i]] <- c("echo adios","sleep 40")
#' commands[[10]] <- "notAcommand"
#' jobArray <- JobArray$new(commands, jobName = "dummy", outDir = "~", mem = "1G", time = "02:00", proc = 1)
#' jobArray$submit()
#' jobArray$getState()
#' jobArray$wait(stopIfFailed = T)
#'
#' }
JobArray <- R6::R6Class(
classname = "JobArray", inherit = JobInfo,
public = list(
#INSTANCES
#METHODS
initialize = function(commandList, ...) {
#Validate args
if(!is.list(commandList))
stop("commanList has to be a list of character vectors")
if(length(commandList) < 1)
stop("commandList has to be of length >= 1")
if(length(commandList) > rSubmitterOpts$MAX_JOB_ARRAY_LENGTH)
stop("\nThe number of jobs (i.e. `length(commandList)`) is greater than max\n",
"length of a job array specified in the rSubmitter options (", rSubmitterOpts$MAX_JOB_ARRAY_LENGTH, ").\n",
"Consider using less jobs or changing option MAX_JOB_ARRAY_LENGTH:n in the config file ~/.rSubmitter")
#if(length(commandList) > getMaxJobArrayLength())
# warning("\nThe number of jobs is greater than the max length of a job array\n",
# "defined by your SLURM system. The array won't likely be submitted")
for(i in commandList)
if(!is.character(i) | !is.vector(i))
stop("All elements of commandList have to be character vectors")
#Set instances
private$commandList <- commandList
super$initialize(...)
private$isJobArray <- TRUE
# Overwrite err and out paths
private$outPath <- file.path(private$outDir, paste0(private$jobName , "_%A_%a.out"))
private$errPath <- file.path(private$outDir, paste0(private$jobName , "_%A_%a.err"))
},
addJob = function(x) {
if(!is.character(x) | !is.vector(x))
stop("x has to be a character vector")
private$commandList <- c(private$commandList, list(x))
return(invisible(self))
},
#getJob = function(i) {
# private$checkIndex(i)
# return(invisible(private$jobs[[i]]))
#},
# Submit all jobs
submit = function(removeScript = F) {
# Check if submmitted and running; throw an error in that case
private$errorIfSubmittedRunning()
# Write submission script
private$writeSubmissionScript()
# Trying to submit up to 4 times (in case there is latency problems between checking for max jobs and submitting)
timesToTry <- 4
for(i in seq(1, timesToTry)) {
# Wait if the number of Jobs allowed has been exceeded
if (!is.null(private$maxJobs)) {
if(self$length() > private$maxJobs)
stop("Length of array(", self$length(), ") greater than maximum number of jobs allowed(", private$maxJobs,"). Consider edditing the config file ~/.rSubmitter")
while(getJobNumber() + self$length() >= private$maxJobs) {
printTime(" Exceeded max number of jobs on queue, waiting ", private$timeWaitMaxjobs, " seconds")
Sys.sleep(private$timeWaitMaxjobs)
}
}
# Submitting jobs, in the last try if will throw an error if it fails
jobId <- systemSubmit(paste("sbatch --parsable", private$scriptPath), wait = rSubmitterOpts$TIME_WAIT_FAILED_CMD, ignore.stdout = F,
stopIfFailed = ifelse(i == timesToTry, T, F) )
# If succeeding submitting jobs
if(length(jobId) > 0) {
private$jobId <- jobId
break
}
}
# Constuct job ids and names for all jobs
private$createJobIds()
private$createJobNames()
private$isSubmitted<- T
# Delete script if asked for
if (removeScript)
file.remove(private$scriptPath)
return(invisible(self))
},
length = function() {
return(length(private$commandList))
},
getJobNames = function() {
if(is.null(private$jobName))
warning("Job(s) have not yet been submmitted. Job names are meaningless before submission")
private$createJobNames()
return(private$jobNameAll)
}
),
private = list(
#INSTANCES
commandList = NULL,
states = NULL,
jobNameAll = NULL,
jobIdAll = NULL,
#METHODS
# Writes the submission script specific to Job arrays
writeSubmissionScript = function() {
private$createScriptVector()
# Add job Array flag
args <- c(private$scriptVector, paste0("#SBATCH --array=1-", self$length()))
# Creates the bash case control statement
case <- "case $SLURM_ARRAY_TASK_ID in"
for (i in 1:self$length()) {
case <- c(case, paste0(" ", i, " )"))
case <- c(case, " set -euo pipefail")
case <- c(case, paste0(" ", private$commandList[[i]]))
case <- c(case, paste0(" ", ";;"))
}
case <- c(case, "esac")
case <- c(case, "exit 0")
# Concatenates SBATCH option to case statement
args <- c(args, case)
writeLines(args, private$scriptPath)
},
# Creates job ids of the form masterId_1 ... masterId_n
createJobIds = function() {
private$jobIdAll <- paste0(private$jobId, "_", 1:self$length())
},
# Creates job names of the form jobName_1 ... jobName_n
createJobNames = function() {
private$jobNameAll <- paste0(private$jobName, "_", 1:self$length())
}
# Throws error if i is out of bounds in job list
)
)