Skip to content

Commit

Permalink
Add fork operator #1281
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Aug 21, 2019
1 parent 8af8518 commit 6f89833
Show file tree
Hide file tree
Showing 11 changed files with 542 additions and 33 deletions.
74 changes: 71 additions & 3 deletions docs/operator.rst
Expand Up @@ -1561,6 +1561,7 @@ The forking operators are:

* `branch`_
* `choice`_
* `fork`_
* `into`_
* `separate`_
* `tap`_
Expand All @@ -1573,7 +1574,7 @@ branch
.. warning:: This is an experimental operator. Syntax and behavior may change.
Required version `19.08.0-edge` or later.

The ``choice`` operator allows you to forward the items emitted by a source channel to one
The ``branch`` operator allows you to forward the items emitted by a source channel to one
or more output channels, `choosing` one out of them at a time.

The selection criteria is defined by specifying a :ref:`closure <script-closure>` that provides
Expand Down Expand Up @@ -1679,6 +1680,73 @@ the others into ``queue2``

See also `branch`_ operator.

.. _operator-fork:

fork
----

.. warning:: This is an experimental operator. Syntax and behavior may change.
Required version `19.08.0-edge` or later.

The ``fork`` operator allows you to forward the items emitted by a source channel to two
or more output channels assigning each of them a separate value.

The forking criteria is defined by specifying a :ref:`closure <script-closure>` that specify the
target channels labelled by a unique identifier followed by an expression statement that
evaluates the value to be assigned to such channel.

For example::

Channel
.from(1,2,3,4)
.fork { it ->
foo: it + 1
bar: it * it
}
.set { result }

result.foo.view { "foo $it" }
result.far.view { "bar $it" }

It shows::

foo 2
foo 3
foo 4
bar 1
bar 4
bar 9


.. tip:: The statement expression can be omitted when the value to be emitted is the same as
the following one. If you need just need to forward the same value to multiple channel
you can use the following the shorthand notation showed below.

::

Channel
.from(1,2,3)
.fork { it -> foo: bar: it }
.set { result }

As before creates two channels, however both of them receive the same source items.


.. warning:: The fork evaluation closure must be specified inline, ie. it *cannot* be assigned to a
variable and passed as argument to the operator, how it can be done with other operators.

To create a fork criteria as variable that can be passed as an argument to more than one
``fork`` operator use the ``forkCriteria`` built-in method as shown below::

def criteria = forkCriteria {
small: it < 10
large: it > 10
}

Channel.from(1,2,30).fork(criteria).set { ch1 }
Channel.from(10,20,1).fork(criteria).set { ch2 }


.. _operator-into:

into
Expand Down Expand Up @@ -1781,7 +1849,7 @@ See also `into`_ and `separate`_ operators.
separate
--------

.. warning:: The `separate` operator has been deprecated. Use `branch`_ instead.
.. warning:: The `separate` operator has been deprecated. Use `fork`_ instead.

The ``separate`` operator lets you copy the items emitted by the source channel into multiple
channels, which each of these can receive a `separate` version of the same item.
Expand Down Expand Up @@ -1866,7 +1934,7 @@ The output will look like the following fragment::



See also: `branch`, `into`_, `choice`_ and `map`_ operators.
See also: `fork`_, `into`_, `choice`_ and `map`_ operators.


