Skip to content

Commit

Permalink
Merged changes required by issue #59
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Jul 25, 2015
2 parents 0844a2b + c18f568 commit ccc5562
Show file tree
Hide file tree
Showing 17 changed files with 284 additions and 947 deletions.
112 changes: 2 additions & 110 deletions docs/process.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ a script that is executed by it. A basic process looks like the following exampl


more specifically a process may contain five definition blocks, respectively: directives,
inputs, outputs, shares and finally the process script. The syntax is defined as follows:
inputs, outputs, when clause and finally the process script. The syntax is defined as follows:

::

Expand Down Expand Up @@ -51,7 +51,7 @@ Script
The `script` block is a string statement that defines the command that is executed by the process to carry out its task.

A process contains one and only one script block, and it must be the last statement when the process contains
input, output and share declarations.
input and output declarations.

The entered string is executed as a `BASH <http://en.wikipedia.org/wiki/Bash_(Unix_shell)>`_ script in the
`host` system. It can be any command, script or combination of them, that you would normally use in terminal shell
Expand Down Expand Up @@ -1057,114 +1057,6 @@ It is useful to enable/disable the process execution depending the state of vari

}

Shares
=======

.. warning:: This feature has been deprecated and will be removed in future releases

Share declarations are a special type of process parameter that can act as an `input` and `output` parameter at the same time.

The share block is declared by using the syntax shown below::

share:
<share qualifier> <parameter name> [from <source channel>] [into <target channel>] [attributes]


A share definition begins with the share `qualifier` followed by the parameter `name`. Optionally the keyword ``from``
can be used to specify the channel over which data is received, and the keyword ``into``, followed by a channel name,
can be used to specify the channel where the produced data has to be sent.

Share parameters accept only the qualifiers listed in the following table:

=========== =============
Qualifier Semantic
=========== =============
val Lets you access the input value in the process script and/or to send it over the output channel.
file Lets you handle the input value as a file, staging it properly in the execution context and/or send it as result over the output channel.
=========== =============


Share parameters have some important differences compared to input or output parameters:

* A share parameter can only receive a single input value, which is bound in the script evaluation context before the process first execution.
* If a share parameter declares an output channel it emits exactly one value, after the process last execution.
* It allows you to `share` the parameter's value across multiple process executions.
* Whenever a share parameter is declared, the process is executed serially, instead of in a parallel manner.


Share generic values
---------------------

The share ``val`` qualifier allows you to declare a parameter whose value can be accessed,
in the script context, across multiple executions of the same process. For example::

process printCount {
input:
val cheers from 'Bonjour', 'Ciao', 'Hello', 'Hola'

share:
val count from 1 into result

script:
count += count
"echo $cheers world! "

}


result.subscribe { println "Result = $it" }

Will output::

Result = 16


This example shows how the `state` of the ``count`` variable is maintained throughout the process executions, and
how on process termination the final value is sent over the channel declared by the ``into`` keyword.

Some caveats about shared value parameters:

* The ``from`` declaration can be used to initialise the parameter by specifying a variable defined in the
pipeline script or by using a `literal` value (as in the above example).

* When the ``from`` declaration is omitted, the parameter is initialised to the variable's value in the script scope
having the same name as the parameter.

* When a variable with the parameter's name doesn't exist in the script scope and no ``from`` is specified,
the parameter is initialised to ``null``.



Share file
------------

The share ``file`` qualifier allows you to declare a parameter that shares its state using a file. For example::

process saveHello {
input:
val cheers from 'Bonjour', 'Ciao', 'Hello', 'Hola'

share:
file greetings into result

"echo '$cheers' >> $greetings "

}

result.subscribe { println it.text }


It will print::

Bonjour
Ciao
Hello
Hola


This example shows how the file content is shared through the process executions. When the process
completes, the file is sent over the channel declared as output.


.. _process-directives:

Expand Down
36 changes: 1 addition & 35 deletions src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,6 @@ public class NextflowDSLImpl implements ASTTransformation {
}
break

case 'share':
if( stm instanceof ExpressionStatement ) {
convertShareMethod( stm.getExpression() )
}
break

case 'exec':
iterator.remove()
execStatements << stm
Expand Down Expand Up @@ -475,32 +469,6 @@ public class NextflowDSLImpl implements ASTTransformation {
}


/*
* handle *shared* parameters
*/

protected void convertShareMethod( Expression expression ) {
log.trace "convert > shared expression: $expression"

if( expression instanceof MethodCallExpression ) {
def methodCall = expression as MethodCallExpression
def methodName = methodCall.getMethodAsString()
def nested = methodCall.objectExpression instanceof MethodCallExpression
log.trace "convert > shared method: $methodName"

if( methodName in ['from','file','val','into','mode'] ) {
if( !nested )
methodCall.setMethod( new ConstantExpression( '_share_' + methodName ) )
fixMethodCall(methodCall)
}

if( methodCall.objectExpression instanceof MethodCallExpression ) {
convertShareMethod(methodCall.objectExpression)
}
}
}


