Tutorial 8. Subworkflows

tmcphillips edited this page Feb 15, 2013 · 4 revisions

###Reusing a workflow as an actor

A key feature of any programming framework is support for reusing code within one program to minimize redundancy, and for sharing code across different programs to minimize the work of writing new programs. So far, the chief unit of code reuse we have seen in RestFlow is the actor. Multiple nodes in one workflow can refer to the same actor definition, and if the actor is defined in a file separate from that declaring the nodes in the workflow, then the actor can be employed by nodes in other workflows as well. But what if you want to reuse larger pieces of one workflow in another? This can be achieved easily in RestFlow, because a workflow can serve as an actor in another workflow.

###Getting data into and out of workflows via portals

The key to using one workflow as an actor in another is arranging for data to flow into and out of the subworkflow. Two special node types are provided for this purpose, InPortal and OutPortal. Portals can be thought of as one-way gateways through which data passes into or out of workflows. An InPortal serves to route one or more data flows into the subworkflow, and an OutPortal serves to route dataflows out of a subworkflow and back into the containing workflow. Here is a simple workflow complete with InPortal and OutPortal that can be employed as an actor in another workflow:

 
imports:

  - classpath:/common/types.yaml
  - classpath:/common/groovy/actors.yaml
  - classpath:/common/directors.yaml
  
components:

- id: Incrementer
  type: Workflow
  properties:
    director: !ref DataDrivenDirector
    nodes:
      - !ref InputValueAndIncrement
      - !ref IncrementInputValue
      - !ref RenderInputs
      - !ref OutputIncrementedValueAndIncrement
    inputs:
      value: 
        default: 0 
      increment:
        default: 1
    outputs:
      incrementedValue: 
      appliedIncrement:
        
- id: InputValueAndIncrement
  type: InPortal
  properties:
    outflows: 
      value: /inputValue/
      increment: /inputIncrement/

- id: IncrementInputValue
  type: GroovyActorNode
  properties: 
    stepsOnce: true
    actor.step: |
      sum = a + b
    inflows:
      a: /inputValue/
      b: /inputIncrement/
    outflows:
      sum: /incrementedValue/   

- id: RenderInputs
  type: GroovyActorNode
  properties:
    actor.step: 
      println "Incrementer received value=" + value + " and increment=" + increment
    inflows:
      value: /inputValue/
      increment: /inputIncrement/

- id: OutputIncrementedValueAndIncrement
  type: OutPortal
  properties:
    inflows: 
      incrementedValue: /incrementedValue/
      appliedIncrement: /inputIncrement/

Note the highlighted components named InputValueAndIncrement and OutputIncrementedValueAndIncrement. These components are of types InPortal and OutPortal, respectively. The former component serves to route two data items into the workflow. These input data items are named value and increment. The outflows property on the InPortal makes these data available to nodes within this workflow on the /inputValue/ and /inputIncrement/ data flows. Note that these outflows are for routing data to other nodes in this workflow, not to bind to data flows outside the workflow. From the point of view of the components in the subworkflow, data flows out of the InPortal, which is why InPortal declares outflows. Similarly, the inflows on the OutPortal bind to the /incrementedValue/ and /inputIncrement/ flows produced within this workflow. OutputIncrementedValueAndIncrement captures the data arriving on these flows and routes them out of the workflow. You cannot see what flows outside this workflow are connnected to it and serve to provide data to the InPortal or receive data from the OutPortal. This is what makes it possible to reuse this workflow as a subworkflow of many different containing workflows without modification.

###Running the workflow in standalone mode

The function of this Incrementer workflow is to increment value by increment and output both the incremented value and the increment that was applied. The RenderInputs node serves to print the received value and step.

Note that the Workflow component, named Incrementer, differs from Workflow components we have seen previously in that it has inputs and outputs properties much like an actor. This is because the workflow as a whole is an actor. And like any other actor, default values for its inputs can be provided as they are here. These default values are those that are used if the workflow is run in standalone mode, i.e. not as a sub-workflow of another workflow that can provide these values.

Save the Incrementer workflow above as incrementer1.yaml and run it. You will see the following output:

