Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Merge "Timeout support for scheduled tasks"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Jan 26, 2016
2 parents bc78f75 + 627fd80 commit 2164e76
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 13 deletions.
1 change: 1 addition & 0 deletions solar/dblayer/solar_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ class Task(Model):
errmsg = Field(basestring, default=str)
timelimit = Field(int, default=int)
retry = Field(int, default=int)
timeout = Field(int, default=int)

execution = IndexedField(basestring)
parents = ParentField(default=list)
Expand Down
30 changes: 25 additions & 5 deletions solar/orchestration/executors/zerorpc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def run(self):

class LimitedExecutionPuller(PoolBasedPuller):

def __init__(self, *args, **kwargs):
super(LimitedExecutionPuller, self).__init__(*args, **kwargs)
self._timelimit_group = gevent.pool.Group()
self._timeout_group = gevent.pool.Group()

def _handle_event(self, event):
ctxt = event.args[0]
timelimit = ctxt.get('timelimit', 0)
Expand All @@ -73,10 +78,22 @@ def _handle_event(self, event):
# share a pool with them, or it is highly possible that
# it wont be ever executed with low number of greenlets in
# a pool
gevent.spawn_later(
timelimit, self._methods['kill'], ctxt, ctxt['task_id'])
self._timelimit_group.add(gevent.spawn_later(
timelimit, self._methods['kill'],
ctxt, ctxt['task_id']))
self._tasks_pool.spawn(self._async_event, event)

def register_timeout(self, seconds, callable_):
self._timeout_group.add(
gevent.spawn_later(seconds, callable_))

def run(self):
try:
super(LimitedExecutionPuller, self).run()
finally:
self._timelimit_group.join(raise_error=True)
self._timeout_group.join(raise_error=True)


class Executor(object):

Expand All @@ -85,6 +102,7 @@ def __init__(self, worker, bind_to):
self.bind_to = bind_to
self._tasks_register = {}
worker._executor = self
self._server = LimitedExecutionPuller(methods=self.worker)

def register(self, ctxt):
if 'task_id' in ctxt:
Expand All @@ -96,10 +114,12 @@ def kill(self, task_id, exc):
self._tasks_register[task_id].kill(exc, block=True)
self._tasks_register.pop(task_id)

def register_timeout(self, *args):
self._server.register_timeout(*args)

def run(self):
server = LimitedExecutionPuller(methods=self.worker)
server.bind(self.bind_to)
server.run()
self._server.bind(self.bind_to)
self._server.run()


class Client(object):
Expand Down
11 changes: 8 additions & 3 deletions solar/orchestration/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def save_graph(graph):
# maybe it is possible to store part of information in AsyncResult backend
uid = graph.graph['uid']

# TODO(dshulyak) remove duplication of parameters
# in solar_models.Task and this object
for n in nx.topological_sort(graph):
t = Task.new(
{'name': n,
Expand All @@ -41,7 +43,8 @@ def save_graph(graph):
'args': graph.node[n].get('args', []),
'errmsg': graph.node[n].get('errmsg', '') or '',
'timelimit': graph.node[n].get('timelimit', 0),
'retry': graph.node[n].get('retry', 0)})
'retry': graph.node[n].get('retry', 0),
'timeout': graph.node[n].get('timeout', 0)})
graph.node[n]['task'] = t
for pred in graph.predecessors(n):
pred_task = graph.node[pred]['task']
Expand All @@ -56,6 +59,7 @@ def update_graph(graph, force=False):
task.status = graph.node[n]['status']
task.errmsg = graph.node[n]['errmsg'] or ''
task.retry = graph.node[n].get('retry', 0)
task.timeout = graph.node[n].get('timeout', 0)
task.save(force=force)


Expand All @@ -81,8 +85,9 @@ def get_graph(uid):
target=t.target or None,
errmsg=t.errmsg or None,
task=t,
timelimit=t.timelimit or 0,
retry=t.retry)
timelimit=t.timelimit,
retry=t.retry,
timeout=t.timeout)
for u in t.parents.all_names():
dg.add_edge(u, t.name)
return dg
Expand Down
26 changes: 21 additions & 5 deletions solar/orchestration/workers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.

from functools import partial
import time

from solar.core.log import log
Expand Down Expand Up @@ -65,6 +66,10 @@ def _do_update(self, plan, task_name, status, errmsg=''):
"""For single update correct state and other relevant data."""
old_status = plan.node[task_name]['status']
if old_status in VISITED:
log.debug(
'Task %s already in visited status %s'
', skipping update to %s',
task_name, old_status, status)
return
retries_count = plan.node[task_name]['retry']

Expand All @@ -84,18 +89,21 @@ def _do_scheduling(self, plan, task_name):
task_type = plan.node[task_name]['type']
plan.node[task_name]['status'] = states.INPROGRESS.name
timelimit = plan.node[task_name].get('timelimit', 0)
timeout = plan.node[task_name].get('timeout', 0)
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan.graph['uid'],
'timelimit': timelimit}
'timelimit': timelimit,
'timeout': timeout}
log.debug(
'Timelimit for task %s - %s, timeout - %s',
task_id, timelimit, timeout)
self._tasks(
task_type, ctxt,
*plan.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
if timeout:
self._configure_timeout(ctxt, timeout)

def update_next(self, ctxt, status, errmsg):
log.debug(
Expand All @@ -112,6 +120,14 @@ def update_next(self, ctxt, status, errmsg):
log.debug('Scheduled tasks %r', rst)
return rst

def _configure_timeout(self, ctxt, timeout):
if not hasattr(self._executor, 'register_timeout'):
raise NotImplemented('Timeout is not supported')
self._executor.register_timeout(
timeout,
partial(self.update_next, ctxt,
states.ERROR.name, 'Timeout Error'))


class SchedulerCallbackClient(object):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import time

import gevent
import mock
import pytest

from solar.errors import ExecutionTimeout
from solar.orchestration import graph
Expand All @@ -39,3 +41,29 @@ def wait_function(timeout):
finished_plan = graph.get_graph(timelimit_plan.graph['uid'])
assert 'ExecutionTimeout' in finished_plan.node['t1']['errmsg']
assert finished_plan.node['t2']['status'] == states.PENDING.name


@pytest.fixture
def timeout_plan(simple_plan):
simple_plan.node['echo_stuff']['timeout'] = 1
graph.update_graph(simple_plan, force=True)
return simple_plan


def test_timeout_plan(timeout_plan, scheduler):
worker, client = scheduler
worker._tasks = mock.Mock()
client.next({}, timeout_plan.graph['uid'])

def wait_function(timeout):
for summary in graph.wait_finish(
timeout_plan.graph['uid'], timeout):
if summary[states.ERROR.name] == 1:
return summary
time.sleep(0.3)
return summary
waiter = gevent.spawn(wait_function, 2)
waiter.get(block=True, timeout=2)
timeout_plan = graph.get_graph(timeout_plan.graph['uid'])
assert (timeout_plan.node['echo_stuff']['status']
== states.ERROR.name)

0 comments on commit 2164e76

Please sign in to comment.