diff --git a/README.md b/README.md index 3696aca11a..58f2fc3be5 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,7 @@ Currently the following clusters are supported: + [SLURM](https://www.nextflow.io/docs/latest/executor.html#slurm) + [PBS/Torque](https://www.nextflow.io/docs/latest/executor.html#pbs-torque) + [HTCondor (experimental)](https://www.nextflow.io/docs/latest/executor.html#htcondor) + + [Moab (beta)](https://www.nextflow.io/docs/latest/executor.html#moab) For example to submit the execution to a SGE cluster create a file named `nextflow.config`, in the directory where the pipeline is going to be launched, with the following content: diff --git a/docs/executor.rst b/docs/executor.rst index ba59f58257..74f9b4979e 100644 --- a/docs/executor.rst +++ b/docs/executor.rst @@ -159,6 +159,33 @@ The amount of resources requested by each job submission is defined by the follo * :ref:`process-memory` * :ref:`process-clusterOptions` +.. _moab-executor: + +Moab +==== + +The `Moab` executor allows you to run your pipeline script by using the +`Moab `_ resource manager by +`Adaptive Computing `_. + +.. warning:: This is an incubating feature. It may change in future Nextflow releases. + +Nextflow manages each process as a separate job that is submitted to the cluster by using the ``msub`` command provided +by the resource manager. + +Being so, the pipeline must be launched from a node where the ``msub`` command is available, that is, in a common usage +scenario, the compute cluster `login` node. + +To enable the `Moab` executor simply set the property ``process.executor = 'moab'`` in the ``nextflow.config`` file. + +The amount of resources requested by each job submission is defined by the following process directives: + +* :ref:`process-cpus` +* :ref:`process-queue` +* :ref:`process-time` +* :ref:`process-memory` +* :ref:`process-clusterOptions` + .. _condor-executor: HTCondor diff --git a/docs/process.rst b/docs/process.rst index cab01be8c9..ce050cce48 100644 --- a/docs/process.rst +++ b/docs/process.rst @@ -1369,8 +1369,8 @@ You can use it to request non-standard resources or use settings that are specif out of the box by Nextflow. .. note:: This directive is taken in account only when using a grid based executor: - :ref:`sge-executor`, :ref:`lsf-executor`, :ref:`slurm-executor`, :ref:`pbs-executor` and - :ref:`condor-executor` executors. + :ref:`sge-executor`, :ref:`lsf-executor`, :ref:`slurm-executor`, :ref:`pbs-executor`, :ref:`pbspro-executor`, + :ref:`moab-executor` and :ref:`condor-executor` executors. .. _process-disk: @@ -1506,6 +1506,7 @@ Name Executor ``lsf`` The process is executed using the `Platform LSF `_ job scheduler. ``slurm`` The process is executed using the SLURM job scheduler. ``pbs`` The process is executed using the `PBS/Torque `_ job scheduler. +``moab`` The process is executed using the `Moab `_ job scheduler. ``condor`` The process is executed using the `HTCondor `_ job scheduler. ``nqsii`` The process is executed using the `NQSII `_ job scheduler. ``ignite`` The process is executed using the `Apache Ignite `_ cluster. diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy index 9202c9acf5..d832616174 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy @@ -57,6 +57,7 @@ class ExecutorFactory { 'condor': CondorExecutor, 'k8s': K8sExecutor, 'nqsii': NqsiiExecutor, + 'moab': MoabExecutor, 'awsbatch': AwsBatchExecutor ] diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/MoabExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/MoabExecutor.groovy new file mode 100644 index 0000000000..1ef1d59d14 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/executor/MoabExecutor.groovy @@ -0,0 +1,196 @@ +/* + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.executor + +import java.nio.file.Path + +import groovy.util.logging.Slf4j +import nextflow.processor.TaskRun +/** + * Implements a executor for Moab batch scheduler cluster + * + * http://www.adaptivecomputing.com/moab-hpc-basic-edition/ + * + * @author Paolo Di Tommaso + */ +@Slf4j +class MoabExecutor extends AbstractGridExecutor { + + /** + * Gets the directives to submit the specified task to the cluster for execution + * + * See http://docs.adaptivecomputing.com/maui/commands/msub.php + * + * @param task A {@link TaskRun} to be submitted + * @param result The {@link List} instance to which add the job directives + * @return A {@link List} containing all directive tokens and values. + */ + protected List getDirectives( TaskRun task, List result ) { + assert result !=null + + result << '-N' << getJobNameFor(task) + result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) + result << '-j' << 'oe' + + // the requested queue name + if( task.config.queue ) { + result << '-q' << (String)task.config.queue + } + + if( task.config.cpus > 1 ) { + result << '-l' << "nodes=1:ppn=${task.config.cpus}" + } + + // max task duration + if( task.config.time ) { + final duration = task.config.getTime() + result << "-l" << "walltime=${duration.format('HH:mm:ss')}" + } + + // task max memory + if( task.config.memory ) { + // https://www.osc.edu/documentation/knowledge_base/out_of_memory_oom_or_excessive_memory_usage + result << "-l" << "mem=${task.config.memory.toString().replaceAll(/[\s]/,'').toLowerCase()}" + } + + // -- at the end append the command script wrapped file name + if( task.config.clusterOptions ) { + result << task.config.clusterOptions.toString() << '' + } + + return result + } + + @Override + String getHeaders( TaskRun task ) { + String result = super.getHeaders(task) + result += "cd ${quote(task.workDir)}\n" + return result + } + + /** + * The command line to submit this job + * + * @param task The {@link TaskRun} instance to submit for execution to the cluster + * @param scriptFile The file containing the job launcher script + * @return A list representing the submit command line + */ + List getSubmitCommandLine(TaskRun task, Path scriptFile ) { + def cmd = new ArrayList(5) + cmd << 'msub' + cmd << '--xml' + cmd << scriptFile.name + return cmd + } + + protected String getHeaderToken() { '#MSUB' } + + /** + * Parse the string returned by the {@code qsub} command and extract the job ID string + * + * @param text The string returned when submitting the job + * @return The actual job ID string + */ + @Override + def parseJobId( String text ) { + String result + try { + result = new XmlSlurper() + .parseText(text) + .job[0] + .@JobID + .toString() + } + catch (Exception e) { + throw new IllegalArgumentException("Invalid Moab submit response:\n$text\n\n", e) + } + + if( !result ) + throw new IllegalArgumentException("Missing Moab submit job ID:\n$text\n\n") + + return result + } + + @Override + protected List getKillCommand() { ['mjobctl', '-c'] } + + @Override + protected List queueStatusCommand(Object queue) { + return ['showq', '--xml', '-w', "user="+System.getProperty('user.name')] + } + + static private Map DECODE_STATUS = [ + 'Running': QueueStatus.RUNNING, + 'Starting': QueueStatus.RUNNING, + 'Idle': QueueStatus.HOLD, + 'UserHold': QueueStatus.HOLD, + 'SystemHold': QueueStatus.HOLD, + 'Deferred': QueueStatus.HOLD, + 'NotQueued': QueueStatus.ERROR, + 'BatchHold': QueueStatus.ERROR, + ] + + protected QueueStatus decode(String status) { + DECODE_STATUS.get(status) + } + + @Override + protected Map parseQueueStatus(String xmlStatus) { + // parse XML string and and decode job states + final result = new LinkedHashMap() + try { + def data = new XmlSlurper().parseText(xmlStatus) + for( def queue : data.queue ) { + if( !queue.@count?.toInteger() ) + continue + + def state = queue.@option?.toString() + if( state=="active") + parseQueueJobNodes(queue, result, QueueStatus.RUNNING ) + else if( state=="eligible" ) + parseQueueJobNodes(queue, result, QueueStatus.PENDING ) + else + parseQueueJobNodes(queue, result ) + } + } + catch (Exception e) { + throw e + } + return result + } + + protected void parseQueueJobNodes(queueNode, Map result, QueueStatus state=null) { + for( def entry : queueNode.job ) { + final value = state ?: decode(entry.@State?.toString()) + result.put( entry.@JobID?.toString(), value ) + } + } + + + protected List killTaskCommand(def jobId) { + final result = getKillCommand() + if( jobId instanceof Collection ) { + result.add( jobId.join(',') ) + log.trace "Kill command: ${result}" + } + else { + result.add(jobId.toString()) + } + return result + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/MoabExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/MoabExecutorTest.groovy new file mode 100644 index 0000000000..6991ec8496 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/executor/MoabExecutorTest.groovy @@ -0,0 +1,225 @@ +/* + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.executor + +import java.nio.file.Paths + +import nextflow.processor.TaskConfig +import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun +import spock.lang.Specification + +/** + * + * @author Paolo Di Tommaso + */ +class MoabExecutorTest extends Specification { + + def 'should get submit command' () { + given: + def exec = new MoabExecutor() + when: + def cmd = exec.getSubmitCommandLine(Mock(TaskRun), Paths.get('/work/dir/script.sh')) + then: + cmd == ['msub', '--xml', 'script.sh'] + } + + def 'should parse job id' () { + given: + def exec = new MoabExecutor() + when: + def result = exec.parseJobId('') + then: + result == '881218' + result instanceof String + + when: + exec.parseJobId('') + then: + def e = thrown(IllegalArgumentException) + e.message == 'Missing Moab submit job ID:\n\n\n' + } + + def 'should cancel a taks' () { + given: + def exec = new MoabExecutor() + + expect: + exec.killTaskCommand('123') == ['mjobctl', '-c', '123'] + exec.killTaskCommand(['111','222','333']) == ['mjobctl', '-c', '111,222,333'] + } + + def 'should get status command' () { + given: + def exec = new MoabExecutor() + when: + def cmd = exec.queueStatusCommand('foo') + then: + cmd == ['showq', '--xml', '-w', "user="+System.getProperty('user.name')] + } + + + def 'should parse queue xml' () { + given: + def exec = new MoabExecutor() + def STATUS = '''\ + + + queue + + + + + + + + + + + + + ''' + .stripIndent() + + when: + def state = exec.parseQueueStatus(STATUS) + then: + state['881196'] == AbstractGridExecutor.QueueStatus.RUNNING + state['881197'] == AbstractGridExecutor.QueueStatus.RUNNING + state['881198'] == AbstractGridExecutor.QueueStatus.PENDING + state['881200'] == AbstractGridExecutor.QueueStatus.ERROR + } + + + def "should validate headers"() { + + setup: + def executor = [:] as MoabExecutor + + // mock process + def proc = Mock(TaskProcessor) + + // task object + def task = new TaskRun() + task.processor = proc + task.workDir = Paths.get('/work/dir') + task.name = 'task name' + + when: + task.config = new TaskConfig() + then: + executor.getHeaders(task) == ''' + #MSUB -N nf-task_name + #MSUB -o /work/dir/.command.log + #MSUB -j oe + cd /work/dir + ''' + .stripIndent().leftTrim() + + + when: + task.config = new TaskConfig() + task.config.queue = 'alpha' + task.config.time = '1m' + then: + executor.getHeaders(task) == ''' + #MSUB -N nf-task_name + #MSUB -o /work/dir/.command.log + #MSUB -j oe + #MSUB -q alpha + #MSUB -l walltime=00:01:00 + cd /work/dir + ''' + .stripIndent().leftTrim() + + + when: + task.config = new TaskConfig() + task.config.queue = 'alpha' + task.config.time = '1m' + task.config.memory = '1m' + then: + executor.getHeaders(task) == ''' + #MSUB -N nf-task_name + #MSUB -o /work/dir/.command.log + #MSUB -j oe + #MSUB -q alpha + #MSUB -l walltime=00:01:00 + #MSUB -l mem=1mb + cd /work/dir + ''' + .stripIndent().leftTrim() + + + + when: + task.config = new TaskConfig() + task.config.queue = 'delta' + task.config.time = '10m' + task.config.memory = '5m' + task.config.cpus = 2 + then: + executor.getHeaders(task) == ''' + #MSUB -N nf-task_name + #MSUB -o /work/dir/.command.log + #MSUB -j oe + #MSUB -q delta + #MSUB -l nodes=1:ppn=2 + #MSUB -l walltime=00:10:00 + #MSUB -l mem=5mb + cd /work/dir + ''' + .stripIndent().leftTrim() + + when: + task.config = new TaskConfig() + task.config.queue = 'delta' + task.config.time = '1d' + task.config.memory = '1g' + task.config.cpus = 8 + then: + executor.getHeaders(task) == ''' + #MSUB -N nf-task_name + #MSUB -o /work/dir/.command.log + #MSUB -j oe + #MSUB -q delta + #MSUB -l nodes=1:ppn=8 + #MSUB -l walltime=24:00:00 + #MSUB -l mem=1gb + cd /work/dir + ''' + .stripIndent().leftTrim() + + when: + task.config = new TaskConfig() + task.config.queue = 'delta' + task.config.time = '2d 6h 10m' + task.config.memory = '2g' + then: + executor.getHeaders(task) == ''' + #MSUB -N nf-task_name + #MSUB -o /work/dir/.command.log + #MSUB -j oe + #MSUB -q delta + #MSUB -l walltime=54:10:00 + #MSUB -l mem=2gb + cd /work/dir + ''' + .stripIndent().leftTrim() + + } +}