# Resource Placement with Dragon and RHAPSODY

This tutorial teaches you how to control **where** your tasks run — which node, which CPUs, which GPUs — using Dragon's [Policy](https://dragonhpc.github.io/dragon/doc/_build/html/ref/dragon.infrastructure.policy.Policy.html#dragon.infrastructure.policy.Policy) system through RHAPSODY's `ComputeTask` API.

You will learn:

1. How to run a basic task with no placement control
2. How to use `ProcessTemplate` for single-process tasks
3. How to use `process_templates` for parallel multi-process jobs
4. How to pin tasks to specific CPUs with `cpu_affinity`
5. How to pin tasks to specific GPUs with `gpu_affinity`
6. How to combine CPU and GPU affinity for full resource control

### Prerequisites

- RHAPSODY installed (`pip install rhapsody-py[dragon]`)
- Dragon runtime available
- Launch this notebook with: `dragon jupyter lab`

### Key Concept: Dragon Policy

A [`Policy`](https://dragonhpc.github.io/dragon/doc/_build/html/ref/dragon.infrastructure.policy.Policy.html#dragon.infrastructure.policy.Policy) tells Dragon **where** and **how** to place a process:

| Parameter | Type | Description |
|-----------|------|-------------|
| `placement` | `Policy.Placement` | Which node to target (`DEFAULT`, `HOST_NAME`, `HOST_ID`, `ANYWHERE`) |
| `host_name` | `str` | Specific hostname (used with `Placement.HOST_NAME`) |
| `host_id` | `int` | Specific host ID (used with `Placement.HOST_ID`) |
| `cpu_affinity` | `list[int]` | List of CPU core IDs to bind the process to |
| `gpu_affinity` | `list[int]` | List of GPU device IDs to bind the process to |
| `distribution` | `Policy.Distribution` | How to distribute across nodes (`ROUNDROBIN`, `BLOCK`) |

You pass a `Policy` into a `ProcessTemplate`, and then pass the template into your `ComputeTask` via `task_backend_specific_kwargs`.

In [None]:
import asyncio
import multiprocessing as mp

from dragon.infrastructure.policy import Policy
from dragon.native.machine import Node, System

from rhapsody.api import ComputeTask, Session
from rhapsody.backends import DragonExecutionBackendV3

---

## 1. Baseline: A Task with No Placement Control

When you submit a task without any `Policy`, Dragon places it **anywhere** in the allocation using its default scheduling. This is the simplest case — you get no control over which node, CPU, or GPU runs your task.

**Important:** There are two execution modes for function tasks:

| Mode | Trigger | `return_value` | `stdout` |
|------|---------|----------------|----------|
| **Function Native** | No `process_template` | Python return value | `None` |
| **Function Process** | With `process_template` | Process exit code (int) | Captured from `print()` |

When using `process_template` for placement control, your function runs inside a Dragon process. To get data back, **print your results** and read them from `task.stdout`.

In [None]:
import json


def report_placement():
    """Report where this task is running (prints JSON for process mode)."""
    import os
    import socket
    result = {
        "hostname": socket.gethostname(),
        "pid": os.getpid(),
        "cuda_visible": os.environ.get("CUDA_VISIBLE_DEVICES", "not set"),
    }
    print(json.dumps(result))
    return result


async def baseline_example():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    # No backend-specific kwargs → Function Native mode
    # return_value contains the Python dict directly
    task = ComputeTask(function=report_placement)

    async with session:
        await session.submit_tasks([task])
        await asyncio.gather(task)

        print(f"Task {task.uid} (native mode):")
        print(f"  return_value type: {type(task.return_value).__name__}")
        print(f"  result: {task.return_value}")

await baseline_example()

The task ran successfully, but you had no say in where it landed. Let's change that.

---

## 2. Using `process_template` for Single-Process Tasks

To control placement for a single-process task, pass a `process_template` dict inside `task_backend_specific_kwargs`. This dict is forwarded directly to Dragon's [`ProcessTemplate`](https://dragonhpc.github.io/dragon/doc/_build/html/ref/dragon.native.process.ProcessTemplate.html) constructor.

The most important key is `policy` — a `Policy` object that specifies the placement.

### How it connects:

```
ComputeTask(
    function=my_func,
    task_backend_specific_kwargs={
        "process_template": {         # → becomes ProcessTemplate(**this_dict)
            "policy": Policy(...)      # → placement rules
        }
    }
)
```

RHAPSODY's `DragonExecutionBackendV3.build_task()` merges your template config with the task's `args`/`kwargs` and creates the `ProcessTemplate` for you.

In [None]:
async def single_process_placement():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    # Discover the first node in the allocation
    sys = System()
    first_node = Node(list(sys.nodes)[0])
    hostname = first_node.hostname
    print(f"Targeting node: {hostname}")

    # Create a policy that pins the task to that specific node
    policy = Policy(
        placement=Policy.Placement.HOST_NAME,
        host_name=hostname,
    )

    # Pass the policy via process_template
    # Note: with process_template, return_value is the exit code (int)
    # The function's print() output is captured in task.stdout
    task = ComputeTask(
        function=report_placement,
        task_backend_specific_kwargs={
            "process_template": {"policy": policy}
        },
    )

    async with session:
        await session.submit_tasks([task])
        await asyncio.gather(task)

        # Parse the JSON from stdout (not return_value!)
        result = json.loads(task.stdout.strip())
        # Note: Dragon may report hostname differently than the FQDN
        # (e.g., "localhost" vs "node01.cluster.local")
        print(f"Task ran on: {result['hostname']} (target was: {hostname})")

await single_process_placement()

---

## 3. Using `process_templates` for Parallel Jobs

For multi-process parallel jobs, use `process_templates` (plural). This is a list of `(nranks, template_config)` tuples, where each tuple defines a group of processes:

```python
task_backend_specific_kwargs={
    "process_templates": [
        (2, {"policy": policy_a}),  # 2 processes with policy_a
        (2, {"policy": policy_b}),  # 2 processes with policy_b
    ]
}
```

This creates a Dragon **Job** with 4 total processes split into two groups. Each group can have its own placement policy.

In [None]:
async def parallel_job_placement():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    # Get available nodes
    sys = System()
    nodes = [Node(huid) for huid in sys.nodes]
    print(f"Available nodes: {[n.hostname for n in nodes]}")

    # Create one policy per node (or reuse the first if single-node)
    policy_a = Policy(
        placement=Policy.Placement.HOST_NAME,
        host_name=nodes[0].hostname,
    )
    policy_b = Policy(
        placement=Policy.Placement.HOST_NAME,
        host_name=nodes[-1].hostname,  # Last node (same as first if single-node)
    )

    # Parallel job: 2 processes on node A + 2 processes on node B
    # Each process will print its placement info to stdout
    task = ComputeTask(
        function=report_placement,
        task_backend_specific_kwargs={
            "process_templates": [
                (2, {"policy": policy_a}),
                (2, {"policy": policy_b}),
            ]
        },
    )

    async with session:
        await session.submit_tasks([task])
        await asyncio.gather(task)

        print(f"Task {task.uid}: {task.state}")
        print(f"Exit code: {task.return_value}")
        # stdout contains output from all processes in the job
        if task.stdout:
            print(f"Stdout:\n{task.stdout}")

await parallel_job_placement()

---

## 4. CPU Affinity: Pinning Tasks to Specific Cores

Use `cpu_affinity` in your `Policy` to bind a process to specific CPU cores. This is critical for:

- **NUMA-aware placement** — keeping memory and compute on the same socket
- **Avoiding contention** — ensuring tasks don't compete for the same cores
- **Reproducible performance** — eliminating OS scheduler variability

`cpu_affinity` takes a list of integer core IDs.

In [None]:
def report_cpu_affinity():
    """Report which CPU cores this process can use (prints JSON)."""
    import json
    import os
    import socket
    result = {
        "hostname": socket.gethostname(),
        "pid": os.getpid(),
        "cpu_affinity": list(os.sched_getaffinity(0)),
    }
    print(json.dumps(result))
    return result


async def cpu_affinity_example():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    # Pin task A to cores 0-3, task B to cores 4-7
    policy_a = Policy(cpu_affinity=[0, 1, 2, 3])
    policy_b = Policy(cpu_affinity=[4, 5, 6, 7])

    task_a = ComputeTask(
        function=report_cpu_affinity,
        task_backend_specific_kwargs={
            "process_template": {"policy": policy_a}
        },
    )
    task_b = ComputeTask(
        function=report_cpu_affinity,
        task_backend_specific_kwargs={
            "process_template": {"policy": policy_b}
        },
    )

    async with session:
        await session.submit_tasks([task_a, task_b])
        await asyncio.gather(task_a, task_b)

        # Parse results from stdout
        result_a = json.loads(task_a.stdout.strip())
        result_b = json.loads(task_b.stdout.strip())

        print(f"Task A cores: {result_a['cpu_affinity']}")
        print(f"Task B cores: {result_b['cpu_affinity']}")

        cores_a = set(result_a["cpu_affinity"])
        cores_b = set(result_b["cpu_affinity"])
        if cores_a.isdisjoint(cores_b):
            print("No overlap — CPU pinning verified.")
        else:
            print("Note: CPU affinity may not be enforced by Dragon Batch API on all systems.")

await cpu_affinity_example()

### CPU Affinity in Parallel Jobs

You can assign different CPU affinities to each process group in a parallel job. This is useful when different ranks have different compute patterns (e.g., one group does I/O-heavy work on fewer cores, another does compute-heavy work on many cores).

In [None]:
async def cpu_affinity_parallel():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    # Group 1: 2 processes pinned to cores 0-3
    # Group 2: 2 processes pinned to cores 4-7
    policy_group1 = Policy(cpu_affinity=[0, 1, 2, 3])
    policy_group2 = Policy(cpu_affinity=[4, 5, 6, 7])

    task = ComputeTask(
        function=report_cpu_affinity,
        task_backend_specific_kwargs={
            "process_templates": [
                (2, {"policy": policy_group1}),
                (2, {"policy": policy_group2}),
            ]
        },
    )

    async with session:
        await session.submit_tasks([task])
        await asyncio.gather(task)

        print(f"Task {task.uid}: {task.state}")
        print(f"Exit code: {task.return_value}")
        if task.stdout:
            print(f"Stdout:\n{task.stdout}")

await cpu_affinity_parallel()

---

## 5. GPU Affinity: Pinning Tasks to Specific GPUs

Use `gpu_affinity` in your `Policy` to bind a process to specific GPU devices. Dragon sets `CUDA_VISIBLE_DEVICES` automatically based on your policy.

For more details, see the Dragon documentation on [Controlling GPU Affinity](https://dragonhpc.github.io/dragon/doc/_build/html/uses/gpus.html).

### Discovering Available GPUs

First, let's discover what GPUs are available across the allocation.

In [None]:
def discover_gpus():
    """Discover all GPUs across all nodes in the Dragon allocation."""
    all_gpus = []
    sys = System()
    for huid in sys.nodes:
        node = Node(huid)
        for gpu_id in node.gpus:
            all_gpus.append((node.hostname, gpu_id))
    return all_gpus


gpus = discover_gpus()
print(f"Found {len(gpus)} GPUs:")
for hostname, gpu_id in gpus:
    print(f"  {hostname} — GPU {gpu_id}")

### Pinning a Single Task to a Specific GPU

Create a `Policy` with `gpu_affinity=[gpu_id]` and `placement=HOST_NAME` to pin the process to a specific GPU on a specific node.

In [None]:
def report_gpu():
    """Report which GPU this process can see (prints JSON)."""
    import json
    import os
    import socket
    result = {
        "hostname": socket.gethostname(),
        "pid": os.getpid(),
        "CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", "not set"),
    }
    print(json.dumps(result))
    return result


async def single_gpu_example():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    gpus = discover_gpus()
    if not gpus:
        print("No GPUs found — skipping.")
        return

    # Pin to the first GPU
    hostname, gpu_id = gpus[0]
    print(f"Pinning task to {hostname} GPU {gpu_id}")

    policy = Policy(
        placement=Policy.Placement.HOST_NAME,
        host_name=hostname,
        gpu_affinity=[gpu_id],
    )

    task = ComputeTask(
        function=report_gpu,
        task_backend_specific_kwargs={
            "process_template": {"policy": policy}
        },
    )

    async with session:
        await session.submit_tasks([task])
        await asyncio.gather(task)

        result = json.loads(task.stdout.strip())
        print(f"Result: {result}")

await single_gpu_example()

### One Task Per GPU (Round-Robin)

A common pattern is to submit one task per GPU, distributing work evenly across all available accelerators.

In [None]:
async def one_task_per_gpu():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    gpus = discover_gpus()
    if not gpus:
        print("No GPUs found — skipping.")
        return

    # Create one task per GPU, each pinned to its GPU
    tasks = []
    for hostname, gpu_id in gpus:
        policy = Policy(
            placement=Policy.Placement.HOST_NAME,
            host_name=hostname,
            gpu_affinity=[gpu_id],
        )
        task = ComputeTask(
            function=report_gpu,
            task_backend_specific_kwargs={
                "process_template": {"policy": policy}
            },
        )
        tasks.append(task)

    async with session:
        await session.submit_tasks(tasks)
        await asyncio.gather(*tasks)

        print(f"Submitted {len(tasks)} tasks (one per GPU):")
        for t in tasks:
            result = json.loads(t.stdout.strip())
            print(f"  {t.uid} — {result['hostname']} GPU {result['CUDA_VISIBLE_DEVICES']}")

await one_task_per_gpu()

### Many Tasks with Round-Robin GPU Assignment

When you have more tasks than GPUs, assign them round-robin. This is a common HPC pattern for maximizing GPU utilization.

In [None]:
def make_gpu_policies(all_gpus, nprocs):
    """Create per-process policies with round-robin GPU assignment."""
    policies = []
    for i in range(nprocs):
        hostname, gpu_id = all_gpus[i % len(all_gpus)]
        policies.append(
            Policy(
                placement=Policy.Placement.HOST_NAME,
                host_name=hostname,
                gpu_affinity=[gpu_id],
            )
        )
    return policies


async def round_robin_gpu():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    gpus = discover_gpus()
    if not gpus:
        print("No GPUs found — skipping.")
        return

    num_tasks = 8
    policies = make_gpu_policies(gpus, num_tasks)

    tasks = [
        ComputeTask(
            function=report_gpu,
            task_backend_specific_kwargs={
                "process_template": {"policy": policies[i]}
            },
        )
        for i in range(num_tasks)
    ]

    async with session:
        await session.submit_tasks(tasks)
        await asyncio.gather(*tasks)

        print(f"{num_tasks} tasks across {len(gpus)} GPUs (round-robin):")
        for t in tasks:
            result = json.loads(t.stdout.strip())
            print(f"  {t.uid} — {result['hostname']} GPU {result['CUDA_VISIBLE_DEVICES']}")

await round_robin_gpu()

### GPU Affinity in Parallel Jobs

For multi-process jobs where each process group needs its own GPU, use `process_templates` with per-group GPU policies.

In [None]:
async def parallel_gpu_job():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    gpus = discover_gpus()
    if len(gpus) < 2:
        print(f"Need at least 2 GPUs, found {len(gpus)} — skipping.")
        return

    # Group 1: 2 processes on GPU 0
    # Group 2: 2 processes on GPU 1
    policy_gpu0 = Policy(
        placement=Policy.Placement.HOST_NAME,
        host_name=gpus[0][0],
        gpu_affinity=[gpus[0][1]],
    )
    policy_gpu1 = Policy(
        placement=Policy.Placement.HOST_NAME,
        host_name=gpus[1][0],
        gpu_affinity=[gpus[1][1]],
    )

    task = ComputeTask(
        function=report_gpu,
        task_backend_specific_kwargs={
            "process_templates": [
                (2, {"policy": policy_gpu0}),
                (2, {"policy": policy_gpu1}),
            ]
        },
    )

    async with session:
        await session.submit_tasks([task])
        await asyncio.gather(task)

        print(f"Parallel job with 2 GPU groups:")
        print(f"  Task {task.uid}: {task.state}")
        print(f"  Exit code: {task.return_value}")
        if task.stdout:
            print(f"  Stdout:\n{task.stdout}")

await parallel_gpu_job()

---

## 6. Combined CPU + GPU Affinity

For maximum control, combine `cpu_affinity` and `gpu_affinity` in the same `Policy`. This is important for:

- **NUMA-aware GPU placement** — pin the process to CPUs on the same NUMA domain as the GPU
- **Data pipeline tasks** — dedicate specific cores for data loading alongside GPU compute
- **Multi-tenant nodes** — partition resources cleanly between users or jobs

In [None]:
def report_full_placement():
    """Report CPU affinity, GPU visibility, and hostname (prints JSON)."""
    import json
    import os
    import socket
    result = {
        "hostname": socket.gethostname(),
        "pid": os.getpid(),
        "cpu_affinity": list(os.sched_getaffinity(0)),
        "CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", "not set"),
    }
    print(json.dumps(result))
    return result


