Skip to content

Commit

Permalink
placement: genericize on resource providers
Browse files Browse the repository at this point in the history
A simple patch that makes a number of the methods of the scheduler
report client just take a resource provider UUID and a set of inventory
records. With upcoming patches, the compute node is not the only
provider of resources in the system and we want to keep various report
client methods for adding resource provider and inventory records as
generic as possible instead of being specific to a compute node.

Change-Id: Ia8fe9d71893e8bf3f4f8cddd329613e269fef210
blueprint: generic-resource-pools-ocata
  • Loading branch information
jaypipes committed Nov 13, 2016
1 parent 3b0c53e commit f1cd928
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 71 deletions.
71 changes: 37 additions & 34 deletions nova/scheduler/client/report.py
Expand Up @@ -264,29 +264,29 @@ def _ensure_resource_provider(self, uuid, name=None):
self._resource_providers[uuid] = rp
return rp

def _get_inventory(self, compute_node):
url = '/resource_providers/%s/inventories' % compute_node.uuid
def _get_inventory(self, rp_uuid):
url = '/resource_providers/%s/inventories' % rp_uuid
result = self.get(url)
if not result:
return {'inventories': {}}
return result.json()

def _update_inventory_attempt(self, compute_node):
"""Update the inventory for this compute node if needed.
def _update_inventory_attempt(self, rp_uuid, inv_data):
"""Update the inventory for this resource provider if needed.
:param compute_node: The objects.ComputeNode for the operation
:param rp_uuid: The resource provider UUID for the operation
:param inv_data: The new inventory for the resource provider
:returns: True if the inventory was updated (or did not need to be),
False otherwise.
"""
inv_data = _compute_node_to_inventory_dict(compute_node)
curr = self._get_inventory(compute_node)
curr = self._get_inventory(rp_uuid)

# Update our generation immediately, if possible. Even if there
# are no inventories we should always have a generation but let's
# be careful.
server_gen = curr.get('resource_provider_generation')
if server_gen:
my_rp = self._resource_providers[compute_node.uuid]
my_rp = self._resource_providers[rp_uuid]
if server_gen != my_rp.generation:
LOG.debug('Updating our resource provider generation '
'from %(old)i to %(new)i',
Expand All @@ -298,36 +298,38 @@ def _update_inventory_attempt(self, compute_node):
if inv_data == curr.get('inventories', {}):
return True

cur_rp_gen = self._resource_providers[compute_node.uuid].generation
cur_rp_gen = self._resource_providers[rp_uuid].generation
payload = {
'resource_provider_generation': cur_rp_gen,
'inventories': inv_data,
}
url = '/resource_providers/%s/inventories' % compute_node.uuid
url = '/resource_providers/%s/inventories' % rp_uuid
result = self.put(url, payload)
if result.status_code == 409:
LOG.info(_LI('Inventory update conflict for %s'),
compute_node.uuid)
rp_uuid)
# Invalidate our cache and re-fetch the resource provider
# to be sure to get the latest generation.
del self._resource_providers[compute_node.uuid]
self._ensure_resource_provider(compute_node.uuid,
compute_node.hypervisor_hostname)
del self._resource_providers[rp_uuid]
# NOTE(jaypipes): We don't need to pass a name parameter to
# _ensure_resource_provider() because we know the resource provider
# record already exists. We're just reloading the record here.
self._ensure_resource_provider(rp_uuid)
return False
elif not result:
LOG.warning(_LW('Failed to update inventory for '
LOG.warning(_LW('Failed to update inventory for resource provider '
'%(uuid)s: %(status)i %(text)s'),
{'uuid': compute_node.uuid,
{'uuid': rp_uuid,
'status': result.status_code,
'text': result.text})
return False

if result.status_code != 200:
LOG.info(
_LI('Received unexpected response code %(code)i while '
'trying to update inventory for compute node %(uuid)s'
'trying to update inventory for resource provider %(uuid)s'
': %(text)s'),
{'uuid': compute_node.uuid,
{'uuid': rp_uuid,
'code': result.status_code,
'text': result.text})
return False
Expand All @@ -336,23 +338,23 @@ def _update_inventory_attempt(self, compute_node):
updated_inventories_result = result.json()
new_gen = updated_inventories_result['resource_provider_generation']

self._resource_providers[compute_node.uuid].generation = new_gen
self._resource_providers[rp_uuid].generation = new_gen
LOG.debug('Updated inventory for %s at generation %i',
compute_node.uuid, new_gen)
rp_uuid, new_gen)
return True

@safe_connect
def _update_inventory(self, compute_node):
def _update_inventory(self, rp_uuid, inv_data):
for attempt in (1, 2, 3):
if compute_node.uuid not in self._resource_providers:
if rp_uuid not in self._resource_providers:
# NOTE(danms): Either we failed to fetch/create the RP
# on our first attempt, or a previous attempt had to
# invalidate the cache, and we were unable to refresh
# it. Bail and try again next time.
LOG.warning(_LW(
'Unable to refresh my resource provider record'))
return False
if self._update_inventory_attempt(compute_node):
if self._update_inventory_attempt(rp_uuid, inv_data):
return True
time.sleep(1)
return False
Expand All @@ -365,26 +367,27 @@ def update_resource_stats(self, compute_node):
compute_node.save()
self._ensure_resource_provider(compute_node.uuid,
compute_node.hypervisor_hostname)
self._update_inventory(compute_node)
inv_data = _compute_node_to_inventory_dict(compute_node)
self._update_inventory(compute_node.uuid, inv_data)

def _get_allocations_for_instance(self, compute_node, instance):
def _get_allocations_for_instance(self, rp_uuid, instance):
url = '/allocations/%s' % instance.uuid
resp = self.get(url)
if not resp:
return {}
else:
# NOTE(cdent): This trims to just the allocations being
# used on this compute node. In the future when there
# used on this resource provider. In the future when there
# are shared resources there might be other providers.
return resp.json()['allocations'].get(
compute_node.uuid, {}).get('resources', {})
rp_uuid, {}).get('resources', {})

@safe_connect
def _allocate_for_instance(self, compute_node, instance):
def _allocate_for_instance(self, rp_uuid, instance):
url = '/allocations/%s' % instance.uuid

my_allocations = _instance_to_allocations_dict(instance)
current_allocations = self._get_allocations_for_instance(compute_node,
current_allocations = self._get_allocations_for_instance(rp_uuid,
instance)
if current_allocations == my_allocations:
allocstr = ','.join(['%s=%s' % (k, v)
Expand All @@ -397,7 +400,7 @@ def _allocate_for_instance(self, compute_node, instance):
'allocations': [
{
'resource_provider': {
'uuid': compute_node.uuid,
'uuid': rp_uuid,
},
'resources': my_allocations,
},
Expand Down Expand Up @@ -435,21 +438,21 @@ def _delete_allocation_for_instance(self, uuid):

def update_instance_allocation(self, compute_node, instance, sign):
if sign > 0:
self._allocate_for_instance(compute_node, instance)
self._allocate_for_instance(compute_node.uuid, instance)
else:
self._delete_allocation_for_instance(instance.uuid)

@safe_connect
def _get_allocations(self, compute_node):
url = '/resource_providers/%s/allocations' % compute_node.uuid
def _get_allocations(self, rp_uuid):
url = '/resource_providers/%s/allocations' % rp_uuid
resp = self.get(url)
if not resp:
return {}
else:
return resp.json()['allocations']

def remove_deleted_instances(self, compute_node, instance_uuids):
allocations = self._get_allocations(compute_node)
allocations = self._get_allocations(compute_node.uuid)
if allocations is None:
allocations = {}

Expand Down
83 changes: 46 additions & 37 deletions nova/tests/unit/scheduler/client/test_report.py
Expand Up @@ -98,6 +98,16 @@ def setUp(self):
super(SchedulerReportClientTestCase, self).setUp()
self.context = context.get_admin_context()
self.ks_sess_mock = mock.Mock()
self.compute_node = objects.ComputeNode(
uuid=uuids.compute_node,
hypervisor_hostname='foo',
vcpus=8,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0,
)

with test.nested(
mock.patch('keystoneauth1.session.Session',
Expand Down Expand Up @@ -398,40 +408,17 @@ def test_compute_node_inventory(self):
'_ensure_resource_provider')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_update_inventory_attempt')
def test_update_resource_stats_rp_fail(self, mock_ui, mock_erp):
cn = mock.MagicMock()
@mock.patch('nova.objects.ComputeNode.save')
def test_update_resource_stats(self, mock_save, mock_ui, mock_erp):
cn = self.compute_node
self.client.update_resource_stats(cn)
cn.save.assert_called_once_with()
mock_save.assert_called_once_with()
mock_erp.assert_called_once_with(cn.uuid, cn.hypervisor_hostname)
self.assertFalse(mock_ui.called)

@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_ensure_resource_provider')
@mock.patch.object(objects.ComputeNode, 'save')
def test_update_resource_stats_saves(self, mock_save, mock_ensure):
cn = objects.ComputeNode(context=self.context,
uuid=uuids.compute_node,
hypervisor_hostname='host1')
self.client.update_resource_stats(cn)
mock_save.assert_called_once_with()
mock_ensure.assert_called_once_with(uuids.compute_node, 'host1')


class TestInventory(SchedulerReportClientTestCase):

def setUp(self):
super(TestInventory, self).setUp()
self.compute_node = objects.ComputeNode(
uuid=uuids.compute_node,
hypervisor_hostname='foo',
vcpus=8,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0,
)

@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'get')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
Expand Down Expand Up @@ -463,7 +450,10 @@ def test_update_inventory(self, mock_put, mock_get):
}
}

result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertTrue(result)

exp_url = '/resource_providers/%s/inventories' % uuid
Expand Down Expand Up @@ -541,7 +531,10 @@ def test_update_inventory_no_update(self, mock_put, mock_get):
},
}
}
result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertTrue(result)
exp_url = '/resource_providers/%s/inventories' % uuid
mock_get.assert_called_once_with(exp_url)
Expand Down Expand Up @@ -569,13 +562,16 @@ def test_update_inventory_conflicts(self, mock_ensure,
mock_get.return_value = {}
mock_put.return_value.status_code = 409

result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertFalse(result)

# Invalidated the cache
self.assertNotIn(uuid, self.client._resource_providers)
# Refreshed our resource provider
mock_ensure.assert_called_once_with(uuid, 'foo')
mock_ensure.assert_called_once_with(uuid)

@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_inventory')
Expand All @@ -593,7 +589,10 @@ def test_update_inventory_unknown_response(self, mock_put, mock_get):
mock_get.return_value = {}
mock_put.return_value.status_code = 234

result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertFalse(result)

# No cache invalidation
Expand All @@ -619,7 +618,10 @@ def test_update_inventory_failed(self, mock_put, mock_get):
# Thanks py3
mock_put.return_value.__bool__.return_value = False

result = self.client._update_inventory_attempt(compute_node)
inv_data = report._compute_node_to_inventory_dict(compute_node)
result = self.client._update_inventory_attempt(
compute_node.uuid, inv_data
)
self.assertFalse(result)

# No cache invalidation
Expand All @@ -639,7 +641,9 @@ def test_update_inventory_fails_and_then_succeeds(self, mock_sleep,
mock_update.side_effect = (False, True)

self.client._resource_providers[cn.uuid] = True
result = self.client._update_inventory(cn)
result = self.client._update_inventory(
cn.uuid, mock.sentinel.inv_data
)
self.assertTrue(result)

# Only slept once
Expand All @@ -658,15 +662,20 @@ def test_update_inventory_never_succeeds(self, mock_sleep,
mock_update.side_effect = (False, False, False)

self.client._resource_providers[cn.uuid] = True
result = self.client._update_inventory(cn)
result = self.client._update_inventory(
cn.uuid, mock.sentinel.inv_data
)
self.assertFalse(result)

# Slept three times
mock_sleep.assert_has_calls([mock.call(1), mock.call(1), mock.call(1)])

# Three attempts to update
mock_update.assert_has_calls([mock.call(cn), mock.call(cn),
mock.call(cn)])
mock_update.assert_has_calls([
mock.call(cn.uuid, mock.sentinel.inv_data),
mock.call(cn.uuid, mock.sentinel.inv_data),
mock.call(cn.uuid, mock.sentinel.inv_data),
])

# Slept three times
mock_sleep.assert_has_calls([mock.call(1), mock.call(1), mock.call(1)])
Expand Down

0 comments on commit f1cd928

Please sign in to comment.