diff --git a/heat/db/api.py b/heat/db/api.py index dd5d0eedf32..8345c42c4f1 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -142,6 +142,10 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None): return IMPL.resource_get_all_by_root_stack(context, stack_id, filters) +def engine_get_all_locked_by_stack(context, stack_id): + return IMPL.engine_get_all_locked_by_stack(context, stack_id) + + def resource_purge_deleted(context, stack_id): return IMPL.resource_purge_deleted(context, stack_id) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 4a4b032dbf6..40db736d609 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -410,6 +410,15 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None): return dict((res.id, res) for res in results) +def engine_get_all_locked_by_stack(context, stack_id): + query = context.session.query( + func.distinct(models.Resource.engine_id) + ).filter( + models.Resource.stack_id == stack_id, + models.Resource.engine_id.isnot(None)) + return set(i[0] for i in query.all()) + + def stack_get_by_name_and_owner_id(context, stack_name, owner_id): query = soft_delete_aware_query( context, models.Stack diff --git a/heat/engine/worker.py b/heat/engine/worker.py index f434b9ff992..70fea6bfe08 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -25,12 +25,16 @@ from heat.common.i18n import _LI from heat.common.i18n import _LW from heat.common import messaging as rpc_messaging +from heat.db import api as db_api from heat.engine import check_resource from heat.engine import sync_point +from heat.rpc import api as rpc_api from heat.rpc import worker_client as rpc_client LOG = logging.getLogger(__name__) +CANCEL_RETRIES = 3 + @profiler.trace_cls("rpc") class WorkerService(service.Service): @@ -107,6 +111,26 @@ def stop_traversal(self, stack): "%(name)s while cancelling the operation."), {'name': stack.name, 'trvsl': old_trvsl}) + def stop_all_workers(self, stack): + # stop the traversal + if stack.status == stack.IN_PROGRESS: + self.stop_traversal(stack) + + # cancel existing workers + cancelled = _cancel_workers(stack, self.thread_group_mgr, + self.engine_id, self._rpc_client) + if not cancelled: + LOG.error(_LE("Failed to stop all workers of stack %(name)s " + ", stack cancel not complete"), + {'name': stack.name}) + return False + + LOG.info(_LI('[%(name)s(%(id)s)] Stopped all active workers for stack ' + '%(action)s'), + {'name': stack.name, 'id': stack.id, 'action': stack.action}) + + return True + @context.request_context def check_resource(self, cnxt, resource_id, current_traversal, data, is_update, adopt_stack_data): @@ -145,5 +169,42 @@ def cancel_check_resource(self, cnxt, stack_id): All the workers running for the given stack will be cancelled. """ - # TODO(ananta): Implement cancel check-resource - LOG.debug('Cancelling workers for stack [%s]', stack_id) + _cancel_check_resource(stack_id, self.engine_id, self.thread_group_mgr) + + +def _cancel_check_resource(stack_id, engine_id, tgm): + LOG.debug('Cancelling workers for stack [%s] in engine [%s]', + stack_id, engine_id) + tgm.send(stack_id, rpc_api.THREAD_CANCEL) + + +def _wait_for_cancellation(stack, wait=5): + # give enough time to wait till cancel is completed + retries = CANCEL_RETRIES + while retries > 0: + retries -= 1 + eventlet.sleep(wait) + engines = db_api.engine_get_all_locked_by_stack( + stack.context, stack.id) + if not engines: + return True + + return False + + +def _cancel_workers(stack, tgm, local_engine_id, rpc_client): + engines = db_api.engine_get_all_locked_by_stack(stack.context, stack.id) + + if not engines: + return True + + # cancel workers running locally + if local_engine_id in engines: + _cancel_check_resource(stack.id, local_engine_id, tgm) + engines.remove(local_engine_id) + + # cancel workers on remote engines + for engine_id in engines: + rpc_client.cancel_check_resource(stack.context, stack.id, engine_id) + + return _wait_for_cancellation(stack) diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index 9b9f7515cd1..a266978a787 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -2436,6 +2436,35 @@ def test_resource_purge_deleted_by_stack(self): self.assertRaises(exception.NotFound, db_api.resource_get, self.ctx, resource.id) + def test_engine_get_all_locked_by_stack(self): + values = [ + {'name': 'res1', 'action': rsrc.Resource.DELETE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.COMPLETE}, + {'name': 'res2', 'action': rsrc.Resource.DELETE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'}, + {'name': 'res3', 'action': rsrc.Resource.UPDATE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-002'}, + {'name': 'res4', 'action': rsrc.Resource.CREATE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.COMPLETE}, + {'name': 'res5', 'action': rsrc.Resource.INIT, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.COMPLETE}, + {'name': 'res6', 'action': rsrc.Resource.CREATE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'}, + {'name': 'res6'}, + ] + for val in values: + create_resource(self.ctx, self.stack, **val) + + engines = db_api.engine_get_all_locked_by_stack(self.ctx, + self.stack.id) + self.assertEqual({'engine-001', 'engine-002'}, engines) + class DBAPIStackLockTest(common.HeatTestCase): def setUp(self): diff --git a/heat/tests/engine/test_check_resource.py b/heat/tests/engine/test_check_resource.py index 307c16a06df..bfe405398e9 100644 --- a/heat/tests/engine/test_check_resource.py +++ b/heat/tests/engine/test_check_resource.py @@ -108,11 +108,13 @@ def test_is_update_traversal( self.resource.id, mock.ANY, True) + @mock.patch.object(resource.Resource, 'load') @mock.patch.object(resource.Resource, 'make_replacement') @mock.patch.object(stack.Stack, 'time_remaining') def test_is_update_traversal_raise_update_replace( - self, tr, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc, - mock_cid): + self, tr, mock_mr, mock_load, mock_cru, mock_crc, mock_pcr, + mock_csc, mock_cid): + mock_load.return_value = self.resource, self.stack, self.stack mock_cru.side_effect = resource.UpdateReplace tr.return_value = 317 self.worker.check_resource( @@ -550,10 +552,13 @@ def setUp(self): self.is_update = False self.graph_key = (self.resource.id, self.is_update) + @mock.patch.object(resource.Resource, 'load') @mock.patch.object(stack.Stack, 'time_remaining') def test_is_cleanup_traversal( - self, tr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + self, tr, mock_load, mock_cru, mock_crc, mock_pcr, mock_csc, + mock_cid): tr.return_value = 317 + mock_load.return_value = self.resource, self.stack, self.stack self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, self.is_update, None) diff --git a/heat/tests/engine/test_engine_worker.py b/heat/tests/engine/test_engine_worker.py index e45488236b5..421cf4981b8 100644 --- a/heat/tests/engine/test_engine_worker.py +++ b/heat/tests/engine/test_engine_worker.py @@ -15,8 +15,10 @@ import mock +from heat.db import api as db_api from heat.engine import check_resource from heat.engine import worker +from heat.rpc import worker_client as wc from heat.tests import common from heat.tests import utils @@ -44,11 +46,11 @@ def test_service_start(self, target_class, rpc_server_method ): + self.worker = worker.WorkerService('host-1', 'topic-1', 'engine_id', mock.Mock()) - self.worker.start() # Make sure target is called with proper parameters @@ -133,3 +135,88 @@ def test_check_resource_adds_and_removes_msg_queue_on_exception( self.assertTrue(mock_tgm.add_msg_queue.called) # ensure remove is also called self.assertTrue(mock_tgm.remove_msg_queue.called) + + @mock.patch.object(worker, '_wait_for_cancellation') + @mock.patch.object(worker, '_cancel_check_resource') + @mock.patch.object(wc.WorkerClient, 'cancel_check_resource') + @mock.patch.object(db_api, 'engine_get_all_locked_by_stack') + def test_cancel_workers_when_no_resource_found(self, mock_get_locked, + mock_ccr, mock_wccr, + mock_wc): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.id = 'stack_id' + mock_get_locked.return_value = [] + worker._cancel_workers(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(mock_wccr.called) + self.assertFalse(mock_ccr.called) + + @mock.patch.object(worker, '_wait_for_cancellation') + @mock.patch.object(worker, '_cancel_check_resource') + @mock.patch.object(wc.WorkerClient, 'cancel_check_resource') + @mock.patch.object(db_api, 'engine_get_all_locked_by_stack') + def test_cancel_workers_with_resources_found(self, mock_get_locked, + mock_ccr, mock_wccr, + mock_wc): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.id = 'stack_id' + mock_get_locked.return_value = ['engine-001', 'engine-007', + 'engine-008'] + worker._cancel_workers(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + mock_wccr.assert_called_once_with(stack.id, 'engine-001', mock_tgm) + self.assertEqual(2, mock_ccr.call_count) + calls = [mock.call(stack.context, stack.id, 'engine-007'), + mock.call(stack.context, stack.id, 'engine-008')] + mock_ccr.assert_has_calls(calls, any_order=True) + self.assertTrue(mock_wc.called) + + @mock.patch.object(worker, '_cancel_workers') + @mock.patch.object(worker.WorkerService, 'stop_traversal') + def test_stop_all_workers_when_stack_in_progress(self, mock_st, mock_cw): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.IN_PROGRESS = 'IN_PROGRESS' + stack.status = stack.IN_PROGRESS + stack.id = 'stack_id' + stack.rollback = mock.MagicMock() + _worker.stop_all_workers(stack) + mock_st.assert_called_once_with(stack) + mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(stack.rollback.called) + + @mock.patch.object(worker, '_cancel_workers') + @mock.patch.object(worker.WorkerService, 'stop_traversal') + def test_stop_all_workers_when_stack_not_in_progress(self, mock_st, + mock_cw): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.FAILED = 'FAILED' + stack.status = stack.FAILED + stack.id = 'stack_id' + stack.rollback = mock.MagicMock() + _worker.stop_all_workers(stack) + self.assertFalse(mock_st.called) + mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(stack.rollback.called) + + # test when stack complete + stack.FAILED = 'FAILED' + stack.status = stack.FAILED + _worker.stop_all_workers(stack) + self.assertFalse(mock_st.called) + mock_cw.assert_called_with(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(stack.rollback.called) diff --git a/heat/tests/engine/test_sync_point.py b/heat/tests/engine/test_sync_point.py index b3a230eeb81..615f70cd326 100644 --- a/heat/tests/engine/test_sync_point.py +++ b/heat/tests/engine/test_sync_point.py @@ -22,6 +22,11 @@ class SyncPointTestCase(common.HeatTestCase): + def setUp(self): + super(SyncPointTestCase, self).setUp() + self.dummy_event = mock.MagicMock() + self.dummy_event.ready.return_value = False + def test_sync_waiting(self): ctx = utils.dummy_context() stack = tools.get_stack('test_stack', utils.dummy_context(), diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index 6e934ed11a0..e020b9265d6 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -12,6 +12,7 @@ # under the License. import collections +import eventlet import itertools import json import os @@ -72,6 +73,7 @@ def setUp(self): env=self.env), stack_id=str(uuid.uuid4())) self.dummy_timeout = 10 + self.dummy_event = eventlet.event.Event() def test_get_class_ok(self): cls = resources.global_env().get_class_to_instantiate( @@ -1880,10 +1882,7 @@ def test_release_ignores_not_found_error(self, mock_sau, mock_get_obj): res._release('engine-id') self.assertFalse(mock_sau.called) - @mock.patch.object(resource.scheduler.TaskRunner, '__init__', - return_value=None) - @mock.patch.object(resource.scheduler.TaskRunner, '__call__') - def test_create_convergence(self, mock_call, mock_init): + def test_create_convergence(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res.action = res.CREATE @@ -1893,12 +1892,11 @@ def test_create_convergence(self, mock_call, mock_init): (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} pcb = mock.Mock() - res.create_convergence(self.stack.t.id, res_data, 'engine-007', - 60, pcb) + with mock.patch.object(resource.Resource, 'create') as mock_create: + res.create_convergence(self.stack.t.id, res_data, 'engine-007', + -1, pcb) + self.assertTrue(mock_create.called) - mock_init.assert_called_once_with(res.create) - mock_call.assert_called_once_with(timeout=60, progress_callback=pcb) - self.assertEqual(self.stack.t.id, res.current_template_id) self.assertItemsEqual([1, 3], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -1910,9 +1908,9 @@ def test_create_convergence_throws_timeout(self): res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} + pcb = mock.Mock() self.assertRaises(scheduler.Timeout, res.create_convergence, - self.stack.t.id, res_data, 'engine-007', - -1) + self.stack.t.id, res_data, 'engine-007', -1, pcb) def test_create_convergence_sets_requires_for_failure(self): """Ensure that requires are computed correctly. @@ -1930,7 +1928,7 @@ def test_create_convergence_sets_requires_for_failure(self): (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} self.assertRaises(exception.ResourceNotAvailable, res.create_convergence, self.stack.t.id, res_data, - 'engine-007', self.dummy_timeout) + 'engine-007', self.dummy_timeout, self.dummy_event) self.assertItemsEqual([5, 3], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -1945,9 +1943,10 @@ def test_adopt_convergence_ok(self, mock_adopt): self._assert_resource_lock(res.id, None, None) res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} - res.create_convergence(self.stack.t.id, res_data, 'engine-007', - self.dummy_timeout) - + tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id, + res_data, 'engine-007', self.dummy_timeout, + self.dummy_event) + tr() mock_adopt.assert_called_once_with( resource_data={'resource_id': 'fluffy'}) self.assertItemsEqual([5, 3], res.requires) @@ -1962,15 +1961,13 @@ def test_adopt_convergence_bad_data(self): self._assert_resource_lock(res.id, None, None) res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} - exc = self.assertRaises(exception.ResourceFailure, - res.create_convergence, self.stack.t.id, - res_data, 'engine-007', self.dummy_timeout) + tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id, + res_data, 'engine-007', self.dummy_timeout, + self.dummy_event) + exc = self.assertRaises(exception.ResourceFailure, tr) self.assertIn('Resource ID was not provided', six.text_type(exc)) - @mock.patch.object(resource.scheduler.TaskRunner, '__init__', - return_value=None) - @mock.patch.object(resource.scheduler.TaskRunner, '__call__') - def test_update_convergence(self, mock_call, mock_init): + def test_update_convergence(self): tmpl = template.Template({ 'HeatTemplateFormatVersion': '2012-12-12', 'Resources': { @@ -1981,6 +1978,7 @@ def test_update_convergence(self, mock_call, mock_init): stack.converge_stack(stack.t, action=stack.CREATE) res = stack.resources['test_res'] res.requires = [2] + res.action = res.CREATE res._store() self._assert_resource_lock(res.id, None, None) @@ -1997,13 +1995,13 @@ def test_update_convergence(self, mock_call, mock_init): res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} pcb = mock.Mock() - res.update_convergence(new_temp.id, res_data, 'engine-007', 120, - new_stack, pcb) + with mock.patch.object(resource.Resource, 'update') as mock_update: + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', 120, new_stack, + pcb) + tr() + self.assertTrue(mock_update.called) - expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] - mock_init.assert_called_once_with(res.update, expected_rsrc_def) - mock_call.assert_called_once_with(timeout=120, progress_callback=pcb) - self.assertEqual(new_temp.id, res.current_template_id) self.assertItemsEqual([3, 4], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -2030,9 +2028,10 @@ def test_update_convergence_throws_timeout(self): new_temp, stack_id=self.stack.id) res_data = {} - self.assertRaises(scheduler.Timeout, res.update_convergence, - new_temp.id, res_data, 'engine-007', - -1, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', -1, new_stack, + self.dummy_event) + self.assertRaises(scheduler.Timeout, tr) def test_update_convergence_with_substitute_class(self): tmpl = rsrc_defn.ResourceDefinition('test_res', @@ -2073,9 +2072,10 @@ def test_update_convergence_checks_resource_class(self): new_temp, stack_id=self.stack.id) res_data = {} - self.assertRaises(resource.UpdateReplace, res.update_convergence, - new_temp.id, res_data, 'engine-007', - -1, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', -1, new_stack, + self.dummy_event) + self.assertRaises(resource.UpdateReplace, tr) def test_update_in_progress_convergence(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2088,12 +2088,10 @@ def test_update_in_progress_convergence(self): res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} - ex = self.assertRaises(exception.UpdateInProgress, - res.update_convergence, - 'template_key', - res_data, 'engine-007', - self.dummy_timeout, - mock.ANY) + tr = scheduler.TaskRunner(res.update_convergence, 'template_key', + res_data, 'engine-007', self.dummy_timeout, + mock.ANY, self.dummy_event) + ex = self.assertRaises(exception.UpdateInProgress, tr) msg = ("The resource %s is already being updated." % res.name) self.assertEqual(msg, six.text_type(ex)) @@ -2130,9 +2128,10 @@ def test_update_resource_convergence_failed(self, mock_update): new_temp, stack_id=self.stack.id) dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE) mock_update.side_effect = dummy_ex - self.assertRaises(exception.ResourceFailure, - res.update_convergence, new_temp.id, res_data, - 'engine-007', 120, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', 120, new_stack, + self.dummy_event) + self.assertRaises(exception.ResourceFailure, tr) expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] mock_update.assert_called_once_with(expected_rsrc_def) @@ -2170,9 +2169,10 @@ def test_update_resource_convergence_update_replace(self, mock_update): mock_update.side_effect = resource.UpdateReplace new_stack = parser.Stack(utils.dummy_context(), 'test_stack', new_temp, stack_id=self.stack.id) - self.assertRaises(resource.UpdateReplace, - res.update_convergence, new_temp.id, res_data, - 'engine-007', 120, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', 120, new_stack, + self.dummy_event) + self.assertRaises(resource.UpdateReplace, tr) expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] mock_update.assert_called_once_with(expected_rsrc_def) @@ -2202,7 +2202,7 @@ def test_convergence_update_replace_rollback(self): res.restore_prev_rsrc = mock.Mock() tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {}, 'engine-007', self.dummy_timeout, - new_stack) + new_stack, self.dummy_event) self.assertRaises(resource.UpdateReplace, tr) self.assertTrue(res.restore_prev_rsrc.called) @@ -2225,15 +2225,13 @@ def test_convergence_update_replace_rollback_restore_prev_rsrc_error(self): 'Simulate rollback') res.restore_prev_rsrc = mock.Mock(side_effect=Exception) tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {}, - 'engine-007', self.dummy_timeout, new_stack) + 'engine-007', self.dummy_timeout, new_stack, + self.dummy_event) self.assertRaises(exception.ResourceFailure, tr) self.assertTrue(res.restore_prev_rsrc.called) self.assertEqual((res.UPDATE, res.FAILED), res.state) - @mock.patch.object(resource.scheduler.TaskRunner, '__init__', - return_value=None) - @mock.patch.object(resource.scheduler.TaskRunner, '__call__') - def test_delete_convergence_ok(self, mock_call, mock_init): + def test_delete_convergence_ok(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res.current_template_id = 1 @@ -2244,11 +2242,13 @@ def test_delete_convergence_ok(self, mock_call, mock_init): res._update_replacement_data = mock.Mock() self._assert_resource_lock(res.id, None, None) pcb = mock.Mock() - res.delete_convergence(2, {}, 'engine-007', 20, pcb) - - mock_init.assert_called_once_with(res.delete) - mock_call.assert_called_once_with(timeout=20, progress_callback=pcb) + with mock.patch.object(resource.Resource, 'delete') as mock_delete: + tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, + 'engine-007', 20, pcb) + tr() + self.assertTrue(mock_delete.called) self.assertTrue(res._update_replacement_data.called) + self._assert_resource_lock(res.id, None, 2) def test_delete_convergence_does_not_delete_same_template_resource(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2256,8 +2256,10 @@ def test_delete_convergence_does_not_delete_same_template_resource(self): res.current_template_id = 'same-template' res._store() res.delete = mock.Mock() - res.delete_convergence('same-template', {}, 'engine-007', - self.dummy_timeout) + tr = scheduler.TaskRunner(res.delete_convergence, 'same-template', {}, + 'engine-007', self.dummy_timeout, + self.dummy_event) + tr() self.assertFalse(res.delete.called) def test_delete_convergence_fail(self): @@ -2270,9 +2272,9 @@ def test_delete_convergence_fail(self): res_id = res.id res.handle_delete = mock.Mock(side_effect=ValueError('test')) self._assert_resource_lock(res.id, None, None) - self.assertRaises(exception.ResourceFailure, - res.delete_convergence, 2, {}, 'engine-007', - self.dummy_timeout) + tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007', + self.dummy_timeout, self.dummy_event) + self.assertRaises(exception.ResourceFailure, tr) self.assertTrue(res.handle_delete.called) # confirm that the DB object still exists, and it's lock is released. @@ -2291,9 +2293,10 @@ def test_delete_in_progress_convergence(self): rs = resource_objects.Resource.get_obj(self.stack.context, res.id) rs.update_and_save({'engine_id': 'not-this'}) self._assert_resource_lock(res.id, 'not-this', None) - ex = self.assertRaises(exception.UpdateInProgress, - res.delete_convergence, - 1, {}, 'engine-007', self.dummy_timeout) + + tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007', + self.dummy_timeout, self.dummy_event) + ex = self.assertRaises(exception.UpdateInProgress, tr) msg = ("The resource %s is already being updated." % res.name) self.assertEqual(msg, six.text_type(ex)) @@ -2308,7 +2311,9 @@ def test_delete_convergence_updates_needed_by(self): res.destroy = mock.Mock() input_data = {(1, False): 4, (2, False): 5} # needed_by resource ids self._assert_resource_lock(res.id, None, None) - res.delete_convergence(1, input_data, 'engine-007', self.dummy_timeout) + scheduler.TaskRunner(res.delete_convergence, 1, input_data, + 'engine-007', self.dummy_timeout, + self.dummy_event)() self.assertItemsEqual([4, 5], res.needed_by) @mock.patch.object(resource_objects.Resource, 'get_obj') @@ -2449,7 +2454,9 @@ def test_delete_convergence_deletes_resource_in_init_state(self): res._store() with mock.patch.object(resource_objects.Resource, 'delete') as resource_del: - res.delete_convergence(1, {}, 'engine-007', 1) + tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, + 'engine-007', 1, self.dummy_event) + tr() resource_del.assert_called_once_with(res.context, res.id) def test_delete_convergence_throws_timeout(self): @@ -2458,8 +2465,9 @@ def test_delete_convergence_throws_timeout(self): res.action = res.CREATE res._store() timeout = -1 # to emulate timeout - self.assertRaises(scheduler.Timeout, res.delete_convergence, - 1, {}, 'engine-007', timeout) + tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007', + timeout, self.dummy_event) + self.assertRaises(scheduler.Timeout, tr) @mock.patch.object(parser.Stack, 'load') @mock.patch.object(resource.Resource, '_load_data') @@ -4054,6 +4062,7 @@ def setUp(self): } } self.dummy_timeout = 10 + self.dummy_event = eventlet.event.Event() def create_resource(self): self.stack = parser.Stack(utils.dummy_context(), 'test_stack', @@ -4069,9 +4078,10 @@ def create_convergence_resource(self): stack_id=str(uuid.uuid4())) res_data = {} res = self.stack['bar'] + pcb = mock.Mock() self.patchobject(res, 'lock') - scheduler.TaskRunner(res.create_convergence, self.stack.t.id, - res_data, 'engine-007', self.dummy_timeout)() + res.create_convergence(self.stack.t.id, res_data, 'engine-007', + self.dummy_timeout, pcb) return res def test_update_restricted(self): @@ -4193,7 +4203,8 @@ def test_replace_restricted_type_change_with_convergence(self): {}, 'engine-007', self.dummy_timeout, - self.new_stack)) + self.new_stack, + eventlet.event.Event())) self.assertEqual('ResourceActionRestricted: resources.bar: ' 'replace is restricted for resource.', six.text_type(error)) @@ -4224,7 +4235,8 @@ def test_update_restricted_type_change_with_convergence(self): {}, 'engine-007', self.dummy_timeout, - self.new_stack)) + self.new_stack, + eventlet.event.Event())) self.assertIn('requires replacement', six.text_type(error)) ev.assert_not_called()