Skip to content

Commit

Permalink
Convergence: basic framework for cancelling workers
Browse files Browse the repository at this point in the history
Implements mechanism to cancel existing workers (in_progress resources).
The stack-cancel-update request lands in one of the engines, and if
there are any workers in that engine which are working for the stack,
they are cancelled first and then other engines are requested to cancel
the workers.

Change-Id: I464c4fdb760247d436473af49448f7797dc0130d
  • Loading branch information
Anant Patil authored and Thomas Herve committed Sep 10, 2016
1 parent b9d1e30 commit 873a408
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 78 deletions.
4 changes: 4 additions & 0 deletions heat/db/api.py
Expand Up @@ -142,6 +142,10 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None):
return IMPL.resource_get_all_by_root_stack(context, stack_id, filters)


def engine_get_all_locked_by_stack(context, stack_id):
return IMPL.engine_get_all_locked_by_stack(context, stack_id)


def resource_purge_deleted(context, stack_id):
return IMPL.resource_purge_deleted(context, stack_id)

Expand Down
9 changes: 9 additions & 0 deletions heat/db/sqlalchemy/api.py
Expand Up @@ -410,6 +410,15 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None):
return dict((res.id, res) for res in results)


def engine_get_all_locked_by_stack(context, stack_id):
query = context.session.query(
func.distinct(models.Resource.engine_id)
).filter(
models.Resource.stack_id == stack_id,
models.Resource.engine_id.isnot(None))
return set(i[0] for i in query.all())


def stack_get_by_name_and_owner_id(context, stack_name, owner_id):
query = soft_delete_aware_query(
context, models.Stack
Expand Down
65 changes: 63 additions & 2 deletions heat/engine/worker.py
Expand Up @@ -25,12 +25,16 @@
from heat.common.i18n import _LI
from heat.common.i18n import _LW
from heat.common import messaging as rpc_messaging
from heat.db import api as db_api
from heat.engine import check_resource
from heat.engine import sync_point
from heat.rpc import api as rpc_api
from heat.rpc import worker_client as rpc_client

LOG = logging.getLogger(__name__)

CANCEL_RETRIES = 3


@profiler.trace_cls("rpc")
class WorkerService(service.Service):
Expand Down Expand Up @@ -107,6 +111,26 @@ def stop_traversal(self, stack):
"%(name)s while cancelling the operation."),
{'name': stack.name, 'trvsl': old_trvsl})

def stop_all_workers(self, stack):
# stop the traversal
if stack.status == stack.IN_PROGRESS:
self.stop_traversal(stack)

# cancel existing workers
cancelled = _cancel_workers(stack, self.thread_group_mgr,
self.engine_id, self._rpc_client)
if not cancelled:
LOG.error(_LE("Failed to stop all workers of stack %(name)s "
", stack cancel not complete"),
{'name': stack.name})
return False

LOG.info(_LI('[%(name)s(%(id)s)] Stopped all active workers for stack '
'%(action)s'),
{'name': stack.name, 'id': stack.id, 'action': stack.action})

return True

