# Submitting Cluster Jobs

## Overview

### Questions

* How do I submit workflows on HPC systems? 
* How can I combine many simulations into a single cluster job?

### Objectives

* Demonstrate the use of **resources** to execute with **MPI domain decomposition**.
* Demonstrate the use of **groups** to execute an action on multiple jobs _in parallel_.

In [1]:
import os

# Progress bars do not format well in notebook output.
os.environ["ROW_NO_PROGRESS"] = "true"
# We do not want to encourage users to use --yes, but the tutorials are not interactive.
os.environ["ROW_YES"] = "true"
# Pretend to be on the greatlakes cluster
os.environ["ROW_CLUSTER"] = "anvil"

## Cluster Jobs

On HPC systems, you submit **cluster jobs** to the queue which then execute on the compute nodes.
**Row** generates **cluster job** submission scripts that execute the **actions** in your **workflow**.
HPC systems provide access to many more CPU cores or GPUs than are available on the typical workstation.
Use them to execute the most time-consuming **actions**.
You can increase the **throughput** (number of simulations per unit time) by executing many simulations *in parallel*, reduce **latency** (time to complete a single simulation) by executing each simulation on more than one MPI rank with **domain decomposition**, or combine the two.

This tutorial will demonstrate how to use **row** to submit HOOMD-blue **actions** in **cluster jobs**.
See [Parallel Simulations with MPI](../03-Parallel-Simulations-With-MPI/00-index.ipynb) to learn more about MPI **domain decomposition**. See the [row user documentation](https://row.readthedocs.io) and your HPC center's documentation for an introduction to HPC job queues.

## Domain decomposition

The **resources** table `workflow.toml` instructs **row** to request the chosen resources in the **cluster job**. 
The values used in this tutorial are for example purposes only.
You should choose a number of MPI ranks, equilibration steps, and cluster job walltime appropriate for your project.

HOOMD-blue uses MPI for interprocess communication.
In this case, the 2 *processes per directory* equate to 2 **MPI ranks** per directory using domain decomposition:

```toml
[action.resources]
processes.per_directory = 2
```

You also need to select the ``mpi`` launcher:

```toml
[[action]]
...
launchers = ["mpi"]
```

## Executing many simulations in parallel

Some HPC systems schedule jobs *only by full node* or limit the *number of cluster jobs* you can queue at one time.
You may also have many thousands of **directories** and want to reduce the time spent waiting in queue.
In both of these cases, you can use MPI to **group** the execution of an **action** on many **directories** into one **cluster job** (see [Parallel Simulations with MPI](../03-Parallel-Simulations-With-MPI/00-index.ipynb) for an introduction to MPI **partitions**).
You can use MPI partitions alone (one rank per **directory**) or in combination with MPI **domain decomposition** (more than one rank per **directory**).

**Row** *groups* eligible **directories** together every time it **submits** an **action** for execution.
It **groups** *ALL* eligible directories *by default*.
The *randomize* and *compress* **actions** in the previous section take `*jobs` as an argument and loop over the directories *in serial*.
When executing in serial like this, the **wall time** scales with the number of directories.
Those actions correspondingly set `resources.walltime.per_directory` and **row** computes the total **wall time** needed for the **cluster job** based on the **group's** size.
*Randomize* and *compress* execute very quickly, so the total wall times to process the directories *in serial* are manageable.

The *equilibrate* **action** takes much longer, so it is not feasible to execute in serial.
To execute *equilibrate* in parallel, you need to:
1) Launch up to `maximum_size` directories per **cluster job**.
   Set `walltime.per_submission` so that the total **wall time** does not scale with the number of directories.
   ```toml
   [action.group]
   maximum_size = 64

   [action.resources]
   walltime.per_submission = "12:00:00"
   ```

3) Choose the job based on the partition in `equilibrate.py` (shown in the previous section):
   ```python
    communicator = hoomd.communicator.Communicator(
        ranks_per_partition=RANKS_PER_PARTITION
    )
    job = jobs[communicator.partition]
    ```

## Submitting cluster jobs

The example is small for demonstration purposes.
It uses 2 MPI ranks per **directory** and executes all three **directories** in one **cluster job** (in parallel).
In production work you should choose the number of ranks per **directory** (`processes.per_directory`) and the number of **directories** per **cluster job** (`maximum_size`) to utilize an integer number of whole nodes in each **cluster job** leaving no empty cores or GPUs.
For example use 16 ranks per **directory** and 32 **directories** per **cluster job** to use 4 whole 128-core nodes per **cluster job**.
When the total number of **directories** is not an integer multiple of `maximum_size`, **row** will form one smaller *tail* **group** to cover the remainder.
If it doesn't fill a whole node, **Row** will submit the *tail* **cluster job** to a shared queue (if available on the HPC system).

