<a href="https://colab.research.google.com/github/rosafilgueira/Workflows_Seminar/blob/main/Intro_Tutorial_dispel4py_2_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# dispel4py Tutorial

In this notebook, we embark on an exploratory journey into the world of [dispel4py](https://github.com/StreamingFlow/d4py), a Python library designed to craft and execute complex data-intensive workflows. Aimed at both novices and seasoned developers, this guide offers a comprehensive introduction to setting up, writing, and running dispel4py workflows with a focus on its versatile capabilities to handle a myriad of computational tasks efficiently.

## Getting Started with dispel4py

Our journey begins with the installation of dispel4py, ensuring you have the necessary tools to dive into data processing. We present two distinct approaches to install dispel4py:

* Version 1: Direct installation using pip, suitable for quickly adding dispel4py and its stream-d4py companion to your Python environment.


* Version 2: For those preferring the latest versions or contributing to development, cloning from the GitHub repository and manual installation offers the cutting edge of dispel4py functionalities.

In [37]:
#Version 1:
#!pip install mpi4py
!pip install stream-d4py

# Version 2:
#!git clone https://github.com/StreamingFlow/d4py.git
#%cd d4py
#!pip install mpi4py
#!python setup.py install



### Functons to copy cells into files

We are going to use the functions bellow to copy the code that we have in cell text into files, so later we can run dispel4py using the command line and given it dispel4py workflows stored as files.

In [75]:
# some magic so we can copy "text cells" into file
from google.colab import _message

def write_cell_above_to_file(search_term, filename):
  cell = get_cell_above(search_term)
  code_block = get_cell_code_block(cell)
  with open(filename, 'w') as fp:
    fp.writelines(code_block)

def get_cell_above(search_term):
  # Load the notebook JSON.
  nb = _message.blocking_request('get_ipynb')

  # Search for current markdown cell (using search term)
  for i, cell in enumerate(nb['ipynb']['cells']):
    if search_term in ''.join(cell['source']):
      return nb['ipynb']['cells'][i - 1]

def get_cell_code_block(cell):
  # Get the code block in previous cell
  cell_lines = cell['source']
  code_block = []
  in_block = False
  for ln in cell_lines:
    if '```' in ln:
      in_block = not in_block  # boolean switch
    else:
      if in_block:
        code_block.append(ln)
  return code_block


## Testing dispel4py

As we delve deeper, we transition from installation to execution, showcasing how dispel4py simplifies the process of defining and running data-driven workflows.


Our exploration covers various "mappings" - execution strategies that dispel4py employs to adapt workflows for different computing environments, ranging from sequential processing on a single core to distributed computing across multiple nodes, demonstrating dispel4py's flexibility and scalability. Here we are going to test two mappings:
  * simple: sequential execution
  * multi: parallel (shared memory) execution

In [39]:
## Simple mapping
!dispel4py simple dispel4py.examples.graph_testing.word_count -i 10

Processing 10 iterations.
RUN ARGS: 
Namespace(target='simple', module='dispel4py.examples.graph_testing.word_count', attr=None, file=None, data=None, iter=10, provenance=None)
Inputs: {'RandomWordProducer0': 10}
SimplePE: Processed 1 iteration.
Outputs: {'WordCounter1': {'output': [['Computing', 1], ['Modelling', 1], ['Modelling', 2], ['Modelling', 3], ['Analysis', 1], ['Seismology', 1], ['Earthquake', 1], ['Infrastructure', 1], ['Seismology', 2], ['Modelling', 4]]}}
ELAPSED TIME: 0.0008900165557861328


In [40]:
## Multi mapping
!dispel4py multi dispel4py.examples.graph_testing.word_count -i 10 -n 4

Processing 10 iterations.
RUN ARGS: 
Namespace(target='multi', module='dispel4py.examples.graph_testing.word_count', attr=None, file=None, data=None, iter=10, provenance=None, simple=False, num=4)
Processes: {'RandomWordProducer0': range(0, 1), 'WordCounter1': range(1, 4)}
RandomWordProducer0 (rank 0): Processed 10 iterations.
WordCounter1 (rank 1): Processed 6 iterations.
WordCounter1 (rank 2): Processed 3 iterations.
WordCounter1 (rank 3): Processed 1 iteration.
ELAPSED TIME: 0.03533649444580078


## How to create my first dispel4py workflow

This tutorial is an introduction to dispel4py. We will see how to write dispel4py PEs, how to connect them together to form a workflow and how this workflow is executed in different environments.
How to write a PE

In this section we are going to implement our first PE.

First you need to decide what kind of processing the PE will do and what the data units are that it processes. In our example we are implementing a PE that decides if a number is divisible by another number. The PE is configured with this divisor and for each input data item it tests whether the number can be divided by this divisor. It sends the input data item to its output stream if it is not divisible.
Create a PE class

To start with we create a PE that does only very few things

### Create a PE class

To start with we create a PE that does only very few things:


In [41]:
from dispel4py.base import IterativePE

class MyFirstPE(IterativePE):

    def __init__(self, divisor):
        IterativePE.__init__(self)
        self.divisor = divisor

In this case we extend the base class `dispel4py.base.IterativePE` which defines one input and one output, which is exactly what we need. We pass the divisor as an initialisation parameter to the object which stores it.

### Implement the processing method

Now the actual work begins: We have to implement the processing method. This is done by overriding the method of the superclass:

In [42]:
def _process(self, data):
    None

We fill in the processing commands, in our case this means that we test if the input data item is divisible by our divisor, and return it if it is not divisible:

In [43]:
def _process(self, data):
    if not data % self.divisor:
        return data

The data returned by _process is written to the output stream of the PE.

That’s it! Our first PE is complete

In [44]:
from dispel4py.base import IterativePE

class MyFirstPE(IterativePE):

    def __init__(self, divisor):
        IterativePE.__init__(self)
        self.divisor = divisor
    def _process(self, data):
        #print("divisor is %s, data is %s" %(self.divisor, data))
        if not data % self.divisor == 0:
            print("This: %s,  is not divisible by divisor the %s" %(data,self.divisor))
            return data


#### Summary of our first PE

Here's a breakdown of what MyFirstPE does:

* __init__(self, divisor): This is the initializer method that gets called when an instance of MyFirstPE is created. It initializes the parent IterativePE class and sets up a divisor attribute, which is used to determine if the incoming data should be processed or not.

* _process(self, data): This method is called for each data item in the stream. The data item is passed to this method as the data argument.
  * Inside _process, it checks if the data item is not divisible by the divisor attribute (if not data % self.divisor == 0:). If the condition is true (meaning the data item is not a multiple of the divisor), it prints the data and then returns it.

The PE essentially filters and outputs data items that are not divisible by a specified number. Items that are divisible by this number are effectively ignored, and do not pass through this processing element.

Note that the print statement is commented out, but if uncommented, it would print a message each time a non-divisible item is processed

### Create a simple workflow

In this section we are going to create a workflow, using the PE that we implemented in the previous section. There’s a useful PE in the library of dispel4py PEs that just produces a sequence of numbers.

We can connect this number producer to our PE which is initialised with the divisor 3 in this example:


In [45]:
from dispel4py.workflow_graph import WorkflowGraph
from dispel4py.examples.graph_testing.testing_PEs import TestProducer

producer = TestProducer()
divide = MyFirstPE(3)

graph = WorkflowGraph()
graph.connect(producer, 'output', divide, 'input')

This workflow produces integers and tests whether they are divisible by 3. Any numbers that are not divisible by 3 will be written to the unconnected output stream of our first PE.

### Execute the workflow

To run this workflow you can use the sequential simple processor:


In [46]:
from dispel4py.new.simple_process import process as simple_process
simple_process(graph, {producer: 20})


Inputs: {'TestProducer8': 20}
This: 1,  is not divisible by divisor the 3
This: 2,  is not divisible by divisor the 3
This: 4,  is not divisible by divisor the 3
This: 5,  is not divisible by divisor the 3
This: 7,  is not divisible by divisor the 3
This: 8,  is not divisible by divisor the 3
This: 10,  is not divisible by divisor the 3
This: 11,  is not divisible by divisor the 3
This: 13,  is not divisible by divisor the 3
This: 14,  is not divisible by divisor the 3
This: 16,  is not divisible by divisor the 3
This: 17,  is not divisible by divisor the 3
This: 19,  is not divisible by divisor the 3
This: 20,  is not divisible by divisor the 3
SimplePE: Processed 1 iteration.
Outputs: {'MyFirstPE9': {'output': [1, 2, 4, 5, 7, 8, 10, 11, 13, 14, 16, 17, 19, 20]}}


### Write a data producer PE

#### Producing the input

Next we will create a ProducerPE that creates the input for our first PE. The test producer that we were using above only produces one number per iteration. In our case we would like to create a PE that produces all the numbers from 2 up to a certain limit.

The implementation looks like this:
i

In [47]:
from dispel4py.base import ProducerPE

class NumberProducer(ProducerPE):
    def __init__(self, start, limit):
        ProducerPE.__init__(self)
        self.start = start
        self.limit = limit
    def _process(self, inputs):
        for i in range(self.start, self.limit):
            #print("I have produced the data %s" %i)
            self.write('output', i)
            # OR: self.write("ProducerPE.OUTPUT_NAME", i)

This introduces several new concepts. The ProducerPE is a base class which has no inputs and one output ProducerPE.OUTPUT_NAME or "output". We initialise an instance of the NumberProducer PE with the lower and upper bounds for the integers that we want to produce.

In the implementation of the _process() method we iterate over the range of numbers from the lower bound up to (and excluding) the upper bound. Since the processing method generates more than one data item we have to write them one at a time to the output data stream using the write() method.

### Using the producer in the workflow

Now we hook our own producer into the workflow, replacing the TestProducer from the dispel4py library:

In [48]:
from dispel4py.workflow_graph import WorkflowGraph

producer = NumberProducer(2, 100)
divide = MyFirstPE(3)
graph = WorkflowGraph()
graph.connect(producer, 'output', divide, 'input')



Everything else stays the same. We create an instance of the NumberProducer that outputs the range of numbers from 2 to 99 (excluding the upper bound of 100).

Now execute the new workflow using the simple mapping:


In [49]:
simple_process(graph, {producer: 1})


Inputs: {'NumberProducer10': 1}
This: 2,  is not divisible by divisor the 3
This: 4,  is not divisible by divisor the 3
This: 5,  is not divisible by divisor the 3
This: 7,  is not divisible by divisor the 3
This: 8,  is not divisible by divisor the 3
This: 10,  is not divisible by divisor the 3
This: 11,  is not divisible by divisor the 3
This: 13,  is not divisible by divisor the 3
This: 14,  is not divisible by divisor the 3
This: 16,  is not divisible by divisor the 3
This: 17,  is not divisible by divisor the 3
This: 19,  is not divisible by divisor the 3
This: 20,  is not divisible by divisor the 3
This: 22,  is not divisible by divisor the 3
This: 23,  is not divisible by divisor the 3
This: 25,  is not divisible by divisor the 3
This: 26,  is not divisible by divisor the 3
This: 28,  is not divisible by divisor the 3
This: 29,  is not divisible by divisor the 3
This: 31,  is not divisible by divisor the 3
This: 32,  is not divisible by divisor the 3
This: 34,  is not divisible 

### Using the comand line

You should save the PEs (NumberProducer, MyFristPE) and the graph as myfirstgraph.py file. Once saved, you could run it using the sequential simple processor.

For this we are going to use the `write_cell_above_to file` function.

Remember, that each time we use this function is going to copy the cell above into the desired file (e.g. `myfirstgraph.py`), and it needs a unique identifier each time (e.g. `id:example1a`).

**Note**: You can also upload the `myfirstgraph.py` workflow, downloading first to your local machine from [here](https://github.com/rosafilgueira/Workflows_Seminar/blob/main/d4py_tutorial_workflows/myfirstgraph.py) and upload it to this Notebook later.


```
from dispel4py.base import ProducerPE
from dispel4py.base import IterativePE
from dispel4py.workflow_graph import WorkflowGraph


class NumberProducer(ProducerPE):
    def __init__(self, start, limit):
        ProducerPE.__init__(self)
        self.start = start
        self.limit = limit
    def _process(self, inputs):
        for i in range(self.start, self.limit):
            self.write('output', i)

class MyFirstPE(IterativePE):

    def __init__(self, divisor):
        IterativePE.__init__(self)
        self.divisor = divisor

    def _process(self, data):
        if not data % self.divisor == 0:
            return data

producer = NumberProducer(2, 100)
divide = MyFirstPE(3)
graph = WorkflowGraph()
graph.connect(producer, 'output', divide, 'input')
```

In [106]:
#@markdown <font size='4'>Run the cell above</font>
search_term = 'id:example1a'
write_cell_above_to_file(search_term, 'myfirstgraph.py')

In [109]:
!dispel4py simple myfirstgraph.py

Processing 1 iteration.
RUN ARGS: 
Namespace(target='simple', module='myfirstgraph.py', attr=None, file=None, data=None, iter=1, provenance=None)
Inputs: {'NumberProducer0': 1}
SimplePE: Processed 1 iteration.
Outputs: {'MyFirstPE1': {'output': [2, 4, 5, 7, 8, 10, 11, 13, 14, 16, 17, 19, 20, 22, 23, 25, 26, 28, 29, 31, 32, 34, 35, 37, 38, 40, 41, 43, 44, 46, 47, 49, 50, 52, 53, 55, 56, 58, 59, 61, 62, 64, 65, 67, 68, 70, 71, 73, 74, 76, 77, 79, 80, 82, 83, 85, 86, 88, 89, 91, 92, 94, 95, 97, 98]}}
ELAPSED TIME: 0.0011785030364990234


### Parallel processing

For this very simple case we can easily parallelise the execution of the workflow. To do this we use the dispel4py multi mapping that executes a workflow in multiple processes using the Python multiprocessing [1](https://docs.python.org/2/library/multiprocessing.html) library. Lets run it with 4 processes

In [120]:
!dispel4py multi myfirstgraph.py -n 4

Processing 1 iteration.
RUN ARGS: 
Namespace(target='multi', module='myfirstgraph.py', attr=None, file=None, data=None, iter=1, provenance=None, simple=False, num=4)
Processes: {'NumberProducer0': range(0, 1), 'MyFirstPE1': range(1, 4)}
NumberProducer0 (rank 0): Processed 1 iteration.
MyFirstPE1 (rank 2): Processed 33 iterations.
MyFirstPE1 (rank 3): Processed 32 iterations.
MyFirstPE1 (rank 1): Processed 33 iterations.
ELAPSED TIME: 0.02984619140625


In this case, MyFirstPE is assigned to processes 1, 2 and 3, so there three parallel instances. These instances each process about a third of the data, as you can see from the output of the instances when processing is complete:

```
MyFirstPE3 (rank 1): Processed 33 iterations.
MyFirstPE3 (rank 2): Processed 33 iterations.
MyFirstPE3 (rank 3): Processed 32 iterations.
````

Note that when executing in a parallel environment the output from each PE is not collected as in the simple mapping. You are responsible for collecting this output and printing or storing it.

## Even humans are odd!

In this exercise we are going to create a dispel4py workflow that produces random numbers and it pairs them by ("one odd","one even") pattern. As we introduced before, we have different types of PEs: Generic, Iterative, Producer, Consumerk, SimpleFunction, ... In this exercise we are going to get familiar with the following ones: GenericPE, IterativePE and ProducerPE.

The first step is to create a PE class that produces a random integer number at the time in a range 1 to 1000, as we did in the "prime" workflow.

Because this PE is our first one in this workflow and it has not any input streams, the most sensible choice is to use a ProducerPE type. However, we could also use a GenericPE type as well. Feel free to modify this ipython notebook to change it as you like.

One quick comment about how to write data to the output streams. There are two options:

* return: it only provides one value. Then the process method is finished.
* self.write: it can produce one or more value(s) during processing. Then it can continue to process (e.g. providing one/several value(s) in a loop).

For this PE we could use both formats, as you can see in the following code. You could comment the one that you like less.


In [78]:
from dispel4py.base import ProducerPE
import random

class NumberProducer(ProducerPE):
    def __init__(self):
        ProducerPE.__init__(self)

    def _process(self , inputs):
        result= random.randint(1, 1000)
        return result
        #OR: self.write('output', result)

After building the "NumberProducer" PE class, its output stream will be sent to another PE class (Divideby2) to determine if the number that has just been produced is even or odd. One way to perform this task is by dividing the the number by 2 and checking the reminder. If the reminder is equal 0, the number is even. Otherwise the number is odd. We are going to use a parameter (called "compare") for comparing the reminder with 0 and 1, and therefore reuse the same PE class for getting the answer (odd or even).

Because this PE class needs only 1 input and produces 0 or 1 output, we are going to create it by using a IterativePE type.

In [79]:
from dispel4py.base import IterativePE

class Divideby2(IterativePE):

    def __init__(self, compare):
        IterativePE.__init__(self)
        self.compare = compare

    def _process(self, data):
        if data % 2 == self.compare:
            return data


Finally, the last PE in this workflow is going to receive two inputs streams. This PE will require two lists for grouping even and odd numbers. Therefore, GenericPE type is going to be the choice for creating this PE class. This type of PE requires to add the input ("odd" and "even") and output ("output") streams in the __init__ method. Because we need to store the data between different iterations, we create member variables in the __init__ method.

During the _process method of this PE, the numbers received through its inputs will be appended to one list or another.

As you can imagine, those lists can be imbalanced and one could have more elements than the other (because the producer PE has randomly generated more odd numbers than even, or the other way around). Therefore, in order to check if there are the numbers that have not been paired up (or "left over"), we can use the _postprocess method for printing out which data has not be paired before. The _postprocess method is launched only once per PE after all processing has completed.


In [80]:
from dispel4py.core import GenericPE

class PairProducer(GenericPE):

    def __init__(self):
        GenericPE.__init__(self)
        self._add_input("odd")
        self._add_input("even")
        self._add_output("output")
        self.list_odd=[]
        self.list_even=[]

    def _process(self, inputs):
        if "odd" in inputs:
            self.list_odd.append(inputs["odd"])
        if "even" in inputs:
            self.list_even.append(inputs["even"])

        while self.list_odd and self.list_even:
            self.write("output", (self.list_odd.pop(0), self.list_even.pop(0)))

    def _postprocess(self):
        self.log('We are left behind: odd: %s, even: %s' % (self.list_odd, self.list_even))
        self.list_odd = []
        self.list_even = []

Now we only have to create the graph and connect the different PEs. Note that we create two PEs (filter_even and filter_odd) of the same type (Divideby2) to decide whether a number is odd or even. The output stream from the producer is connected to both filter PEs meaning that they both receive a copy of the same stream.

In [81]:
from dispel4py.workflow_graph import WorkflowGraph

producer = NumberProducer()
filter_even = Divideby2(0)
filter_odd = Divideby2(1)
pair = PairProducer()

graph = WorkflowGraph()
graph.connect(producer, 'output', filter_even, 'input')
graph.connect(producer, 'output', filter_odd, 'input')
graph.connect(filter_even, 'output', pair, 'even')
graph.connect(filter_odd, 'output', pair, 'odd')


Finally, let's run this workflow with the sequential simple processor as we did before.

In [82]:
from dispel4py.new.simple_process import process as simple_process
simple_process(graph, {producer: 20})

Inputs: {'NumberProducer19': 20}
PairProducer22: We are left behind: odd: [], even: [144, 314, 510, 550, 412, 978, 742, 368]
SimplePE: Processed 1 iteration.
Outputs: {'PairProducer22': {'output': [(905, 422), (33, 26), (467, 852), (835, 62), (543, 754), (87, 714)]}}


### Using Command line

Once again we ned to copy the workflow and the PEs into a file. For doing this are going to use the `write_cell_above_to file` function.

Remember, that each time we use this function is going to copy the cell above into the desired file (e.g. `evenodd.py`), and it needs a unique identifier each time (e.g. `id:example1b`).

**Note**: You can also upload the `evenodd.py` workflow, downloading first to your local machine from [here](https://github.com/rosafilgueira/Workflows_Seminar/blob/main/d4py_tutorial_workflows/evenodd.py) and upload it to this Notebook later.


```
from dispel4py.base import ProducerPE, IterativePE
from dispel4py.core import GenericPE
from dispel4py.workflow_graph import WorkflowGraph
from dispel4py.core import GenericPE
import random

class NumberProducer(ProducerPE):
    def __init__(self):
        ProducerPE.__init__(self)
        
    def _process(self, inputs):
        result= random.randint(1, 1000)
        return result
        #OR: self.write('output', result)

class Divideby2(IterativePE):
    def __init__(self, compare):
        IterativePE.__init__(self)
        self.compare = compare
    def _process(self, data):
        if data % 2 == self.compare:
            return data
          
class PairProducer(GenericPE):
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input("odd")
        self._add_input("even")
        self._add_output("output")
        self.list_odd=[]
        self.list_even=[]
      
    def _process(self, inputs):
        if "odd" in inputs:
            self.list_odd.append(inputs["odd"])
        if "even" in inputs:
            self.list_even.append(inputs["even"])
       
        while self.list_odd and self.list_even:
            self.write("output", (self.list_odd.pop(0), self.list_even.pop(0)))
    
    def _postprocess(self):
        self.log('We are left behind: odd: %s, even: %s' % (self.list_odd, self.list_even))
        self.list_odd = []
        self.list_even = []

producer = NumberProducer()
filter_even = Divideby2(0)
filter_odd = Divideby2(1)
pair = PairProducer()

graph = WorkflowGraph()
graph.connect(producer, 'output', filter_even, 'input')
graph.connect(producer, 'output', filter_odd, 'input')
graph.connect(filter_even, 'output', pair, 'even')
graph.connect(filter_odd, 'output', pair, 'odd')
```

In [110]:
#@markdown <font size='4'>Run the cell above</font>
search_term = 'id:example1b'
write_cell_above_to_file(search_term, 'evenodd.py')

In [111]:
!dispel4py simple evenodd.py -i 20

Processing 20 iterations.
RUN ARGS: 
Namespace(target='simple', module='evenodd.py', attr=None, file=None, data=None, iter=20, provenance=None)
Inputs: {'NumberProducer0': 20}
PairProducer3: We are left behind: odd: [], even: [62, 302]
SimplePE: Processed 1 iteration.
Outputs: {'PairProducer3': {'output': [(297, 148), (391, 14), (339, 800), (25, 66), (399, 530), (991, 570), (187, 786), (539, 410), (525, 484)]}}
ELAPSED TIME: 0.0007524490356445312


In [112]:
!dispel4py multi evenodd.py -i 100 -n 4

Processing 100 iterations.
RUN ARGS: 
Namespace(target='multi', module='evenodd.py', attr=None, file=None, data=None, iter=100, provenance=None, simple=False, num=4)
Processes: {'NumberProducer0': range(0, 1), 'Divideby21': range(1, 2), 'Divideby22': range(2, 3), 'PairProducer3': range(3, 4)}
NumberProducer0 (rank 0): Processed 100 iterations.
Divideby22 (rank 2): Processed 100 iterations.
Divideby21 (rank 1): Processed 100 iterations.
PairProducer3 (rank 3): We are left behind: odd: [], even: [244, 110, 668, 852, 696, 766]
PairProducer3 (rank 3): Processed 100 iterations.
ELAPSED TIME: 0.024795055389404297


## The data streaming classic: WordCount!!

We can not leave this tutorial without a streaming classic example. WordCount example reads a text and counts how often words occur.


In [113]:
from dispel4py.core import GenericPE

class SplitLines(GenericPE):

    def __init__(self):
        GenericPE.__init__(self)
        self._add_input("input")
        self._add_output("output")

    def _process(self, inputs):
        for line in inputs["input"].splitlines():
            self.write("output", line)

In [114]:
from dispel4py.base import IterativePE

class SplitWords(IterativePE):

    def __init__(self):
        IterativePE.__init__(self)

    def _process(self, data):
        for word in data.split(" "):
            self.write("output", (word,1))

In [115]:
from collections import defaultdict

class CountWords(GenericPE):
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input("input", grouping=[0])
        self._add_output("output")
        self.count=defaultdict(int)

    def _process(self, inputs):
        word, count = inputs['input']
        self.count[word] += count

    def _postprocess(self):
        self.write('output', self.count)

In [116]:
from dispel4py.workflow_graph import WorkflowGraph

split = SplitLines()
words = SplitWords()
count = CountWords()

graph = WorkflowGraph()
graph.connect(split, 'output', words, 'input')
graph.connect(words, 'output', count, 'input')

In [117]:
from dispel4py.new.simple_process import process as simple_process
simple_process(graph, {split: [ {'input' : "Hello Hello something more World World World"}] })

Inputs: {'SplitLines26': [{'input': 'Hello Hello something more World World World'}]}
SimplePE: Processed 1 iteration.
Outputs: {'CountWords28': {'output': [defaultdict(<class 'int'>, {'Hello': 2, 'something': 1, 'more': 1, 'World': 3})]}}


### Using Command line


Once again we ned to copy the workflow and the PEs into a file. For doing this are going to use the write_cell_above_to file function.

Remember, that each time we use this function is going to copy the cell above into the desired file (e.g. wordcount.py), and it needs a unique identifier each time (e.g. id:example1c).

**Note**: You can also upload the `wordcount.py` workflow, downloading first to your local machine from [here](https://github.com/rosafilgueira/Workflows_Seminar/blob/main/d4py_tutorial_workflows/wordcount.py) and upload it to this Notebook later.

```
from dispel4py.core import GenericPE
from dispel4py.base import IterativePE
from dispel4py.workflow_graph import WorkflowGraph
import os

class SplitLines(GenericPE):
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input("input")
        self._add_output("output")
        
    def _process(self, inputs):
        for line in inputs["input"].splitlines():
            self.write("output", line)

class SplitWords(IterativePE):

    def __init__(self):
        IterativePE.__init__(self)
        
    def _process(self, data):
        for word in data.split(" "):
            self.write("output", (word,1))


class CountWords(GenericPE):
    def __init__(self):
        
        from collections import defaultdict
        GenericPE.__init__(self)
        self._add_input("input", grouping=[0])
        self._add_output("output")
        self.count=defaultdict(int)
        
    def _process(self, inputs):
        word, count = inputs['input']
        self.count[word] += count
    
    def _postprocess(self):
        self.write('output', self.count)

split = SplitLines()
split.name ='split'
words = SplitWords()
count = CountWords()


graph = WorkflowGraph()
graph.connect(split, 'output', words, 'input')
graph.connect(words, 'output', count, 'input')
```

In [118]:
#@markdown <font size='4'>Run the cell above</font>
search_term = 'id:example1c'
write_cell_above_to_file(search_term, 'wordcount.py')

Atention how we give the iput data to the workflow.

In this case is **very important** that we give a name to the first PE:

``` split.name= 'split'```

Because we are going to use it later to specify the PE (`split`) for which the input dictionary is for :
```

-d '{"split" : [{"input" : "Hello Hello something more World World World"}]}'

```

* -d: This flag stands for 'data', and it allows you to input JSON-formatted data directly into the workflow from the command line.

* '{"split" : [{"input" : "Hello Hello something more World World World"}]}': This is the JSON data being passed to the workflow. The JSON object contains one key, split, which is likely the name of a processing element or input stream in the workflow. The value is a list of dictionaries, each containing an input key with a string of text as its value.

In [119]:
!dispel4py simple wordcount.py -d '{"split" : [{"input" : "Hello Hello something more World World World"}]}'

RUN ARGS: 
Namespace(target='simple', module='wordcount.py', attr=None, file=None, data='{"split" : [{"input" : "Hello Hello something more World World World"}]}', iter=1, provenance=None)
Inputs: {'split0': [{'input': 'Hello Hello something more World World World'}]}
SimplePE: Processed 1 iteration.
Outputs: {'CountWords2': {'output': [defaultdict(<class 'int'>, {'Hello': 2, 'something': 1, 'more': 1, 'World': 3})]}}
ELAPSED TIME: 0.0005199909210205078


## Useful dispel4py Flags

-h, --help: Show the help message and exit.

-l, --loglevel: Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

-v, --verbose: Enable verbose logging.

-n, --num: number of processes or threads)

-a ATTR, --attr ATTR: Set an attribute for the graph.

-d DATA, --data DATA: Input data as a JSON string. This is how you provide the initial data for the workflow.

-f FILE, --file FILE: Input data as a JSON file. This is an alternative to -d where you can specify a file containing the JSON data.

-i ITERATIONS, --iterations ITERATIONS: The number of iterations to run.

-t, --trace: Trace the execution of the workflow.

-s, --stream: Stream data between iterations instead of between PEs.



## Summary of dispel4py Mappings


### Sequential

* simple: Executes dataflow graphs sequentially within a single process. This is suitable for smaller, less complex data processing tasks.

### Parallel

Fixed workload distribution (supports both stateful and stateless PEs)

* mpi: Utilizes the Message Passing Interface (MPI) to distribute computations across multiple nodes, allowing for parallel processing in a distributed memory environment.
* multi: Runs multiple instances of a dataflow graph in parallel on a single machine using the multiprocessing library in Python.
* zmq_multi: Similar to multi, but uses the ZeroMQ library for managing parallel execution within a single machine.

* redis: Executes multiple instances of a dataflow graph in parallel using Redis as a messaging system.


### Dynamic workload distribution (supports only stateless PEs)

* dyn_multi: Runs multiple instances with dynamic workload assignment (without autoscaling) using Python’s multiprocessing library.
* dyn_auto_multi: Similar to dyn_multi, but includes autoscaling capabilities and can adjust the number of threads dynamically.
* dyn_redis: Runs multiple instances with dynamic workload assignment (without autoscaling) using Redis.
* dyn_auto_redis: Similar to dyn_redis, but allows for autoscaling and dynamic thread adjustments.

### Hybrid workload distribution (supports both stateful and stateless PEs)

* hybrid_redis: A hybrid approach that runs multiple instances of a dataflow graph using Redis. Stateless PEs have dynamically assigned workloads, while Stateful PEs have fixed assignments.