Skip to content

Commit

Permalink
add flux operator python sdk example
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Jul 7, 2023
1 parent d40c3b8 commit 8265dd1
Show file tree
Hide file tree
Showing 2 changed files with 353 additions and 0 deletions.
117 changes: 117 additions & 0 deletions examples/python/sample-flux-operator-job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python3

import argparse
from kubernetes import config, client
import fluxoperator.models as models

# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Flux Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="hello-world",
)
parser.add_argument(
"--image",
help="container image to use",
default="ghcr.io/flux-framework/flux-restful-api",
)
parser.add_argument(
"--tasks",
help="Number of tasks",
default=1,
type=int,
)
parser.add_argument(
"--quiet",
help="Do not show extra flux output (only hello worlds!)",
action="store_true",
default=False,
)

parser.add_argument(
"--command",
help="command to run",
default="echo",
)
parser.add_argument(
"--args", nargs="+", help="args for container", default=["hello", "world"]
)
return parser


def generate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):
"""
Generate a minicluster CRD
"""
container = models.MiniClusterContainer(
command=command + " " + " ".join(args),
resources={
"limits": {
"cpu": 1,
"memory": "2Gi",
}
},
)

# 4 pods and 4 tasks will echo hello-world x 4
spec = models.MiniClusterSpec(
job_labels={"kueue.x-k8s.io/queue-name": "user-queue"},
containers=[container],
size=4,
tasks=tasks,
logging={"quiet": quiet},
)

return models.MiniCluster(
kind="MiniCluster",
api_version="flux-framework.org/v1alpha1",
metadata=client.V1ObjectMeta(
generate_name=job_name,
namespace="default",
),
spec=spec,
)


def main():
"""
Run an MPI job. This requires the MPI Operator to be installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()

# Generate a CRD spec
minicluster = generate_minicluster_crd(
args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
)
crd_api = client.CustomObjectsApi()

print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="flux-framework.org",
version="v1alpha1",
namespace="default",
plural="miniclusters",
body=minicluster,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods'
)


if __name__ == "__main__":
main()
236 changes: 236 additions & 0 deletions site/content/en/docs/tasks/run_python_jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,239 @@ Use:

You can also change the container image with `--image` and args with `--args`.
For more customization, you can edit the example script.


### Flux Operator Job

For this example, we will be using the [Flux Operator](https://github.com/flux-framework/flux-operator)
to submit a job, and specifically using the [Python SDK](https://github.com/flux-framework/flux-operator/tree/main/sdk/python/v1alpha1) to do this easily. Given our Python environment created in the [setup](#before-you-begin), we can install this Python SDK directly to it as follows:

```bash
pip install fluxoperator
```

We will also need to [install the Flux operator](https://flux-framework.org/flux-operator/getting_started/user-guide.html#quick-install).

```bash
kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
```

Write the following script to `sample-flux-operator-job.py`:

```python
#!/usr/bin/env python3

import argparse
from kubernetes import config, client
import fluxoperator.models as models

# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Flux Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="hello-world",
)
parser.add_argument(
"--image",
help="container image to use",
default="ghcr.io/flux-framework/flux-restful-api",
)
parser.add_argument(
"--tasks",
help="Number of tasks",
default=1,
type=int,
)
parser.add_argument(
"--quiet",
help="Do not show extra flux output (only hello worlds!)",
action="store_true",
default=False,
)

parser.add_argument(
"--command",
help="command to run",
default="echo",
)
parser.add_argument(
"--args", nargs="+", help="args for container", default=["hello", "world"]
)
return parser


def generate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):
"""
Generate a minicluster CRD
"""
container = models.MiniClusterContainer(
command=command + " " + " ".join(args),
resources={
"limits": {
"cpu": 1,
"memory": "2Gi",
}
},
)

# 4 pods and 4 tasks will echo hello-world x 4
spec = models.MiniClusterSpec(
job_labels={"kueue.x-k8s.io/queue-name": "user-queue"},
containers=[container],
size=4,
tasks=tasks,
logging={"quiet": quiet},
)

return models.MiniCluster(
kind="MiniCluster",
api_version="flux-framework.org/v1alpha1",
metadata=client.V1ObjectMeta(
generate_name=job_name,
namespace="default",
),
spec=spec,
)


def main():
"""
Run an MPI job. This requires the MPI Operator to be installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()

# Generate a CRD spec
minicluster = generate_minicluster_crd(
args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
)
crd_api = client.CustomObjectsApi()

