Skip to content

Commit

Permalink
Really make Engine.forget_terminated work as expected.
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardomurri committed Aug 14, 2017
1 parent 76c92d4 commit 9498469
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 24 deletions.
59 changes: 36 additions & 23 deletions gc3libs/core.py
Expand Up @@ -1291,11 +1291,11 @@ def _update_task_counts(self, task, state, increment):


# pylint: disable=too-many-arguments,dangerous-default-value
def __get_task_queue(self, task):
def __get_task_queue(self, task, _override_state=None):
"""
Return the "queue" object to which `task` should be added or removed.
"""
state = task.execution.state
state = _override_state or task.execution.state
if Run.State.NEW == state:
return self._new
elif state in [Run.State.SUBMITTED,
Expand Down Expand Up @@ -1544,9 +1544,11 @@ def progress(self):
transitioned.append(index)
self._terminating.append(task)
elif state == Run.State.TERMINATED:
# task changed state, mark as to remove
transitioned.append(index)
if not self.forget_terminated:
if self.forget_terminated:
self._drop_terminated_task(task, old_state)
else:
# task changed state, mark as to remove
transitioned.append(index)
self._terminated.append(task)
else:
# if we got to this point, state has an invalid value
Expand Down Expand Up @@ -1622,9 +1624,11 @@ def progress(self):
elif old_state == Run.State.RUNNING:
if isinstance(task, Application):
currently_in_flight -= 1
if not self.forget_terminated:
if self.forget_terminated:
self._drop_terminated_task(task, old_state)
else:
self._terminated.append(task)
transitioned.append(index)
transitioned.append(index)
# pylint: disable=broad-except
except Exception as err:
if gc3libs.error_ignored(
Expand Down Expand Up @@ -1678,10 +1682,12 @@ def progress(self):
# task changed state, mark as to remove
transitioned.append(index)
elif state == Run.State.TERMINATED:
if not self.forget_terminated:
if self.forget_terminated:
self._drop_terminated_task(task, Run.State.STOPPED)
else:
# task changed state, mark as to remove
self._terminated.append(task)
# task changed state, mark as to remove
transitioned.append(index)
transitioned.append(index)
# pylint: disable=broad-except
except Exception as err:
if gc3libs.error_ignored(
Expand Down Expand Up @@ -1865,21 +1871,13 @@ def progress(self):
" has been destroyed already.)",
task, err.__class__.__name__, err)
if self.forget_terminated:
try:
# task state is TERMINATED but the queue
# is still `self._terminating` so we need
# to override the choice that
# `self.__get_task_queue` would do
self.remove(task, self._terminating)
gc3libs.log.debug("Dropped TERMINATED task %s", task)
# pylint: disable=broad-except
except Exception as err:
gc3libs.log.debug(
"Could not forget TERMINATED task '%s': %s: %s",
task, err.__class__.__name__, err)
# task state is TERMINATED but the queue
# is still `self._terminating` so we need
# to override the choice that
# `self.__get_task_queue` would do
self._drop_terminated_task(task, Run.State.TERMINATING)
else:
self._terminated.append(task)

if self._store and task.changed:
self._store.save(task)
if not self.forget_terminated:
Expand All @@ -1888,6 +1886,21 @@ def progress(self):
for index in reversed(transitioned):
del self._terminating[index]

def _drop_terminated_task(self, task, old_state):
queue = self.__get_task_queue(task, old_state)
try:
# task state is TERMINATED but the queue
# is still `self._terminating` so we need
# to override the choice that
# `self.__get_task_queue` would do
self.remove(task, queue)
gc3libs.log.debug(
"Dropped TERMINATED task %s (was: %s)", task, old_state)
except Exception as err: # pylint: disable=broad-except
gc3libs.log.debug(
"Could not forget TERMINATED task '%s': %s: %s",
task, err.__class__.__name__, err)


def redo(self, task, *args, **kwargs):
"""
Expand Down
10 changes: 9 additions & 1 deletion gc3libs/tests/test_engine.py
Expand Up @@ -74,9 +74,12 @@ def test_engine_forget_terminated(num_jobs=3, transition_graph=None, max_iter=10
engine.forget_terminated = True

# generate some no-op tasks
tasks = []
for n in range(num_jobs):
name = 'app{nr}'.format(nr=n+1)
engine.add(SuccessfulApp(name))
app = SuccessfulApp(name)
engine.add(app)
tasks.append(app)

# run them all
current_iter = 0
Expand All @@ -88,6 +91,8 @@ def test_engine_forget_terminated(num_jobs=3, transition_graph=None, max_iter=10

# check that they have been forgotten
assert not engine._terminated
for task in tasks:
assert not task._attached


def test_engine_progress_collection():
Expand All @@ -114,6 +119,9 @@ def test_engine_progress_collection_and_forget_terminated():
engine.progress()

assert not engine._terminated
assert not seq._attached
for task in seq.tasks:
assert not task._attached


def test_engine_kill_SequentialTaskCollection():
Expand Down

0 comments on commit 9498469

Please sign in to comment.