Skip to content

Commit

Permalink
BgEngine: New mechanism for running one-shot functions before or afte…
Browse files Browse the repository at this point in the history
…r the main loop.
  • Loading branch information
riccardomurri committed Apr 6, 2017
1 parent 2bf9a89 commit 4ce1d97
Showing 1 changed file with 79 additions and 10 deletions.
89 changes: 79 additions & 10 deletions gc3libs/core.py
Expand Up @@ -2060,6 +2060,12 @@ def __init__(self, lib, *args, **kwargs):
self._queue = []
self._queue_locked = lock()

# queues for before/after `Engine.progress()` triggers
self._after_progress_triggers = []
self._after_progress_triggers_locked = lock()
self._before_progress_triggers = []
self._before_progress_triggers_locked = lock()

assert len(args) > 0, (
"`BgEngine()` must be called"
" either with an `Engine` instance as second (and last) argument,"
Expand Down Expand Up @@ -2127,37 +2133,66 @@ def _perform(self):
- Run `Engine.progress()` to ensure that GC3Pie tasks are updated.
"""
gc3libs.log.debug("%s: _perform() started", self)
self.__run_delayed_operations()
self.__run_before_triggers()
self.__run_engine_progress()
self.__run_after_triggers()

def __run_delayed_operations(self):
# quickly grab a local copy of the command queue, and
# reset it to the empty list -- we do not want to hold
# the lock on the queue for a long time, as that would
# make the API unresponsive
with self._queue_locked:
queue = self._queue
self._queue = list()
# execute delayed operations
for fn, args, kwargs in queue:
self._queue = []
self.__run_hooks(queue)

def __run_before_triggers(self):
with self._before_progress_triggers_locked:
before_progress_triggers = self._before_progress_triggers
self._before_progress_triggers = []
self.__run_hooks(before_progress_triggers)

def __run_after_triggers(self):
with self._after_progress_triggers_locked:
after_progress_triggers = self._after_progress_triggers
self._after_progress_triggers = []
self.__run_hooks(after_progress_triggers)

@staticmethod
def __run_hooks(queue):
"""
Call all the functions listed in `queue`, in the order given.
Any exceptions raised will be logged at a WARNING level but
otherwise ignored.
"""
for func, args, kwargs in queue:
gc3libs.log.debug(
"Executing delayed call %s(*%r, **%r) ...",
fn.__name__, args, kwargs)
# pylint: disable=broad-except
func.__name__, args, kwargs)
try:
fn(*args, **kwargs)
except Exception, err:
func(*args, **kwargs)
except Exception as err: # pylint: disable=broad-except
gc3libs.log.warning(
"Ignoring '%s' error,"
" occurred while executing delayed call %s(*%r, **%r): %s",
err.__class__.__name__,
fn.__name__, args, kwargs,
func.__name__, args, kwargs,
err, exc_info=__debug__)
# update GC3Pie tasks

def __run_engine_progress(self):
"""
Call the `.progress()` method of the wrapped `Engine` instance.
"""
gc3libs.log.debug(
"%s: calling `progress()` on Engine %s ...",
self, self._engine)
# pylint: disable=broad-except
try:
self._engine.progress()
self._progress_last_run = time.time()
except Exception, err:
except Exception as err:
gc3libs.log.warning(
"Ignoring '%s' error,"
" occurred while running"
Expand All @@ -2166,6 +2201,40 @@ def _perform(self):
gc3libs.log.debug("%s: _perform() done", self)


def trigger_before_progress(self, func, *args, **kwargs):
"""
Call a function *before* running `Engine.progress()` in the main loop.
Exceptions raised during the call will be logged at WARNING level but
otherwise ignored.
The function call will be triggered only *once* at the next run of the
main loop; it will not be fired repeatedly at every re-run of the main
loop.
Any suppplemental positional arguments or keyword-arguments that are
supplied will be passed unchanged to the trigger function.
"""
with self._before_progress_triggers_locked:
self._before_progress_triggers.append((func, args, kwargs))


def trigger_after_progress(self, func, *args, **kwargs):
"""
Call a function *after* running `Engine.progress()` in the main loop.
Exceptions raised during the call will be logged at WARNING level but
otherwise ignored.
The function call will be triggered only *once* at the next run of the
main loop; it will not be fired repeatedly at every re-run of the main
loop.
Any suppplemental positional arguments or keyword-arguments that are
supplied will be passed unchanged to the trigger function.
"""
with self._after_progress_triggers_locked:
self._after_progress_triggers.append((func, args, kwargs))


@staticmethod
def at_most_once_per_cycle(fn): # pylint: disable=invalid-name
"""
Expand Down

0 comments on commit 4ce1d97

Please sign in to comment.