Skip to content

Commit

Permalink
Ignore resource directives if -l option is specified in PBS executor (#…
Browse files Browse the repository at this point in the history
…3015) [ci fast]


Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Abhinav Sharma <abhi18av@users.noreply.github.com>
Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
3 people committed Feb 14, 2023
1 parent 445218a commit 28a6796
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 7 deletions.
21 changes: 21 additions & 0 deletions docs/executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,16 @@ Resource requests and other job characteristics can be controlled via the follow
* :ref:`process-queue`
* :ref:`process-time`

.. tip::
As of Nextflow version 23.02.0-edge or later, it is possible to specify resource settings with both the ``clusterOptions`` and
the ``cpus`` directives by specifying the cluster options dynamically::

cpus = 2
clusterOptions = { "-l nodes=1:ppn=${task.cpus}:..." }

This technique allows you to specify ``clusterOptions`` once for all processes, including any options that are specific
to your cluster, and use the standard resource directives throughout the rest of your pipeline.


.. _pbspro-executor:

Expand All @@ -481,6 +491,17 @@ Resource requests and other job characteristics can be controlled via the follow
* :ref:`process-queue`
* :ref:`process-time`

.. tip::
As of Nextflow version 23.02.0-edge or later, it is possible to specify resource settings with both the ``clusterOptions`` and
the ``cpus`` and ``memory`` directives by specifying the cluster options dynamically::

cpus = 2
memory = 8.GB
clusterOptions = { "-l select=1:ncpus=${task.cpus}:mem=${task.memory.toMega()}mb:..." }

This technique allows you to specify ``clusterOptions`` once for all processes, including any options that are specific
to your cluster, and use the standard resource directives throughout the rest of your pipeline.


.. _sge-executor:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package nextflow.executor

import java.nio.file.Path
import java.util.regex.Pattern

import groovy.util.logging.Slf4j
import nextflow.processor.TaskRun
Expand All @@ -29,6 +30,8 @@ import nextflow.processor.TaskRun
@Slf4j
class PbsExecutor extends AbstractGridExecutor {

private static Pattern OPTS_REGEX = ~/(?:^|\s)-l.+/

/**
* Gets the directives to submit the specified task to the cluster for execution
*
Expand All @@ -48,8 +51,14 @@ class PbsExecutor extends AbstractGridExecutor {
result << '-q' << (String)task.config.queue
}

// task cpus
if( task.config.cpus > 1 ) {
result << '-l' << "nodes=1:ppn=${task.config.cpus}"
if( matchOptions(task.config.clusterOptions?.toString()) ) {
log.warn1 'cpus directive is ignored when clusterOptions contains -l option\ntip: clusterOptions = { "-l nodes=1:ppn=${task.cpus}:..." }'
}
else {
result << '-l' << "nodes=1:ppn=${task.config.cpus}"
}
}

// max task duration
Expand Down Expand Up @@ -161,4 +170,7 @@ class PbsExecutor extends AbstractGridExecutor {
return p!=-1 ? line.substring(p+prefix.size()).trim() : null
}

static protected boolean matchOptions(String value) {
value ? OPTS_REGEX.matcher(value).find() : null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package nextflow.executor


import groovy.util.logging.Slf4j
import nextflow.processor.TaskRun

/**
* Implements a executor for PBSPro cluster executor
*
Expand Down Expand Up @@ -70,7 +70,12 @@ class PbsProExecutor extends PbsExecutor {
res << "mem=${task.config.getMemory().getMega()}mb".toString()
}
if( res ) {
result << '-l' << "select=1:${res.join(':')}".toString()
if( matchOptions(task.config.clusterOptions?.toString()) ) {
log.warn1 'cpus and memory directives are ignored when clusterOptions contains -l option\ntip: clusterOptions = { "-l select=1:ncpus=${task.cpus}:mem=${task.memory.toMega()}mb:..." }'
}
else {
result << '-l' << "select=1:${res.join(':')}".toString()
}
}

// max task duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class PbsExecutorTest extends Specification {
'''
.stripIndent().leftTrim()


when:
task.config = new TaskConfig()
task.config.queue = 'alpha'
Expand All @@ -81,7 +80,6 @@ class PbsExecutorTest extends Specification {
'''
.stripIndent().leftTrim()


when:
task.config = new TaskConfig()
task.config.queue = 'alpha'
Expand All @@ -98,8 +96,6 @@ class PbsExecutorTest extends Specification {
'''
.stripIndent().leftTrim()



when:
task.config = new TaskConfig()
task.config.queue = 'delta'
Expand Down Expand Up @@ -152,6 +148,20 @@ class PbsExecutorTest extends Specification {
'''
.stripIndent().leftTrim()

when:
task.config = new TaskConfig()
task.config.clusterOptions = '-l nodes=1:x86:ppn=4'
task.config.cpus = 2
task.config.memory = '8g'
then:
executor.getHeaders(task) == '''
#PBS -N nf-task_name
#PBS -o /work/dir/.command.log
#PBS -j oe
#PBS -l mem=8gb
#PBS -l nodes=1:x86:ppn=4
'''
.stripIndent().leftTrim()
}

def WorkDirWithBlanks() {
Expand Down Expand Up @@ -278,4 +288,16 @@ class PbsExecutorTest extends Specification {
executor.queueStatusCommand('xxx').each { assert it instanceof String }
}

def 'should match cluster options' () {
expect:
PbsExecutor.matchOptions('-l foo')
PbsExecutor.matchOptions('-lfoo')
PbsExecutor.matchOptions('-x -l foo')
PbsExecutor.matchOptions('-x -lfoo')
and:
!PbsExecutor.matchOptions(null)
!PbsExecutor.matchOptions('')
!PbsExecutor.matchOptions('-x-l foo')
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,33 @@ class PbsProExecutorTest extends Specification {
]
}

def 'should ignore cpus and memory when clusterOptions contains -l option' () {
given:
def executor = Spy(PbsProExecutor)
def WORK_DIR = Paths.get('/foo/bar')

def task = Mock(TaskRun)
task.getWorkDir() >> WORK_DIR
task.getConfig() >> new TaskConfig([
clusterOptions: '-l select=1:ncpus=2:mem=8gb',
cpus: 2,
memory: '8 GB'
])

when:
def result = executor.getDirectives(task, [])
then:
1 * executor.getJobNameFor(task) >> 'foo'
1 * executor.quote( WORK_DIR.resolve(TaskRun.CMD_LOG)) >> '/foo/bar/.command.log'

result == [
'-l select=1:ncpus=2:mem=8gb', '',
'-N', 'foo',
'-o', '/foo/bar/.command.log',
'-j', 'oe'
]
}

def 'should return qstat command line' () {
given:
def executor = [:] as PbsProExecutor
Expand Down

0 comments on commit 28a6796

Please sign in to comment.