Skip to content

Commit

Permalink
Refactor vendor_passthru to use conductor async workers
Browse files Browse the repository at this point in the history
To acheive sync validation + async vendor action we used two rpc calls.
Currently we have async workers mechanism in conductor so this patch
changes vendor_passthru flow to:
rpc CALL -> validate -> spawn worker -> return driver info.

Also this patch moves vendor_passthru exclusive lock creation to
conductor level to have common lock for the whole method and prevent
races.

Change-Id: I9b0173841398d72b1818e986caf3d9a486cd205e
  • Loading branch information
max-lobur committed Mar 5, 2014
1 parent 36a9682 commit d7b3158
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 100 deletions.
75 changes: 42 additions & 33 deletions ironic/conductor/manager.py
Expand Up @@ -157,7 +157,7 @@ def _do_sync_power_state(task):
class ConductorManager(service.PeriodicService):
"""Ironic Conductor service main class."""

RPC_API_VERSION = '1.11'
RPC_API_VERSION = '1.12'

def __init__(self, host, topic):
serializer = objects_base.IronicObjectSerializer()
Expand Down Expand Up @@ -279,41 +279,50 @@ def change_node_power_state(self, context, node_id, new_state):
# Release node lock if error occurred.
task.release_resources()

# NOTE(deva): There is a race condition in the RPC API for vendor_passthru.
# Between the validate_vendor_action and do_vendor_action calls, it's
# possible another conductor instance may acquire a lock, or change the
# state of the node, such that validate() succeeds but do() fails.
# TODO(deva): Implement an intent lock to prevent this race. Do this after
# we have implemented intelligent RPC routing so that the do() will be
# guaranteed to land on the same conductor instance that performed
# validate().
def validate_vendor_action(self, context, node_id, driver_method, info):
"""Validate driver specific info or get driver status."""

