# Executing Heterogeneous DAG Workflows with Parsl-RP (RPEX)

RPEX integrates the powerful runtime engine and workload manager of RADICAL-Pilot with the flexible and parallel workflow manager of Parsl. RPEX offers the best of both worlds by enabling users to run heterogeneous regular and ``MPI`` workflows, such as executables and Python functions, within the same environments on different HPC platforms. Users can express and manage these workflows via Parsl.

In this tutorial, we will explore creating an ``MPI`` version of the **Monte Carlo workflow** to calculate the value of **PI**, which is similar to the ``non-MPI`` version in Parsl examples [here](https://parsl.readthedocs.io/en/stable/1-parsl-introduction.html#Monte-Carlo-workflow). This will allow us to utilize both MPI (Message Passing Interface) and regular tasks using the Parsl API. The execution of the workflow's tasks will be managed by the RPEX executor.

In conclusion, this tutorial will demonstrate how to utilize Parsl's data flow manager and RADICAL Pilot's workload manager to achieve concurrent task execution within the **Monte Carlo workflow**.




## Overview

The tutorial will cover the following key steps:

1. **Configuring the RPEX Executor**:
    - Setting up the RPEX executor and binding it to the DAG for task execution.


2. **Constructing a Heterogeneous Monte Carlo workflow**:
    - Parsl API and data managementorators to define a heterogeneous workflow with both MPI and non-MPI tasks
      of Python functions and executables.


3. **Executing the DAG**:
    - Running the DAG utilizing RPEX local host.

#### The serial version of Parsl Pi example that uses the HighThroughPut executor is as follows:

- Each `pi` function takes 10 ** 3 points and run in parallel so the total number of processed points are 3,000,000 points.

```
App Calls   pi(1M)     pi(1M)     pi(1M)
               \         |         /
Futures         a        b        c
                 \       |       /
App Call            avg_points()   
                         |
Future               avg_mpi_pi
```

#### The MPI version of Pi workflow example that we will build and execute in this tutorial is as follows:

- Each `mpi_pi` rank takes 10 ** 3 points and run in parallel so the total number of processed points are 3,000,000 points.

```
App Calls            mpi_pi(3M)
                         |        
Futures                  a
                       / | \
App Call            avg_points()   
                         |
Future               avg_mpi_pi
```

First and as a best practice, let's ensure RADICAL-Pilot and Parsl exist in the notebook environment.

In [None]:
!pip show parsl && echo "==============" && ! radical-stack

let's import Parsl and RP Python modules in our application, alongside the RadicalPilotExecutor (RPEX) from Parsl

In [None]:
import parsl
import radical.pilot as rp

from parsl.config import Config
from parsl.app.app import python_app, bash_app
from parsl.executors.radical import ResourceConfig
from parsl.executors.radical import RadicalPilotExecutor

RP has a set of environment variables to control the log level, turn the report on/off, and the animation as follows:

In [None]:
%env RADICAL_LOG_LVL=OFF
%env RADICAL_REPORT=TRUE
%env RADICAL_REPORT_ANIME=FALSE

## Configuring the RPEX Executor

RPEX uses `ResourceConfig`, which is a data class that gives the flexibility to define advanced execution constraints for the RADICAL-Pilot runtime system, such as the number of workers and number of CPUs or GPUs per worker and more.

For the purpose of this tutorial, we will use `MPI` worker by specifying the `worker_type` parameter for the `ResourceConfig` class instance, which deploys one MPI worker with 4 CPU cores per worker and 0 GPUs.

In [None]:
rpex_cfg = ResourceConfig()
rpex_cfg.worker_type = 'MPI'
rpex_cfg.cores_per_worker = 4

<div class="alert alert-block alert-info">
⚠️ NOTE:
    
The ***cores*** on the executor level represent the entire amount of cores for the executor, including the MPI worker. This approach helps to create a clean separation between the number of cores that are used for the MPI workers, which are responsible for the function execution, and other resources that are used for running executable tasks, for example.
</div>

Once we create the `ResourceConfig`, we will pass it to the RPEX executor initialization. This will tell the executor to deploy 1 MPI worker with 4 cores and the rest of the 8 cores (4 cores) are left for executable tasks execution, if any.

In [None]:
config = Config(executors=[RadicalPilotExecutor(
                           label='rpex-heterogeneous',
                           rpex_cfg=rpex_cfg,
                           resource='local.localhost_test',
                           runtime=30, cores=8)])

radical_executor = config.executors[0]

Now, let's tell Parsl that we want to use the RPEX executor and to do so we invoke the ``load`` function with the designated config of `RadicalPilotExecutor`.

In [None]:
parsl.load(config)

In [None]:
@python_app
def mpi_pi(num_points, comm=None, parsl_resource_specification={'ranks':3}):
    """
    Calculate the PI value of N points in this case (3 Million)
    and distribute them across N ranks (3 ranks) so each rank
    takes a fair amount of work (1 Million).
    """

    import numpy

    rank = comm.Get_rank()
    ranks = comm.Get_size()

    def compute_pi(samples):
        inside = 0
        for x, y in samples:
            if x**2 + y**2 < 1:
                inside += 1

        pi = (inside*4 / samples)

        return pi


    if rank == 0:
        N = num_points // ranks
        samples = numpy.random.random((ranks, N, 2))
    else:
        samples = None

    samples = comm.scatter(samples, root=0)

    # each rank calculates it's own pi and report it back
    mypi = compute_pi(samples) / ranks
    
    return numpy.mean(mypi)

In [None]:
future_means = mpi_pi(3 * 10 ** 6, comm=None)

In [None]:
# Bash App that computes the mean of three PI values
@bash_app
def mean(future_means, stdout='mean_task.stdout',
         parsl_resource_specification={'ranks':1, 'mode': rp.TASK_EXECUTABLE}):
    """
    create a python file with the name `mean.py`
    and execute it as an executable (python3 mean.py)
    """
    import os
    exec_path = os.path.join(os.getcwd(), 'mean.py')
    with open(exec_path, 'w') as f:
        cmd = f'print (sum({future_means}) / len({future_means}))'
        f.write(cmd)

    return f'python3 {exec_path}'

In [None]:
# Compute the mean of the three estimates
future_mean_pi = mean(future_means)
# wait for the mean_pi to finish
future_mean_pi.result()
!echo "Average PI value: " && cat mean_task.stdout

Finally, shutdown the executor, otherwise it will always stays ready to get more tasks

In [None]:
radical_executor.shutdown()