Maths operators
Expand Down
24 changes: 20 additions & 4 deletions modules/nextflow/src/main/groovy/nextflow/Nextflow.groovy
Expand Up @@ -24,8 +24,8 @@ import java.nio.file.Path
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowVariable
import nextflow.ast.BranchXform
import nextflow.ast.BranchXformImpl
import nextflow.ast.OpXform
import nextflow.ast.OpXformImpl
import nextflow.exception.ProcessUnrecoverableException
import nextflow.exception.StopSplitIterationException
import nextflow.extension.CH
Expand All @@ -35,6 +35,7 @@ import nextflow.file.FileHelper
import nextflow.file.FilePatternSplitter
import nextflow.mail.Mailer
import nextflow.script.TokenBranchDef
import nextflow.script.TokenForkDef
import nextflow.splitter.FastaSplitter
import nextflow.splitter.FastqSplitter
import nextflow.util.ArrayTuple
Expand Down Expand Up @@ -413,14 +414,29 @@ class Nextflow {
* Marker method to create a closure to be passed to {@link OperatorEx#branch(DataflowReadChannel, groovy.lang.Closure)}
* operator.
*
* Despite apparently is doing nothing, this method is needed as marked to apply the {@link BranchXform} AST
* Despite apparently is doing nothing, this method is needed as marker to apply the {@link OpXform} AST
* transformation required to interpret the closure content as required for the branch evaluation.
*
* @see OperatorEx#branch(DataflowReadChannel, Closure)
* @see BranchXformImpl
* @see OpXformImpl
*
* @param closure
* @return
*/
static Closure<TokenBranchDef> branchCriteria(Closure<TokenBranchDef> closure) { closure }

/**
* Marker method to create a closure to be passed to {@link OperatorEx#fork(DataflowReadChannel, Closure)}
* operator.
*
* Despite apparently is doing nothing, this method is needed as marker to apply the {@link OpXform} AST
* transformation required to interpret the closure content as required for the branch evaluation.
*
* @see OperatorEx#fork(DataflowReadChannel, Closure)
* @see OpXformImpl
*
* @param closure
* @return
*/
static Closure<TokenForkDef> forkCriteria(Closure<TokenBranchDef> closure) { closure }
}
Expand Up @@ -24,6 +24,7 @@ import org.codehaus.groovy.ast.expr.BinaryExpression
import org.codehaus.groovy.ast.expr.ClosureExpression
import org.codehaus.groovy.ast.expr.ConstantExpression
import org.codehaus.groovy.ast.expr.ConstructorCallExpression
import org.codehaus.groovy.ast.expr.DeclarationExpression
import org.codehaus.groovy.ast.expr.Expression
import org.codehaus.groovy.ast.expr.MapExpression
import org.codehaus.groovy.ast.expr.MethodCallExpression
Expand All @@ -33,6 +34,7 @@ import org.codehaus.groovy.ast.stmt.BlockStatement
import org.codehaus.groovy.ast.stmt.ExpressionStatement
import org.codehaus.groovy.ast.stmt.ReturnStatement
import org.codehaus.groovy.ast.stmt.Statement
import org.codehaus.groovy.ast.tools.GeneralUtils
import org.codehaus.groovy.control.SourceUnit
import org.codehaus.groovy.syntax.SyntaxException
import org.codehaus.groovy.syntax.Types
Expand Down Expand Up @@ -86,6 +88,10 @@ class ASTHelpers {
return new ConstructorCallExpression(type,args)
}

static Expression declX(Expression left, Expression right) {
new DeclarationExpression(left, GeneralUtils.ASSIGN, right)
}

static MethodCallExpression isMethodCallX(Expression expr) {
return expr instanceof MethodCallExpression ? expr : null
}
Expand Down Expand Up @@ -137,4 +143,5 @@ class ASTHelpers {
static ReturnStatement isReturnS(Statement stmt) {
stmt instanceof ReturnStatement ? stmt : null
}

}
Expand Up @@ -24,9 +24,9 @@ import java.lang.annotation.Target
import org.codehaus.groovy.transform.GroovyASTTransformationClass

/**
* Marker interface which to apply AST transformation to {@code process} declaration
* Declares Nextflow operators AST xforms
*/
@Retention(RetentionPolicy.SOURCE)
@Target(ElementType.METHOD)
@GroovyASTTransformationClass(classes = [BranchXformImpl])
@interface BranchXform {}
@GroovyASTTransformationClass(classes = [OpXformImpl])
@interface OpXform {}

0 comments on commit 6f89833

Please sign in to comment.