$ restflow -f incrementer1.yaml
Incrementer received value=0 and increment=1
$

Any workflow designed to be used as a subworkflow can be run standalone as long as default values are provided for every input.

###Running the workflow in subworkflow mode

Now we will write a workflow that uses the Incrementer workflow above as an actor. Save the following as nestedworkflow1.yaml:

imports:

  - classpath:/common/types.yaml
  - classpath:/common/groovy/actors.yaml
  - classpath:/common/directors.yaml
  - workspace:/incrementer1.yaml

components:

  - id: GenerateIntegerSequence
    type: Node
    properties:   
      actor: !ref IntegerSequenceGenerator
      endFlowOnNoOutput: true
      constants:
        initial: 1
        step: 1
        max: 5
      outflows:
        value: /sequence/

  - id: IncrementByDefaultIncrement
    type: Node
    properties: 
      nestedUriPrefix: /Increment{STEP}
      actor: !ref Incrementer
      inflows:
        value: /sequence/
      outflows:
        incrementedValue: /onceIncrementedSequence/
        appliedIncrement: /firstAppliedIncrement/

  - id: RenderIncrementedIntegers
    type: Node
    properties: 
      actor: !ref PrintStreamWriter
      constants:
        name: Once incremented values
        outputImmediately: false
        outputAtWrapup: true
      inflows:
        message: /onceIncrementedSequence/

  - id: RenderFirstIncrement
    type: Node
    properties: 
      actor: !ref PrintStreamWriter
      constants:
        name: First increment values
        outputImmediately: false
        outputAtWrapup: true
      inflows:
        message: /firstAppliedIncrement/

  - id: NestedWorkflow
    type: Workflow
    properties:
      director: !ref MTDataDrivenDirector
      nodes:
        - !ref GenerateIntegerSequence
        - !ref IncrementByDefaultIncrement
        - !ref RenderIncrementedIntegers
        - !ref RenderFirstIncrement

Examining the lines highlighted in bold above, note that nestedworkflow1.yaml imports incrementer1.yaml, and that the IncrementByDefaultIncrement node employs the Incrementer workflow defined in incrementer1.yaml as an actor. If you try running nestedworkflow1.yaml the way you have executed previous workflows you will see the following error message:

$ restflow -f nestedworkflow1.yaml 
Must specify one of the following workflows: [Incrementer, NestedWorkflow]
$ 

Because nestedworkflow1.yaml imports incrementer1.yaml, RestFlow finds two components of type Workflow. You must indicate using the -w option which workflow to run. If you request RestFlow to run Incrementer, then the imported workflow will be run in standalone mode:

$ restflow -f nestedworkflow1.yaml -w Incrementer
Incrementer received value=0 and increment=1
$

On the other hand, if you specify NestedWorkflow, then you will see the following:

$ restflow -f nestedworkflow1.yaml -w NestedWorkflow
Incrementer received value=1 and increment=1
Incrementer received value=2 and increment=1
Incrementer received value=3 and increment=1
Incrementer received value=4 and increment=1
Incrementer received value=5 and increment=1
*** Once incremented values ***
2
3
4
5
6
*** First increment values ***
1
1
1
1
1
$

Now NestedWorkflow is being run. The GenerateIntegerSequence node uses the IntegerSequenceGenerator to produce the first five counting numbers (1, 2, 3, 4, and 5). The IncrementByDefaultIncrement node uses Incrementer as an actor to increment by 1 (the default increment) the values generated by GenerateIntegerSequence. RenderIncrementedIntegers and RenderFirstIncrement send to the terminal the five incremented values and the five increments applied by IncrementByDefaultIncrement, respectively. Both of these nodes use the PrintStreamWriter actor. PrintStreamWriter buffers incoming data and then prints all of the accumulated data at the end of the workflow run (during wrapup) if the actor input outputAtWrapup is set to true. It outputs a heading (e.g., "*** First incremented values ***") based on the value of its name input before dumping its data to the terminal. The result is that when more than one node in a workflow uses PrintStreamWriter, the terminal output from each is separated from the others by these headings.

###Using multiple instances of the same subworkflow