@context.request_context
def check_resource(self, cnxt, resource_id, current_traversal, data,
is_update, adopt_stack_data):
Expand Down Expand Up @@ -145,5 +169,42 @@ def cancel_check_resource(self, cnxt, stack_id):
All the workers running for the given stack will be
cancelled.
"""
# TODO(ananta): Implement cancel check-resource
LOG.debug('Cancelling workers for stack [%s]', stack_id)
_cancel_check_resource(stack_id, self.engine_id, self.thread_group_mgr)


def _cancel_check_resource(stack_id, engine_id, tgm):
LOG.debug('Cancelling workers for stack [%s] in engine [%s]',
stack_id, engine_id)
tgm.send(stack_id, rpc_api.THREAD_CANCEL)


def _wait_for_cancellation(stack, wait=5):
# give enough time to wait till cancel is completed
retries = CANCEL_RETRIES
while retries > 0:
retries -= 1
eventlet.sleep(wait)
engines = db_api.engine_get_all_locked_by_stack(
stack.context, stack.id)
if not engines:
return True

return False


def _cancel_workers(stack, tgm, local_engine_id, rpc_client):
engines = db_api.engine_get_all_locked_by_stack(stack.context, stack.id)

if not engines:
return True

# cancel workers running locally
if local_engine_id in engines:
_cancel_check_resource(stack.id, local_engine_id, tgm)
engines.remove(local_engine_id)

# cancel workers on remote engines
for engine_id in engines:
rpc_client.cancel_check_resource(stack.context, stack.id, engine_id)

return _wait_for_cancellation(stack)
29 changes: 29 additions & 0 deletions heat/tests/db/test_sqlalchemy_api.py
Expand Up @@ -2436,6 +2436,35 @@ def test_resource_purge_deleted_by_stack(self):
self.assertRaises(exception.NotFound, db_api.resource_get,
self.ctx, resource.id)

def test_engine_get_all_locked_by_stack(self):
values = [
{'name': 'res1', 'action': rsrc.Resource.DELETE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.COMPLETE},
{'name': 'res2', 'action': rsrc.Resource.DELETE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'},
{'name': 'res3', 'action': rsrc.Resource.UPDATE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-002'},
{'name': 'res4', 'action': rsrc.Resource.CREATE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.COMPLETE},
{'name': 'res5', 'action': rsrc.Resource.INIT,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.COMPLETE},
{'name': 'res6', 'action': rsrc.Resource.CREATE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'},
{'name': 'res6'},
]
for val in values:
create_resource(self.ctx, self.stack, **val)

engines = db_api.engine_get_all_locked_by_stack(self.ctx,
self.stack.id)
self.assertEqual({'engine-001', 'engine-002'}, engines)


class DBAPIStackLockTest(common.HeatTestCase):
def setUp(self):
Expand Down
11 changes: 8 additions & 3 deletions heat/tests/engine/test_check_resource.py
Expand Up @@ -108,11 +108,13 @@ def test_is_update_traversal(
self.resource.id,
mock.ANY, True)

@mock.patch.object(resource.Resource, 'load')
@mock.patch.object(resource.Resource, 'make_replacement')
@mock.patch.object(stack.Stack, 'time_remaining')
def test_is_update_traversal_raise_update_replace(
self, tr, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc,
mock_cid):
self, tr, mock_mr, mock_load, mock_cru, mock_crc, mock_pcr,
mock_csc, mock_cid):
mock_load.return_value = self.resource, self.stack, self.stack
mock_cru.side_effect = resource.UpdateReplace
tr.return_value = 317
self.worker.check_resource(
Expand Down Expand Up @@ -550,10 +552,13 @@ def setUp(self):
self.is_update = False
self.graph_key = (self.resource.id, self.is_update)

@mock.patch.object(resource.Resource, 'load')
@mock.patch.object(stack.Stack, 'time_remaining')
def test_is_cleanup_traversal(
self, tr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self, tr, mock_load, mock_cru, mock_crc, mock_pcr, mock_csc,
mock_cid):
tr.return_value = 317
mock_load.return_value = self.resource, self.stack, self.stack
self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update, None)
Expand Down
89 changes: 88 additions & 1 deletion heat/tests/engine/test_engine_worker.py
Expand Up @@ -15,8 +15,10 @@

import mock

from heat.db import api as db_api
from heat.engine import check_resource
from heat.engine import worker
from heat.rpc import worker_client as wc
from heat.tests import common
from heat.tests import utils

Expand Down Expand Up @@ -44,11 +46,11 @@ def test_service_start(self,
target_class,
rpc_server_method
):

self.worker = worker.WorkerService('host-1',
'topic-1',
'engine_id',
mock.Mock())

self.worker.start()

# Make sure target is called with proper parameters
Expand Down Expand Up @@ -133,3 +135,88 @@ def test_check_resource_adds_and_removes_msg_queue_on_exception(
self.assertTrue(mock_tgm.add_msg_queue.called)
# ensure remove is also called
self.assertTrue(mock_tgm.remove_msg_queue.called)

@mock.patch.object(worker, '_wait_for_cancellation')
@mock.patch.object(worker, '_cancel_check_resource')
@mock.patch.object(wc.WorkerClient, 'cancel_check_resource')
@mock.patch.object(db_api, 'engine_get_all_locked_by_stack')
def test_cancel_workers_when_no_resource_found(self, mock_get_locked,
mock_ccr, mock_wccr,
mock_wc):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.id = 'stack_id'
mock_get_locked.return_value = []
worker._cancel_workers(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(mock_wccr.called)
self.assertFalse(mock_ccr.called)

@mock.patch.object(worker, '_wait_for_cancellation')
@mock.patch.object(worker, '_cancel_check_resource')
@mock.patch.object(wc.WorkerClient, 'cancel_check_resource')
@mock.patch.object(db_api, 'engine_get_all_locked_by_stack')
def test_cancel_workers_with_resources_found(self, mock_get_locked,
mock_ccr, mock_wccr,
mock_wc):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.id = 'stack_id'
mock_get_locked.return_value = ['engine-001', 'engine-007',
'engine-008']
worker._cancel_workers(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
mock_wccr.assert_called_once_with(stack.id, 'engine-001', mock_tgm)
self.assertEqual(2, mock_ccr.call_count)
calls = [mock.call(stack.context, stack.id, 'engine-007'),
mock.call(stack.context, stack.id, 'engine-008')]
mock_ccr.assert_has_calls(calls, any_order=True)
self.assertTrue(mock_wc.called)

@mock.patch.object(worker, '_cancel_workers')
@mock.patch.object(worker.WorkerService, 'stop_traversal')
def test_stop_all_workers_when_stack_in_progress(self, mock_st, mock_cw):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.IN_PROGRESS = 'IN_PROGRESS'
stack.status = stack.IN_PROGRESS
stack.id = 'stack_id'
stack.rollback = mock.MagicMock()
_worker.stop_all_workers(stack)
mock_st.assert_called_once_with(stack)
mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(stack.rollback.called)

@mock.patch.object(worker, '_cancel_workers')
@mock.patch.object(worker.WorkerService, 'stop_traversal')
def test_stop_all_workers_when_stack_not_in_progress(self, mock_st,
mock_cw):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.FAILED = 'FAILED'
stack.status = stack.FAILED
stack.id = 'stack_id'
stack.rollback = mock.MagicMock()
_worker.stop_all_workers(stack)
self.assertFalse(mock_st.called)
mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(stack.rollback.called)

# test when stack complete
stack.FAILED = 'FAILED'
stack.status = stack.FAILED
_worker.stop_all_workers(stack)
self.assertFalse(mock_st.called)
mock_cw.assert_called_with(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(stack.rollback.called)
5 changes: 5 additions & 0 deletions heat/tests/engine/test_sync_point.py
Expand Up @@ -22,6 +22,11 @@


class SyncPointTestCase(common.HeatTestCase):
def setUp(self):
super(SyncPointTestCase, self).setUp()
self.dummy_event = mock.MagicMock()
self.dummy_event.ready.return_value = False

def test_sync_waiting(self):
ctx = utils.dummy_context()
stack = tools.get_stack('test_stack', utils.dummy_context(),
Expand Down

0 comments on commit 873a408

Please sign in to comment.