Tutorial 7. Choosing a Director

tmcphillips edited this page Feb 15, 2013 · 3 revisions

The final type of component required to implement a workflow is the director. Every Workflow component specifies, typically by reference, the director to associate with the workflow. For example, the hello1.yaml workflow employs the MTDataDrivenDirector:

- id: HelloWorld
  type: Workflow
  properties:
    director: !ref MTDataDrivenDirector
    nodes:
    - !ref CreateGreeting
    - !ref RenderGreeting

What role does a director play in a workflow? As you might expect, the director is responsible for directing the execution of the workflow. In particular, the director schedules workflow steps and dictates how data flows between steps during the workflow run.

###Models of computation

One consequence of specifying workflows declaratively is that the workflow definition itself does not specify the order in which nodes are to be stepped when the workflow is run. All you provide is a list of nodes (that either inline or refer to corresponding actors) and the inflows and outflows for each. That is, you define the relationship between nodes in a workflow implicitly in terms of dataflow dependencies. The advantage of this approach is that the schedule of steps can depend on the nature and quantity of the data provided to the workflow. Depending on the data sets provided to a workflow, actors may step different numbers of times and in different orders. A workflow specified in this manner need not be adapted specifically to handle each input data set.

So the director of a workflow is in charge of determining an appropriate order in which actors should step given the inflow and outflow expressions. But why require each workflow to refer to a director? Why not build the algorithms for scheduling steps into the RestFlow framework itself? The answer is that there is a multitude of possible algorithms and approaches for interpreting a workflow specification, scheduling steps, and managing the flow of data between nodes. Each distinct approach for interpreting and running workflows is called a Model of Computation (MoC). A director is an implementation of a MoC, and there is no single MoC that is appropriate under all circumstances. Furthermore, more than one director may implement the same MoC, but still differ in ways that matter to the workflow designer.

In contrast to RestFlow, many workflow systems employ implicit models of computation. These systems each support only one approach to scheduling steps in a workflow. Assumptions about the supported MoC typically are scattered throughout the code implementing these systems, with the result that adding support for new MoCs or different implementations of these MoCs to such systems can be extremely difficult. With RestFlow, it is possible to develop new directors implementing completely different MoCs than those currently provided with RestFlow.

###The Data-Driven Model of Computation

Before comparing the capabilities of the various directors provided with RestFlow, it is useful to understand the approach to scheduling steps taken by one of the most generally useful MoCs. This data-driven model of computation is sophisticated but straightforward to understand, and RestFlow provides two implementations of it (DataDrivenDirector and MTDataDrivenDirector) as described in the next section. Here we describe the general characteristics of the the data-driven MoC:

Communication via FIFO queues. In RestFlow, data flows between nodes with matching inflow and outflow expressions. When using directors based on the data-driven MoC, data arrives at the inflow of a node in the order it left the outflow of the upstream node that sent the data. In addition, a sender of data does not need to wait for downstream nodes to receive and use the data before performing additional computations and potentially sending more data to downstream nodes. Data is queued at the inflows of downstream actors in a First-In-First-Out, or FIFO, manner. As a result, upstream actors may step many times before the actors downstream step even once.

Dynamic scheduling based on input data availability. MoCs in some other workflows systems pre-compute the schedule for steps. That is, based on the specification given, these systems determine the order in which the steps in the workflow should be carried out prior to running the workflow. Often the assumption is that each actor will step one time only; in other cases the schedule is computed from declarations nodes make about how much data each actor consumes and produces each time it steps. In contrast, directors implementing RestFlow's data-driven MoC do not pre-compute the schedule for stepping nodes. Rather, an actor is stepped when the director detects that sufficient data has arrived at a node's inflows. As a result, workflows employing the data-driven MoC can handle different quantities of data provided to the workflow as a whole. Moreover, actors are allowed to produce variable quantities of data during different steps of the same actor as needed. This allows workflows to contain conditional control-flow (if-then-else), indefinite loop (do-while), and filtering constructs not readily supported by MoCs that employ static schedules.

Optional inputs and dynamic stepping logic. Support for indefinite loops requires not just dynamic scheduling of nodes, but also support for optional inputs. Directors implementing the data-driven MoC allows actors to declare inputs as optional and detect when an upstream actor will never produce additional data. This enables actors to, for example, accept two incoming dataflows and operate over those flows even when the quantify of data arriving on the flows is different. An example is the IntegerStreamMerger actor discussed in Chapter 9. In addition, actors can indicate dynamically whether they require additional data on particular inputs before stepping again. This allows actors to implement algorithms that respond to the values of incoming data items. This capability, too, is illustrated by the IntegerStreamMerger actor.