Now modify the nested workflow to use two instances of Incrementer. Insert the following two nodes into nestedworkflow1.yaml:

  - id: IncrementByFive
    type: Node
    properties: 
      nestedUriPrefix: /IncByFive{STEP}
      actor: !ref Incrementer
      constants:
        increment: 5
      inflows:
        value: /onceIncrementedSequence/
      outflows:
        incrementedValue: /twiceIncrementedSequence/

  - id: RenderTwiceIncrementedIntegers
    type: Node
    properties: 
      actor: !ref PrintStreamWriter
      constants:
        name: Twice incremented values
        outputImmediately: false
        outputAtWrapup: true
      inflows:
        message: /twiceIncrementedSequence/

Then add references to these to the list of nodes for NestedWorkflow:

  - id: NestedWorkflow
    type: Workflow
    properties:
      director: !ref MTDataDrivenDirector
      nodes:
        - !ref GenerateIntegerSequence
        - !ref IncrementByDefaultIncrement
        - !ref RenderIncrementedIntegers
        - !ref RenderFirstIncrement
        - !ref IncrementByFive
        - !ref RenderTwiceIncrementedIntegers

Save the new workflow as nestedworkflow2.yaml and run it:

$ restflow -f nestedworkflow2.yaml -w NestedWorkflow
Incrementer received value=1 and increment=1
Incrementer received value=2 and increment=1
Incrementer received value=2 and increment=5
Incrementer received value=3 and increment=1
Incrementer received value=3 and increment=5
Incrementer received value=4 and increment=1
Incrementer received value=4 and increment=5
Incrementer received value=5 and increment=1
Incrementer received value=5 and increment=5
Incrementer received value=6 and increment=5
*** Once incremented values ***
2
3
4
5
6
*** First increment values ***
1
1
1
1
1
*** Twice incremented values ***
7
8
9
10
11
$

The new node IncrementByFive takes the products of IncrementByDefaultIncrement and increments each by 5. NestedWorkflow now employs the Increment workflow as the actor for two different nodes that are configured differently.

###Understanding nested workflow behavior

The behavior of a nested workflow becomes clearer when trace dumps are enabled. Run the workflow again and add the -t option to the command (as explained in Tutorial 4. Workflow Traces), and run the workflow again. Following the output shown above, you will now see:

*** Node step counts ***
NestedWorkflow: 1
NestedWorkflow.GenerateIntegerSequence: 6
NestedWorkflow.IncrementByDefaultIncrement: 5
NestedWorkflow.IncrementByDefaultIncrement.IncrementInputValue: 5
NestedWorkflow.IncrementByDefaultIncrement.InputValueAndIncrement: 5
NestedWorkflow.IncrementByDefaultIncrement.OutputIncrementedValueAndIncrement: 5
NestedWorkflow.IncrementByDefaultIncrement.RenderInputs: 5
NestedWorkflow.IncrementByFive: 5
NestedWorkflow.IncrementByFive.IncrementInputValue: 5
NestedWorkflow.IncrementByFive.InputValueAndIncrement: 5
NestedWorkflow.IncrementByFive.OutputIncrementedValueAndIncrement: 5
NestedWorkflow.IncrementByFive.RenderInputs: 5
NestedWorkflow.RenderFirstIncrement: 5
NestedWorkflow.RenderIncrementedIntegers: 5
NestedWorkflow.RenderTwiceIncrementedIntegers: 5
*** Published resources ***
/IncByDefault1/incrementedValue: 2
/IncByDefault1/inputIncrement: 1
/IncByDefault1/inputValue: 1
/IncByDefault2/incrementedValue: 3
/IncByDefault2/inputIncrement: 1
/IncByDefault2/inputValue: 2
/IncByDefault3/incrementedValue: 4
/IncByDefault3/inputIncrement: 1
/IncByDefault3/inputValue: 3
/IncByDefault4/incrementedValue: 5
/IncByDefault4/inputIncrement: 1
/IncByDefault4/inputValue: 4
/IncByDefault5/incrementedValue: 6
/IncByDefault5/inputIncrement: 1
/IncByDefault5/inputValue: 5
/IncByFive1/incrementedValue: 7
/IncByFive1/inputIncrement: 5
/IncByFive1/inputValue: 2
/IncByFive2/incrementedValue: 8
/IncByFive2/inputIncrement: 5
/IncByFive2/inputValue: 3
/IncByFive3/incrementedValue: 9
/IncByFive3/inputIncrement: 5
/IncByFive3/inputValue: 4
/IncByFive4/incrementedValue: 10
/IncByFive4/inputIncrement: 5
/IncByFive4/inputValue: 5
/IncByFive5/incrementedValue: 11
/IncByFive5/inputIncrement: 5
/IncByFive5/inputValue: 6
/firstAppliedIncrement/1: 1
/firstAppliedIncrement/2: 1
/firstAppliedIncrement/3: 1
/firstAppliedIncrement/4: 1
/firstAppliedIncrement/5: 1
/onceIncrementedSequence/1: 2
/onceIncrementedSequence/2: 3
/onceIncrementedSequence/3: 4
/onceIncrementedSequence/4: 5
/onceIncrementedSequence/5: 6
/sequence/1: 1
/sequence/2: 2
/sequence/3: 3
/sequence/4: 4
/sequence/5: 5
/twiceIncrementedSequence/1: 7
/twiceIncrementedSequence/2: 8
/twiceIncrementedSequence/3: 9
/twiceIncrementedSequence/4: 10
/twiceIncrementedSequence/5: 11

