Skip to content

Commit

Permalink
feature: flux framework scheduler
Browse files Browse the repository at this point in the history
this will add a basic scheduler that can interact
with Flux. I will need guidance on the best way to
run tests, as either we will need a container base
with flux or to mock something.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Sep 16, 2022
1 parent 0219a7a commit 67c7280
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 2 deletions.
1 change: 1 addition & 0 deletions reframe/core/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
]
_launchers = {}
_scheduler_backend_modules = [
'reframe.core.schedulers.flux',
'reframe.core.schedulers.local',
'reframe.core.schedulers.lsf',
'reframe.core.schedulers.pbs',
Expand Down
152 changes: 152 additions & 0 deletions reframe/core/schedulers/flux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

#
# Flux-Framework backend
#
# - Initial version submitted by Vanessa Sochat, Lawrence Livermore National Lab
#

import os
import time

import reframe.core.runtime as rt
from reframe.core.backends import register_scheduler
from reframe.core.exceptions import JobSchedulerError
from reframe.core.schedulers.pbs import PbsJobScheduler


# Just import flux once
try:
import flux
import flux.job
from flux.job import JobspecV1
except ImportError:
flux = None


@register_scheduler('flux')
class FluxJobScheduler(PbsJobScheduler):
def __init__(self):
if not flux:
raise JobSchedulerError(
"Cannot import flux. Is a cluster available to you with Python bindings?"
)
self._fexecutor = flux.job.FluxExecutor()
self._submit_timeout = rt.runtime().get_option(
f'schedulers/@{self.registered_name}/job_submit_timeout'
)

def _prepare_job_formatter(self):
"""
A job doesn't have an easy status command - instead we do a job
listing with a particular format. See src/cmd/flux-jobs.py#L44
in flux-framework/flux-core for more attributes.
"""
jobs_format = (
"{id.f58:>12} {username:<8.8} {name:<10.10} {status_abbrev:>2.2} "
"{ntasks:>6} {nnodes:>6h} {runtime!F:>8h} {success} {exception.occurred}"
"{exception.note} {exception.type} {result} {runtime} {status}"
"{ranks:h} {t_remaining} {annotations}"
)
self.jobs_formatter = flux.job.JobInfoFormat(jobs_format)

# Note there is no attr for "id", its always returned
fields2attrs = {
"id.f58": (),
"username": ("userid",),
"exception.occurred": ("exception_occurred",),
"exception.type": ("exception_type",),
"exception.note": ("exception_note",),
"runtime": ("t_run", "t_cleanup"),
"status": ("state", "result"),
"status_abbrev": ("state", "result"),
"t_remaining": ("expiration", "state", "result"),
}

# Set job attributes we will use later to get job statuses
self.job_attrs = set()
for field in self.jobs_formatter.fields:
if field not in fields2attrs:
self.job_attrs.update((field,))
else:
self.job_attrs.update(fields2attrs[field])


def emit_preamble(self, job):
"""
We don't need to submit with a file, so we don't need a preamble.
"""
return ["echo $PWD"]

def submit(self, job):
"""
Submit a job to the flux executor.
"""
self._prepare_job_formatter()

# Output and error files
script_prefix = job.script_filename.split('.')[0]
output = os.path.join(job.workdir, "%s.out" % script_prefix)
error = os.path.join(job.workdir, "%s.err" % script_prefix)

# Generate the flux job
# Assume the filename includes a hashbang
# flux does not support mem_mb, disk_mb
fluxjob = JobspecV1.from_command(
command=["/bin/bash", job.script_filename],
num_tasks=job.num_tasks_per_core or 1,
cores_per_task=job.num_cpus_per_task or 1
)

# A duration of zero (the default) means unlimited
fluxjob.duration = job.time_limit or 0
fluxjob.stdout = output
fluxjob.stderr = error

# This doesn't seem to be used?
fluxjob.cwd = job.workdir
fluxjob.environment = dict(os.environ)
flux_future = self._fexecutor.submit(fluxjob)
job._jobid = str(flux_future.jobid())
job._submit_time = time.time()
job._flux_future = flux_future


def poll(self, *jobs):
if jobs:
# filter out non-jobs
jobs = [job for job in jobs if job is not None]

if not jobs:
return

# Loop through active jobs and act on status
for job in jobs:
if job._flux_future.done():

# The exit code can help us determine if the job was successful
try:
exit_code = job._flux_future.result(0)
except RuntimeError:
# Assume some runtime issue (suspended)
self.log(f'Job {job.jobid} was likely suspended.')
job._state = 'SUSPENDED'
else:
# the job finished (but possibly with nonzero exit code)
if exit_code != 0:
self.log(f'Job {job.jobid} did not finish successfully')
job._state = 'COMPLETED'
job._completed = True

# Otherwise, we are still running
else:
job._state = 'RUNNING'

def finished(self, job):
if job.exception:
raise job.exception

return job.state == 'COMPLETED'
1 change: 1 addition & 0 deletions reframe/core/schedulers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def getscheduler(name):


# Import the schedulers modules to trigger their registration
import reframe.core.schedulers.flux # noqa: F401, F403
import reframe.core.schedulers.local # noqa: F401, F403
import reframe.core.schedulers.lsf # noqa: F401, F403
import reframe.core.schedulers.oar # noqa: F401, F403
Expand Down
4 changes: 2 additions & 2 deletions reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@
"scheduler": {
"type": "string",
"enum": [
"local", "lsf", "oar", "pbs",
"flux", "local", "lsf", "oar", "pbs",
"sge", "slurm", "squeue", "torque"
]
},
Expand Down Expand Up @@ -383,7 +383,7 @@
"properties": {
"name": {
"type": "string",
"enum": ["local", "lsf", "oar", "pbs",
"enum": ["flux", "local", "lsf", "oar", "pbs",
"sge", "slurm", "squeue", "torque"]
},
"ignore_reqnodenotavail": {"type": "boolean"},
Expand Down
12 changes: 12 additions & 0 deletions tutorials/advanced/flux/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM fluxrm/flux-sched:focal
# docker build -f tutorials/advanced/flux/Dockerfile -t flux-reframe .
# docker run -it -v $PWD:/code flux-reframe
# docker run -it flux-reframe
USER root
ENV PATH=/opt/conda/bin:$PATH
WORKDIR /code
COPY . /code
RUN /bin/bash /code/bootstrap.sh
ENV PATH=/code/bin:$PATH
# If you want to develop, you'll need to comment this
# USER fluxuser
90 changes: 90 additions & 0 deletions tutorials/advanced/flux/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Flux Framework Tutorial

This is a demo that will show how to use refame with [Flux Framework](https://github.com/flux-framework/).
First, build the container here from the root of reframe.

```bash
$ docker build -f tutorials/advanced/flux/Dockerfile -t flux-reframe .
```

Then shell inside, optionally binding the present working directory if you want to develop.

```bash
$ docker run -it -v $PWD:/code flux-reframe
$ docker run -it flux-reframe
```

Note that if you build the local repository, you'll need to bootstrap and install
again, as we have over-written the bin!

```bash
./bootstrap.sh
```

And then reframe will again be in the local `bin` directory:

```bash
# which reframe
/code/bin/reframe
```

Then we can run reframe with the custom config [config.py](config.py) for flux.

```bash
# What tests are under tutorials/advanced/flux?
$ reframe -c . -C settings.py -l
```
```console
[ReFrame Setup]
version: 4.0.0-dev.1
command: '/code/bin/reframe -c tutorials/advanced/flux -C tutorials/advanced/flux/settings.py -l'
launched by: root@b1f6650222bc
working directory: '/code'
settings file: 'tutorials/advanced/flux/settings.py'
check search path: '/code/tutorials/advanced/flux'
stage directory: '/code/stage'
output directory: '/code/output'

[List of matched checks]
- EchoRandTest /66b93401
Found 1 check(s)

Log file(s) saved in '/tmp/rfm-ilqg7fqg.log'
```

This also works

```bash
$ reframe -c tutorials/advanced/flux -C tutorials/advanced/flux/settings.py -l
```

And then to run tests, just replace `-l` (for list) with `-r` or `--run` (for run):

```bash
$ reframe -c tutorials/advanced/flux -C tutorials/advanced/flux/settings.py --run
```
```console
root@b1f6650222bc:/code# reframe -c tutorials/advanced/flux -C tutorials/advanced/flux/settings.py --run
[ReFrame Setup]
version: 4.0.0-dev.1
command: '/code/bin/reframe -c tutorials/advanced/flux -C tutorials/advanced/flux/settings.py --run'
launched by: root@b1f6650222bc
working directory: '/code'
settings file: 'tutorials/advanced/flux/settings.py'
check search path: '/code/tutorials/advanced/flux'
stage directory: '/code/stage'
output directory: '/code/output'

[==========] Running 1 check(s)
[==========] Started on Fri Sep 16 20:47:15 2022

[----------] start processing checks
[ RUN ] EchoRandTest /66b93401 @generic:default+builtin
[ OK ] (1/1) EchoRandTest /66b93401 @generic:default+builtin
[----------] all spawned checks have finished

[ PASSED ] Ran 1/1 test case(s) from 1 check(s) (0 failure(s), 0 skipped)
[==========] Finished on Fri Sep 16 20:47:15 2022
Run report saved in '/root/.reframe/reports/run-report.json'
Log file(s) saved in '/tmp/rfm-0avso9nb.log'
```
32 changes: 32 additions & 0 deletions tutorials/advanced/flux/example1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

# rfmdocstart: echorand
import reframe as rfm
import reframe.utility.sanity as sn


@rfm.simple_test
class EchoRandTest(rfm.RunOnlyRegressionTest):
descr = 'A simple test that echoes a random number'
valid_systems = ['*']
valid_prog_environs = ['*']
lower = variable(int, value=90)
upper = variable(int, value=100)
executable = 'echo'
executable_opts = [
'Random: ',
f'$((RANDOM%({upper}+1-{lower})+{lower}))'
]

@sanity_function
def assert_solution(self):
return sn.assert_bounded(
sn.extractsingle(
r'Random: (?P<number>\S+)', self.stdout, 'number', float
),
self.lower, self.upper
)
# rfmdocend: echorand
69 changes: 69 additions & 0 deletions tutorials/advanced/flux/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

#
# Generic fallback configuration
#

site_configuration = {
'systems': [
{
'name': 'generic',
'descr': 'Generic example system',
'hostnames': ['.*'],
'partitions': [
{
'name': 'default',
'scheduler': 'flux',
'launcher': 'local',
'environs': ['builtin']
}
]
},
],
'environments': [
{
'name': 'builtin',
'cc': 'cc',
'cxx': '',
'ftn': ''
},
],
'logging': [
{
'handlers': [
{
'type': 'stream',
'name': 'stdout',
'level': 'info',
'format': '%(message)s'
},
{
'type': 'file',
'level': 'debug',
'format': '[%(asctime)s] %(levelname)s: %(check_info)s: %(message)s', # noqa: E501
'append': False
}
],
'handlers_perflog': [
{
'type': 'filelog',
'prefix': '%(check_system)s/%(check_partition)s',
'level': 'info',
'format': (
'%(check_job_completion_time)s|reframe %(version)s|'
'%(check_info)s|jobid=%(check_jobid)s|'
'%(check_perf_var)s=%(check_perf_value)s|'
'ref=%(check_perf_ref)s '
'(l=%(check_perf_lower_thres)s, '
'u=%(check_perf_upper_thres)s)|'
'%(check_perf_unit)s'
),
'append': True
}
]
}
],
}

0 comments on commit 67c7280

Please sign in to comment.