Here is the full `workflow.toml` that would allow **cluster jobs** to use up to one full 128-core node:

In [2]:
with open("workflow.toml", "w") as workflow:
    workflow.write("""
[default.action]
command = "python project.py --action $ACTION_NAME {directories}"
submit_options.anvil.account = "my_account"

[[action]]
name = "randomize"
products = ["random.gsd"]
resources.walltime.per_directory = "00:05:00"

[[action]]
name = "compress"
previous_actions = ["randomize"]
products = ["compressed.gsd"]
resources.walltime.per_directory = "00:10:00"

[[action]]
name = "equilibrate"
previous_actions = ["compress"]
products = ["trajectory.gsd"]
launchers = ["mpi"]

[action.group]
maximum_size = 64

[action.resources]
processes.per_directory = 2
walltime.per_submission = "12:00:00"
""")

This tutorial assumes that you are using the [Purdue Anvil](https://www.rcac.purdue.edu/knowledge/anvil) cluster.
It sets a placeholder cluster account in the default action `submit_options.anvil.account`.
Adjust the account name and/or the cluster name as appropriate for your HPC system.

In [3]:
! row show status

[4mAction     [0m [4mCompleted[0m [4mSubmitted[0m [4mEligible[0m [4mWaiting[0m [4mRemaining cost[0m
[1mrandomize  [0m [32m[1m        3[0m [33m[1m        0[0m [34m       0[0m [36m[2m      0[0m
[1mcompress   [0m [32m[1m        3[0m [33m[1m        0[0m [34m       0[0m [36m[2m      0[0m
[1mequilibrate[0m [32m[1m        0[0m [33m[1m        0[0m [34m       3[0m [36m[2m      0[0m [2m[3m  72 CPU-hours[0m


The *equilibrate* step is ready to execute.
**Row** automates the **cluster job** submission process with `row submit`.
Use the `--dry-run` flag first to ensure that the generated **cluster jobs** are correct (`--dry-run` displays the generated submission script(s) and skips the submission step):

In [4]:
! row submit --dry-run

#!/bin/bash
#SBATCH --job-name=equilibrate-59363805e6f46a715bc154b38dffc4e4+2
#SBATCH --output=equilibrate-%j.out
#SBATCH --partition=shared
#SBATCH --ntasks=6
#SBATCH --mem-per-cpu=1800M
#SBATCH --time=720
#SBATCH --account=my_account

directories=(
59363805e6f46a715bc154b38dffc4e4
972b10bd6b308f65f0bc3a06db58cf9d
c1a59a95a0e8b4526b28cf12aa0a689e
)

export ACTION_WORKSPACE_PATH=workspace
export ACTION_CLUSTER=anvil
export ACTION_NAME=equilibrate
export ACTION_PROCESSES=6
export ACTION_WALLTIME_IN_MINUTES=720
export ACTION_PROCESSES_PER_DIRECTORY=2

trap 'printf %s\\n "${directories[@]}" | /Users/joaander/.cargo/bin/row scan --no-progress -a equilibrate - || exit 3' EXIT
srun --mpi=pmi2 --ntasks=6 python project.py --action $ACTION_NAME "${directories[@]}" || { >&2 echo "[row] Error executing command."; exit 1; }


In this configuration, **row** submits one **cluster job** that launches `project.py` with 6 ranks.

The partitioned communicator
```python
hoomd.communicator.Communicator(ranks_per_partition=2)
```
will form 3 partitions, each using 2 MPI ranks to **domain decompose** the simulation.
The partition index selects which directory each partition will execute:
```python
job = jobs[communicator.partition]
```

If there were more than 64 **directories** in this workspace, **row** would generate more than one **cluster job** due to the `maximum_size = 64`.

When you are sure that the resources are configured correctly, you can execute `row submit` on a cluster to submit the job to the queue.
**Row** will track the **cluster jobs** and prevent you from submitting an action on a directory that is still in queue or running.

## Summary

In this section of the tutorial, you configured the *eqilibrate* action to execute on many directories in parallel and used **row** to generate a **cluster job** for submission.

This is the end of the tutorial on organizing and executing simulations.

This tutorial only teaches the basics of **row**.
Read the [row documentation](https://row.readthedocs.io/) to learn more.