# DAG Example: Map-Reduce word count

This example DAG consists of counting the words of some books available for free at the [Gutenberg Project website](https://www.gutenberg.org). To do so, a Map task counts the words in parallel by splitting the texts in chunks. Then, for every book all the words counted are merged and reduced in a single function.

#### Prerequisites to run this example:

In order to run this example, you will need:

- An IBM Cloud Functions account. You can register for free and access the IBM Cloud Functions free tier [here](https://www.ibm.com/cloud/free).

- The [IBM Cloud CLI](https://www.ibm.com/cloud/cli) with the IBM Cloud Functions plugin installed and set up. Make sure you are logged in and the correct region and IBM Cloud Functions namespace is selected.

- A [DockerHub](https://hub.docker.com/) account and [Docker CE](https://docs.docker.com/get-docker/) installed in your system.

- A Triggerflow installation up and running. Please refer to the installation guide to properly install a Triggerflow instance.

#### Setup

In this section we will build the Docker runtime image and create the IBM Cloud functions:

1. Build the Triggerflow runtime for IBM Cloud Functions: Go to [IBM Cloud Functions runtime directory](../../runtime/ibm_cloud_functions) and build the image depending on you Python version. For example, for Python 3.8:

```bash
$ docker build . -f Dockerfile_v38 -t ${YOUR_DOCKER_USERNAME}/triggerflow_runtime:v3.8-0
```

```bash
$ docker push ${YOUR_DOCKER_USERNAME}/triggerflow_runtime:v3.8-0
```

Replace `${YOUR_DOCKER_USERNAME}` with your DockerHub user name.

2. Create the IBM Cloud Function actions used in this workflow:

```bash
$ ibmcloud cloud-functions package create triggerflow-count-words
```

```bash
$ ibmcloud cloud-functions action create triggerflow-count-words/count-words --docker ${YOUR_DOCKER_USERNAME}/triggerflow_runtime:v3.8-0 count_words.py
```

```bash
$ ibmcloud cloud-functions action create triggerflow-count-words/merge-dicts --docker ${YOUR_DOCKER_USERNAME}/triggerflow_runtime:v3.8-0 merge_dicts.py
```

## DAG Definition

The cell below descirbes the DAG tasks and dependencies. Similarly to Airflow's DAG definition, a Triggerflow DAG is composed of multiple operators that describe how or where a task is executed. In this case, we are executing serverless functions on IBM Cloud. The bitwise shift operator is used to set the task order of execution. For example, here the `count_*` tasks will be executed before the `merge_*` tasks. The `merge_*` are getting a JSONPath as value for the parameter `dicts`. This JSONPath is used to pass the data from a task to a following one. The JSONPath is parsed and evaluated in runtime when the task is being prepared to be run. In this case, with `$.[*]`, we are getting all the elements from the array of results that the `count_*` tasks generate.

In [None]:
from triggerflow.dags import DAG
from triggerflow.dags.operators import IBMCloudFunctionsCallAsyncOperator, IBMCloudFunctionsMapOperator

urls = ['https://www.gutenberg.org/files/1342/1342-0.txt',
        'https://www.gutenberg.org/files/11/11-0.txt',
        'https://www.gutenberg.org/files/1661/1661-0.txt']

# urls = ['http://www.gutenberg.org/files/2701/2701-0.txt']

dag = DAG('count-words')
parallel = 8

for i, url in enumerate(urls):
    count = IBMCloudFunctionsMapOperator(
        task_id='count_{}'.format(i),
        function_name='count-words',
        function_package='triggerflow-count-words',
        invoke_kwargs={'url': url,
                       'parallel': parallel},
        iter_data=('func_id', [par for par in range(parallel)]),
        dag=dag)
    
    merge = IBMCloudFunctionsCallAsyncOperator(
        task_id='merge_{}'.format(i),
        function_name='merge-dicts',
        function_package='triggerflow-count-words',
        invoke_kwargs={'dicts': '$.[*]'.format(i)},
        dag=dag)
    
    count >> merge

We can visualize the DAG tasks and order of execution using `dag.show()`, which uses Grpahviz as backend to plot the graph image for now.

In [None]:
dag.show()

Triggerlfow allows to save DAGs in a local folder of this system. By doing so, we could trigger a DAG execution without having to redeclare the whole DAG object as before. Triggerflow stores its DAGs in JSON format.

In [None]:
dag.save()

some_new_dag_object = DAG('count-words')
some_new_dag_object.load()
some_new_dag_object.tasks

## DAG execution

We can trigger a DAG exection using the `dag.run()` method. This will create a DAGRun object, with a unique ID of execution. The DAGRun object, when it is instantiated, it registers all the triggers neccessary to orchestarte the DAG, and publishes the initial CloudEvent to the event source, so that the DAG starts executing right away.

In [None]:
dagrun = dag.run()

The `dagrun.result()` gets the result of the final task. The result are the `data` values from the events that activated the final join trigger. A task_id can be passed as parameter to retrieve the results from any other task in the DAG. However, this method call does not block until the DAG execution is completed, so it might raise an exception if the results aren't yet available.

In [None]:
result = dagrun.result()
print(result[1])

DAGRuns are also stored in the local system's Triggerflow cache. This is useful for long running DAG executions, because the state of the workflow can be retreived without having to maintain instantiated the DAGRun object. To load a run, the ID is required.

In [None]:
from triggerflow.dags import DAGRun

same_run = DAGRun.load_run('count-words-f123c4dc5bd2')
res = same_run.result()
print(res)