async def combined_affinity_example():
    backend = await DragonExecutionBackendV3()
    session = Session(backends=[backend])

    gpus = discover_gpus()
    if not gpus:
        print("No GPUs found — skipping.")
        return

    # Task 1: GPU 0 with cores 0-7 (e.g., NUMA domain 0)
    policy_a = Policy(
        placement=Policy.Placement.HOST_NAME,
        host_name=gpus[0][0],
        cpu_affinity=list(range(0, 8)),
        gpu_affinity=[gpus[0][1]],
    )

    task_a = ComputeTask(
        function=report_full_placement,
        task_backend_specific_kwargs={
            "process_template": {"policy": policy_a}
        },
    )

    tasks = [task_a]

    # Task 2: GPU 1 with cores 8-15 (e.g., NUMA domain 1) — if available
    if len(gpus) >= 2:
        policy_b = Policy(
            placement=Policy.Placement.HOST_NAME,
            host_name=gpus[1][0],
            cpu_affinity=list(range(8, 16)),
            gpu_affinity=[gpus[1][1]],
        )
        task_b = ComputeTask(
            function=report_full_placement,
            task_backend_specific_kwargs={
                "process_template": {"policy": policy_b}
            },
        )
        tasks.append(task_b)

    async with session:
        await session.submit_tasks(tasks)
        await asyncio.gather(*tasks)

        for t in tasks:
            result = json.loads(t.stdout.strip())
            print(f"Task {t.uid}:")
            print(f"  Host : {result['hostname']}")
            print(f"  CPUs : {result['cpu_affinity']}")
            print(f"  GPU  : {result['CUDA_VISIBLE_DEVICES']}")
            print()

