<div>
<center><img src="../assets/Flux-logo.svg" width="400"/>
</div>

# Chapter 2: Python Submission API 🐍️

Flux also provides first-class python bindings which can be used to submit jobs programmatically. 

### Importing the flux package

Flux requires python to build, so if you have Flux installed, you have at least one python installation that works. However, you can also `pip install flux-python` to get the `flux` package in a side-installation of python (which you may have to do in this notebook!)

In [1]:
!python3 -c "import flux; print(flux.Flux())"

<flux.core.handle.Flux object at 0x155546ed0130>


In [41]:
import concurrent.futures
import os
import json
import flux
import flux.job
from flux.job import JobspecV1
from flux.job.JobID import JobID

### `flux.job.JobspecV1` to create job specifications

Flux represents work as a standard called the [Jobspec](https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_25.html). While you could write YAML or JSON, it's much easier to use provided Python functions that take high level metadata (command, resources, etc) to generate them. We can then replicate our previous example of submitting multiple heterogeneous jobs using these Python helpers, and testing that Flux co-schedules them.

In [26]:
# connect to the running Flux instance
f = flux.Flux()

# Create the Jobspec from a command to run a python script, and specify resources
compute_jobreq = JobspecV1.from_command(
    command=["sleep", "0"], num_tasks=1, num_nodes=1, cores_per_task=1
    )
compute_jobreq.environment = {'TMPDIR': '/tmp/'}
id = flux.job.submit(f, compute_jobreq)

# When we submit, we get back the job identifier (JobID)
print(f'{id.f58 = }')

id.f58 = 'fAsfFYwuh'


We get a job ID, and the command will _block_ until a job ID is assigned. But, perhaps we want more information about the job, too.

In [46]:
# Maybe we want a bit more information about this job
info = flux.job.result(f, id)
print(f'{info.to_dict() = }')

# for i in range(100):
#     flux.job.submit(f, compute_jobreq) # submit and print out the jobid (in f58 format)

info.to_dict() = {'t_run': 1747934727.7275984, 't_cleanup': 1747934732.8789754, 'duration': 0.0, 'result': 'COMPLETED', 'waitstatus': 0, 'id': JobID(26095936077824), 't_submit': 1747934727.7054133, 'runtime': 5.151376962661743, 'returncode': 0, 'dependencies': [], 'annotations': {}, 'exception': {'occurred': False}}


Once we create the job, when we submit it in Python we get back a job identifier or jobid. We can then interact with the Flux handle, a connection to Flux, to get information about that job.

### `flux.job.get_job(handle, jobid)` to get job info

In [47]:
# Let's submit again to retrieve (and save) the job identifier
compute_jobreq = JobspecV1.from_command(
    command=["sleep", "15"], num_tasks=1, num_nodes=1, cores_per_task=1
    )
compute_jobreq.environment = {'TMPDIR': '/tmp/'}

## Explain how to set environment in Jobspec
# import os, json
# compute_jobreq.environment = dict(os.environ)

fluxjob = flux.job.submit(f, compute_jobreq)
fluxjobid = JobID(fluxjob.f58)
print(f"🎉️ Hooray, we just submitted {fluxjobid}!\n")

# Here is how to get your info. The first argument is the flux handle, then the jobid
jobinfo = flux.job.get_job(f, fluxjobid)
print(jobinfo)

🎉️ Hooray, we just submitted fEj9nMUnK!

{'t_depend': 1747934978.8019178, 't_run': 0.0, 't_cleanup': 0.0, 't_inactive': 0.0, 'duration': 0.0, 'expiration': 0.0, 'name': 'sleep', 'cwd': '', 'queue': '', 'project': '', 'bank': '', 'ntasks': 1, 'ncores': 1, 'nnodes': 1, 'priority': 16, 'ranks': '', 'nodelist': '', 'success': '', 'result': '', 'waitstatus': '', 'id': JobID(30308443357184), 't_submit': 1747934978.791045, 't_remaining': 0.0, 'state': 'SCHED', 'username': 'hobbs17', 'userid': 60943, 'urgency': 16, 'runtime': 0.0, 'status': 'SCHED', 'returncode': '', 'dependencies': [], 'annotations': {}, 'exception': {'occurred': '', 'severity': '', 'type': '', 'note': ''}}


Look at what came from `.get_job()`. It didn't block, but the job isn't complete yet -- that `t_run` is 0, which makes no sense, because the run time should be around 5 seconds. Notice the status is also 'SCHED', meaning that the job is being scheduled, 

You can now run `flux jobs` to see the jobs that we submit from Python.

In [29]:
!flux jobs -a --name="sleep"

       JOBID USER     NAME       ST NTASKS NNODES     TIME INFO
   fC3FJfwtT hobbs17  sleep       R      1      1   5.548s tuolumne1024