print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="flux-framework.org",
version="v1alpha1",
namespace="default",
plural="miniclusters",
body=minicluster,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods'
)


if __name__ == "__main__":
main()
```

Now try running the example:

```bash
python sample-flux-operator-job.py
```
```console
📦️ Container image selected is ghcr.io/flux-framework/flux-restful-api...
⭐️ Creating sample job with prefix hello-world...
Use:
"kubectl get queue" to see queue assignment
"kubectl get pods" to see pods
```

You'll be able to almost immediately see the MiniCluster job admitted to the queue:

```bash
kubectl get queue
```
```console
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
```

And the 4 pods running (we are creating a networked cluster with 4 nodes):

```bash
kubectl get pods
```
```console
NAME READY STATUS RESTARTS AGE
hello-world7qgqd-0-wp596 1/1 Running 0 7s
hello-world7qgqd-1-d7r87 1/1 Running 0 7s
hello-world7qgqd-2-rfn4t 1/1 Running 0 7s
hello-world7qgqd-3-blvtn 1/1 Running 0 7s
```

If you look at logs of the main broker pod (index 0 of the job above), there is a lot of
output for debugging, and you can see "hello world" running at the end:

```bash
kubectl logs hello-world7qgqd-0-wp596
```

<details>

<summary>Flux Operator Lead Broker Output</summary>

```console
🌀 Submit Mode: flux start -o --config /etc/flux/config -Scron.directory=/etc/flux/system/cron.d -Stbon.fanout=256 -Srundir=/run/flux -Sstatedir=/var/lib/flux -Slocal-uri=local:///run/flux/local -Slog-stderr-level=6 -Slog-stderr-mode=local flux submit -n 1 --quiet --watch echo hello world
broker.info[0]: start: none->join 0.399725ms
broker.info[0]: parent-none: join->init 0.030894ms
cron.info[0]: synchronizing cron tasks to event heartbeat.pulse
job-manager.info[0]: restart: 0 jobs
job-manager.info[0]: restart: 0 running jobs
job-manager.info[0]: restart: checkpoint.job-manager not found
broker.info[0]: rc1.0: running /etc/flux/rc1.d/01-sched-fluxion
sched-fluxion-resource.info[0]: version 0.27.0-15-gc90fbcc2
sched-fluxion-resource.warning[0]: create_reader: allowlist unsupported
sched-fluxion-resource.info[0]: populate_resource_db: loaded resources from core's resource.acquire
sched-fluxion-qmanager.info[0]: version 0.27.0-15-gc90fbcc2
broker.info[0]: rc1.0: running /etc/flux/rc1.d/02-cron
broker.info[0]: rc1.0: /etc/flux/rc1 Exited (rc=0) 0.5s
broker.info[0]: rc1-success: init->quorum 0.485239s
broker.info[0]: online: hello-world7qgqd-0 (ranks 0)
broker.info[0]: online: hello-world7qgqd-[0-3] (ranks 0-3)
broker.info[0]: quorum-full: quorum->run 0.354587s
hello world
broker.info[0]: rc2.0: flux submit -n 1 --quiet --watch echo hello world Exited (rc=0) 0.3s
broker.info[0]: rc2-success: run->cleanup 0.308392s
broker.info[0]: cleanup.0: flux queue stop --quiet --all --nocheckpoint Exited (rc=0) 0.1s
broker.info[0]: cleanup.1: flux cancel --user=all --quiet --states RUN Exited (rc=0) 0.1s
broker.info[0]: cleanup.2: flux queue idle --quiet Exited (rc=0) 0.1s
broker.info[0]: cleanup-success: cleanup->shutdown 0.252899s
broker.info[0]: children-complete: shutdown->finalize 47.6699ms
broker.info[0]: rc3.0: running /etc/flux/rc3.d/01-sched-fluxion
broker.info[0]: rc3.0: /etc/flux/rc3 Exited (rc=0) 0.2s
broker.info[0]: rc3-success: finalize->goodbye 0.212425s
broker.info[0]: goodbye: goodbye->exit 0.06917ms
```

</details>

If you submit and ask for four tasks, you'll see "hello world" four times:

```bash
python sample-flux-operator-job.py --tasks 4
```
```console
...
broker.info[0]: quorum-full: quorum->run 23.5812s
hello world
hello world
hello world
hello world
```

You can further customize the job, and can ask questions on the [Flux Operator issues board](https://github.com/flux-framework/flux-operator/issues) if you have any.

0 comments on commit 8265dd1

Please sign in to comment.