Skip to content

Commit

Permalink
Fixes for Engine.forget_terminated.
Browse files Browse the repository at this point in the history
There are at least three different issues that should be solved by
this commit:

* Do not drop TERMINATED tasks twice when `Engine.forget_terminated == True`.
* Correctly handle the case of tasks that transition to TERMINATED
  without having been in TERMINATING state first (e.g., task collections)
* Avoid updating TERMINATED tasks after they've been forgotten, a bug
  that would appear as a traceback like this::

        AssertionError: Task.update_state() called on detached task ...
  • Loading branch information
riccardomurri committed Aug 11, 2017
1 parent b672e3d commit c095a8b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
17 changes: 11 additions & 6 deletions gc3libs/core.py
Expand Up @@ -1546,7 +1546,8 @@ def progress(self):
elif state == Run.State.TERMINATED:
# task changed state, mark as to remove
transitioned.append(index)
self._terminated.append(task)
if not self.forget_terminated:
self._terminated.append(task)
else:
# if we got to this point, state has an invalid value
gc3libs.log.error(
Expand Down Expand Up @@ -1621,7 +1622,8 @@ def progress(self):
elif old_state == Run.State.RUNNING:
if isinstance(task, Application):
currently_in_flight -= 1
self._terminated.append(task)
if not self.forget_terminated:
self._terminated.append(task)
transitioned.append(index)
# pylint: disable=broad-except
except Exception as err:
Expand Down Expand Up @@ -1676,7 +1678,8 @@ def progress(self):
# task changed state, mark as to remove
transitioned.append(index)
elif state == Run.State.TERMINATED:
self._terminated.append(task)
if not self.forget_terminated:
self._terminated.append(task)
# task changed state, mark as to remove
transitioned.append(index)
# pylint: disable=broad-except
Expand Down Expand Up @@ -1879,9 +1882,11 @@ def progress(self):

if self._store and task.changed:
self._store.save(task)
# remove tasks for which final output has been retrieved
for index in reversed(transitioned):
del self._terminating[index]
if not self.forget_terminated:
# remove tasks for which final output has been retrieved
# (only if TERMINATED tasks have not been dropped already)
for index in reversed(transitioned):
del self._terminating[index]


def redo(self, task, *args, **kwargs):
Expand Down
35 changes: 35 additions & 0 deletions gc3libs/tests/test_engine.py
Expand Up @@ -69,6 +69,27 @@ def test_engine_progress(num_jobs=1, transition_graph=None, max_iter=100):
current_iter += 1


def test_engine_forget_terminated(num_jobs=3, transition_graph=None, max_iter=100):
with temporary_engine() as engine:
engine.forget_terminated = True

# generate some no-op tasks
for n in range(num_jobs):
name = 'app{nr}'.format(nr=n+1)
engine.add(SuccessfulApp(name))

# run them all
current_iter = 0
done = engine.stats()[Run.State.TERMINATED]
while done < num_jobs and current_iter < max_iter:
engine.progress()
done = engine.stats()[Run.State.TERMINATED]
current_iter += 1

# check that they have been forgotten
assert not engine._terminated


def test_engine_progress_collection():
with temporary_engine() as engine:
seq = SimpleSequentialTaskCollection(3)
Expand All @@ -81,6 +102,20 @@ def test_engine_progress_collection():
assert seq.stage().execution.state == 'TERMINATED'


def test_engine_progress_collection_and_forget_terminated():
with temporary_engine() as engine:
engine.forget_terminated = True

seq = SimpleSequentialTaskCollection(3)
engine.add(seq)

# run through sequence
while seq.execution.state != 'TERMINATED':
engine.progress()

assert not engine._terminated


def test_engine_kill_SequentialTaskCollection():
with temporary_engine() as engine:
seq = SimpleSequentialTaskCollection(3)
Expand Down
12 changes: 8 additions & 4 deletions gc3libs/workflow.py
Expand Up @@ -151,7 +151,8 @@ def update_state(self, **extra_args):
Update the running state of all managed tasks.
"""
for task in self.tasks:
self._controller.update_job_state(task, **extra_args)
if task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]:
self._controller.update_job_state(task, **extra_args)

def kill(self, **extra_args):
# XXX: provide default implementation that kills all jobs?
Expand Down Expand Up @@ -449,7 +450,8 @@ def update_state(self, **extra_args):

# update state of current task
task = self.tasks[self._current_task]
task.update_state(**extra_args)
if task.execution.state not in [Run.State. NEW, Run.State.TERMINATED]:
task.update_state(**extra_args)
gc3libs.log.debug("Task `%s` in state %s", task, task.execution.state)

# now set state based on the state of current task:
Expand Down Expand Up @@ -862,7 +864,8 @@ def update_state(self, **extra_args):
for task in self.tasks:
# gc3libs.log.debug("Updating state of %s in collection %s ..."
# % (task, self))
task.update_state(**extra_args)
if task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]:
task.update_state(**extra_args)
self.execution.state = self._state()
if self.execution.state == Run.State.TERMINATED:
self.execution.returncode = (0, 0)
Expand Down Expand Up @@ -1118,7 +1121,8 @@ def update_state(self):
TERMINATED and `self.retry()` is `True`.
"""
own_state_old = self.execution.state
self.task.update_state()
if self.task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]:
self.task.update_state()
own_state_new = self._recompute_state()
if (self.task.execution.state == Run.State.TERMINATED
and own_state_old != Run.State.TERMINATED):
Expand Down

0 comments on commit c095a8b

Please sign in to comment.