Workflows stop automatically. One danger of employing a MoC that is dynamically scheduled is that it can be a challenge for the director to know when the workflow run is complete, i.e. when to stop, particularly when actors accept optional inputs. RestFlow's data-driven model of computation makes it easy for the director to know when a workflow has completed. Nodes in a workflow are repeatedly triggered during a run. When a round of triggerings fails to lead to additional data appearing on outflows, the workflow is assumed to be complete, and the director terminates the workflow.

###Workflow for comparing the data-driven directors

A workflow specification written to employ a particular director will run correctly using any other director implementing the same MoC. However, certain properties and behaviors not specified by the MoC may differ. Indeed, it is these differences that are key to choosing a particular director implementing a MoC. For example, RestFlow provides two different implementations of the data-driven MoC, DataDrivenDirector and MTDataDrivenDirector. To see one difference between these implementations, write a workflow similar to the one below and save it as timedworkflow1.yaml:

imports:

- classpath:/common/directors.yaml
- classpath:/common/types.yaml

components: 

- id: TimedWorkflow
  type: Workflow
  properties:
    director: !ref DataDrivenDirector
    nodes:
    - !ref StartNode
    - !ref LeftBranchNode
    - !ref RightBranchNode
    - !ref EndNode
    
- id: StartNode
  type: GroovyActorNode
  properties:  
    actor.step: |
      output = input
    constants:
      input: 5
    outflows:
      output: /start/ 
        
- id: LeftBranchNode
  type: GroovyActorNode
  properties:  
    actor.step: |
      output = input
    inflows:
      input: /start/
    outflows:
      output: /left/ 
      
- id: RightBranchNode
  type: GroovyActorNode
  properties:  
    actor.step: |
      output = input
    inflows:
      input: /start/
    outflows:
      output: /right/ 

- id: EndNode
  type: GroovyActorNode
  properties:
    actor.step: |
      println "Left value  = " + left      
      println "Right value = " + right
    inflows:
      left: /left/
      right: /right/

This workflow comprises four groovy actor nodes, and each node declaration includes the actor code implementing the steps the node represents. The StartNode outputs a number (5) to the /start/ dataflow. TheLeftNode and RightNode both receive this value from their inflows and output the received value on the /left/ and /right/ dataflows, respectively. The EndNode receives the values from /left/ and /right/ and displays these values to the terminal. The output of this workflow is not surprising:

$ restflow -f timedworkflow1.yaml
Left value = 5
Right value = 5
$

Note that TimedWorkflow employs the DataDrivenDirector. Try changing the director specified to MTDataDrivenDirectorand rerunning the workflow. You should see no difference in the results or behavior of the workflow. As an aside, note that embedding the actor code for each step in the workflow nodes themselves (using the GroovyActorNode type) is reasonable at this point, given the simplicity of the steps.

###Observing task parallelism

Now create a variant of the workflow with the content below and save it as timedworkflow2.yaml. Note that the director is once again DataDrivenDirector.

imports:

- classpath:/common/directors.yaml
- classpath:/common/types.yaml

components: 

- id: TimedWorkflow
  type: Workflow
  properties:
    director: !ref DataDrivenDirector
    nodes:
    - !ref StartNode
    - !ref LeftBranchNode
    - !ref RightBranchNode
    - !ref EndNode
    
- id: StartNode
  type: GroovyActorNode
  properties:  
    actor.step: |
      println "StartNode starting to step"
      Thread.sleep 1000
      output = input
      println "StartNode finished stepping"
    constants:
      input: 5
    outflows:
      output: /start/ 
        
- id: LeftBranchNode
  type: GroovyActorNode
  properties:  
    actor.step: |
      println "LeftBranchNode starting to step"
      Thread.sleep 2000
      output = input
      println "LeftBranchNode finished stepping"
    inflows:
      input: /start/
    outflows:
      output: /left/ 
      
- id: RightBranchNode
  type: GroovyActorNode
  properties:  
    actor.step: |
      println "RightBranchNode starting to step"
      Thread.sleep 2000
      output = input
      println "RightBranchNode finished stepping"
    inflows:
      input: /start/
    outflows:
      output: /right/ 