protected void convertOutputMethod( Expression expression ) {
log.trace "convert > output expression: $expression"

Expand Down Expand Up @@ -563,7 +531,7 @@ public class NextflowDSLImpl implements ASTTransformation {
protected Expression varToStr( Expression expr ) {
if( expr instanceof VariableExpression ) {
def name = ((VariableExpression) expr).getName()
return new ConstantExpression(name)
return newObj( TokenVar, new ConstantExpression(name) )
}

if( expr instanceof TupleExpression ) {
Expand Down Expand Up @@ -640,8 +608,6 @@ public class NextflowDSLImpl implements ASTTransformation {

}



// -- TupleExpression or ArgumentListExpression
if( expr instanceof TupleExpression ) {
def i = 0
Expand Down
31 changes: 2 additions & 29 deletions src/main/groovy/nextflow/processor/ParallelTaskProcessor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ import groovyx.gpars.dataflow.operator.DataflowOperator
import groovyx.gpars.dataflow.operator.DataflowProcessor
import groovyx.gpars.dataflow.operator.PoisonPill
import nextflow.Channel
import nextflow.file.FileHolder
import nextflow.script.EachInParam
import nextflow.script.FileSharedParam
import nextflow.script.SharedParam
/**
* Defines the parallel tasks execution logic
*
Expand All @@ -56,7 +53,6 @@ class ParallelTaskProcessor extends TaskProcessor {

// append the shared obj to the input list
def allScalar = config.getInputs().allScalarInputs()
def sharedCount = config.getInputs().count { it instanceof SharedParam }

/*
* check if there are some iterators declaration
Expand Down Expand Up @@ -106,16 +102,10 @@ class ParallelTaskProcessor extends TaskProcessor {
/*
* define the max forks attribute:
* - by default the process execution is parallel using the poolSize value
* - when there is at least one shared variable it is executed in serial mode (maxForks==1) to guarantee thread safe access
* - otherwise use the value defined by the user via 'taskConfig'
*/
def maxForks = session.poolSize
if( sharedCount ) {
log.debug "Process declares shared inputs -- Using thread safe mode (maxForks=1)"
maxForks = 1
blocking = true
}
else if( config.maxForks ) {
if( config.maxForks ) {
maxForks = config.maxForks
blocking = true
}
Expand Down Expand Up @@ -278,24 +268,7 @@ class ParallelTaskProcessor extends TaskProcessor {

@Override
public void afterStop(final DataflowProcessor processor) {
log.debug "<${name}> After stop -- shareObjs ${ParallelTaskProcessor.this.sharedObjs}"

// bind shared outputs
ParallelTaskProcessor.this.sharedObjs?.each { param, obj ->

if( !param.outChannel )
return

log.trace "Binding shared out param: ${param.name} = ${obj}"
if( param instanceof FileSharedParam ) {
if( obj instanceof Collection )
obj = obj[0]
if( obj instanceof FileHolder )
obj = obj.storePath
}

param.outChannel.bind( obj )
}
log.debug "<${name}> After stop"
}

/**
Expand Down
19 changes: 0 additions & 19 deletions src/main/groovy/nextflow/processor/ProcessConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,16 @@ import nextflow.script.EachInParam
import nextflow.script.EnvInParam
import nextflow.script.FileInParam
import nextflow.script.FileOutParam
import nextflow.script.FileSharedParam
import nextflow.script.InParam
import nextflow.script.InputsList
import nextflow.script.OutParam
import nextflow.script.OutputsList
import nextflow.script.SetInParam
import nextflow.script.SetOutParam
import nextflow.script.SharedParam
import nextflow.script.StdInParam
import nextflow.script.StdOutParam
import nextflow.script.ValueInParam
import nextflow.script.ValueOutParam
import nextflow.script.ValueSharedParam
import nextflow.util.ReadOnlyMap
/**
* Holds the process configuration properties
Expand Down Expand Up @@ -186,10 +183,6 @@ class ProcessConfig implements Map<String,Object> {
outputs
}

@Deprecated
List getSharedDefs () {
configProperties.inputs.findAll { it instanceof SharedParam }
}

/*
* note: without this method definition {@link BaseScript#echo} will be invoked
Expand Down Expand Up @@ -254,18 +247,6 @@ class ProcessConfig implements Map<String,Object> {
result
}

/// shared parameters

@Deprecated
SharedParam _share_val( def obj ) {
new ValueSharedParam(this).bind(obj) as SharedParam
}

@Deprecated
SharedParam _share_file( def obj ) {
new FileSharedParam(this).bind(obj) as SharedParam
}


/**
* Defines a special *dummy* input parameter, when no inputs are
Expand Down
2 changes: 1 addition & 1 deletion src/main/groovy/nextflow/processor/TaskContext.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class TaskContext implements Map<String,Object> {
return script.getBinding().getVariable(property.toString())
}

throw new MissingPropertyException("Unknown variable '$property' -- Make sure you didn't misspell it or define somewhere in the script before use it")
throw new MissingPropertyException("Unknown variable '$property' -- Make sure you didn't misspell it or define somewhere in the script before use it", property as String, null)
}

Object invokeMethod(String name, Object args) {
Expand Down

0 comments on commit ccc5562

Please sign in to comment.