Skip to content

Commit

Permalink
Use blinker instead of generic.event for the internal pub/sub mec…
Browse files Browse the repository at this point in the history
…hanism.
  • Loading branch information
riccardomurri committed May 4, 2018
1 parent 73ae6e9 commit 8189487
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 45 deletions.
15 changes: 8 additions & 7 deletions gc3libs/__init__.py
Expand Up @@ -105,7 +105,7 @@ class Default(object):
LSF_CACHE_TIME = 30


from gc3libs.events import emit, subscribe, TaskStateChange
from gc3libs.events import TaskStateChange
import gc3libs.exceptions
from gc3libs.persistence import Persistable
from gc3libs.url import UrlKeyDict, UrlValueDict
Expand Down Expand Up @@ -296,7 +296,7 @@ def __init__(self, **extra_args):
self._attached = False
self._controller = None
self.changed = True
subscribe(self._on_state_change, TaskStateChange)
TaskStateChange.connect(self._on_state_change, sender=self)

# manipulate the "controller" interface used to control the associated job
def attach(self, controller):
Expand Down Expand Up @@ -629,10 +629,10 @@ def wait(self, interval=60):
# State transition handlers.
#

def _on_state_change(self, event):
if id(event.task) != id(self):
def _on_state_change(self, task, from_state, to_state):
if id(task) != id(self):
return
handler_name = event.to_state.lower()
handler_name = to_state.lower()
gc3libs.log.debug(
"Calling state-transition handler '%s' on %s ...",
handler_name, self)
Expand Down Expand Up @@ -1933,10 +1933,11 @@ def fset(self, value):
self.history.append(
"Transition from state {0} to state {1}"
.format(self._state, value))
# signal state-transition
if self._ref is not None:
self._ref.changed = True
emit(TaskStateChange(self._ref, self._state, value))
# signal state-transition
TaskStateChange.send(
self._ref, from_state=self._state, to_state=value)
# finally, update state
self._state = value

Expand Down
21 changes: 9 additions & 12 deletions gc3libs/core.py
Expand Up @@ -35,7 +35,7 @@
import gc3libs
from gc3libs import Application, Run, Task
import gc3libs.debug
from gc3libs.events import subscribe, TaskStateChange
from gc3libs.events import TaskStateChange
import gc3libs.exceptions
from gc3libs.quantity import Duration
import gc3libs.utils as utils
Expand Down Expand Up @@ -1258,16 +1258,16 @@ def __init__(self, controller, tasks=[], store=None,
# init counters/statistics
self._counts = self._Counters(self)
self._counts.init_for(Task) # always gather these
subscribe(self._on_state_change, TaskStateChange)
TaskStateChange.connect(self._on_state_change)

# Engine fully initialized, add all tasks
for task in tasks:
self.add(task)

def _on_state_change(self, event):
task = event.task
def _on_state_change(self, task, from_state, to_state):
if task in self._managed:
self._counts.transitioned(event)
#gc3libs.log.debug("Task %s transitioned from %s to %s ...", task, from_state, to_state)
self._counts.transitioned(task, from_state, to_state)

class TaskQueue(object):
def __init__(self):
Expand Down Expand Up @@ -1457,22 +1457,19 @@ def remove(self, task):
"""
self._update(task, -1)

def transitioned(self, event):
def transitioned(self, task, from_state, to_state):
"""
Update the counts, following a `TaskStateChange` event.
The counters relative to *event.from_state* are
decremented by unit, and correspondingly the counters for
*event.to_state* are incremented.
The counters relative to *from_state* are decremented by
unit, and correspondingly the counters for *to_state* are
incremented.
This is functionally equivalent to, but more efficient than::
self._update(task, from_state, -1)
self._update(task, to_state, +1)
"""
task = event.task
from_state = event.from_state
to_state = event.to_state
stats_to_increment = [to_state]
if to_state == 'TERMINATED':
if task.execution.returncode == 0:
Expand Down
30 changes: 5 additions & 25 deletions gc3libs/events.py
Expand Up @@ -29,31 +29,11 @@
__docformat__ = 'reStructuredText'


# import these names here, so we can use `from gc3libs.events import
# subscribe` elsewhere in the code, and leave the dependency on
# `generic.events` as an implementation detail
from generic.event import fire as emit, subscribe, unsubscribe
# do not make symbols imported from `blinker` public: use of `blinker`
# here is an implementation detail
from blinker import signal as _signal


# FIXME: rewrite with `attrs` when we drop support for Py2.6!
class TaskStateChange(object):
"""
Fired when a `Task`:class: execution state changes.
TaskStateChange = _signal('task_state_change')

No guarantee is given as to whether *task* is still in the old
*from_state* or has already transitioned to the new *to_state*.
"""

__slots__ = ('task', 'from_state', 'to_state')

def __init__(self, task, from_state, to_state):
self.task = task
self.from_state = from_state
self.to_state = to_state


# implement workaround for https://github.com/andreypopp/generic/issues/1
# i.e., register a dummy event handler for `TaskStateChange`
def _no_action(event):
pass
subscribe(_no_action, TaskStateChange)
TermStatusChange = _signal('task_termstatus_change')
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -209,9 +209,9 @@ def run_tests(self):

# run-time dependencies
install_requires=(version_dependent_requires + [
'blinker',
'coloredlogs',
'dictproxyhack',
'generic',
# prettytable -- format tabular text output
'prettytable',
# pyCLI -- object-oriented command-line app programming
Expand Down

0 comments on commit 8189487

Please sign in to comment.