Skip to content

Commit

Permalink
Ensure Engine obeys the "submit" and "in flight" limits.
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardomurri committed May 8, 2018
1 parent cc1b4ff commit c14c523
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 82 deletions.
116 changes: 34 additions & 82 deletions gc3libs/core.py
Expand Up @@ -1112,6 +1112,7 @@ def first_come_first_serve(task_queue, resources, matchmaker=MatchMaker()):
gc3libs.log.debug(
"Scheduler ignored error in submitting task '%s': %s: %s",
task, err.__class__.__name__, err, exc_info=True)
continue
else:
# unsuccessful submission, push task back into queue
task_queue.put(task)
Expand Down Expand Up @@ -1257,7 +1258,14 @@ def __init__(self, controller, tasks=[], store=None,

# init counters/statistics
self._counts = self._Counters(self)
self._counts.init_for(Task) # always gather these
# always gather statistics about `Task` instances, as `Task`
# is the root of the GC3Pie task hierarchy
self._counts.init_for(Task)
# but also count `Application` instances to enforce submission
# and running limits (only real tasks that consume compute
# resources count for those -- "policy" tasks like
# `TaskCollection` should not)
self._counts.init_for(Application)
TaskStateChange.connect(self._on_state_change)

# Engine fully initialized, add all tasks
Expand Down Expand Up @@ -1631,9 +1639,6 @@ def progress(self):
"""
gc3libs.log.debug("Engine.progress(): starting.")

# prepare
currently_submitted = 0
currently_in_flight = 0
# pylint: disable=redefined-variable-type
if self.max_in_flight > 0:
limit_in_flight = self.max_in_flight
Expand Down Expand Up @@ -1683,16 +1688,6 @@ def progress(self):

self._managed.requeue(task)

# do book-keeping
state = task.execution.state
if old_state == Run.State.SUBMITTED:
if isinstance(task, Application):
currently_submitted -= 1
currently_in_flight -= 1
elif old_state == Run.State.RUNNING:
if isinstance(task, Application):
currently_in_flight -= 1

# update status of tasks before launching new ones
queue = self._managed.to_update
if queue:
Expand Down Expand Up @@ -1754,28 +1749,8 @@ def progress(self):
else:
self._managed.requeue(task)

if state == Run.State.SUBMITTED:
# only real applications need to be counted
# against the limit; policy tasks are exempt
# (this applies to all similar clauses below)
if isinstance(task, Application):
if old_state != Run.State.SUBMITTED:
currently_submitted += 1
if old_state not in [
Run.State.SUBMITTED,
Run.State.RUNNING,
]:
currently_in_flight += 1
elif state == Run.State.RUNNING:
if isinstance(task, Application):
if old_state == Run.State.SUBMITTED:
currently_submitted -= 1
if old_state not in [
Run.State.SUBMITTED,
Run.State.RUNNING,
]:
currently_in_flight += 1
if (self.retrieve_running and task.would_output
if self.retrieve_running:
if (state == Run.State.RUNNING and task.would_output
and self.can_retrieve):
# try to get output
try:
Expand All @@ -1786,7 +1761,7 @@ def progress(self):
# pylint: disable=broad-except
except Exception as err:
self.__ignore_or_raise(
err, "fecthing output", task,
err, "fetching output", task,
# context:
# - module
'core',
Expand All @@ -1800,45 +1775,23 @@ def progress(self):
'RUNNING',
'fetch_output',
)
# elif state == Run.State.NEW:
# # can happen after a `.redo()`, requeue
# assert not isinstance(task, Application)
# elif state in [
# Run.State.STOPPED,
# Run.State.TERMINATED,
# Run.State.TERMINATING,
# Run.State.UNKNOWN,
# ]:
# # nothing to do, task has already been requeued
# pass
# else:
# # if we got to this point, state has an invalid value
# gc3libs.log.error(
# "Invalid state `%r` returned by task %s.",
# state, task)
# if not gc3libs.error_ignored(
# # context:
# # - module
# 'core',
# # - class
# 'Engine',
# # - method
# 'progress',
# # - actual error class
# 'InternalError',
# # - additional keywords
# 'state',
# 'update',
# ):
# # propagate exception to caller
# raise gc3libs.exceptions.InternalError(
# "Invalid state '{state!r}' returned by task {task}"
# .format(state=state, task=task))

# reckon how many tasks are "live"; we are only interested in
# tasks that consume real compute resources (i.e.,
# `Application` instances) and exclude "policy" classes (e.g.,
# all `TaskCollections`)
app_counts = self.counts(Application)
currently_submitted = app_counts['SUBMITTED']
currently_in_flight = currently_submitted + app_counts['RUNNING']

# now try to submit NEW tasks
if (self.can_submit and
currently_submitted < limit_submitted and
currently_in_flight < limit_in_flight):
submit_allowance = min(
limit_submitted - currently_submitted,
limit_in_flight - currently_in_flight
)
queue = self._managed.to_submit
if queue:
gc3libs.log.debug(
Expand All @@ -1856,21 +1809,24 @@ def progress(self):
# ... in sched:` line
sched = gc3libs.utils.YieldAtNext(_sched)
for task, resource_name in sched:
# enforce Engine limits
if submit_allowance <= 0:
# we need to put back the task in the queue
# here as we won't go call back into scheduler
self._managed.to_submit.put(task)
break
resource = self._core.resources[resource_name]
try:
self._core.submit(task, targets=[resource])
if self._store and task.changed:
self._store.save(task)
# if we get to this point, we know state is
# either SUBMITTED or RUNNING
if self._store and task.changed:
self._store.save(task)
self._managed.to_update.put(task)
if isinstance(task, Application):
currently_submitted += 1
currently_in_flight += 1
# do book-keeping
state = task.execution.state
submit_allowance -= 1
# notify scheduler
sched.send(state)
sched.send(task.execution.state)
# pylint: disable=broad-except
except Exception as err1:
# record the error in the task's history
Expand Down Expand Up @@ -1901,10 +1857,6 @@ def progress(self):
'scheduler',
'submit',
)
# enforce Engine limits
if (currently_submitted >= limit_submitted
or currently_in_flight >= limit_in_flight):
break

# finally, retrieve output of finished tasks
if self.can_retrieve:
Expand Down
33 changes: 33 additions & 0 deletions gc3libs/tests/test_engine.py
Expand Up @@ -517,6 +517,39 @@ def test_engine_cannot_find_task_by_id_if_no_store():
engine.find_task_by_id(task_id)


@pytest.mark.parametrize("limit_submitted,limit_in_flight", [
(2, 10),
(10, 5),
])
def test_engine_limits(limit_submitted, limit_in_flight,
num_jobs=30, max_iter=100):
"""
Test that `Engine.limit_in_flight` and `Engine.limit_submitted` are honored.
"""
with temporary_engine(max_cores=50) as engine:
# set limits
engine.max_in_flight = 10
engine.max_submitted = 2
# populate with test apps
apps = []
for n in range(num_jobs):
name = 'app{nr}'.format(nr=n)
app = SuccessfulApp(name)
engine.add(app)
apps.append(app)
stats = engine.counts()
iter = 0
while stats['TERMINATED'] < num_jobs and iter < max_iter:
iter += 1
engine.progress()
stats = engine.counts()
submitted = stats['SUBMITTED']
assert submitted <= engine.max_submitted
in_flight = (stats['SUBMITTED'] + stats['RUNNING'])
assert in_flight <= engine.max_in_flight
assert stats["TERMINATED"] == num_jobs


def test_engine_counts(num_jobs=100, max_iter=1000):
"""
Test that `Engine.count()` returns correct results.
Expand Down

0 comments on commit c14c523

Please sign in to comment.