Look first at the step counts. In bold you will see that the nodes employing the Incrementer subworkflow each step Incrementer step 5 times. Now look at the nodes listed immediately following NestedWorkflow.IncrementByDefaultIncrement in the list of step counts:

NestedWorkflow.IncrementByDefaultIncrement.IncrementInputValue: 5
NestedWorkflow.IncrementByDefaultIncrement.InputValueAndIncrement: 5
NestedWorkflow.IncrementByDefaultIncrement.OutputIncrementedValueAndIncrement: 5
NestedWorkflow.IncrementByDefaultIncrement.RenderInputs: 5

These correspond to the nodes inside the Incrementer subworkflow referred to by the IncrementByDefaultIncrement node. Note that dots are used to indicate workflow nesteding.

These step counts illustrate clearly how subworkflows operate in RestFlow. The nodes using subworkflows for actors step the subworkflows like any other actor. Each time a node steps a subworkflow, it runs the subworkflow and waits for the run of the subworkflow to complete. So, here, whenever NestedWorkflow is run once, then Incrementer subworkflow is run ten times, i.e. five times for each of the two nodes that use it as an actor. The step counts listed for a run of NestedWorkflow summarize the activities of actors at all levels of nesting in the workflow during a single run of NestedWorkflow.

Similarly, the published resources for a single run of NestedWorkflow includes all data items sent by nodes to outflows during that run. The outflows of nodes in nested workflows are qualified by the IDs of the containing nodes. For example:

/IncByFive1/incrementedValue: 7

###Subworkflows and directors

All workflows must specify a director. Because a single stepping of a subworkflow serving as an actor corresponds to one complete run of the subworkflow, any director may be used in a subworkflow; it does not matter what director is used for the containing workflow. In the examples above, NestedWorkflow uses MTDataDrivenDirector, while Incrementer uses DataDrivenDirector. This flexibility yields a number of significant benefits. First, for any subworkflow the most appropriate director can be selected without regard to the director employed by the containing workflow(s). This means that nothing needs to be sacrificed when nesting workflows. It also means that a subworkflow can used by many different containing workflows without modification. Finally, this flexibility allows large workflows to be partitioned into subworkflows that use different directors. The requirement that, say, three particular nodes in some workflow must never run concurrently, does not mean that a multithreaded director cannot be used for the remainder of the workflow. The fragment of the workflow containing the concurrency-senstive nodes can be placed in a subworkflow with a single-threaded director, while a multithreaded director is used for the overall workflow and other subworkflows.

###Namespaces: avoiding component id collisions