LOG.debug(_("RPC validate_vendor_action called for node %s.")
def vendor_passthru(self, context, node_id, driver_method, info):
"""RPC method to encapsulate vendor action.
Synchronously validate driver specific info or get driver status,
and if successful, start background worker to perform vendor action
asynchronously.
:param context: an admin context.
:param node_id: the id or uuid of a node.
:param driver_method: the name of the vendor method.
:param info: vendor method args.
:raises: InvalidParameterValue if supplied info is not valid.
:raises: UnsupportedDriverExtension if current driver does not have
vendor interface.
:raises: NoFreeConductorWorker when there is no free worker to start
async task.
"""
LOG.debug(_("RPC vendor_passthru called for node %s.")
% node_id)
with task_manager.acquire(context, node_id, shared=True) as task:
try:
if getattr(task.driver, 'vendor', None):
return task.driver.vendor.validate(task, task.node,
method=driver_method,
**info)
else:
raise exception.UnsupportedDriverExtension(
driver=task.node.driver,
extension='vendor passthru')
except Exception as e:
with excutils.save_and_reraise_exception():
task.node.last_error = \
_("Failed to validate vendor info. Error: %s") % e
task.node.save(context)
# NOTE(max_lobur): Even though not all vendor_passthru calls may
# require an exclusive lock, we need to do so to guarantee that the
# state doesn't unexpectedly change between doing a vendor.validate
# and vendor.vendor_passthru.
task = task_manager.TaskManager(context, node_id, shared=False)

def do_vendor_action(self, context, node_id, driver_method, info):
"""Run driver action asynchronously."""
try:
if not getattr(task.driver, 'vendor', None):
raise exception.UnsupportedDriverExtension(
driver=task.node.driver,
extension='vendor passthru')

with task_manager.acquire(context, node_id, shared=True) as task:
task.driver.vendor.vendor_passthru(task, task.node,
method=driver_method, **info)
task.driver.vendor.validate(task, task.node, method=driver_method,
**info)
# Start requested action in the background.
thread = self._spawn_worker(task.driver.vendor.vendor_passthru,
task, task.node, method=driver_method,
**info)
# Release node lock at the end of async action.
thread.link(lambda t: task.release_resources())
except Exception:
with excutils.save_and_reraise_exception():
# Release node lock if error occurred.
task.release_resources()

def do_node_deploy(self, context, node_id):
"""RPC method to initiate deployment to a node.
Expand Down
38 changes: 16 additions & 22 deletions ironic/conductor/rpcapi.py
Expand Up @@ -57,10 +57,12 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
1.9 - Added destroy_node.
1.10 - Remove get_node_power_state
1.11 - Added get_console_information, set_console_mode.
1.12 - validate_vendor_action, do_vendor_action replaced by single
vendor_passthru method.
"""

RPC_API_VERSION = '1.11'
RPC_API_VERSION = '1.12'

def __init__(self, topic=None):
if topic is None:
Expand Down Expand Up @@ -138,36 +140,28 @@ def change_node_power_state(self, context, node_id, new_state, topic=None):

def vendor_passthru(self, context, node_id, driver_method, info,
topic=None):
"""Pass vendor specific info to a node driver.
"""Synchronously, acquire lock, validate given parameters and start
the conductor background task for specified vendor action.
:param context: request context.
:param node_id: node id or uuid.
:param driver_method: name of method for driver.
:param info: info for node driver.
:param topic: RPC topic. Defaults to self.topic.
:raises: InvalidParameterValue for parameter errors.
:raises: UnsupportedDriverExtension for unsupported extensions.
:raises: InvalidParameterValue if supplied info is not valid.
:raises: UnsupportedDriverExtension if current driver does not have
vendor interface.
:raises: NoFreeConductorWorker when there is no free worker to start
async task.
"""
topic = topic or self.topic

driver_data = self.call(context,
self.make_msg('validate_vendor_action',
node_id=node_id,
driver_method=driver_method,
info=info),
topic=topic)

# this method can do nothing if 'driver_method' intended only
# for obtain 'driver_data'
self.cast(context,
self.make_msg('do_vendor_action',
node_id=node_id,
driver_method=driver_method,
info=info),
topic=topic)

return driver_data
return self.call(context,
self.make_msg('vendor_passthru',
node_id=node_id,
driver_method=driver_method,
info=info),
topic=topic)

def do_node_deploy(self, context, node_id, topic=None):
"""Signal to conductor service to perform a deployment.
Expand Down
5 changes: 2 additions & 3 deletions ironic/drivers/modules/pxe.py
Expand Up @@ -701,6 +701,7 @@ def validate(self, task, node, **kwargs):

return True

@task_manager.require_exclusive_lock
def _continue_deploy(self, task, node, **kwargs):
"""Resume a deployment upon getting POST data from deploy ramdisk.
Expand Down Expand Up @@ -769,6 +770,4 @@ def vendor_passthru(self, task, node, **kwargs):
kwargs.get('persistent'))

elif method == 'pass_deploy_info':
ctx = task.context
with task_manager.acquire(ctx, node['uuid']) as inner_task:
self._continue_deploy(inner_task, node, **kwargs)
self._continue_deploy(task, node, **kwargs)
91 changes: 71 additions & 20 deletions ironic/tests/conductor/test_manager.py
Expand Up @@ -459,52 +459,103 @@ def test_update_node_invalid_driver(self):
res = objects.Node.get_by_uuid(self.context, node['uuid'])
self.assertEqual(existing_driver, res['driver'])

def test_vendor_action(self):
def test_vendor_passthru_success(self):
n = utils.get_test_node(driver='fake')
self.dbapi.create_node(n)
node = self.dbapi.create_node(n)
info = {'bar': 'baz'}
self.service.do_vendor_action(
self.context, n['uuid'], 'first_method', info)
self.service.start()

def test_validate_vendor_action(self):
n = utils.get_test_node(driver='fake')
self.service.vendor_passthru(
self.context, n['uuid'], 'first_method', info)
# Waiting to make sure the below assertions are valid.
self.service._worker_pool.waitall()

node.refresh(self.context)
self.assertIsNone(node.last_error)
# Verify reservation has been cleared.
self.assertIsNone(node.reservation)

def test_vendor_passthru_node_already_locked(self):
fake_reservation = 'test_reserv'
n = utils.get_test_node(driver='fake', reservation=fake_reservation)
node = self.dbapi.create_node(n)
info = {'bar': 'baz'}
self.service.validate_vendor_action(
self.context, n['uuid'], 'first_method', info)
self.service.start()

self.assertRaises(exception.NodeLocked,
self.service.vendor_passthru,
self.context, n['uuid'], 'first_method', info)

node.refresh(self.context)
self.assertIsNone(node.last_error)
# Verify the existing reservation is not broken.
self.assertEqual(fake_reservation, node.reservation)

def test_validate_vendor_action_unsupported_method(self):
def test_vendor_passthru_unsupported_method(self):
n = utils.get_test_node(driver='fake')
node = self.dbapi.create_node(n)
info = {'bar': 'baz'}
self.service.start()

self.assertRaises(exception.InvalidParameterValue,
self.service.validate_vendor_action,
self.context, n['uuid'], 'abc', info)
self.service.vendor_passthru,
self.context, n['uuid'], 'unsupported_method', info)

node.refresh(self.context)
self.assertIsNotNone(node.last_error)
self.assertIsNone(node.last_error)
# Verify reservation has been cleared.
self.assertIsNone(node.reservation)

def test_validate_vendor_action_no_parameter(self):
def test_vendor_passthru_invalid_method_parameters(self):
n = utils.get_test_node(driver='fake')
node = self.dbapi.create_node(n)
info = {'fake': 'baz'}
info = {'invalid_param': 'whatever'}
self.service.start()

self.assertRaises(exception.InvalidParameterValue,
self.service.validate_vendor_action,
self.service.vendor_passthru,
self.context, n['uuid'], 'first_method', info)

node.refresh(self.context)
self.assertIsNotNone(node.last_error)
self.assertIsNone(node.last_error)
# Verify reservation has been cleared.
self.assertIsNone(node.reservation)

def test_validate_vendor_action_unsupported(self):
def test_vendor_passthru_vendor_interface_not_supported(self):
n = utils.get_test_node(driver='fake')
node = self.dbapi.create_node(n)
info = {'bar': 'baz'}
self.driver.vendor = None
self.service.start()

self.assertRaises(exception.UnsupportedDriverExtension,
self.service.validate_vendor_action,
self.context, n['uuid'], 'foo', info)
self.service.vendor_passthru,
self.context, n['uuid'], 'whatever_method', info)

node.refresh(self.context)
self.assertIsNotNone(node.last_error)
# Verify reservation has been cleared.
self.assertIsNone(node.reservation)

def test_vendor_passthru_worker_pool_full(self):
n = utils.get_test_node(driver='fake')
node = self.dbapi.create_node(n)
info = {'bar': 'baz'}
self.service.start()

with mock.patch.object(self.service, '_spawn_worker') \
as spawn_mock:
spawn_mock.side_effect = exception.NoFreeConductorWorker()

self.assertRaises(exception.NoFreeConductorWorker,
self.service.vendor_passthru,
self.context, n['uuid'], 'first_method', info)
# Waiting to make sure the below assertions are valid.
self.service._worker_pool.waitall()

node.refresh(self.context)
self.assertIsNone(node.last_error)
# Verify reservation has been cleared.
self.assertIsNone(node.reservation)

def test_do_node_deploy_invalid_state(self):
# test node['provision_state'] is not NOSTATE
Expand Down
17 changes: 5 additions & 12 deletions ironic/tests/conductor/test_rpcapi.py
Expand Up @@ -117,18 +117,11 @@ def test_change_node_power_state(self):
new_state=states.POWER_ON)

def test_pass_vendor_info(self):
ctxt = context.get_admin_context()
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
expected_retval = 'hello world'

def _fake_rpc_method(*args, **kwargs):
return expected_retval

self.useFixture(fixtures.MonkeyPatch(
'ironic.openstack.common.rpc.call', _fake_rpc_method))
retval = rpcapi.vendor_passthru(ctxt, node_id=self.fake_node['uuid'],
driver_method='foo', info={'bar': 'baz'})
self.assertEqual(expected_retval, retval)
self._test_rpcapi('vendor_passthru',
'call',
node_id=self.fake_node['uuid'],
driver_method='test-driver-method',
info={"test_info": "test_value"})

def test_do_node_deploy(self):
self._test_rpcapi('do_node_deploy',
Expand Down
15 changes: 5 additions & 10 deletions ironic/tests/drivers/test_pxe.py
Expand Up @@ -771,8 +771,7 @@ def fake_deploy(**kwargs):
'ironic.drivers.modules.deploy_utils.deploy',
fake_deploy))

with task_manager.acquire(self.context, self.node.uuid,
shared=True) as task:
with task_manager.acquire(self.context, self.node.uuid) as task:
task.resources[0].driver.vendor.vendor_passthru(task, self.node,
method='pass_deploy_info', address='123456', iqn='aaa-bbb',
key='fake-56789')
Expand All @@ -794,8 +793,7 @@ def fake_deploy(**kwargs):
'ironic.drivers.modules.deploy_utils.deploy',
fake_deploy))

with task_manager.acquire(self.context, [self.node.uuid],
shared=True) as task:
with task_manager.acquire(self.context, [self.node.uuid]) as task:
task.resources[0].driver.vendor.vendor_passthru(task, self.node,
method='pass_deploy_info', address='123456', iqn='aaa-bbb',
key='fake-56789')
Expand All @@ -817,8 +815,7 @@ def fake_deploy(**kwargs):
'ironic.drivers.modules.deploy_utils.deploy',
fake_deploy))

with task_manager.acquire(self.context, [self.node.uuid],
shared=True) as task:
with task_manager.acquire(self.context, [self.node.uuid]) as task:
task.resources[0].driver.vendor.vendor_passthru(task, self.node,
method='pass_deploy_info', address='123456', iqn='aaa-bbb',
key='fake-56789', error='test ramdisk error')
Expand All @@ -832,17 +829,15 @@ def test_continue_deploy_invalid(self):
self.node.provision_state = 'FAKE'
self.node.save(self.context)

with task_manager.acquire(self.context, [self.node.uuid],
shared=True) as task:
with task_manager.acquire(self.context, [self.node.uuid]) as task:
task.resources[0].driver.vendor.vendor_passthru(task, self.node,
method='pass_deploy_info', address='123456', iqn='aaa-bbb',
key='fake-56789', error='test ramdisk error')
self.assertEqual('FAKE', self.node.provision_state)
self.assertEqual(states.POWER_ON, self.node.power_state)

def test_lock_elevated(self):
with task_manager.acquire(self.context, [self.node['uuid']],
shared=True) as task:
with task_manager.acquire(self.context, [self.node['uuid']]) as task:
with mock.patch.object(task.driver.vendor, '_continue_deploy') \
as _continue_deploy_mock:
task.driver.vendor.vendor_passthru(task, self.node,
Expand Down

0 comments on commit d7b3158

Please sign in to comment.