- id: EndNode
  type: GroovyActorNode
  properties:
    actor.initialize: |
      println "EndNode initializing" 
      startTime = new Date()
    actor.step: |
      println "EndNode starting to step"
      println "Left value  = " + left      
      println "Right value = " + right
      Thread.sleep 1000
      println "EndNode finished stepping."
    actor.dispose: |
      println "EndNode wrapping up."
      endTime = new Date()
      duration = (endTime.time - startTime.time) / 1000.0
      System.err.println "Duration = " + duration + " seconds"
    actor.state:
      startTime:
    inflows:
      left: /left/
      right: /right/

The Thread.sleep statements cause the system to pause the execution of the step for the indicated number of milliseconds, with the result that the StartNode and EndNode steps take 1 second each, and the LeftBranchNode and RightBranchNode steps take 2 seconds each. The println statements allow you to monitor the sequence of events that occur during the workflow run. Finally the actor.initialize and actor.wrapup properties of EndNode provide code that the actor embedded in EndNode executes at the beginning of the workflow run (before any actor steps), and at the end of the run (after all actors have stopped stepping), respectively. These code blocks here serve to measure the duration of the workflow run as a whole.

Running this workflow should result in something similar to the following:

$ restflow -f timedworkflow2.yaml
EndNode initializing
StartNode starting to step
StartNode finished stepping
LeftBranchNode starting to step
LeftBranchNode finished stepping
RightBranchNode starting to step
RightBranchNode finished stepping
EndNode starting to step
Left value = 5
Right value = 5
EndNode finished stepping
EndNode wrapping up
Duration = 6.02 seconds
$

Running the workflow again should produce the same results, except that the duration in seconds may vary slightly from run to run. Looking at the output it is clear that steppings of different nodes do not overlap in time. StartNode begins to step, and then finishes a second later. LeftBranchNode begins to step, and completes 2 seconds later. RightBranchNode steps next, and then EndNode. Before any actor steps, EndNode initializes. EndNode wraps up after all actors have stepped. The total duration of the run is just over 6 seconds, the total of the Thread.sleep arguments in the actor steps.

Now change the director referred to by TimerWorkflow from DataDrivenDirector to MTDataDrivenDirector, and rerun the workflow a number of times. You will see something like the following:

$ restflow -f timedworkflow2.yaml
EndNode initializing
StartNode starting to step
StartNode finished stepping
RightBranchNode starting to step
LeftBranchNode starting to step
RightBranchNode finished stepping
LeftBranchNode finished stepping
EndNode starting to step
Left value = 5
Right value = 5
EndNode finished stepping.
EndNode wrapping up.
Duration = 4.02 seconds
$

The first thing you probably notice is that the workflow now takes roughly 4 seconds to run, rather than 6. What else has changed? Looking carefully at the terminal output you will see that after StartNode steps, both RightBranchNode and LeftBranchNode begin to step at the same time, and then finish at roughly the same time (increase the sleep time in each actor if the terminal output is to fast to observe). EndNode steps when LeftBranchNode and RightBranchNode have both finished stepping. So LeftBranchNode and RightBranchNode, each of which takes 2 seconds to step, do so simultaneously. This is why the workflow as a whole takes 4 seconds to run rather than 6 as before.

MTDataDrivenDirector is a multithreaded implementation of the data-driven model of computation. MTDataDrivenDirector differs from DataDrivenDirector in that the former executes each workflow node in a separate thread of execution, whereas the latter employs a single thread to step all of the actors in the workflow. Where the data dependencies given by the inflow and outflow expressions logically allow for parallel stepping of actors, MTDataDrivenDirector enables these actors to step concurrently. Because LeftBranchNode and RightBranchNode are both ready to step as soon StartNode outputs its result, and neither LeftBranchNode nor RightBranchNode depend on the other for inputs, MTDataDrivenDirector enables both to step at nearly the same time, and without waiting for one to finish before starting the other.

The workflow specified by timedworkflow2.yaml illustrates task parallelism. LeftBranchNode and RightBranchNode represent two tasks that can take place independently and thus concurrently. MTDataDrivenDirector enables the workflow to run in this manner.

###Achieving pipeline parallelism

A special kind of parallelism occurs when actors in serially connected can step concurrently. This is called pipeline parallelism, because different steps along a single series, or pipeline, of nodes execute at the same time.

Before demonstrating the pipeline parallelism that MTDataDrivenDirector supports, refactor the timed workflow example to make it easier to make changes to it. Note that StartNode, LeftBranchNode, and RightBranchNode all implement what is essentially the same step. For example, the value of StartNode's actor.step property is:

  actor.step: |
    println "StartNode starting to step..."
    Thread.sleep 1000
    output = input
    println "StartNode finished stepping."