[01;32m   fAsfFYwuh hobbs17  sleep      CD      1      1   0.138s tuolumne1024
[0;0m[01;32m   fAky1B2Y7 hobbs17  sleep      CD      1      1   0.151s tuolumne1024
[0;0m[01;31m   f9zoXKtcw hobbs17  sleep       F      1      1   0.040s tuolumne1024
[0;0m[01;31m   f9epp5D1Z hobbs17  sleep       F      1      1   0.041s tuolumne1024
[0;0m[01;31m   f9cud159Z hobbs17  sleep       F      1      1   0.046s tuolumne1024
[0;0m[01;31m   f8qZEDKUj hobbs17  sleep       F      1      1   0.044s tuolumne1024
[0;0m[01;31m   f8TYaVmcK hobbs17  sleep       F      1      1   0.041s tuolumne1024
[0;0m[37m   f86WhWidm hobbs17  sleep      CA      1      1   13.83s tuolumne1024
[0;0m[01;31m   f7ycr4RKd hobbs17  sleep       F      1      1   0.040s tuolumne1024
[0;0m[01;31m   f7npt1f99 hobbs17  sleep       F      1      1   0.040s tuolumne1024
[0;0m[01;31

Under the hood, the `Jobspec` class is creating a YAML document that ultimately gets serialized as JSON and sent to Flux for ingestion, validation, queueing, scheduling, and eventually execution.  We can dump the raw JSON jobspec that is submitted, where we can see the exact resources requested and the task set to be executed on those resources.

In [30]:
print(compute_jobreq.dumps(indent=2))

{
  "resources": [
    {
      "type": "node",
      "count": 1,
      "with": [
        {
          "type": "slot",
          "count": 1,
          "with": [
            {
              "type": "core",
              "count": 1
            }
          ],
          "label": "task"
        }
      ]
    }
  ],
  "tasks": [
    {
      "command": [
        "sleep",
        "15"
      ],
      "slot": "task",
      "count": {
        "per_slot": 1
      }
    }
  ],
  "attributes": {
    "system": {
      "duration": 0,
      "environment": {
        "TMPDIR": "/tmp/"
      }
    }
  },
  "version": 1
}


### Playing with the Synchronous job submission API

One slight hiccup to creating jobspecs in Python is that some attributes of a jobspec are set in the _initializer_ (`.from_command` or another method) while others are specified by methods to the jobspec itself. Below is an example, and here's a brief table for reference:

| Operation                                      | CLI                                                                       | Python                                                                                     |
|------------------------------------------------|---------------------------------------------------------------------------|--------------------------------------------------------------------------------------------|
| Setting nodes, tasks, and cores                | -N _n_, -n _n_, and -c _n_, respectively                                  | in the initializer: num_nodes, num_tasks, num_cores                                        |
| Setting node exclusivity                       | `-x` or enforced by policy                                                | in the initializer: exclusive=True                                                         |
| Setting time limits                            | `-t MINUTES\|FSD`                                                         | `jobspec.duration=[seconds\|FSD]`                                                          |
| Set working directory                          | Automatically set to current directory or `--cwd=`                        | `jobspec.cwd=[string of path]`                                                             |
| Set environment                                | Default set to current environment, or `--env=[modifier]`                 | `jobspec.environment=[dict]`                                                               |
| Setting stdout and stderr                      | `--output [OUT] --error [ERR]` or your terminal by default for alloc/run  | `flux jobs` / `flux job info *job_id*`                                                     |
| Setting system attributes                      | `-S KEY[=VAL]`                                                            | `jobspec.setattr(key, val)`                                                                |
| Set shell attributes                           | `-o KEY[=VAL]`                                                            | `jobspec.setattr_shell_option(key, val)`                                                   |
| Modifying configuration of a subinstance       | `--conf` for batch/alloc                                                  | in the initializer, requires `.from_batch_command` or `.from_nest_command`                 |

The example below is stolen from the [El Cap documentation maintained by Ramesh Pankajakshan](https://hpc.llnl.gov/documentation/user-guides/using-el-capitan-systems/introduction-and-quickstart/flux).

_"Submitting jobs through Python is the civilized way of life" -Ramesh_

In [36]:
handle = flux.Flux()
jobspec = JobspecV1.from_command(
    command=["sleep", "5"], num_tasks=4, num_nodes=1,
)
jobspec.cwd = os.getcwd()
jobspec.exclusive=1
jobspec.duration="5m"
jobspec.environment = {"TMPDIR": "/tmp/"}
jobspec.stdout="PYEXAMPLE.{{id}}.out"
jobspec.stderr="PYEXAMPLE.{{id}}.err"
id = flux.job.submit(handle, jobspec)

info = flux.job.event_wait(handle, id, "finish")
## fetch the R, give you a nodelist
print(info)

1747934732.87898: finish {'status': 0}


### `FluxExecutor` for bulk submission

We can use the FluxExecutor class to submit large numbers of jobs to Flux. This method resembles python's `concurrent.futures` interface.

In [38]:
from flux.job import FluxExecutor
with FluxExecutor() as executor:
    compute_jobspec = JobspecV1.from_command(["sleep", "3"])
    compute_jobspec.environment = {"TMPDIR": "/tmp/"}
    futures = [executor.submit(compute_jobspec) for _ in range(10)]
    # wait for the jobid for each job, as a proxy for the job being submitted
# all jobs submitted - print timings
print("I'm all done")

I'm all done


In [40]:
# Submit the FluxExecutor based script.
# flux python bulksubmit_executor.py -n200 /bin/sleep 0

In [42]:
jobspec = flux.job.JobspecV1.from_command(["/bin/true"])
jobspec.environment = {"TMPDIR": "/tmp/"}
with flux.job.FluxExecutor() as executor:
        futs = [executor.submit(jobspec) for _ in range(5)]
        for f in concurrent.futures.as_completed(futs):
                print(f.result())

0
0
0
0
0


### `flux.event_watch` to watch events

If you want to get the output of a job (or more generally, stream events) you can do that as follows. Let's submit a quick job, and then look at the output.


In [43]:
# Create the Jobspec from a command to run a python script, and specify resources
f = flux.Flux()
jobspec = flux.job.JobspecV1.from_command(
    command=["echo", "The Tales of Beedle the Bard"], num_tasks=1, num_nodes=1, cores_per_task=1)
jobspec.environment = {"TMPDIR": "/tmp/"}
jobid = flux.job.submit(f, jobspec, waitable=True)

# Wait until the job finishes
flux.job.wait(f, jobid)
print(jobid)

# Wait on an event to complete and then print its associated data
for line in flux.job.event_watch(f, jobid, "guest.output"):
    print(line)

fDtB89axw
1747934867.70586: header {'version': 1, 'encoding': {'stdout': 'UTF-8', 'stderr': 'UTF-8'}, 'count': {'stdout': 1, 'stderr': 1}, 'options': {}}
1747934867.81982: data {'stream': 'stdout', 'rank': '0', 'data': 'The Tales of Beedle the Bard\n'}
1747934867.81990: data {'stream': 'stderr', 'rank': '0', 'eof': True}
1747934867.81991: data {'stream': 'stdout', 'rank': '0', 'eof': True}


### `flux.job.JobOutputWatch` to synchronously watch for job output with asynchronous job submission


In [44]:
# Create the Jobspec from a command to run a python script, and specify resources
f = flux.Flux()
jobspec = JobspecV1.from_command(
    command=["echo", "The Tales of Beedle the Bard"], num_tasks=4, num_nodes=1
)
jobspec.environment = {"TMPDIR": "/tmp/"}
jobid = flux.job.submit(f, jobspec)

t = flux.job.output.JobOutputWatchLines(f, jobid).getline()
print(t)

['stdout', 'The Tales of Beedle the Bard']


### `flux.job.JobOutputWatch` to watch for job output with asynchronous job submission

In [45]:
import concurrent.futures
import flux.job

## Define something we want to happen when the futures are fulfilled
def print_output(fut):
    t = flux.job.output.JobOutputWatchLines(f, fut.jobid()).getline()
    for line in t:
        print(f'{fut.jobid()}: {line}')

## Submit all of the futures using the executor.
jobspec = flux.job.JobspecV1.from_command(["echo", "Wingardium Leviosa!"])
jobspec.environment = {"TMPDIR": "/tmp/"}

with flux.job.FluxExecutor() as executor:
        futs = [executor.submit(jobspec) for _ in range(5)]
        for fut in futs:
            fut.add_done_callback(print_output)

fE4zRHhoM: stdout
fE4zRHhoM: Wingardium Leviosa!
fE4zSmh5h: stdout
fE4zSmh5h: Wingardium Leviosa!
fE4zSmh5i: stdout
fE4zSmh5i: Wingardium Leviosa!
fE4zSmh5j: stdout
fE4zSmh5j: Wingardium Leviosa!
fE4zSmh5k: stdout
fE4zSmh5k: Wingardium Leviosa!


### `flux.job.job_list` to list jobs

Finally, it can be really helpful to get an entire listing of jobs. You can do that as follows. Note that the `job_list` is creating a remote procedure call (rpc) and we call `get` to retrieve the output.

In [34]:
flux.job.job_list(f).get()

{'jobs': [{'id': 324704392970240,
   'userid': 1000,
   'urgency': 16,
   'priority': 16,
   't_submit': 1747878454.2070212,
   't_depend': 1747878454.2199697,
   't_run': 1747878454.2341714,
   't_cleanup': 1747878469.3649247,
   't_inactive': 1747878469.369873,
   'state': 64,
   'name': 'sleep',
   'ntasks': 1,
   'ncores': 1,
   'duration': 0.0,
   'nnodes': 1,
   'ranks': '0',
   'nodelist': 'a100cf62dc4f',
   'success': True,
   'exception_occurred': False,
   'result': 1,
   'expiration': 0.0,
   'annotations': {'sched': {'resource_summary': 'rank0/core0'}},
   'waitstatus': 0},
  {'id': 324870806175744,
   'userid': 1000,
   'urgency': 16,
   'priority': 16,
   't_submit': 1747878464.1258276,
   't_depend': 1747878464.141535,
   't_run': 1747878464.1594179,
   't_cleanup': 1747878464.350665,
   't_inactive': 1747878464.3524082,
   'state': 64,
   'name': 'echo',
   'ntasks': 1,
   'ncores': 1,
   'duration': 0.0,
   'nnodes': 1,
   'ranks': '2',
   'nodelist': 'a100cf62dc4f',
 