# Hello DALiuGE: Unnecessarily Complicated

### Nicholas Pritchard 

## Introduction

This notebook introduces the essential components of the [DALiuGE](https://github.com/ICRAR/daliuge) framework, presents several trivial example workflows with the ultimate goal of demonstrating what reproducible data processing practically involves.

1. We outline some basics about [DALiuGE](https://github.com/ICRAR/daliuge) and DROP-based execution.
2. Several trivial 'Hello World' example workflows are illustrated using raw DROPS
3. These are converted to launchable Logical Graph Templates 
4. We introduce and illustrate several methodologies to reproduce various workflow behaviours.

## DALiuGE and DROPs

The Data Activate Liu (Flow) Graph Engine is a workflow graph based execution framework in development for eventual deployment with the Science Data Processor (SDP) for the Square Kilometre Array (SKA).

A DROP is the abstraction for workflow component which encoumpasses both data and computing elements. 

DROPs can:
- Produce input for another DROP
- Consume the output of another DROP

Once deployed by DALiuGE a DROP fires events describing its runtime state. Producers signal to their consumers when their data is ready. Once all of a DROP's producers have finished it begins execution; this is the main mechanism driving workflow execution. 

The goal is to rigorously design and test DROPs for re-use by Astronomers. The detail of how an entire workflow graph (Logical Graph Template) is instantiated is beyond the scope of this notebook.

#### Our DROPs

We draw your attention to four types of DROPs used in this notebook
- File
  - Represents a literal file in a directory.
  - A file DROP acting as a producer is analogous to reading
  - A file DROP acting as a consumer is analogous to writing
- InMemory
  - Represents a datastore in memory.
  - Functions similar to a file but only exists during runtime
- BashShell
  - Executes a 'command' argument in a bash-shell
  - Can consume or produce 'streaming' input or output which is processed line by line 
- PythonScript
  - Executes a simple Python function similar to a bash script.
  
Eventually, we will see how to specify a new DROP type.

## Hello World - Everyone's First Example

A 'hello world' program is the canonical first example used to demonstrate a new programming language or execution environment.

The goal is to of course output 'Hello World' in some form. We extend this to a 'Hello [S]' example where [S] is an arbitrary string of characters.

Instead of outputting to a screen, we output data to a File DROP.

We now proceed with several ways to achieve this trivial task using DROPs.

First, we setup pre-amble

In [33]:
import os
import time

from dlg.apps.bash_shell_app import StreamingInputBashApp, StreamingOutputBashApp
from dlg.apps.pyfunc import PyFuncApp
from dlg.ddap_protocol import DROPStates
from dlg.drop import FileDROP, InMemoryDROP, BarrierAppDROP
from dlg.droputils import allDropContents
from six import BytesIO
import pickle

### Hello World
This example writes 'Hello World' to a file directly.

In [6]:
output_fname = os.getcwd() + '/result.out'

# Initialize our Drops
a = FileDROP('a', 'a', filepath=output_fname)

# Link Drops together

# Execute and wait 
a.write(b"Hello world")
a.setCompleted()

In [7]:
# Inspect Results
print(a)
print(a.status)
print(a.status == DROPStates.COMPLETED)

# Check the file was written correctly
print(allDropContents(a))

<FileDROP oid=a, uid=a>
2
True
b'Hello world'


### Hello World Bash
A slightly more convoluted setup where a bash script echoes 'Hello World'. DALiuGE then writes this output to file.

In [8]:
output_fname = os.getcwd() + '/result.out'

# Initialize our Drops
a = StreamingOutputBashApp('a', 'a', command=r"echo -en 'Hello world'")
b = InMemoryDROP('b', 'b')
c = StreamingInputBashApp('c', 'c', command="cat > %o0")
d = FileDROP('d', 'd', filepath=output_fname)

# Link Drops together
a.addOutput(b)
c.addStreamingInput(b)
c.addOutput(d)

# Execute and wait 
a.async_execute()

In [9]:
# Inspect Results
for drop in (a, b, c, d):
    print(drop)
    print(drop.status)
    print(drop.status == DROPStates.COMPLETED)

# Check the file was written correctly
print(allDropContents(d))

<StreamingOutputBashApp oid=a, uid=a>
2
True
<InMemoryDROP oid=b, uid=b>
2
True
<StreamingInputBashApp oid=c, uid=c>
2
True
<FileDROP oid=d, uid=d>
2
True
b'Hello world'


### Hello World Python
This example achieves identical functionality to the previous example but using a python function. 

In [34]:
output_fname = os.getcwd() + '/result6.out'


def hello_world():
    return "Hello World"

fname = '__main__.hello_world'

# Initialize our Drops
a = PyFuncApp('a', 'a', func_name=fname)
b = FileDROP('b', 'b', filepath=output_fname)

# Link Drops together
a.addOutput(b)

# Execute and wait 
a.async_execute()

In [35]:
# Inspect Results
for drop in (a, b):
    print(drop)
    print(drop.status)
    print(drop.status == DROPStates.COMPLETED)

# Check the file was written correctly
print(pickle.loads(allDropContents(b)))

<PyFuncApp oid=a, uid=a>
2
True
<FileDROP oid=b, uid=b>
2
True
Hello World


### Hello S Bash
The first 'dynamic' example where a bash script echoes a string as streaming output, a second script captures this output as a streaming input, prepends each line with 'Hello' and echoes this new text. DALiuGE finally writes this result to file. 

In [36]:
output_fname = os.getcwd() + '/result3.out'

# Initialize our drops
a = StreamingOutputBashApp('a', 'a', command=r"echo -en 'world'")
b = InMemoryDROP('b', 'b')
c = StreamingInputBashApp('c', 'c', command="echo Hello $(cat) > %o0")
d = FileDROP('d', 'd', filepath=output_fname)

# Link Drops together
a.addOutput(b)
c.addStreamingInput(b)
c.addOutput(d)

# Execute and wait
a.async_execute()

In [37]:
# Inspect Results
for drop in (a, b, c, d):
    print(drop)
    print(drop.status)
    print(drop.status == DROPStates.COMPLETED)

# Check the file was written correctly
print(allDropContents(d))

<StreamingOutputBashApp oid=a, uid=a>
2
True
<InMemoryDROP oid=b, uid=b>
2
True
<StreamingInputBashApp oid=c, uid=c>
2
True
<FileDROP oid=d, uid=d>
2
True
b'Hello world\n'


### Hello S Python
This example achieves identical functionality to the previous example but uses a python function. Notice that we write data using the pickle submodule. 

In [39]:
output_fname = os.getcwd() + '/result.out'

def hello_world(s='Everybody'):
    return "Hello " + s

fname = '__main__.hello_world'

# Initialize our Drops
a = InMemoryDROP('a', 'a')
b = PyFuncApp('b', 'b', func_name=fname)
c = FileDROP('c', 'c', filepath=output_fname)

# Link Drops together
b.addInput(a)
b.addOutput(c)

# Execute and wait (HACK)
a.write(pickle.dumps("World"))
a.setCompleted()

In [40]:
# Inspect Results
for drop in (a, b, c):
    print(drop)
    print(drop.status)
    print(drop.status == DROPStates.COMPLETED)

# Check the file was written correctly
print(pickle.loads(allDropContents(c)))

<InMemoryDROP oid=a, uid=a>
2
True
<PyFuncApp oid=b, uid=b>
2
True
<FileDROP oid=c, uid=c>
2
True
Hello World


### Hello S Drop
We now see how a simple DROP can be defined.

This example features a new `PrependResult` DROP which extends a `BarrierApp` DROP. All drops need to implement a data URL method, initialisation method (allowing you to add new arguments, in our case, the 'prefix' argument) and a run method.

In [22]:
class PrependResult(BarrierAppDROP):
    def dataURL(self):
        pass

    def initialize(self, **kwargs):
        super(PrependResult, self).initialize(**kwargs)
        self.prefix = bytes(kwargs['prefix'], 'utf-8')

    def run(self):
        curr_drop = self.inputs[0]
        output = self.outputs[0]
        all_lines = BytesIO(allDropContents(curr_drop)).readlines()
        for line in all_lines:
            output.write(self.prefix + line)

In [23]:
output_fname = os.getcwd() + '/result.out'
# Initialize our drops
a = InMemoryDROP('a', 'a')
b = PrependResult('b', 'b', prefix='Hello ')
c = FileDROP('c', 'c', filepath=output_fname)

# Link Drops together
a.addConsumer(b)
b.addOutput(c)

# Execute and wait (HACK)
a.write(b"world")
a.setCompleted()

In [24]:
# Inspect Results
for drop in (a, b, c):
    print(drop)
    print(drop.status)
    print(drop.status == DROPStates.COMPLETED)

# Check the file was written correctly
print(allDropContents(c))

<InMemoryDROP oid=a, uid=a>
2
True
<PrependResult oid=b, uid=b>
2
True
<FileDROP oid=c, uid=c>
2
True
b'Hello world'


So far all these examples execute DROPs natively and locally.

To leverage remote and likely expansive compute resources DALiuGE is designed to accept workflow specifications at various stages of specificity in the following methodology:
1. Astronomers design a Logical Graph Template (LGT) specifying a workflow at the highest abstraction level.
  - Comprised of DROPs and Control Components describing how parallelism is to be exploited (GROUP BY, SCATTER/GATHER etc.)
  - Should be the only major astronomer intervention before runtime
  - Each subsequent step occurs within the DALiuGE install. 
2. `Fill`
  - Input: A Logical Graph Template (LGT) file
  - Output: A Logical Graph (LG)
  - Parameterises a graph template with specific design details such as SCATTER axes or other run-specific details. Indended to facilitate LGT reuse with multiple telescope surveys
  - Occurs months before execution
3. `Unroll`
  - Input: A Logical Graph
  - Output: A Physical Graph Template (PGT)
  - Converts a logical graph to a fully atomic PGT comprised of all DROPs and all relationships.
  - Group controls such as loops are 'unrolled' into a fully verbose DAG workflow.
  - Occurs weeks before execution
4. `Partition`
  - Input: A Physical Graph Template (PGT), Machine specification, Algorithm Choice
  - Output: A Physical Graph Template (PGT)
  - DROPs are grouped by resource requirements. This is where parallelism is introduced into the execution. Partitions can be launched on separate machines simultaneously.
  - The resulting partitioned PGT can vary greatly depending on the algorithm used and target machine. We expect a given PGT will undergo several different partitioning rounds to establish one that is most suitable
  - Occurs weeks before execution.
  
5. `Map`
  - Input: A Partitioned Physical Graph Template (PGT), Machine specification
  - Output: A Physical Graph (PG)
  - Provides exact IP addresses and machine information for all DROPS. The resulting Physical Graph specifies a computation exactly.
  - Occurs minutes before execution
6. `Submit`
  - Input: A Physical Graph (PG)
  - Output: Data artefacts and/or Reproducibility File 
  - DALiuGE instantiates the drops specified in the PG and orchestrates the successive firing of DROPs until exeuction completes

## Launchable Templates

## Reproducible Data Processing

### Rerun

### Repeat

### Reproduce

### Replicate