await combined_affinity_example()

---

## Quick Reference

### Policy Parameters

```python
from dragon.infrastructure.policy import Policy

policy = Policy(
    placement=Policy.Placement.HOST_NAME,  # DEFAULT, ANYWHERE, HOST_NAME, HOST_ID
    host_name="node01",                    # Target node (with HOST_NAME)
    cpu_affinity=[0, 1, 2, 3],             # Pin to specific CPU cores
    gpu_affinity=[0],                      # Pin to specific GPU devices
    distribution=Policy.Distribution.ROUNDROBIN,  # ROUNDROBIN, BLOCK
)
```

### Passing Policies to RHAPSODY Tasks

```python
# Single process
ComputeTask(
    function=my_func,
    task_backend_specific_kwargs={
        "process_template": {"policy": policy}
    },
)

# Parallel job (multiple process groups)
ComputeTask(
    function=my_func,
    task_backend_specific_kwargs={
        "process_templates": [
            (nranks_group_1, {"policy": policy_1}),
            (nranks_group_2, {"policy": policy_2}),
        ]
    },
)
```

### Return Values vs Stdout

| Execution Mode | `task.return_value` | `task.stdout` |
|---|---|---|
| Function Native (no template) | Python return value | `None` |
| Function Process (`process_template`) | Exit code (int) | Captured from `print()` |
| Function Job (`process_templates`) | Exit code (int) | Captured from `print()` |
| Executable Process/Job | `None` | Command stdout |

**Tip:** When using `process_template` or `process_templates`, have your function `print(json.dumps(result))` and parse it with `json.loads(task.stdout.strip())`.

### Execution Mode Summary

| Task Config | Execution Mode | Dragon API |
|-------------|---------------|------------|
| `function` only | Function Native | `batch.function()` |
| `function` + `process_template` | Function Process | `batch.process(ProcessTemplate(...))` |
| `function` + `process_templates` | Function Job | `batch.job([(N, ProcessTemplate(...)), ...])` |
| `executable` + `process_template` | Executable Process | `batch.process(ProcessTemplate(...))` |
| `executable` + `process_templates` | Executable Job | `batch.job([(N, ProcessTemplate(...)), ...])` |