The name of the node in the println arguments and the value of the Thread.sleep argument are what differ between the actor.step property of StartNode from the steps defined for LeftBranchNode and RightBranchNode. This is an excellent opportunity to factor out the common features of these step definitions and define them in a configurable actor. Create a DelayedRelayer actor component in the same file containing the workflow nodes and update the nodes to reference it. Again change the director back to DataDrivenDirector. Save the new workflow as timedworkflow3.yaml.

imports:

- classpath:/common/directors.yaml
- classpath:/common/types.yaml

components: 

- id: TimedWorkflow
  type: Workflow
  properties:
    director: !ref DataDrivenDirector
    nodes:
    - !ref StartNode
    - !ref LeftBranchNode
    - !ref RightBranchNode
    - !ref EndNode
    
- id: StartNode
  type: Node
  properties:  
    actor: !ref DelayedRelayer
    constants:
      nodeName: StartNode
      delay: 1000
      input: 5
    outflows:
      output: /start/ 
        
- id: LeftBranchNode
  type: Node
  properties:  
    actor: !ref DelayedRelayer
    constants:
      nodeName: LeftBranchNode
      delay: 2000
    inflows:
      input: /start/
    outflows:
      output: /left/ 
      
- id: RightBranchNode
  type: Node
  properties:  
    actor: !ref DelayedRelayer
    constants:
      nodeName: RightBranchNode
      delay: 2000
    inflows:
      input: /start/
    outflows:
      output: /right/ 

- id: EndNode
  type: GroovyActorNode
  properties:
    actor.initialize: |
      println "EndNode initializing" 
      startTime = new Date()
    actor.step: |
      println "EndNode starting to step"
      println "Left value  = " + left      
      println "Right value = " + right
      Thread.sleep 1000
      println "EndNode finished stepping."
    actor.dispose: |
      println "EndNode wrapping up."
      endTime = new Date()
      duration = (endTime.time - startTime.time) / 1000.0
      System.err.println "Duration = " + duration + " seconds"
    actor.state:
      startTime:
    inflows:
      left: /left/
      right: /right/

- id: DelayedRelayer
  type: GroovyActor
  properties:
    step: |
      println nodeName + " starting to step"
      Thread.sleep delay
      output = input
      println nodeName + " finished stepping"

Running timedworkflow3.yaml should give the same result as before. However, this restructuring of the workflow will make it easier to apply changes to all three nodes that now employ DelayedRelayer. This is a typical refactoring pattern that you will employ again and again when designing workflows.

Modify the DelayedRelayer step definition to output the values the actor receives and to increment it before relaying to the next node:

- id: DelayedRelayer
  type: GroovyActor
  properties:
    step: |
      println nodeName + " starting to step"
      println nodeName + " received " + input
      Thread.sleep delay
      output = input + 1
      println nodeName + " finished stepping"

Save the new workflow as timedworkflow4.yaml and run it. You should see output similar to the following. Note that the workflow still takes about 6 seconds to run when using DataDrivenDirector, and 4 seconds when using MTDataDrivenDirector.

$ restflow -f timedworkflow4.yaml
EndNode initializing
StartNode starting to step
StartNode received 5
StartNode finished stepping
LeftBranchNode starting to step
LeftBranchNode received 6
LeftBranchNode finished stepping
RightBranchNode starting to step
RightBranchNode received 6
RightBranchNode finished stepping
EndNode starting to step
Left value = 7
Right value = 7
EndNode finished stepping.
EndNode wrapping up.
Duration = 6.035 seconds
$

To witness pipeline parallelism, configure StartNode to emit a sequence of three numbers rather than just one:

- id: StartNode
  type: Node
  properties:  
    actor: !ref DelayedRelayer
    constants:
      nodeName: StartNode
      delay: 1000
    sequences:
      input: 
      - 5
      - 10
      - 15
    outflows:
      output: /start/ 

Run the workflow first using DataDrivenDirector. You will see output similar to the following:

$ restflow -f timedworkflow5.yaml
EndNode initializing
StartNode starting to step
StartNode received 5
StartNode finished stepping
StartNode starting to step
StartNode received 10
StartNode finished stepping
LeftBranchNode starting to step
LeftBranchNode received 6
LeftBranchNode finished stepping
RightBranchNode starting to step
RightBranchNode received 6
RightBranchNode finished stepping
StartNode starting to step
StartNode received 15
StartNode finished stepping
LeftBranchNode starting to step
LeftBranchNode received 11
LeftBranchNode finished stepping
RightBranchNode starting to step
RightBranchNode received 11
RightBranchNode finished stepping
EndNode starting to step
Left value = 7
Right value = 7
EndNode finished stepping.
LeftBranchNode starting to step
LeftBranchNode received 16
LeftBranchNode finished stepping
RightBranchNode starting to step
RightBranchNode received 16
RightBranchNode finished stepping
EndNode starting to step
Left value = 12
Right value = 12
EndNode finished stepping.
EndNode starting to step
Left value = 17
Right value = 17
EndNode finished stepping.
EndNode wrapping up.
Duration = 18.047 seconds
$

