Skip to content

Commit

Permalink
flux cli classes go into a flux.cli module
Browse files Browse the repository at this point in the history
flux-mini also shares the same imported classes to
reduce redundancy.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Feb 24, 2023
1 parent aa7ae89 commit 13666ad
Show file tree
Hide file tree
Showing 37 changed files with 697 additions and 2,675 deletions.
5 changes: 1 addition & 4 deletions etc/gen-cmdhelp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
###############################################################
# Copyright 2020-2023 Lawrence Livermore National Security, LLC
# Copyright 2020 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
Expand Down Expand Up @@ -30,9 +30,6 @@
sys.path.insert(0, docsdir)
from manpages import man_pages

print(os.listdir(docsdir))
print(os.listdir(os.path.join(docsdir, 'man1')))

entries = []
visited = dict()

Expand Down
7 changes: 7 additions & 0 deletions src/bindings/python/flux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ nobase_fluxpy_PYTHON = \
memoized_property.py \
debugged.py \
importer.py \
cli/__init__.py \
cli/_base.py \
cli/alloc.py \
cli/batch.py \
cli/bulksubmit.py \
cli/run.py \
cli/submit.py \
core/__init__.py \
core/watchers.py \
core/inner.py \
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,10 @@ def create_parser(**kwargs):
Create default parser with args for mini subcommands
"""
exclude_io = kwargs.get("exclude_io", False) is True
add_help = kwargs.get("add_help", True) is True
parser = argparse.ArgumentParser(
usage=kwargs.get("usage"),
add_help=True,
add_help=add_help,
description=kwargs.get("description"),
formatter_class=kwargs.get("formatter_class") or argparse.HelpFormatter,
)
Expand Down
174 changes: 174 additions & 0 deletions src/bindings/python/flux/cli/alloc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
##############################################################
# Copyright 2023 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
##############################################################

import argparse
import os
import sys
import time

import flux
import flux.cli._base as base
import flux.job
from flux.uri import JobURI


class AllocCmd(base.MiniCmd):
def __init__(self, **kwargs):
self.t0 = None
if "exclude_io" not in kwargs:
kwargs["exclude_io"] = True
super().__init__(**kwargs)
base.add_batch_alloc_args(self.parser)
self.parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="Increase verbosity on stderr (multiple use OK)",
)
self.parser.add_argument(
"--bg",
action="store_true",
help="Wait for new instance to start, but do not attach to it.",
)
self.parser.add_argument(
"COMMAND",
nargs=argparse.REMAINDER,
help="Set the initial COMMAND of new Flux instance."
+ "(default is an interactive shell)",
)

def init_jobspec(self, args):
# If number of slots not specified, then set it to node count
# if set, otherwise raise an error.
if not args.nslots:
if not args.nodes:
raise ValueError("Number of slots to allocate must be specified")
args.nslots = args.nodes
args.exclusive = True

# For --bg, do not run an rc2 (inital program) unless
# the user explicitly specified COMMAND:
if args.bg and not args.COMMAND:
args.broker_opts = args.broker_opts or []
args.broker_opts.append("-Sbroker.rc2_none=1")

if args.dump:
args.broker_opts = args.broker_opts or []
args.broker_opts.append("-Scontent.dump=" + args.dump)

jobspec = flux.job.JobspecV1.from_nest_command(
command=args.COMMAND,
num_slots=args.nslots,
cores_per_slot=args.cores_per_slot,
gpus_per_slot=args.gpus_per_slot,
num_nodes=args.nodes,
broker_opts=base.list_split(args.broker_opts),
exclusive=args.exclusive,
)

# For --bg, always allocate a pty, but not interactive,
# since an interactive pty causes the job shell to hang around
# until a pty client attaches, which may never happen.
#
# O/w, allocate an interactive pty only if stdin is a tty
#
if args.bg:
jobspec.setattr_shell_option("pty.capture", 1)
elif sys.stdin.isatty():
jobspec.setattr_shell_option("pty.interactive", 1)
return jobspec

@staticmethod
def log(jobid, ts, msg):
print(f"{jobid}: {ts:6.3f}s: {msg}", file=sys.stderr, flush=True)

def bg_wait_cb(self, future, args, jobid):
"""
Wait for memo event, connect to child instance, and finally wait
for rc1 to complete
"""
event = future.get_event()
if not event:
# The job has unexpectedly exited since we're at the end
# of the eventlog. Run `flux job attach` since this will dump
# any errors or output, then raise an exception.
os.system(f"flux job attach {jobid} >&2")
raise OSError(f"{jobid}: unexpectedly exited")

if not self.t0:
self.t0 = event.timestamp
ts = event.timestamp - self.t0

if args.verbose and event.name == "alloc":
self.log(jobid, ts, "resources allocated")
if event.name == "memo" and "uri" in event.context:
if args.verbose:
self.log(jobid, ts, "waiting for instance")

# Wait for child instance to finish rc1 using state-machine.wait,
# then stop the reactor to return to caller.
uri = str(JobURI(event.context["uri"]))
try:
child_handle = flux.Flux(uri)
except OSError as exc:
raise OSError(f"Unable to connect to {jobid}: {exc}")
try:
child_handle.rpc("state-machine.wait").get()
except OSError:
raise OSError(f"{jobid}: instance startup failed")

if args.verbose:
self.log(jobid, time.time() - self.t0, "ready")
self.flux_handle.reactor_stop()

def background(self, args, jobid):
"""Handle the --bg option
Wait for child instance to be ready to accept jobs before returning.
Print jobid to stdout once the job is ready.
"""
jobid = flux.job.JobID(jobid)

flux.job.event_watch_async(self.flux_handle, jobid).then(
self.bg_wait_cb, args, jobid
)
if args.verbose:
self.log(jobid, 0.0, "waiting for resources")
try:
self.flux_handle.reactor_run()
except KeyboardInterrupt:
print(f"\r{jobid}: Interrupt: canceling job", file=sys.stderr)
flux.job.cancel(self.flux_handle, jobid)
sys.exit(1)

print(jobid)

def main(self, args):
jobid = self.submit(args)

if args.bg:
self.background(args, jobid)
sys.exit(0)

# Display job id on stderr if -v
# N.B. we must flush sys.stderr due to the fact that it is buffered
# when it points to a file, and os.execvp leaves it unflushed
if args.verbose > 0:
print("jobid:", jobid, file=sys.stderr)
sys.stderr.flush()

# Build args for flux job attach
attach_args = ["flux-job", "attach"]
attach_args.append(jobid.f58.encode("utf-8", errors="surrogateescape"))

# Exec flux-job attach, searching for it in FLUX_EXEC_PATH.
os.environ["PATH"] = os.environ["FLUX_EXEC_PATH"] + ":" + os.environ["PATH"]
os.execvp("flux-job", attach_args)
140 changes: 140 additions & 0 deletions src/bindings/python/flux/cli/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
##############################################################
# Copyright 2023 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
##############################################################

import argparse
import logging
import sys

import flux
import flux.cli._base as base
import flux.job
from flux.job.directives import DirectiveParser

LOGGER = logging.getLogger("flux-bulksubmit")


class BatchCmd(base.MiniCmd):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.parser.add_argument(
"--wrap",
action="store_true",
help="Wrap arguments or stdin in a /bin/sh script",
)
base.add_batch_alloc_args(self.parser)
self.parser.add_argument(
"SCRIPT",
nargs=argparse.REMAINDER,
help="Batch script and arguments to submit",
)

def parse_directive_args(self, name, batchscript):
"""
Parse any directives in batchscript.directives, then apply
command line arguments in self.argv. This allows command line
to override file directives
"""
args = None
for item in batchscript.directives:
try:
if item.action == "SETARGS":
args = self.parser.parse_args(item.args, namespace=args)
except SystemExit:
# Argparse exits on error. Give the user a clue
# about which line failed in the source file:
LOGGER.error(f"argument parsing failed at {name} line {item.lineno}")
sys.exit(2)
args = self.parser.parse_args(self.argv, namespace=args)
return batchscript.script, args

def process_script(self, args):
"""
Process a batch script that may contain RFC 36 directives.
Returns the ingested script and new argparse args Namespace.
"""
if args.SCRIPT:
if args.wrap:
# Return script which will be wrapped by caller
return " ".join(args.SCRIPT) + "\n", args

# O/w, open script for reading
name = open_arg = args.SCRIPT[0]
else:
name = "stdin"
open_arg = 0 # when passed to `open`, 0 gives the `stdin` stream
with open(open_arg, "r", encoding="utf-8") as filep:
try:
batchscript = DirectiveParser(filep)
except UnicodeError:
raise ValueError(
f"{name} does not appear to be a script, "
"or failed to encode as utf-8"
)
except ValueError as exc:
raise ValueError(f"{name}: {exc}") from None
return self.parse_directive_args(name, batchscript)

def init_jobspec(self, args):
if args.wrap:
self.script = f"#!/bin/sh\n{self.script}"

# If number of slots not specified, then set it to node count
# if set, otherwise raise an error.
if not args.nslots:
if not args.nodes:
raise ValueError("Number of slots to allocate must be specified")
args.nslots = args.nodes
args.exclusive = True

if args.dump:
args.broker_opts = args.broker_opts or []
args.broker_opts.append("-Scontent.dump=" + args.dump)

# If job name is not explicitly set in args, use the script name
# if a script was provided, else the string "mini-batch" to
# indicate the script was set on flux mini batch stdin.
if args.job_name is None:
if args.SCRIPT:
args.job_name = args.SCRIPT[0]
else:
args.job_name = "mini-batch"

jobspec = flux.job.JobspecV1.from_batch_command(
script=self.script,
jobname=args.job_name,
args=args.SCRIPT[1:],
num_slots=args.nslots,
cores_per_slot=args.cores_per_slot,
gpus_per_slot=args.gpus_per_slot,
num_nodes=args.nodes,
broker_opts=base.list_split(args.broker_opts),
exclusive=args.exclusive,
)

# Default output is flux-{{jobid}}.out
# overridden by either --output=none or --output=kvs
if not args.output:
jobspec.stdout = "flux-{{id}}.out"
return jobspec

def main(self, args):
# Save cmdline argv to mini batch in case it must be reprocessed
# after applying directive options.
# self.argv is sys.argv without first two args ["mini", "batch"]
self.argv = sys.argv[2:]

# Process file with possible submission directives, returning
# script and new argparse args Namespace as a result.
# This must be done before calling self.submit() so that SETARGS
# directives are available in jobspec_create():
self.script, args = self.process_script(args)

jobid = self.submit(args)
print(jobid, file=sys.stdout)

0 comments on commit 13666ad

Please sign in to comment.