RestFlow generally does not care how you distribute components between files. You are free to put all of your node declarations and actor definitions in one big file. You can even put multiple Workflow components in one file, and use the second argument to the RestFlow run command to specify which workflow in the file to run. However, it is often much more convenient to organize your components in different files.

The important thing to keep in mind when organizing components in multiple files is that the identity of a component is defined by its id. It does not matter what file the component is in. So the downside of this flexibility is that it would not be difficult to give two components in different files the same id. For example, the incrementer1.yaml file defines a component named RenderInputs. If nestedworkflow2.yaml (which imports incrementer1.yaml) also defines a component named RenderInputs, then RestFlow will report an error. All components loaded into RestFlow during a particular workflow run must have unique identities regardless of the files they are defined in.

Such id collisions can be avoided by placing components into namespaces. Here is an alternate definition of the Incrementer subworkflow, incrementer2.yaml, that uses namespaces to ensure that workflows using it as a subworkflow need not care what component IDs are used within incrementer2.yaml:

namespace:

  organization: ssrl
  module: adders

imports:

- classpath:/common/types.yaml
- actors:actors.yaml
- classpath:/common/directors.yaml

components:

- id: Incrementer
  type: Workflow
  properties:
    director: !ref DataDrivenDirector
    nodes:
      - !ref ssrl.adders.InputValueAndIncrement
      - !ref ssrl.adders.IncrementInputValue
      - !ref ssrl.adders.RenderInputs
      - !ref ssrl.adders.OutputIncrementedValueAndIncrement
    inputs:
      value: 
        default: 0 
      increment:
        default: 1
    outputs:
      incrementedValue: 
      appliedIncrement:
      
- id: InputValueAndIncrement
  type: InPortal
  properties:
    outflows: 
      value: /inputValue/
      increment: /inputIncrement/

- id: IncrementInputValue
  type: GroovyActorNode
  properties: 
    stepsOnce: true
    actor.step: |
      sum = a + b
    inflows:
      a: /inputValue/
      b: /inputIncrement/
    outflows:
      sum: /incrementedValue/   

- id: RenderInputs
  type: GroovyActorNode
  properties:
    actor.step: 
      println "Incrementer received value=" + value + " and increment=" + increment
    inflows:
      value: /inputValue/
      increment: /inputIncrement/

- id: OutputIncrementedValueAndIncrement
  type: OutPortal
  properties:
    inflows: 
      incrementedValue: /incrementedValue/
      appliedIncrement: /inputIncrement/

The namespace section declares that all components in this file are part of the adders module maintained by the ssrl organization. The module portion of a namespace declaration allows you to associate your components with meaningful names. The organization portion prevents collisions of module names between organizations. For example both SSRL and UCD can publish their own (and different) adders modules as long as they choose different organization names for their namespaces (e.g., ssrl and ucd).

References (using the !ref operator) to the components in incrementer2.yaml now must prefix the ids of these components with the organization and module associated with the namespace. NestedWorkflow can be updated accordingly by making changes such as the one highlighted below for the IncrementByDefaultIncrement component:

  - id: IncrementByDefaultIncrement
    type: Node
    properties:
      nestedUriPrefix: /IncByDefault{STEP}
      actor: !ref ssrl.adders.Incrementer
      inflows:
        value: /sequence/
      outflows:
        incrementedValue: /onceIncrementedSequence/
        appliedIncrement: /firstAppliedIncrement/

You may have noticed that the IDs of the nodes listed for Incrementer in incrementer2.yaml also are qualified in this way. If this seems too wordy, you may use the !lref (local reference) operator instead of !ref to refer to a component defined in the same namespace:

- id: Incrementer
  type: Workflow
  properties:
    director: !ref DataDrivenDirector
    nodes:
      - !lref InputValueAndIncrement
      - !lref IncrementInputValue
      - !lref RenderInputs
      - !lref OutputIncrementedValueAndIncrement
    inputs:
      value: 
        default: 0 
      increment:
        default: 1
    outputs:
      incrementedValue: 
      appliedIncrement:

As you develop your own library of actors you use in multiple workflows (and especially if you share them with others), you likely will find it convenient to organize your actors in distinct modules and namespaces.

You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.