EndNode receives pairs of values equaling 7, 12, and 17 as expected. The workflow takes roughly 18 seconds to run, three times the total delays in the four actors. Now run the workflow using the MTDataDrivenDirector:

$ restflow -f timedworkflow5.yaml
EndNode initializing
StartNode starting to step
StartNode received 5
StartNode finished stepping
StartNode starting to step
StartNode received 10
RightBranchNode starting to step
RightBranchNode received 6
LeftBranchNode starting to step
LeftBranchNode received 6
StartNode finished stepping
StartNode starting to step
StartNode received 15
StartNode finished stepping
RightBranchNode finished stepping
LeftBranchNode finished stepping
EndNode starting to step
Left value = 7
Right value = 7
LeftBranchNode starting to step
LeftBranchNode received 11
RightBranchNode starting to step
RightBranchNode received 11
EndNode finished stepping.
LeftBranchNode finished stepping
LeftBranchNode starting to step
LeftBranchNode received 16
RightBranchNode finished stepping
RightBranchNode starting to step
RightBranchNode received 16
EndNode starting to step
Left value = 12
Right value = 12
EndNode finished stepping.
LeftBranchNode finished stepping
RightBranchNode finished stepping
EndNode starting to step
Left value = 17
Right value = 17
EndNode finished stepping.
EndNode wrapping up.
Duration = 8.054 seconds
$

With the multithreaded director, the run takes just over 8 seconds, or 10 seconds less than with the single-threaded director. Perhaps even more impressive is the observation that the run takes only 4 seconds longer the mulithreaded run of the workflow in which StartNode emits just one value!

The time savings come from the fact that MTDataDrivenDirector detects that StartNode can start stepping a second time as soon as it as sent it's first output to LeftBranchNode and RightBranchNode. Similarly, the latter two nodes find new values on their inflows as soon they complete stepping the first time, and so each begin stepping a second time without delay as well, and so on. In short, the actors connected serially step concurrently when given sequences of inputs to work on. This is pipeline parallelism.

Note that the efficiency observed here is greater than would be realized when employing actors that actually carry out CPU-intensive computations rather than simply sleeping. Two actors that each perform 2 seconds of intense computation will take longer than a total of 2 seconds to execute concurrently if only one CPU core is available. However, as multi-core CPUs, computers with multiple CPUs, and clusters of computers all become more common, support for easily exploiting parallelism is becoming ever more critical. RestFlow makes it easy to take advantage of such resources.

###When to use a single-threaded director

At this point it may appear that MTDataDrivenDirector is superior to DataDrivenDirector in every way, and that the former director should be used exclusively. It is true that this director is perhaps the most generally applicable director included with RestFlow. However, there are two reasons why one might choose to use the single-threaded DataDrivenDirector under some circumstances.

First, the order in which actors step when using MTDataDrivenDirector is non-deterministic. Each time you run the workflows above using the multithreaded director you may see slight or even major variations in the order in which actors step. These variations do not affect the actual data produced by the workflows. (You can verify this by enabling trace dumps as described in Chapter 4). However, there are some circumstances when not just the outputs of a workflow run, but also the order of events that occur during a run should be reliably repeatable. One example is when a workflow is being tested. It is much easier to tell if a workflow is behaving differently when any change in the order of events can be taken as indication of a meaningful change in behavior. Workflow runs generally are easier to compare when a deterministic, single-threaded director such as DataDrivenDirector is employed.

Second, there are times when you most definitely do not want actors to step concurrently. An example is when the two actors both use a resource that can perform only one task at a time. Robots and scientific instruments such as X-ray detectors are examples of such resources that an actor might use. Enforcing exclusive access to these resources, i.e. preventing more than one actor from trying to use the resource at the same time, can be critical.

Does the need to enforce exclusive access to resources by actors mean that any workflow with this need cannot realize any of the performance advantages that the multithreaded director yields? The answer is no. Workflows can be structured to employ different directors for different parts of the workflow as described in the next chapter.

You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.