From 873a40851dd7807c6de0ee73affb7af2be875519 Mon Sep 17 00:00:00 2001 From: Anant Patil Date: Thu, 18 Aug 2016 15:31:23 +0530 Subject: [PATCH] Convergence: basic framework for cancelling workers Implements mechanism to cancel existing workers (in_progress resources). The stack-cancel-update request lands in one of the engines, and if there are any workers in that engine which are working for the stack, they are cancelled first and then other engines are requested to cancel the workers. Change-Id: I464c4fdb760247d436473af49448f7797dc0130d --- heat/db/api.py | 4 + heat/db/sqlalchemy/api.py | 9 ++ heat/engine/worker.py | 65 +++++++++- heat/tests/db/test_sqlalchemy_api.py | 29 +++++ heat/tests/engine/test_check_resource.py | 11 +- heat/tests/engine/test_engine_worker.py | 89 ++++++++++++- heat/tests/engine/test_sync_point.py | 5 + heat/tests/test_resource.py | 156 ++++++++++++----------- 8 files changed, 290 insertions(+), 78 deletions(-) diff --git a/heat/db/api.py b/heat/db/api.py index dd5d0eedf32..8345c42c4f1 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -142,6 +142,10 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None): return IMPL.resource_get_all_by_root_stack(context, stack_id, filters) +def engine_get_all_locked_by_stack(context, stack_id): + return IMPL.engine_get_all_locked_by_stack(context, stack_id) + + def resource_purge_deleted(context, stack_id): return IMPL.resource_purge_deleted(context, stack_id) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 4a4b032dbf6..40db736d609 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -410,6 +410,15 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None): return dict((res.id, res) for res in results) +def engine_get_all_locked_by_stack(context, stack_id): + query = context.session.query( + func.distinct(models.Resource.engine_id) + ).filter( + models.Resource.stack_id == stack_id, + models.Resource.engine_id.isnot(None)) + return set(i[0] for i in query.all()) + + def stack_get_by_name_and_owner_id(context, stack_name, owner_id): query = soft_delete_aware_query( context, models.Stack diff --git a/heat/engine/worker.py b/heat/engine/worker.py index f434b9ff992..70fea6bfe08 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -25,12 +25,16 @@ from heat.common.i18n import _LI from heat.common.i18n import _LW from heat.common import messaging as rpc_messaging +from heat.db import api as db_api from heat.engine import check_resource from heat.engine import sync_point +from heat.rpc import api as rpc_api from heat.rpc import worker_client as rpc_client LOG = logging.getLogger(__name__) +CANCEL_RETRIES = 3 + @profiler.trace_cls("rpc") class WorkerService(service.Service): @@ -107,6 +111,26 @@ def stop_traversal(self, stack): "%(name)s while cancelling the operation."), {'name': stack.name, 'trvsl': old_trvsl}) + def stop_all_workers(self, stack): + # stop the traversal + if stack.status == stack.IN_PROGRESS: + self.stop_traversal(stack) + + # cancel existing workers + cancelled = _cancel_workers(stack, self.thread_group_mgr, + self.engine_id, self._rpc_client) + if not cancelled: + LOG.error(_LE("Failed to stop all workers of stack %(name)s " + ", stack cancel not complete"), + {'name': stack.name}) + return False + + LOG.info(_LI('[%(name)s(%(id)s)] Stopped all active workers for stack ' + '%(action)s'), + {'name': stack.name, 'id': stack.id, 'action': stack.action}) + + return True + @context.request_context def check_resource(self, cnxt, resource_id, current_traversal, data, is_update, adopt_stack_data): @@ -145,5 +169,42 @@ def cancel_check_resource(self, cnxt, stack_id): All the workers running for the given stack will be cancelled. """ - # TODO(ananta): Implement cancel check-resource - LOG.debug('Cancelling workers for stack [%s]', stack_id) + _cancel_check_resource(stack_id, self.engine_id, self.thread_group_mgr) + + +def _cancel_check_resource(stack_id, engine_id, tgm): + LOG.debug('Cancelling workers for stack [%s] in engine [%s]', + stack_id, engine_id) + tgm.send(stack_id, rpc_api.THREAD_CANCEL) + + +def _wait_for_cancellation(stack, wait=5): + # give enough time to wait till cancel is completed + retries = CANCEL_RETRIES + while retries > 0: + retries -= 1 + eventlet.sleep(wait) + engines = db_api.engine_get_all_locked_by_stack( + stack.context, stack.id) + if not engines: + return True + + return False + + +def _cancel_workers(stack, tgm, local_engine_id, rpc_client): + engines = db_api.engine_get_all_locked_by_stack(stack.context, stack.id) + + if not engines: + return True + + # cancel workers running locally + if local_engine_id in engines: + _cancel_check_resource(stack.id, local_engine_id, tgm) + engines.remove(local_engine_id) + + # cancel workers on remote engines + for engine_id in engines: + rpc_client.cancel_check_resource(stack.context, stack.id, engine_id) + + return _wait_for_cancellation(stack) diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index 9b9f7515cd1..a266978a787 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -2436,6 +2436,35 @@ def test_resource_purge_deleted_by_stack(self): self.assertRaises(exception.NotFound, db_api.resource_get, self.ctx, resource.id) + def test_engine_get_all_locked_by_stack(self): + values = [ + {'name': 'res1', 'action': rsrc.Resource.DELETE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.COMPLETE}, + {'name': 'res2', 'action': rsrc.Resource.DELETE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'}, + {'name': 'res3', 'action': rsrc.Resource.UPDATE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-002'}, + {'name': 'res4', 'action': rsrc.Resource.CREATE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.COMPLETE}, + {'name': 'res5', 'action': rsrc.Resource.INIT, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.COMPLETE}, + {'name': 'res6', 'action': rsrc.Resource.CREATE, + 'root_stack_id': self.stack.id, + 'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'}, + {'name': 'res6'}, + ] + for val in values: + create_resource(self.ctx, self.stack, **val) + + engines = db_api.engine_get_all_locked_by_stack(self.ctx, + self.stack.id) + self.assertEqual({'engine-001', 'engine-002'}, engines) + class DBAPIStackLockTest(common.HeatTestCase): def setUp(self): diff --git a/heat/tests/engine/test_check_resource.py b/heat/tests/engine/test_check_resource.py index 307c16a06df..bfe405398e9 100644 --- a/heat/tests/engine/test_check_resource.py +++ b/heat/tests/engine/test_check_resource.py @@ -108,11 +108,13 @@ def test_is_update_traversal( self.resource.id, mock.ANY, True) + @mock.patch.object(resource.Resource, 'load') @mock.patch.object(resource.Resource, 'make_replacement') @mock.patch.object(stack.Stack, 'time_remaining') def test_is_update_traversal_raise_update_replace( - self, tr, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc, - mock_cid): + self, tr, mock_mr, mock_load, mock_cru, mock_crc, mock_pcr, + mock_csc, mock_cid): + mock_load.return_value = self.resource, self.stack, self.stack mock_cru.side_effect = resource.UpdateReplace tr.return_value = 317 self.worker.check_resource( @@ -550,10 +552,13 @@ def setUp(self): self.is_update = False self.graph_key = (self.resource.id, self.is_update) + @mock.patch.object(resource.Resource, 'load') @mock.patch.object(stack.Stack, 'time_remaining') def test_is_cleanup_traversal( - self, tr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + self, tr, mock_load, mock_cru, mock_crc, mock_pcr, mock_csc, + mock_cid): tr.return_value = 317 + mock_load.return_value = self.resource, self.stack, self.stack self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, self.is_update, None) diff --git a/heat/tests/engine/test_engine_worker.py b/heat/tests/engine/test_engine_worker.py index e45488236b5..421cf4981b8 100644 --- a/heat/tests/engine/test_engine_worker.py +++ b/heat/tests/engine/test_engine_worker.py @@ -15,8 +15,10 @@ import mock +from heat.db import api as db_api from heat.engine import check_resource from heat.engine import worker +from heat.rpc import worker_client as wc from heat.tests import common from heat.tests import utils @@ -44,11 +46,11 @@ def test_service_start(self, target_class, rpc_server_method ): + self.worker = worker.WorkerService('host-1', 'topic-1', 'engine_id', mock.Mock()) - self.worker.start() # Make sure target is called with proper parameters @@ -133,3 +135,88 @@ def test_check_resource_adds_and_removes_msg_queue_on_exception( self.assertTrue(mock_tgm.add_msg_queue.called) # ensure remove is also called self.assertTrue(mock_tgm.remove_msg_queue.called) + + @mock.patch.object(worker, '_wait_for_cancellation') + @mock.patch.object(worker, '_cancel_check_resource') + @mock.patch.object(wc.WorkerClient, 'cancel_check_resource') + @mock.patch.object(db_api, 'engine_get_all_locked_by_stack') + def test_cancel_workers_when_no_resource_found(self, mock_get_locked, + mock_ccr, mock_wccr, + mock_wc): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.id = 'stack_id' + mock_get_locked.return_value = [] + worker._cancel_workers(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(mock_wccr.called) + self.assertFalse(mock_ccr.called) + + @mock.patch.object(worker, '_wait_for_cancellation') + @mock.patch.object(worker, '_cancel_check_resource') + @mock.patch.object(wc.WorkerClient, 'cancel_check_resource') + @mock.patch.object(db_api, 'engine_get_all_locked_by_stack') + def test_cancel_workers_with_resources_found(self, mock_get_locked, + mock_ccr, mock_wccr, + mock_wc): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.id = 'stack_id' + mock_get_locked.return_value = ['engine-001', 'engine-007', + 'engine-008'] + worker._cancel_workers(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + mock_wccr.assert_called_once_with(stack.id, 'engine-001', mock_tgm) + self.assertEqual(2, mock_ccr.call_count) + calls = [mock.call(stack.context, stack.id, 'engine-007'), + mock.call(stack.context, stack.id, 'engine-008')] + mock_ccr.assert_has_calls(calls, any_order=True) + self.assertTrue(mock_wc.called) + + @mock.patch.object(worker, '_cancel_workers') + @mock.patch.object(worker.WorkerService, 'stop_traversal') + def test_stop_all_workers_when_stack_in_progress(self, mock_st, mock_cw): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.IN_PROGRESS = 'IN_PROGRESS' + stack.status = stack.IN_PROGRESS + stack.id = 'stack_id' + stack.rollback = mock.MagicMock() + _worker.stop_all_workers(stack) + mock_st.assert_called_once_with(stack) + mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(stack.rollback.called) + + @mock.patch.object(worker, '_cancel_workers') + @mock.patch.object(worker.WorkerService, 'stop_traversal') + def test_stop_all_workers_when_stack_not_in_progress(self, mock_st, + mock_cw): + mock_tgm = mock.Mock() + _worker = worker.WorkerService('host-1', 'topic-1', 'engine-001', + mock_tgm) + stack = mock.MagicMock() + stack.FAILED = 'FAILED' + stack.status = stack.FAILED + stack.id = 'stack_id' + stack.rollback = mock.MagicMock() + _worker.stop_all_workers(stack) + self.assertFalse(mock_st.called) + mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(stack.rollback.called) + + # test when stack complete + stack.FAILED = 'FAILED' + stack.status = stack.FAILED + _worker.stop_all_workers(stack) + self.assertFalse(mock_st.called) + mock_cw.assert_called_with(stack, mock_tgm, 'engine-001', + _worker._rpc_client) + self.assertFalse(stack.rollback.called) diff --git a/heat/tests/engine/test_sync_point.py b/heat/tests/engine/test_sync_point.py index b3a230eeb81..615f70cd326 100644 --- a/heat/tests/engine/test_sync_point.py +++ b/heat/tests/engine/test_sync_point.py @@ -22,6 +22,11 @@ class SyncPointTestCase(common.HeatTestCase): + def setUp(self): + super(SyncPointTestCase, self).setUp() + self.dummy_event = mock.MagicMock() + self.dummy_event.ready.return_value = False + def test_sync_waiting(self): ctx = utils.dummy_context() stack = tools.get_stack('test_stack', utils.dummy_context(), diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index 6e934ed11a0..e020b9265d6 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -12,6 +12,7 @@ # under the License. import collections +import eventlet import itertools import json import os @@ -72,6 +73,7 @@ def setUp(self): env=self.env), stack_id=str(uuid.uuid4())) self.dummy_timeout = 10 + self.dummy_event = eventlet.event.Event() def test_get_class_ok(self): cls = resources.global_env().get_class_to_instantiate( @@ -1880,10 +1882,7 @@ def test_release_ignores_not_found_error(self, mock_sau, mock_get_obj): res._release('engine-id') self.assertFalse(mock_sau.called) - @mock.patch.object(resource.scheduler.TaskRunner, '__init__', - return_value=None) - @mock.patch.object(resource.scheduler.TaskRunner, '__call__') - def test_create_convergence(self, mock_call, mock_init): + def test_create_convergence(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res.action = res.CREATE @@ -1893,12 +1892,11 @@ def test_create_convergence(self, mock_call, mock_init): (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} pcb = mock.Mock() - res.create_convergence(self.stack.t.id, res_data, 'engine-007', - 60, pcb) + with mock.patch.object(resource.Resource, 'create') as mock_create: + res.create_convergence(self.stack.t.id, res_data, 'engine-007', + -1, pcb) + self.assertTrue(mock_create.called) - mock_init.assert_called_once_with(res.create) - mock_call.assert_called_once_with(timeout=60, progress_callback=pcb) - self.assertEqual(self.stack.t.id, res.current_template_id) self.assertItemsEqual([1, 3], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -1910,9 +1908,9 @@ def test_create_convergence_throws_timeout(self): res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} + pcb = mock.Mock() self.assertRaises(scheduler.Timeout, res.create_convergence, - self.stack.t.id, res_data, 'engine-007', - -1) + self.stack.t.id, res_data, 'engine-007', -1, pcb) def test_create_convergence_sets_requires_for_failure(self): """Ensure that requires are computed correctly. @@ -1930,7 +1928,7 @@ def test_create_convergence_sets_requires_for_failure(self): (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} self.assertRaises(exception.ResourceNotAvailable, res.create_convergence, self.stack.t.id, res_data, - 'engine-007', self.dummy_timeout) + 'engine-007', self.dummy_timeout, self.dummy_event) self.assertItemsEqual([5, 3], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -1945,9 +1943,10 @@ def test_adopt_convergence_ok(self, mock_adopt): self._assert_resource_lock(res.id, None, None) res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} - res.create_convergence(self.stack.t.id, res_data, 'engine-007', - self.dummy_timeout) - + tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id, + res_data, 'engine-007', self.dummy_timeout, + self.dummy_event) + tr() mock_adopt.assert_called_once_with( resource_data={'resource_id': 'fluffy'}) self.assertItemsEqual([5, 3], res.requires) @@ -1962,15 +1961,13 @@ def test_adopt_convergence_bad_data(self): self._assert_resource_lock(res.id, None, None) res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} - exc = self.assertRaises(exception.ResourceFailure, - res.create_convergence, self.stack.t.id, - res_data, 'engine-007', self.dummy_timeout) + tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id, + res_data, 'engine-007', self.dummy_timeout, + self.dummy_event) + exc = self.assertRaises(exception.ResourceFailure, tr) self.assertIn('Resource ID was not provided', six.text_type(exc)) - @mock.patch.object(resource.scheduler.TaskRunner, '__init__', - return_value=None) - @mock.patch.object(resource.scheduler.TaskRunner, '__call__') - def test_update_convergence(self, mock_call, mock_init): + def test_update_convergence(self): tmpl = template.Template({ 'HeatTemplateFormatVersion': '2012-12-12', 'Resources': { @@ -1981,6 +1978,7 @@ def test_update_convergence(self, mock_call, mock_init): stack.converge_stack(stack.t, action=stack.CREATE) res = stack.resources['test_res'] res.requires = [2] + res.action = res.CREATE res._store() self._assert_resource_lock(res.id, None, None) @@ -1997,13 +1995,13 @@ def test_update_convergence(self, mock_call, mock_init): res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} pcb = mock.Mock() - res.update_convergence(new_temp.id, res_data, 'engine-007', 120, - new_stack, pcb) + with mock.patch.object(resource.Resource, 'update') as mock_update: + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', 120, new_stack, + pcb) + tr() + self.assertTrue(mock_update.called) - expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] - mock_init.assert_called_once_with(res.update, expected_rsrc_def) - mock_call.assert_called_once_with(timeout=120, progress_callback=pcb) - self.assertEqual(new_temp.id, res.current_template_id) self.assertItemsEqual([3, 4], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -2030,9 +2028,10 @@ def test_update_convergence_throws_timeout(self): new_temp, stack_id=self.stack.id) res_data = {} - self.assertRaises(scheduler.Timeout, res.update_convergence, - new_temp.id, res_data, 'engine-007', - -1, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', -1, new_stack, + self.dummy_event) + self.assertRaises(scheduler.Timeout, tr) def test_update_convergence_with_substitute_class(self): tmpl = rsrc_defn.ResourceDefinition('test_res', @@ -2073,9 +2072,10 @@ def test_update_convergence_checks_resource_class(self): new_temp, stack_id=self.stack.id) res_data = {} - self.assertRaises(resource.UpdateReplace, res.update_convergence, - new_temp.id, res_data, 'engine-007', - -1, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', -1, new_stack, + self.dummy_event) + self.assertRaises(resource.UpdateReplace, tr) def test_update_in_progress_convergence(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2088,12 +2088,10 @@ def test_update_in_progress_convergence(self): res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} - ex = self.assertRaises(exception.UpdateInProgress, - res.update_convergence, - 'template_key', - res_data, 'engine-007', - self.dummy_timeout, - mock.ANY) + tr = scheduler.TaskRunner(res.update_convergence, 'template_key', + res_data, 'engine-007', self.dummy_timeout, + mock.ANY, self.dummy_event) + ex = self.assertRaises(exception.UpdateInProgress, tr) msg = ("The resource %s is already being updated." % res.name) self.assertEqual(msg, six.text_type(ex)) @@ -2130,9 +2128,10 @@ def test_update_resource_convergence_failed(self, mock_update): new_temp, stack_id=self.stack.id) dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE) mock_update.side_effect = dummy_ex - self.assertRaises(exception.ResourceFailure, - res.update_convergence, new_temp.id, res_data, - 'engine-007', 120, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', 120, new_stack, + self.dummy_event) + self.assertRaises(exception.ResourceFailure, tr) expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] mock_update.assert_called_once_with(expected_rsrc_def) @@ -2170,9 +2169,10 @@ def test_update_resource_convergence_update_replace(self, mock_update): mock_update.side_effect = resource.UpdateReplace new_stack = parser.Stack(utils.dummy_context(), 'test_stack', new_temp, stack_id=self.stack.id) - self.assertRaises(resource.UpdateReplace, - res.update_convergence, new_temp.id, res_data, - 'engine-007', 120, new_stack) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', 120, new_stack, + self.dummy_event) + self.assertRaises(resource.UpdateReplace, tr) expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] mock_update.assert_called_once_with(expected_rsrc_def) @@ -2202,7 +2202,7 @@ def test_convergence_update_replace_rollback(self): res.restore_prev_rsrc = mock.Mock() tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {}, 'engine-007', self.dummy_timeout, - new_stack) + new_stack, self.dummy_event) self.assertRaises(resource.UpdateReplace, tr) self.assertTrue(res.restore_prev_rsrc.called) @@ -2225,15 +2225,13 @@ def test_convergence_update_replace_rollback_restore_prev_rsrc_error(self): 'Simulate rollback') res.restore_prev_rsrc = mock.Mock(side_effect=Exception) tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {}, - 'engine-007', self.dummy_timeout, new_stack) + 'engine-007', self.dummy_timeout, new_stack, + self.dummy_event) self.assertRaises(exception.ResourceFailure, tr) self.assertTrue(res.restore_prev_rsrc.called) self.assertEqual((res.UPDATE, res.FAILED), res.state) - @mock.patch.object(resource.scheduler.TaskRunner, '__init__', - return_value=None) - @mock.patch.object(resource.scheduler.TaskRunner, '__call__') - def test_delete_convergence_ok(self, mock_call, mock_init): + def test_delete_convergence_ok(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res.current_template_id = 1 @@ -2244,11 +2242,13 @@ def test_delete_convergence_ok(self, mock_call, mock_init): res._update_replacement_data = mock.Mock() self._assert_resource_lock(res.id, None, None) pcb = mock.Mock() - res.delete_convergence(2, {}, 'engine-007', 20, pcb) - - mock_init.assert_called_once_with(res.delete) - mock_call.assert_called_once_with(timeout=20, progress_callback=pcb) + with mock.patch.object(resource.Resource, 'delete') as mock_delete: + tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, + 'engine-007', 20, pcb) + tr() + self.assertTrue(mock_delete.called) self.assertTrue(res._update_replacement_data.called) + self._assert_resource_lock(res.id, None, 2) def test_delete_convergence_does_not_delete_same_template_resource(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2256,8 +2256,10 @@ def test_delete_convergence_does_not_delete_same_template_resource(self): res.current_template_id = 'same-template' res._store() res.delete = mock.Mock() - res.delete_convergence('same-template', {}, 'engine-007', - self.dummy_timeout) + tr = scheduler.TaskRunner(res.delete_convergence, 'same-template', {}, + 'engine-007', self.dummy_timeout, + self.dummy_event) + tr() self.assertFalse(res.delete.called) def test_delete_convergence_fail(self): @@ -2270,9 +2272,9 @@ def test_delete_convergence_fail(self): res_id = res.id res.handle_delete = mock.Mock(side_effect=ValueError('test')) self._assert_resource_lock(res.id, None, None) - self.assertRaises(exception.ResourceFailure, - res.delete_convergence, 2, {}, 'engine-007', - self.dummy_timeout) + tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007', + self.dummy_timeout, self.dummy_event) + self.assertRaises(exception.ResourceFailure, tr) self.assertTrue(res.handle_delete.called) # confirm that the DB object still exists, and it's lock is released. @@ -2291,9 +2293,10 @@ def test_delete_in_progress_convergence(self): rs = resource_objects.Resource.get_obj(self.stack.context, res.id) rs.update_and_save({'engine_id': 'not-this'}) self._assert_resource_lock(res.id, 'not-this', None) - ex = self.assertRaises(exception.UpdateInProgress, - res.delete_convergence, - 1, {}, 'engine-007', self.dummy_timeout) + + tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007', + self.dummy_timeout, self.dummy_event) + ex = self.assertRaises(exception.UpdateInProgress, tr) msg = ("The resource %s is already being updated." % res.name) self.assertEqual(msg, six.text_type(ex)) @@ -2308,7 +2311,9 @@ def test_delete_convergence_updates_needed_by(self): res.destroy = mock.Mock() input_data = {(1, False): 4, (2, False): 5} # needed_by resource ids self._assert_resource_lock(res.id, None, None) - res.delete_convergence(1, input_data, 'engine-007', self.dummy_timeout) + scheduler.TaskRunner(res.delete_convergence, 1, input_data, + 'engine-007', self.dummy_timeout, + self.dummy_event)() self.assertItemsEqual([4, 5], res.needed_by) @mock.patch.object(resource_objects.Resource, 'get_obj') @@ -2449,7 +2454,9 @@ def test_delete_convergence_deletes_resource_in_init_state(self): res._store() with mock.patch.object(resource_objects.Resource, 'delete') as resource_del: - res.delete_convergence(1, {}, 'engine-007', 1) + tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, + 'engine-007', 1, self.dummy_event) + tr() resource_del.assert_called_once_with(res.context, res.id) def test_delete_convergence_throws_timeout(self): @@ -2458,8 +2465,9 @@ def test_delete_convergence_throws_timeout(self): res.action = res.CREATE res._store() timeout = -1 # to emulate timeout - self.assertRaises(scheduler.Timeout, res.delete_convergence, - 1, {}, 'engine-007', timeout) + tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007', + timeout, self.dummy_event) + self.assertRaises(scheduler.Timeout, tr) @mock.patch.object(parser.Stack, 'load') @mock.patch.object(resource.Resource, '_load_data') @@ -4054,6 +4062,7 @@ def setUp(self): } } self.dummy_timeout = 10 + self.dummy_event = eventlet.event.Event() def create_resource(self): self.stack = parser.Stack(utils.dummy_context(), 'test_stack', @@ -4069,9 +4078,10 @@ def create_convergence_resource(self): stack_id=str(uuid.uuid4())) res_data = {} res = self.stack['bar'] + pcb = mock.Mock() self.patchobject(res, 'lock') - scheduler.TaskRunner(res.create_convergence, self.stack.t.id, - res_data, 'engine-007', self.dummy_timeout)() + res.create_convergence(self.stack.t.id, res_data, 'engine-007', + self.dummy_timeout, pcb) return res def test_update_restricted(self): @@ -4193,7 +4203,8 @@ def test_replace_restricted_type_change_with_convergence(self): {}, 'engine-007', self.dummy_timeout, - self.new_stack)) + self.new_stack, + eventlet.event.Event())) self.assertEqual('ResourceActionRestricted: resources.bar: ' 'replace is restricted for resource.', six.text_type(error)) @@ -4224,7 +4235,8 @@ def test_update_restricted_type_change_with_convergence(self): {}, 'engine-007', self.dummy_timeout, - self.new_stack)) + self.new_stack, + eventlet.event.Event())) self.assertIn('requires replacement', six.text_type(error)) ev.assert_not_called()