Skip to content

Commit

Permalink
add asyncio support to flux Python
Browse files Browse the repository at this point in the history
Problem: we currently cannot integrate flux with Python asyncio
Description: This will allow us to run tasks using asyncio. It
adds a few extra functions to Flux futures to support being
run alongside asyncio tasks, and a flux.asyncio module that
provides a FluxEventLoop and a custom selector to handle
integration of the flux native watchers with Python native
asyncio.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Dec 15, 2022
1 parent 70ad0ef commit 979fb50
Show file tree
Hide file tree
Showing 15 changed files with 440 additions and 17 deletions.
3 changes: 0 additions & 3 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ RUN python3 -m pip install IPython && \
# Assuming installing to /usr/local
ENV LD_LIBRARY_PATH=/usr/local/lib

# Assuming installing to /usr/local
ENV LD_LIBRARY_PATH=/usr/local/lib

# extra interactive utilities and compilation_database generation
RUN apt-get update \
&& apt-get -qq install -y --no-install-recommends \
Expand Down
2 changes: 2 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"terminal.integrated.defaultProfile.linux": "bash",

// Ensure that Python autocomplete works out of the box
// If you want to test Python just do (after make install)
// export PYTHONPATH=/usr/local/lib/python3.8/site-packages
"python.autoComplete.extraPaths": [
"/usr/local/lib/flux/python3.8",
"/usr/local/lib/python3.8/site-packages",
Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ repos:
rev: v1.4.2
hooks:
- id: vermin
args: ['-t=3.6-', '--violations']
args: ['-t=3.6-', '--violations', '--exclude', 'asyncio.get_running_loop']
# We exclude the asyncio function because we fall back to one supported by Python 3.6
54 changes: 50 additions & 4 deletions doc/python/job_submission.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ restricted to instance owners.
Asynchronous interfaces
-----------------------

There are two primary asynchronous interfaces to job manipulations. The first is
an event-loop interface, which is closer to the native C interface, and consists of
functions like ``flux.job.submit_async`` and ``flux.job.result_async`` (note the
There are two primary asynchronous interfaces to job manipulations, and a third
that uses Python asyncio to submit jobs. The first is an event-loop interface,
which is closer to the native C interface, and consists of functions like
``flux.job.submit_async`` and ``flux.job.result_async`` (note the
functions are the same as in the synchronous interface, only with an "_async"
suffix). The second is an interface which is almost identical to the
`concurrent.futures <https://docs.python.org/3/library/concurrent.futures.html>`_
Expand All @@ -149,7 +150,12 @@ futures, the difference being that the ``FluxExecutor`` is designed so that
all futures fulfill in the background, and there is no need for user code
to enter the Flux event loop, while the event-loop-based interface
requires the user to call into the Flux event loop in order for futures
to fulfill and for callbacks to trigger.
to fulfill and for callbacks to trigger. For the third interface (using asyncio)
we are still using Flux' implementation of futures, but we create some of
the backend watchers (e.g., a file descriptor, timer, or signal watcher)
within the context of a customized asyncio event loop. This means that we are
able to have Flux run alongside a more traditional (and native) Python
asyncronous interface.

Our general recommendation is that you use the ``FluxExecutor`` interface
unless you are familiar with event-loop based programming.
Expand Down Expand Up @@ -240,3 +246,43 @@ as futures complete.
.. autofunction:: flux.job.result_async

.. autofunction:: flux.job.wait_async


The ``FluxEventLoop`` (asyncio) interface
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you are familiar with `asyncio <https://docs.python.org/3/library/asyncio.html>`_
Flux has support for using these native Python event loops to submit jobs. As an example,
here we use the ``FluxEventLoop`` to submit a job, along with running an ``asyncio.sleep`` job.


.. code:: python
# ensure the script is running in an active flux instance
import asyncio
from flux.asyncio import loop
import flux.job
fluxsleep = flux.job.JobspecV1.from_command(['sleep', '2'])
fluxecho = flux.job.JobspecV1.from_command(['echo', 'pancakes'])
tasks = [
loop.create_task(asyncio.sleep(5)),
loop.create_task(flux.asyncio.submit(fluxecho)),
loop.create_task(flux.asyncio.submit(fluxsleep)),
]
asyncio.set_event_loop(loop)
results = loop.run_until_complete(asyncio.gather(*tasks))
# [JobID(456004999315456), JobID(456004999315457)]
For the above, you get back the job ID. Note that we also use a common event loop
and handle that ``flux.asyncio`` defines.
You are, however, free to use the ``FluxEventLoop`` (loop already created in the
example above) class to define your own, using your own Flux handle, which is
accepted as the only init argument to the loop class.

.. autofunction:: flux.asyncio.submit

.. autofunction:: flux.asyncio.FluxEventLoop
6 changes: 6 additions & 0 deletions etc/docker/ubuntu/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ Build the main `flux-ubuntu` image:
$ docker build -f etc/docker/ubuntu/Dockerfile -t ghcr.io/flux-framework/flux-ubuntu .
```

If you want to shell inside (and bind for re-compile and development):

```bash
$ docker run -it -v $PWD:/code ghcr.io/flux-framework/flux-ubuntu
```

And then tests:

```bash
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ exclude = benchmarks docs
max-line-length = 100
ignore = E1 E2 E5 W5
per-file-ignores =
src/bindings/python/flux/asyncio/__init__.py:F401
src/bindings/python/flux/resource/__init__.py:F401
src/bindings/python/flux/job/frobnicator/__init__.py:F401
src/bindings/python/flux/job/__init__.py:F401
Expand All @@ -22,4 +23,4 @@ per-file-ignores =
src/bindings/python/flux/uri/resolvers/pid.py:E713
src/bindings/python/flux/job/validator/plugins/jobspec.py:F541
src/bindings/python/flux/util.py:E713
src/bindings/python/flux/uri/resolvers/pid.py:E713
src/bindings/python/flux/uri/resolvers/pid.py:E713
3 changes: 3 additions & 0 deletions src/bindings/python/flux/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
nobase_fluxpy_PYTHON = \
__init__.py \
asyncio/__init__.py \
asyncio/events.py \
asyncio/selector.py \
kvs.py \
wrapper.py \
rpc.py \
Expand Down
1 change: 1 addition & 0 deletions src/bindings/python/flux/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from flux.asyncio.events import FluxEventLoop, loop, submit
70 changes: 70 additions & 0 deletions src/bindings/python/flux/asyncio/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
###############################################################
# Copyright 2022 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################

import asyncio

import flux
import flux.core.watchers
from flux.asyncio.selector import FluxSelector

# The loop *must* have the same handle as the submit or else the loop will
# run forever.
HANDLE = flux.Flux()


async def submit(jobspec, flux_handle=None):
"""Submit a Flux jobspec and return a job ID.
Example usage
=============
# ensure the script is running in an active flux instance
import asyncio
from flux.asyncio import loop
import flux.job
fluxsleep = flux.job.JobspecV1.from_command(['sleep', '2'])
fluxecho = flux.job.JobspecV1.from_command(['echo', 'pancakes'])
tasks = [
loop.create_task(asyncio.sleep(5)),
loop.create_task(flux.asyncio.submit(fluxecho)),
loop.create_task(flux.asyncio.submit(fluxsleep)),
]
asyncio.set_event_loop(loop)
results = loop.run_until_complete(asyncio.gather(*tasks))
# [JobID(456004999315456), JobID(456004999315457)]
"""
handle = flux_handle or HANDLE
uid = await flux.job.submit_async(handle, jobspec)
return uid


class FluxEventLoop(asyncio.SelectorEventLoop):
"""An asyncio loop that handles running Flux."""

def __init__(self, flux_handle=None):
# This can be provided by a user that knows what they are doing.
# The handle to the submit job and loop must be the same.
if not flux_handle:
flux_handle = HANDLE
selector = FluxSelector(flux_handle)

# Reverse reference is needed for watchers
selector.loop = self
super().__init__(selector)

@property
def selector(self):
return self._selector


# This loop needs to be accessible from all places!
loop = FluxEventLoop() # pylint: disable=invalid-name
151 changes: 151 additions & 0 deletions src/bindings/python/flux/asyncio/selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
###############################################################
# Copyright 2022 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################

import selectors
import signal

import flux.constants
from flux.core.watchers import FDWatcher


def stop_callback(handle, _watcher, _fd_int, events, args=None):
"""
This is called by the watcher, and stops the reactor add adds ready jobs.
"""
handle.reactor_stop()

# TimerWatcher won't know a file descriptor, just needs to stop the reactor
if not args:
return
selector = args["select"]
selector_event = get_selector_event(events)
selector.ready.add((args["key"], selector_event & args["key"].events))


def get_selector_event(events):
"""
Given an int, return the corresponding flux identifier.
"""
event = 0
if events & flux.constants.FLUX_POLLIN:
event |= selectors.EVENT_READ
if events & flux.constants.FLUX_POLLOUT:
event |= selectors.EVENT_WRITE
return event


def get_flux_event(events):
"""
Given an int, return the corresponding selector identifier
"""
event = 0
if events & selectors.EVENT_READ:
event |= flux.constants.FLUX_POLLIN
if events & selectors.EVENT_WRITE:
event |= flux.constants.FLUX_POLLOUT
return event


class FluxSelector(
selectors._BaseSelectorImpl
): # pylint: disable=protected-access # type: ignore
"""
A Flux selector supports registering file objects to be monitored for
specific I/O events (for Flux).
"""

def __init__(self, handle):
super().__init__()
self.handle = handle
self.ready = set()
self._watchers = {}

def register(self, fileobj, events, data=None):
"""
Register a new file descriptor event.
"""
key = super().register(fileobj, events, data)
watcher = FDWatcher(
self.handle,
fileobj,
get_flux_event(events),
stop_callback,
args={"key": key, "select": self},
)
watcher.start()
self._watchers[fileobj] = watcher
return key

def unregister(self, fileobj):
"""
Remove the key and the watcher.
"""
try:
key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
self._watchers[key.fileobj].stop()
except KeyError:
raise KeyError("{!r} is not registered".format(fileobj)) from None
return key

def select(self, timeout=None):
"""
Perform the actual selection, until some monitored file objects are
ready or a timeout expires.
Parameters:
timeout -- if timeout > 0, this specifies the maximum wait time, in
seconds, waited for by a flux TimerWatcher
if timeout <= 0, the select() call won't block, and will
report the currently ready file objects
if timeout is None, select() will block until a monitored
file object becomes ready
Returns:
list of (key, events) for ready file objects
`events` is a bitwise mask of EVENT_READ|EVENT_WRITE
"""
reactor = self.handle.get_reactor()
reactor_interrupted = False

def reactor_interrupt(handle, *_args):
# ensure reactor_interrupted from enclosing scope:
nonlocal reactor_interrupted
reactor_interrupted = True
handle.reactor_stop(reactor)

with self.handle.signal_watcher_create(signal.SIGINT, reactor_interrupt):
with self.handle.in_reactor():

# Ensure previous events are cleared
self.ready.clear()

# 0 == "run until I tell you to stop"
if timeout is not None:
if timeout > 0:

# Block for a specified timeout
with self.handle.timer_watcher_create(timeout, stop_callback):
watcher_count = self.handle.flux_reactor_run(reactor, 0)

# If timeout <= 0, select won't block
else:
watcher_count = self.handle.flux_reactor_run(
reactor, flux.constants.FLUX_REACTOR_NOWAIT
)

# If timeout is None, block until a monitored object ready
else:
watcher_count = self.handle.flux_reactor_run(reactor, 0)

if reactor_interrupted:
raise KeyboardInterrupt

if watcher_count < 0:
self.handle.raise_if_exception()

return list(self.ready)

0 comments on commit 979fb50

Please sign in to comment.