Skip to content

Commit

Permalink
Fix nested process names config resolution
Browse files Browse the repository at this point in the history
This commit fixed the config resolution for processes
executed in nested workflows.

The usual process config rule still applies, however
the user as the option to override the nested process
configuration using the `withName: <fullyQuafiliedProcessName>`
selector to specify the specific process config. For example
for a process executed as:

  workflow flow1 {
    foo()
  }

it's possible to specify the config as:

  process {
     withName: 'flow1:foo' { cpus: 2, memory: '2GB' }
  }
  • Loading branch information
pditommaso committed Aug 21, 2019
1 parent 6f89833 commit ccd8e83
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 125 deletions.
2 changes: 2 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Const.groovy
Expand Up @@ -145,4 +145,6 @@ class Const {

static public final String DEFAULT_BRANCH = 'master'

static public final String SCOPE_SEP = ':'

}
Expand Up @@ -70,6 +70,9 @@ import static org.codehaus.groovy.ast.tools.GeneralUtils.callThisX
import static org.codehaus.groovy.ast.tools.GeneralUtils.closureX
import static org.codehaus.groovy.ast.tools.GeneralUtils.constX
import static org.codehaus.groovy.ast.tools.GeneralUtils.stmt

import static nextflow.Const.SCOPE_SEP

/**
* Implement some syntax sugars of Nextflow DSL scripting.
*
Expand Down Expand Up @@ -1043,6 +1046,11 @@ class NextflowDSLImpl implements ASTTransformation {
unit.addError( new SyntaxException("Identifier `$name` is already used by another definition", node.lineNumber, node.columnNumber+8) )
return true
}
if( name.contains(SCOPE_SEP) ) {
def offset = 8+2+ name.indexOf(SCOPE_SEP)
unit.addError( new SyntaxException("Process and workflow names cannot contain colon character", node.lineNumber, node.columnNumber+offset) )
return true
}
return false
}

Expand Down
Expand Up @@ -19,6 +19,7 @@ package nextflow.script
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.exception.DuplicateProcessInvocation
import static nextflow.Const.SCOPE_SEP

/**
* Abstract module component which bind itself in the
Expand All @@ -30,8 +31,6 @@ import nextflow.exception.DuplicateProcessInvocation
@CompileStatic
abstract class BindableDef extends ComponentDef {

static final public String SCOPE_SEP = ':'

private Set<String> invocations = new HashSet<>()

abstract Object run(Object[] args)
Expand Down
Expand Up @@ -142,17 +142,24 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
*/
private outputs = new OutputsList()

private Map legacySettings

/**
* Initialize the taskConfig object with the defaults values
*
* @param script The owner {@code BaseScript} configuration object
*/
ProcessConfig( BaseScript script ) {
protected ProcessConfig( BaseScript script ) {
ownerScript = script
configProperties = new LinkedHashMap()
configProperties.putAll( DEFAULT_CONFIG )
}

ProcessConfig( BaseScript script, String name ) {
this(script)
this.processName = name
}

/* Only for testing purpose */
@PackageScope
ProcessConfig( Map delegate ) {
Expand Down Expand Up @@ -180,6 +187,10 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
return this
}

ProcessConfig setLegacySettings( Map map ) {
this.legacySettings = map
return this
}

/**
* Enable special behavior to allow the configuration object
Expand Down Expand Up @@ -307,24 +318,20 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
* ```
*
* @param configDirectives
* A map object modelling the setting defined defined by the user in the nextflow configuration file *
* A map object modelling the setting defined defined by the user in the nextflow configuration file
* @param category
* The type of annotation either {@code withLabel:} or {@code withName:}
* @param processLabel
* A specific label representing the object holding the configuration setting to apply
*/
@PackageScope
void applyConfigForLabel( Map<String,?> configDirectives, String category, String processLabel ) {
protected void applyConfigSelector(Map<String,?> configDirectives, String category, String target ) {
assert category in ['withLabel:','withName:']
assert processName != null
assert processLabel != null

for( String rule : configDirectives.keySet() ) {
if( !rule.startsWith(category) )
continue
final isLabel = category=='withLabel:'
final pattern = rule.substring(category.size()).trim()
final target = isLabel ? processLabel : processName
if( !matchesSelector(target, pattern) )
continue

Expand All @@ -334,7 +341,7 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
applyConfigSettings(settings)
}
else if( settings != null ) {
throw new ConfigParseException("Unknown config settings for label `$processLabel` -- settings=$settings ")
throw new ConfigParseException("Unknown config settings for ${isLabel?"process":'process with name'}: $target -- settings=$settings ")
}
}
}
Expand All @@ -346,14 +353,53 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
return Pattern.compile(pattern).matcher(target).matches() ^ isNegated
}

/**
* Apply the process configuration provided in the nextflow configuration file
* to the process instance
*
* @param configProcessScope The process configuration settings specified
* in the configuration file as {@link Map} object
* @param simpleName The process name
*/
void applyConfig(Map configProcessScope, String simpleName, String fullyQualifiedName=null) {
// -- Apply the directives defined in the config object using the`withLabel:` syntax
final processLabels = this.getLabels() ?: ['']
for( String lbl : processLabels ) {
this.applyConfigSelector(configProcessScope, "withLabel:", lbl)
}

// -- apply setting defined in the config file using the process simple name (ie. w/o execution scope)
this.applyConfigSelector(configProcessScope, "withName:", simpleName)

// -- apply setting defined in the config file using the process qualified name (ie. with the execution scope)
if( fullyQualifiedName && fullyQualifiedName!=simpleName ) {
this.applyConfigSelector(configProcessScope, "withName:", fullyQualifiedName)
}

// -- Apply process specific setting defined using `process.$name` syntax
// NOTE: this is deprecated and will be removed
if( legacySettings ) {
this.applyConfigSettings(legacySettings)
}

// -- Apply defaults
this.applyConfigDefaults(configProcessScope)

// -- check for conflicting settings
if( this.scratch && this.stageInMode == 'rellink' ) {
log.warn("Directives `scratch` and `stageInMode=rellink` conflict each other -- Enforcing default stageInMode for process `$simpleName`")
this.remove('stageInMode')
}
}


