Skip to content

Commit

Permalink
add example MPI operator job
Browse files Browse the repository at this point in the history
We determined that the issue with running the MPI Job example
was that the order of operations for installing Kueue and the
MPI Operator is important - the MPI operator must be installed
first! Once that is done, each of the YAML applied example and
the Python example worked. Here we are adding the Python example.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Jul 25, 2023
1 parent 10ae081 commit a1ebb9a
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
2 changes: 2 additions & 0 deletions site/content/en/docs/tasks/run_mpi_jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,5 @@ spec:
cpu: 1
memory: 1Gi
```

For equivalent instructions for doing this in Python, see [Run Python Jobs](/docs/tasks/run_python_jobs/#mpi-operator-job).
100 changes: 100 additions & 0 deletions site/content/en/docs/tasks/run_python_jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,103 @@ hello world

You can further customize the job, and can ask questions on the [Flux Operator issues board](https://github.com/flux-framework/flux-operator/issues).
Finally, for instructions for how to do this with YAML outside of Python, see [Run A Flux MiniCluster](/docs/tasks/run_flux_minicluster/).

### MPI Operator Job

For this example, we will be using the [MPI Operator](https://www.kubeflow.org/docs/components/training/mpi/)
to submit a job, and specifically using the [Python SDK](https://github.com/kubeflow/mpi-operator/tree/master/sdk/python/v2beta1) to do this easily. Given our Python environment created in the [setup](#before-you-begin), we can install this Python SDK directly to it as follows:

```bash
git clone --depth 1 https://github.com/kubeflow/mpi-operator /tmp/mpijob
cd /tmp/mpijob/sdk/python/v2beta1
python setup.py install
cd -
```

Importantly, the MPI Operator *must be installed before Kueue* for this to work! Let's start from scratch with a new Kind cluster.
We will also need to [install the MPI operator](https://github.com/kubeflow/mpi-operator/tree/master#installation) and Kueue. Here we install
the exact versions tested with this example:

```bash
kubectl apply -f https://raw.githubusercontent.com/kubeflow/mpi-operator/master/deploy/v2beta1/mpi-operator.yaml
kubectl apply -f https://github.com/kubernetes-sigs/kueue/releases/download/v0.4.0/manifests.yaml
```

You need to wait until Kueue is ready. You can determine this as follows:

```bash
# Wait until you see all pods in the kueue-system are Running
kubectl get pods -n kueue-system
```

When Kueue is ready:

```bash
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/kueue/main/site/static/examples/single-clusterqueue-setup.yaml
```

Now try running the example MPI job.

```bash
python sample-mpijob.py
```
```console
📦️ Container image selected is mpioperator/mpi-pi:openmpi...
⭐️ Creating sample job with prefix pi...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
```

{{% include "python/sample-mpijob.py" "python" %}}

After submit, you can see that the queue has an admitted workload!

```bash
$ kubectl get queue
```
```console
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
```

And that the job "pi-launcher" has started:

```bash
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
pi-launcher 0/1 9s 9s
```

The MPI Operator works by way of a central launcher interacting with nodes via ssh. We can inspect
a worker and the launcher to get a glimpse of how both work:

```bash
$ kubectl logs pods/pi-worker-1
```
```console
Server listening on 0.0.0.0 port 22.
Server listening on :: port 22.
Accepted publickey for mpiuser from 10.244.0.8 port 51694 ssh2: ECDSA SHA256:rgZdwufXolOkUPA1w0bf780BNJC8e4/FivJb1/F7OOI
Received disconnect from 10.244.0.8 port 51694:11: disconnected by user
Disconnected from user mpiuser 10.244.0.8 port 51694
Received signal 15; terminating.
```

The job is fairly quick, and we can see the output of pi in the launcher:

```bash
$ kubectl logs pods/pi-launcher-f4gqv
```
```console
Warning: Permanently added 'pi-worker-0.pi-worker.default.svc,10.244.0.7' (ECDSA) to the list of known hosts.
Warning: Permanently added 'pi-worker-1.pi-worker.default.svc,10.244.0.9' (ECDSA) to the list of known hosts.
Rank 1 on host pi-worker-1
Workers: 2
Rank 0 on host pi-worker-0
pi is approximately 3.1410376000000002
```

That looks like pi! 🎉️🥧️
If you are interested in running this same example with YAML outside of Python, see [Run an MPIJob](/docs/tasks/run_mpi_jobs/).

143 changes: 143 additions & 0 deletions site/static/examples/python/sample-mpijob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#!/usr/bin/env python3

import argparse
from kubernetes import config, client
import mpijob.models as models

# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue MPI Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="pi",
)
parser.add_argument(
"--image",
help="container image to use",
default="mpioperator/mpi-pi:openmpi",
)
parser.add_argument(
"--command",
help="command to run",
default="mpirun",
)
parser.add_argument(
"--args",
nargs="+",
help="args for container",
default=["-n", "2", "/home/mpiuser/pi"],
)
return parser


def generate_job_crd(job_name, image, command, args):
"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
)

# containers for launcher and worker
launcher_container = client.V1Container(
image=image,
name="mpi-launcher",
command=[command],
args=args,
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)

worker_container = client.V1Container(
image=image,
name="mpi-worker",
command=["/usr/sbin/sshd"],
args=["-De", "-f", "/home/mpiuser/.sshd_config"],
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)

# Create the Launcher and worker replica specs
launcher = models.V2beta1ReplicaSpec(
replicas=1,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[launcher_container])
),
)

worker = models.V2beta1ReplicaSpec(
replicas=2,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[worker_container])
),
)

# runPolicy for jobspec
policy = models.V2beta1RunPolicy(
clean_pod_policy="Running", ttl_seconds_after_finished=60
)

# Create the jobspec
jobspec = models.V2beta1MPIJobSpec(
slots_per_worker=1,
run_policy=policy,
ssh_auth_mount_path="/home/mpiuser/.ssh",
mpi_replica_specs={"Launcher": launcher, "Worker": worker},
)
return models.V2beta1MPIJob(
metadata=metadata,
api_version="kubeflow.org/v2beta1",
kind="MPIJob",
spec=jobspec,
)


def main():
"""
Run an MPI job. This requires the MPI Operator to be installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()

# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.command, args.args)
crd_api = client.CustomObjectsApi()

print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace="default",
plural="mpijobs",
body=crd,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
)


if __name__ == "__main__":
main()

0 comments on commit a1ebb9a

Please sign in to comment.