Skip to content

Commit

Permalink
fix bug with FluxExecutor jobs output
Browse files Browse the repository at this point in the history
currently, for some reason the jobs listing is
passing tests, but in testing with a workflow there
is an error warning printed to the terminal. This
tweak suggested by Paolo seems to fix the issue.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Nov 23, 2022
1 parent aa974d4 commit 1a04864
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
3 changes: 3 additions & 0 deletions docs/executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ Resource requests and other job characteristics can be controlled via the follow
* :ref:`process-queue`
* :ref:`process-time`

Additionally, to have Flux print all output to stderr and stdout (and not to an ``--outfile`` just add ``--terminal-output``
as one of the ``process.clusterOptions``.

.. note:: Flux does not support specifying memory.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,30 @@ class FluxExecutor extends AbstractGridExecutor {
List<String> result = ['flux', 'mini', 'submit']
result << '--setattr=cwd=' + quote(task.workDir)
result << '--job-name="' + getJobNameFor(task) + '"'
result << '--output=' + quote(task.workDir.resolve(TaskRun.CMD_LOG)) // -o OUTFILE

// Incrementally save additional cluster options, and special arguments for output
Boolean terminal_output = false
List<String> clusterOptions = []

// Look for special option hidden in cluster options to not set output
if( task.config.clusterOptions ) {
for (String item : task.config.clusterOptions.toString().tokenize(' ')) {
if ( item ) {
if ( item.stripIndent().trim() == "--terminal-output" ) {
terminal_output = true

// Otherwise add to list of cluster options
} else {
clusterOptions << item.stripIndent().trim()
}
}
}
}

// Only write output to file if user doesn't want written entirely to terminal
if ( !terminal_output ) {
result << '--output=' + quote(task.workDir.resolve(TaskRun.CMD_LOG)) // -o OUTFILE
}

if( task.config.cpus > 1 ) {
result << '--cores-per-task=' + task.config.cpus.toString()
Expand All @@ -82,16 +105,9 @@ class FluxExecutor extends AbstractGridExecutor {
result << '--queue=' + (task.config.queue.toString())
}

// Any extra cluster options the user wants!
// Options tokenized with ; akin to OarExecutor
if( task.config.clusterOptions ) {

// Split by space
for (String item : task.config.clusterOptions.toString().tokenize(' ')) {
if ( item ) {
result << item.stripIndent().trim()
}
}
// Add cluster options to result list
for (String item : clusterOptions) {
result << item
}
result << '/bin/bash' << scriptFile.getName()
return result
Expand Down Expand Up @@ -129,17 +145,18 @@ class FluxExecutor extends AbstractGridExecutor {
protected List<String> queueStatusCommand(Object queue) {

// Look at jobs from last 15 minutes
final result = ['flux', 'jobs', '--suppress-header', '--format="{id.f58} {status_abbrev}"', '--since="-15m"']
String command = 'flux jobs --suppress-header --format="{id.f58} {status_abbrev}" --since="-15m"'

if( queue )
result << '--queue' << queue.toString()
command += ' --queue=' + queue.toString()

final user = System.getProperty('user.name')
if( user )
result << '--user' << user
command += ' --user=' + user
else
log.debug "Cannot retrieve current user"

final result = ['sh', '-c', command]
return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class FluxExecutorTest extends Specification {
task.config.clusterOptions = '--tasks-per-node=4 --cpus-per-node=4'
then:
executor.getSubmitCommandLine(task, Paths.get('/some/path/job.sh')) == ['flux', 'mini', 'submit', '--setattr=cwd=/work/path', '--job-name="nf-my_task"', '--output=/work/path/.command.log', '--time-limit=60', '--tasks-per-node=4', '--cpus-per-node=4', '/bin/bash', 'job.sh']

when:
task.config = new TaskConfig()
task.config.clusterOptions = '--terminal-output'
then:
executor.getSubmitCommandLine(task, Paths.get('/some/path/job.sh')) == ['flux', 'mini', 'submit', '--setattr=cwd=/work/path', '--job-name="nf-my_task"', '/bin/bash', 'job.sh']

}

def testWorkDirWithBlanks() {
Expand Down Expand Up @@ -144,7 +151,7 @@ class FluxExecutorTest extends Specification {
def executor = [:] as FluxExecutor
then:
usr
executor.queueStatusCommand(null) == ['flux', 'jobs', '--suppress-header', '--format="{id.f58} {status_abbrev}"', '--since="-15m"', '--user', usr]
executor.queueStatusCommand('xxx') == ['flux', 'jobs', '--suppress-header', '--format="{id.f58} {status_abbrev}"', '--since="-15m"', '--queue', 'xxx', '--user', usr]
executor.queueStatusCommand(null) == ['sh', '-c', "flux jobs --suppress-header --format=\"{id.f58} {status_abbrev}\" --since=\"-15m\" --user=" + usr]
executor.queueStatusCommand('xxx') == ['sh', '-c', "flux jobs --suppress-header --format=\"{id.f58} {status_abbrev}\" --since=\"-15m\" --queue=xxx --user=" + usr]
}
}

0 comments on commit 1a04864

Please sign in to comment.