/**
* Apply the settings defined in the configuration file to the actual process configuration object
*
* @param settings
* A map object modelling the setting defined defined by the user in the nextflow configuration file
*/
@PackageScope
void applyConfigSettings(Map<String,?> settings) {
protected void applyConfigSettings(Map<String,?> settings) {
if( !settings )
return

Expand Down Expand Up @@ -392,8 +438,7 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
* the same config setting).
*
*/
@PackageScope
void applyConfigDefaults( Map processDefaults ) {
protected void applyConfigDefaults( Map processDefaults ) {
for( String key : processDefaults.keySet() ) {
if( key == 'params' )
continue
Expand Down
90 changes: 57 additions & 33 deletions modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy
Expand Up @@ -18,6 +18,7 @@ package nextflow.script

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Const
import nextflow.Global
import nextflow.Session
import nextflow.exception.ScriptRuntimeException
Expand All @@ -39,37 +40,59 @@ class ProcessDef extends BindableDef implements ChainableDef {

private Session session = Global.session as Session

/**
* The script owning this process
*/
private BaseScript owner

private String name
/**
* Fully qualified process name ie. it may contain nested workflow execution scopes
*/
private String processName

private ProcessConfig processConfig

private BodyDef taskBody
/**
* Simple process names ie. the name used to declared or import it w/o the execution scope
* This name is used to resolve the configuration
*/
private String simpleName

/**
* The closure holding the process definition body
*/
private Closure<BodyDef> rawBody

private Object output
/**
* The resolved process configuration
*/
private transient ProcessConfig processConfig

/**
* The actual process implementation
*/
private transient BodyDef taskBody

/**
* The result of the process execution
*/
private transient ChannelOut output

ProcessDef(BaseScript owner, Closure<BodyDef> body, String name ) {
this.owner = owner
this.rawBody = body
this.name = name
this.simpleName = name
this.processName = name
}

@Deprecated
ProcessDef(BaseScript owner, String name, ProcessConfig config, BodyDef body) {
this.owner = owner
this.name = name
this.taskBody = body
this.processConfig = config
static String stripScope(String str) {
str.split(Const.SCOPE_SEP).last()
}

protected void config() {
log.trace "Process config > $name"

protected void initialize() {
log.trace "Process config > $processName"
assert processConfig==null

// the config object
processConfig = new ProcessConfig(owner).setProcessName(name)
processConfig = new ProcessConfig(owner,processName)

// Invoke the code block which will return the script closure to the executed.
// As side effect will set all the property declarations in the 'taskConfig' object.
Expand All @@ -83,22 +106,22 @@ class ProcessDef extends BindableDef implements ChainableDef {
throw new ScriptRuntimeException("Missing script in the specified process block -- make sure it terminates with the script string to be executed")

// apply config settings to the process
ProcessFactory.applyConfig(name, session.config, processConfig)
processConfig.applyConfig((Map)session.config.process, simpleName, processName)
}

@Override
ProcessDef clone() {
def result = (ProcessDef)super.clone()
result.@taskBody = taskBody?.clone()
result.@processConfig = processConfig?.clone()
result.@rawBody = (Closure)rawBody?.clone()
return result
}

@Override
ProcessDef cloneWithName(String name) {
def result = clone()
result.@name = name
result.@processName = name
result.@simpleName = stripScope(name)
return result
}

Expand All @@ -108,36 +131,37 @@ class ProcessDef extends BindableDef implements ChainableDef {

BaseScript getOwner() { owner }

String getName() { name }
String getName() { processName }

String getSimpleName() { simpleName }

ProcessConfig getProcessConfig() { processConfig }

@Deprecated
def getOutput() {
ChannelOut getOutput() {
log.warn1 "Property `output` has been deprecated use `${name}.out` instead"
output
return output
}

def getOut() { output }
ChannelOut getOut() { output }

String getType() { 'process' }

Object invoke_a(Object[] args) {
if( processConfig==null )
config()
super.invoke_a(args)
}

private String missMatchErrMessage(String name, int expected, int actual) {
final ch = expected > 1 ? "channels" : "channel"
return "Process `$name` declares ${expected} input ${ch} but ${actual} were specified"
}

@Override
Object run(Object[] args) {
final params = ChannelOut.spread(args)
// initialise process config
initialize()

// get params
final params = ChannelOut.spread(args)
// sanity check
if( params.size() != declaredInputs.size() )
throw new ScriptRuntimeException(missMatchErrMessage(name, declaredInputs.size(), params.size()))
throw new ScriptRuntimeException(missMatchErrMessage(processName, declaredInputs.size(), params.size()))

// set input channels
for( int i=0; i<params.size(); i++ ) {
Expand All @@ -163,12 +187,12 @@ class ProcessDef extends BindableDef implements ChainableDef {
// create the executor
final executor = session
.executorFactory
.getExecutor(name, processConfig, taskBody, session)
.getExecutor(processName, processConfig, taskBody, session)

// create processor class
session
.newProcessFactory(owner)
.newTaskProcessor(name, executor, processConfig, taskBody)
.newTaskProcessor(processName, executor, processConfig, taskBody)
.run()

// the result channels
Expand Down

0 comments on commit ccd8e83

Please sign in to comment.