From 5bc0ff6354a399f712c777fa43153f05a50a1ce3 Mon Sep 17 00:00:00 2001 From: Brian Elliott Date: Fri, 2 Nov 2012 19:41:15 +0000 Subject: [PATCH] Add support for resizes to resource tracker. Keep track of additional resources required to resize an instance to a new host. Also hold resources for a revert resize to the original host. This fixes race conditions where the destination host could become overscheduled. (or the source host in the event of a revert) bug 1065267 Change-Id: Ic565d4e2ab9bee40f25fe9f198e1217cdd92ca1b --- nova/compute/api.py | 12 +- nova/compute/claims.py | 35 ++ nova/compute/manager.py | 58 +-- nova/compute/resource_tracker.py | 216 ++++++++++- nova/compute/stats.py | 4 + nova/tests/compute/test_claims.py | 53 ++- nova/tests/compute/test_resource_tracker.py | 400 +++++++++++++++++--- 7 files changed, 675 insertions(+), 103 deletions(-) diff --git a/nova/compute/api.py b/nova/compute/api.py index 5a8bcad2f37..fe83c7fbebf 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -1564,13 +1564,13 @@ def revert_resize(self, context, instance): task_state=task_states.RESIZE_REVERTING, expected_task_state=None) + self.db.migration_update(elevated, migration_ref['id'], + {'status': 'reverting'}) + self.compute_rpcapi.revert_resize(context, instance=instance, migration=migration_ref, host=migration_ref['dest_compute'], reservations=reservations) - self.db.migration_update(elevated, migration_ref['id'], - {'status': 'reverted'}) - @wrap_check_policy @check_instance_lock @check_instance_state(vm_state=[vm_states.RESIZED]) @@ -1588,14 +1588,14 @@ def confirm_resize(self, context, instance): task_state=None, expected_task_state=None) + self.db.migration_update(elevated, migration_ref['id'], + {'status': 'confirming'}) + self.compute_rpcapi.confirm_resize(context, instance=instance, migration=migration_ref, host=migration_ref['source_compute'], reservations=reservations) - self.db.migration_update(elevated, migration_ref['id'], - {'status': 'confirmed'}) - @staticmethod def _resize_quota_delta(context, new_instance_type, old_instance_type, sense, compare): diff --git a/nova/compute/claims.py b/nova/compute/claims.py index 6415ae187ae..375a9742bff 100644 --- a/nova/compute/claims.py +++ b/nova/compute/claims.py @@ -29,6 +29,9 @@ class NopClaim(object): """For use with compute drivers that do not support resource tracking""" + def __init__(self, migration=None): + self.migration = migration + @property def disk_gb(self): return 0 @@ -184,3 +187,35 @@ def _test(self, type_, unit, total, used, requested, limit): LOG.info(msg, instance=self.instance) return can_claim + + +class ResizeClaim(Claim): + """Claim used for holding resources for an incoming resize/migration + operation. + """ + def __init__(self, instance, instance_type, tracker): + super(ResizeClaim, self).__init__(instance, tracker) + self.instance_type = instance_type + self.migration = None + + @property + def disk_gb(self): + return (self.instance_type['root_gb'] + + self.instance_type['ephemeral_gb']) + + @property + def memory_mb(self): + return self.instance_type['memory_mb'] + + @property + def vcpus(self): + return self.instance_type['vcpus'] + + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-') + def abort(self): + """Compute operation requiring claimed resources has failed or + been aborted. + """ + LOG.debug(_("Aborting claim: %s") % self, instance=self.instance) + self.tracker.abort_resize_claim(self.instance['uuid'], + self.instance_type) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index be20c7a4001..3acc271df07 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -1593,6 +1593,9 @@ def confirm_resize(self, context, instance, reservations=None, self.driver.confirm_migration(migration, instance, self._legacy_nw_info(network_info)) + rt = self._get_resource_tracker(instance.get('node')) + rt.confirm_resize(context, migration) + self._notify_about_instance_usage( context, instance, "resize.confirm.end", network_info=network_info) @@ -1637,6 +1640,9 @@ def revert_resize(self, context, instance, migration=None, self._terminate_volume_connections(context, instance) + rt = self._get_resource_tracker(instance.get('node')) + rt.revert_resize(context, migration, status='reverted_dest') + self.compute_rpcapi.finish_revert_resize(context, instance, migration, migration['source_compute'], reservations) @@ -1707,8 +1713,8 @@ def finish_revert_resize(self, context, instance, reservations=None, vm_state=vm_states.ACTIVE, task_state=None) - self.db.migration_update(elevated, migration['id'], - {'status': 'reverted'}) + rt = self._get_resource_tracker(instance.get('node')) + rt.revert_resize(context, migration) self._notify_about_instance_usage( context, instance, "resize.revert.end") @@ -1725,6 +1731,29 @@ def _quota_rollback(context, reservations): if reservations: QUOTAS.rollback(context, reservations) + def _prep_resize(self, context, image, instance, instance_type, + reservations, request_spec, filter_properties): + + if not filter_properties: + filter_properties = {} + + same_host = instance['host'] == self.host + if same_host and not CONF.allow_resize_to_same_host: + self._set_instance_error_state(context, instance['uuid']) + msg = _('destination same as source!') + raise exception.MigrationError(msg) + + limits = filter_properties.get('limits', {}) + rt = self._get_resource_tracker(instance.get('node')) + with rt.resize_claim(context, instance, instance_type, limits=limits) \ + as claim: + migration_ref = claim.migration + + LOG.audit(_('Migrating'), context=context, + instance=instance) + self.compute_rpcapi.resize_instance(context, instance, + migration_ref, image, instance_type, reservations) + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @reverts_task_state @wrap_instance_fault @@ -1742,30 +1771,9 @@ def prep_resize(self, context, image, instance, instance_type, context, instance, current_period=True) self._notify_about_instance_usage( context, instance, "resize.prep.start") - try: - same_host = instance['host'] == self.host - if same_host and not CONF.allow_resize_to_same_host: - self._set_instance_error_state(context, instance['uuid']) - msg = _('destination same as source!') - raise exception.MigrationError(msg) - - old_instance_type = instance['instance_type'] - - migration_ref = self.db.migration_create(context.elevated(), - {'instance_uuid': instance['uuid'], - 'source_compute': instance['host'], - 'dest_compute': self.host, - 'dest_host': self.driver.get_host_ip_addr(), - 'old_instance_type_id': old_instance_type['id'], - 'new_instance_type_id': instance_type['id'], - 'status': 'pre-migrating'}) - - LOG.audit(_('Migrating'), context=context, - instance=instance) - self.compute_rpcapi.resize_instance(context, instance, - migration_ref, image, instance_type, reservations) - + self._prep_resize(context, image, instance, instance_type, + reservations, request_spec, filter_properties) except Exception: # try to re-schedule the resize elsewhere: self._reschedule_resize_or_reraise(context, image, instance, diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py index cc99b7c4f83..d567fb67542 100644 --- a/nova/compute/resource_tracker.py +++ b/nova/compute/resource_tracker.py @@ -20,6 +20,8 @@ """ from nova.compute import claims +from nova.compute import instance_types +from nova.compute import task_states from nova.compute import vm_states from nova import config from nova import context @@ -29,6 +31,7 @@ from nova import notifications from nova.openstack.common import cfg from nova.openstack.common import importutils +from nova.openstack.common import jsonutils from nova.openstack.common import lockutils from nova.openstack.common import log as logging from nova import utils @@ -62,6 +65,7 @@ def __init__(self, host, driver, nodename): self.compute_node = None self.stats = importutils.import_object(CONF.compute_stats_class) self.tracked_instances = {} + self.tracked_migrations = {} @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-') def instance_claim(self, context, instance_ref, limits=None): @@ -110,10 +114,69 @@ def instance_claim(self, context, instance_ref, limits=None): else: raise exception.ComputeResourcesUnavailable() + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-') + def resize_claim(self, context, instance_ref, instance_type, limits=None): + """Indicate that resources are needed for a resize operation to this + compute host. + :param context: security context + :param instance_ref: instance to reserve resources for + :param instance_type: new instance_type being resized to + :param limits: Dict of oversubscription limits for memory, disk, + and CPUs. + :returns: A Claim ticket representing the reserved resources. This + should be turned into finalize a resource claim or free + resources after the compute operation is finished. + """ + if self.disabled: + # compute_driver doesn't support resource tracking, just + # generate the migration record and continue the resize: + migration_ref = self._create_migration(context, instance_ref, + instance_type) + return claims.NopClaim(migration=migration_ref) + + claim = claims.ResizeClaim(instance_ref, instance_type, self) + + if claim.test(self.compute_node, limits): + + migration_ref = self._create_migration(context, instance_ref, + instance_type) + claim.migration = migration_ref + + # Mark the resources in-use for the resize landing on this + # compute host: + self._update_usage_from_migration(self.compute_node, migration_ref) + self._update(context, self.compute_node) + + return claim + + else: + raise exception.ComputeResourcesUnavailable() + + def _create_migration(self, context, instance, instance_type): + """Create a migration record for the upcoming resize. This should + be done while the COMPUTE_RESOURCES_SEMAPHORE is held so the resource + claim will not be lost if the audit process starts. + """ + # TODO(russellb): no-db-compute: Send the old instance type + # info that is needed via rpc so db access isn't required + # here. + old_instance_type_id = instance['instance_type_id'] + old_instance_type = instance_types.get_instance_type( + old_instance_type_id) + + return db.migration_create(context.elevated(), + {'instance_uuid': instance['uuid'], + 'source_compute': instance['host'], + 'dest_compute': self.host, + 'dest_host': self.driver.get_host_ip_addr(), + 'old_instance_type_id': old_instance_type['id'], + 'new_instance_type_id': instance_type['id'], + 'status': 'pre-migrating'}) + def _set_instance_host(self, context, instance_uuid): """Tag the instance as belonging to this host. This should be done - while the COMPUTE_RESOURCES_SEMPAHORE is being held so the resource - claim will not be lost if the audit process starts. + while the COMPUTE_RESOURCES_SEMPAHORE is held so the resource claim + will not be lost if the audit process starts. """ values = {'host': self.host, 'launched_on': self.host} (old_ref, instance_ref) = db.instance_update_and_get_original(context, @@ -131,6 +194,18 @@ def abort_instance_claim(self, instance): ctxt = context.get_admin_context() self._update(ctxt, self.compute_node) + def abort_resize_claim(self, instance_uuid, instance_type): + """Remove usage for an incoming migration""" + if instance_uuid in self.tracked_migrations: + migration, itype = self.tracked_migrations.pop(instance_uuid) + + if instance_type['id'] == migration['new_instance_type_id']: + self.stats.update_stats_for_migration(itype, sign=-1) + self._update_usage(self.compute_node, itype, sign=-1) + + ctxt = context.get_admin_context() + self._update(ctxt, self.compute_node) + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-') def update_usage(self, context, instance): """Update the resource usage and stats after a change in an @@ -139,9 +214,10 @@ def update_usage(self, context, instance): if self.disabled: return + uuid = instance['uuid'] + # don't update usage for this instance unless it submitted a resource # claim first: - uuid = instance['uuid'] if uuid in self.tracked_instances: self._update_usage_from_instance(self.compute_node, instance) self._update(context.elevated(), self.compute_node) @@ -159,6 +235,7 @@ def update_available_resource(self, context): declared a need for resources, but not necessarily retrieved them from the hypervisor layer yet. """ + LOG.audit(_("Auditing locally available compute resources")) resources = self.driver.get_available_resource(self.nodename) if not resources: # The virt driver does not support this function @@ -177,6 +254,12 @@ def update_available_resource(self, context): # Now calculate usage based on instance utilization: self._update_usage_from_instances(resources, instances) + + # Grab all in-progress migrations: + migrations = db.migration_get_in_progress_by_host(context, self.host) + + self._update_usage_from_migrations(resources, migrations) + self._report_final_resource_view(resources) self._sync_compute_node(context, resources) @@ -260,6 +343,104 @@ def _update(self, context, values, prune_stats=False): self.compute_node['id'], values, prune_stats) self.compute_node = dict(compute_node) + def confirm_resize(self, context, migration, status='confirmed'): + """Cleanup usage for a confirmed resize""" + elevated = context.elevated() + db.migration_update(elevated, migration['id'], + {'status': status}) + self.update_available_resource(elevated) + + def revert_resize(self, context, migration, status='reverted'): + """Cleanup usage for a reverted resize""" + self.confirm_resize(context, migration, status) + + def _update_usage(self, resources, usage, sign=1): + resources['memory_mb_used'] += sign * usage['memory_mb'] + resources['local_gb_used'] += sign * usage['root_gb'] + resources['local_gb_used'] += sign * usage['ephemeral_gb'] + + # free ram and disk may be negative, depending on policy: + resources['free_ram_mb'] = (resources['memory_mb'] - + resources['memory_mb_used']) + resources['free_disk_gb'] = (resources['local_gb'] - + resources['local_gb_used']) + + resources['running_vms'] = self.stats.num_instances + resources['vcpus_used'] = self.stats.num_vcpus_used + + def _update_usage_from_migration(self, resources, migration): + """Update usage for a single migration. The record may + represent an incoming or outbound migration. + """ + uuid = migration['instance_uuid'] + LOG.audit("Updating from migration %s" % uuid) + + incoming = (migration['dest_compute'] == self.host) + outbound = (migration['source_compute'] == self.host) + same_host = (incoming and outbound) + + instance = self.tracked_instances.get(uuid, None) + itype = None + + if same_host: + # same host resize. record usage for whichever instance type the + # instance is *not* in: + if (instance['instance_type_id'] == + migration['old_instance_type_id']): + + itype = migration['new_instance_type_id'] + else: + # instance record already has new flavor, hold space for a + # possible revert to the old instance type: + itype = migration['old_instance_type_id'] + + elif incoming and not instance: + # instance has not yet migrated here: + itype = migration['new_instance_type_id'] + + elif outbound and not instance: + # instance migrated, but record usage for a possible revert: + itype = migration['old_instance_type_id'] + + if itype: + instance_type = instance_types.get_instance_type(itype) + self.stats.update_stats_for_migration(instance_type) + self._update_usage(resources, instance_type) + resources['stats'] = self.stats + self.tracked_migrations[uuid] = (migration, instance_type) + + def _update_usage_from_migrations(self, resources, migrations): + + self.tracked_migrations.clear() + + filtered = {} + + # do some defensive filtering against bad migrations records in the + # database: + for migration in migrations: + + instance = migration['instance'] + + if not instance: + # migration referencing deleted instance + continue + + uuid = instance['uuid'] + + # skip migration if instance isn't in a resize state: + if not self._instance_in_resize_state(instance): + LOG.warn(_("Instance not resizing, skipping migration."), + instance_uuid=uuid) + continue + + # filter to most recently updated migration for each instance: + m = filtered.get(uuid, None) + if not m or migration['updated_at'] >= m['updated_at']: + filtered[uuid] = migration + + for migration in filtered.values(): + self._update_usage_from_migration(resources, migration) + def _update_usage_from_instance(self, resources, instance): """Update usage for a single instance.""" @@ -268,7 +449,7 @@ def _update_usage_from_instance(self, resources, instance): is_deleted_instance = instance['vm_state'] == vm_states.DELETED if is_new_instance: - self.tracked_instances[uuid] = 1 + self.tracked_instances[uuid] = jsonutils.to_primitive(instance) sign = 1 if is_deleted_instance: @@ -280,18 +461,7 @@ def _update_usage_from_instance(self, resources, instance): # if it's a new or deleted instance: if is_new_instance or is_deleted_instance: # new instance, update compute node resource usage: - resources['memory_mb_used'] += sign * instance['memory_mb'] - resources['local_gb_used'] += sign * instance['root_gb'] - resources['local_gb_used'] += sign * instance['ephemeral_gb'] - - # free ram and disk may be negative, depending on policy: - resources['free_ram_mb'] = (resources['memory_mb'] - - resources['memory_mb_used']) - resources['free_disk_gb'] = (resources['local_gb'] - - resources['local_gb_used']) - - resources['running_vms'] = self.stats.num_instances - resources['vcpus_used'] = self.stats.num_vcpus_used + self._update_usage(resources, instance, sign=sign) resources['current_workload'] = self.stats.calculate_workload() resources['stats'] = self.stats @@ -329,3 +499,17 @@ def _verify_resources(self, resources): if missing_keys: reason = _("Missing keys: %s") % missing_keys raise exception.InvalidInput(reason=reason) + + def _instance_in_resize_state(self, instance): + vm = instance['vm_state'] + task = instance['task_state'] + + if vm == vm_states.RESIZED: + return True + + if (vm == vm_states.ACTIVE and task in [task_states.RESIZE_PREP, + task_states.RESIZE_MIGRATING, task_states.RESIZE_MIGRATED, + task_states.RESIZE_FINISH]): + return True + + return False diff --git a/nova/compute/stats.py b/nova/compute/stats.py index 062fac59ffd..44b92c6de69 100644 --- a/nova/compute/stats.py +++ b/nova/compute/stats.py @@ -114,6 +114,10 @@ def update_stats_for_instance(self, instance): # save updated I/O workload in stats: self["io_workload"] = self.io_workload + def update_stats_for_migration(self, instance_type, sign=1): + x = self.get("num_vcpus_used", 0) + self["num_vcpus_used"] = x + (sign * instance_type['vcpus']) + def _decrement(self, key): x = self.get(key, 0) self[key] = x - 1 diff --git a/nova/tests/compute/test_claims.py b/nova/tests/compute/test_claims.py index f631c1665d3..b780420ec8c 100644 --- a/nova/tests/compute/test_claims.py +++ b/nova/tests/compute/test_claims.py @@ -26,15 +26,27 @@ LOG = logging.getLogger(__name__) +class DummyTracker(object): + icalled = False + rcalled = False + + def abort_instance_claim(self, *args, **kwargs): + self.icalled = True + + def abort_resize_claim(self, *args, **kwargs): + self.rcalled = True + + class ClaimTestCase(test.TestCase): def setUp(self): super(ClaimTestCase, self).setUp() self.resources = self._fake_resources() + self.tracker = DummyTracker() def _claim(self, **kwargs): instance = self._fake_instance(**kwargs) - return claims.Claim(instance, None) + return claims.Claim(instance, self.tracker) def _fake_instance(self, **kwargs): instance = { @@ -47,6 +59,18 @@ def _fake_instance(self, **kwargs): instance.update(**kwargs) return instance + def _fake_instance_type(self, **kwargs): + instance_type = { + 'id': 1, + 'name': 'fakeitype', + 'memory_mb': 1, + 'vcpus': 1, + 'root_gb': 1, + 'ephemeral_gb': 2 + } + instance_type.update(**kwargs) + return instance_type + def _fake_resources(self, values=None): resources = { 'memory_mb': 2048, @@ -109,17 +133,30 @@ def test_disk_insufficient(self): self.assertFalse(claim.test(self.resources, limits)) def test_abort(self): - instance = self._fake_instance(root_gb=10, ephemeral_gb=40) + claim = self._abort() + self.assertTrue(claim.tracker.icalled) - def fake_abort(self): - self._called = True - - self.stubs.Set(claims.Claim, 'abort', fake_abort) + def _abort(self): claim = None try: - with claims.Claim(instance, None) as claim: + with self._claim(memory_mb=4096) as claim: raise test.TestingException("abort") except test.TestingException: pass - self.assertTrue(claim._called) + return claim + + +class ResizeClaimTestCase(ClaimTestCase): + + def setUp(self): + super(ResizeClaimTestCase, self).setUp() + self.instance = self._fake_instance() + + def _claim(self, **kwargs): + instance_type = self._fake_instance_type(**kwargs) + return claims.ResizeClaim(self.instance, instance_type, self.tracker) + + def test_abort(self): + claim = self._abort() + self.assertTrue(claim.tracker.rcalled) diff --git a/nova/tests/compute/test_resource_tracker.py b/nova/tests/compute/test_resource_tracker.py index 3997de133a0..5a7fcac2b32 100644 --- a/nova/tests/compute/test_resource_tracker.py +++ b/nova/tests/compute/test_resource_tracker.py @@ -19,6 +19,8 @@ import uuid +from nova.compute import claims +from nova.compute import instance_types from nova.compute import resource_tracker from nova.compute import task_states from nova.compute import vm_states @@ -40,9 +42,13 @@ class UnsupportedVirtDriver(driver.ComputeDriver): """Pretend version of a lame virt driver""" + def __init__(self): super(UnsupportedVirtDriver, self).__init__(None) + def get_host_ip_addr(self): + return '127.0.0.1' + def get_available_resource(self, nodename): # no support for getting resource usage info return {} @@ -59,6 +65,9 @@ def __init__(self): self.memory_mb_used = 0 self.local_gb_used = 0 + def get_host_ip_addr(self): + return '127.0.0.1' + def get_available_resource(self, nodename): d = { 'vcpus': self.vcpus, @@ -83,13 +92,18 @@ def setUp(self): self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) - self.context = context.RequestContext('fake', 'fake') + self.context = context.get_admin_context() self._instances = {} + self._instance_types = {} + self.stubs.Set(db, 'instance_get_all_by_host_and_node', - lambda c, h, n: self._instances.values()) + self._fake_instance_get_all_by_host_and_node) self.stubs.Set(db, 'instance_update_and_get_original', self._fake_instance_update_and_get_original) + self.stubs.Set(db, 'instance_type_get', self._fake_instance_type_get) + + self.host = 'fakehost' def _create_compute_node(self, values=None): compute = { @@ -131,7 +145,7 @@ def _fake_instance(self, *args, **kwargs): instance_uuid = str(uuid.uuid1()) instance = { 'uuid': instance_uuid, - 'vm_state': vm_states.BUILDING, + 'vm_state': vm_states.RESIZED, 'task_state': None, 'memory_mb': 2, 'root_gb': 3, @@ -140,12 +154,35 @@ def _fake_instance(self, *args, **kwargs): 'project_id': '123456', 'vcpus': 1, 'host': None, + 'instance_type_id': 1, } instance.update(kwargs) self._instances[instance_uuid] = instance return instance + def _fake_instance_type_create(self, **kwargs): + instance_type = { + 'id': 1, + 'name': 'fakeitype', + 'memory_mb': FAKE_VIRT_MEMORY_MB, + 'vcpus': FAKE_VIRT_VCPUS, + 'root_gb': FAKE_VIRT_LOCAL_GB / 2, + 'ephemeral_gb': FAKE_VIRT_LOCAL_GB / 2, + 'flavorid': 'fakeflavor' + } + instance_type.update(**kwargs) + + id_ = instance_type['id'] + self._instance_types[id_] = instance_type + return instance_type + + def _fake_instance_get_all_by_host_and_node(self, context, host, nodename): + return [i for i in self._instances.values() if i['host'] == host] + + def _fake_instance_type_get(self, ctxt, id_): + return self._instance_types[id_] + def _fake_instance_update_and_get_original(self, context, instance_uuid, values): instance = self._instances[instance_uuid] @@ -154,8 +191,11 @@ def _fake_instance_update_and_get_original(self, context, instance_uuid, # only used in the subsequent notification: return (instance, instance) - def _tracker(self, unsupported=False): - host = "fakehost" + def _tracker(self, host=None, unsupported=False): + + if host is None: + host = self.host + node = "fakenode" if unsupported: @@ -206,6 +246,23 @@ def test_disabled_updated_usage(self): root_gb=10) self.tracker.update_usage(self.context, instance) + def testDisabledResizeClaim(self): + instance = self._fake_instance() + instance_type = self._fake_instance_type_create() + claim = self.tracker.resize_claim(self.context, instance, + instance_type) + self.assertEqual(0, claim.memory_mb) + self.assertEqual(instance['uuid'], claim.migration['instance_uuid']) + self.assertEqual(instance_type['id'], + claim.migration['new_instance_type_id']) + + def testDisabledResizeContextClaim(self): + instance = self._fake_instance() + instance_type = self._fake_instance_type_create() + with self.tracker.resize_claim(self.context, instance, instance_type) \ + as claim: + self.assertEqual(0, claim.memory_mb) + class MissingServiceTestCase(BaseTestCase): def setUp(self): @@ -246,17 +303,39 @@ def test_enabled(self): self.assertFalse(self.tracker.disabled) -class ResourceTestCase(BaseTestCase): +class BaseTrackerTestCase(BaseTestCase): + def setUp(self): - super(ResourceTestCase, self).setUp() + # setup plumbing for a working resource tracker with required + # database models and a compatible compute driver: + super(BaseTrackerTestCase, self).setUp() + self.tracker = self._tracker() + self._migrations = {} + self.stubs.Set(db, 'service_get_all_compute_by_host', self._fake_service_get_all_compute_by_host) self.stubs.Set(db, 'compute_node_update', self._fake_compute_node_update) + self.stubs.Set(db, 'migration_update', + self._fake_migration_update) + self.stubs.Set(db, 'migration_get_in_progress_by_host', + self._fake_migration_get_in_progress_by_host) self.tracker.update_available_resource(self.context) - self.limits = self._basic_limits() + self.limits = self._limits() + + self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb') + self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb') + self._assert(FAKE_VIRT_VCPUS, 'vcpus') + self._assert(0, 'memory_mb_used') + self._assert(0, 'local_gb_used') + self._assert(0, 'vcpus_used') + self._assert(0, 'running_vms') + self._assert(FAKE_VIRT_MEMORY_MB, 'free_ram_mb') + self._assert(FAKE_VIRT_LOCAL_GB, 'free_disk_gb') + self.assertFalse(self.tracker.disabled) + self.assertEqual(0, self.tracker.compute_node['current_workload']) def _fake_service_get_all_compute_by_host(self, ctx, host): self.compute = self._create_compute_node() @@ -271,36 +350,50 @@ def _fake_compute_node_update(self, ctx, compute_node_id, values, self.compute.update(values) return self.compute - def _basic_limits(self): - """Get basic limits, no oversubscription""" + def _fake_migration_get_in_progress_by_host(self, ctxt, host): + status = ['confirmed', 'reverted'] + migrations = [] + + for migration in self._migrations.values(): + if migration['status'] in status: + continue + + uuid = migration['instance_uuid'] + migration['instance'] = self._instances[uuid] + migrations.append(migration) + + return migrations + + def _fake_migration_update(self, ctxt, migration_id, values): + # cheat and assume there's only 1 migration present + migration = self._migrations.values()[0] + migration.update(values) + return migration + + def _limits(self, memory_mb=FAKE_VIRT_MEMORY_MB, + disk_gb=FAKE_VIRT_LOCAL_GB, vcpus=FAKE_VIRT_VCPUS): + """Create limits dictionary used for oversubscribing resources""" + return { - 'memory_mb': FAKE_VIRT_MEMORY_MB * 2, - 'disk_gb': FAKE_VIRT_LOCAL_GB, - 'vcpu': FAKE_VIRT_VCPUS, + 'memory_mb': memory_mb, + 'disk_gb': disk_gb, + 'vcpu': vcpus } - def test_update_usage_only_for_tracked(self): - instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1, - task_state=None) - self.tracker.update_usage(self.context, instance) + def _assert(self, value, field, tracker=None): - self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(0, self.tracker.compute_node['local_gb_used']) - self.assertEqual(0, self.tracker.compute_node['current_workload']) + if tracker is None: + tracker = self.tracker - claim = self.tracker.instance_claim(self.context, instance, - self.limits) - self.assertNotEqual(0, claim.memory_mb) - self.assertEqual(3, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(2, self.tracker.compute_node['local_gb_used']) + if not field in tracker.compute_node: + raise test.TestingException( + "'%(field)s' not in compute node." % locals()) + x = tracker.compute_node[field] - # now update should actually take effect - instance['task_state'] = task_states.SCHEDULING - self.tracker.update_usage(self.context, instance) + self.assertEqual(value, x) - self.assertEqual(3, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(2, self.tracker.compute_node['local_gb_used']) - self.assertEqual(1, self.tracker.compute_node['current_workload']) + +class TrackerTestCase(BaseTrackerTestCase): def test_free_ram_resource_value(self): driver = FakeVirtDriver() @@ -316,13 +409,33 @@ def test_update_compute_node(self): self.assertFalse(self.tracker.disabled) self.assertTrue(self.updated) - def test_claim_and_audit(self): - self.assertEqual(5, self.tracker.compute_node['memory_mb']) - self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(6, self.tracker.compute_node['local_gb']) - self.assertEqual(0, self.tracker.compute_node['local_gb_used']) +class InstanceClaimTestCase(BaseTrackerTestCase): + + def test_update_usage_only_for_tracked(self): + instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1, + task_state=None) + self.tracker.update_usage(self.context, instance) + + self._assert(0, 'memory_mb_used') + self._assert(0, 'local_gb_used') + self._assert(0, 'current_workload') + claim = self.tracker.instance_claim(self.context, instance, + self.limits) + self.assertNotEqual(0, claim.memory_mb) + self._assert(3, 'memory_mb_used') + self._assert(2, 'local_gb_used') + + # now update should actually take effect + instance['task_state'] = task_states.SCHEDULING + self.tracker.update_usage(self.context, instance) + + self._assert(3, 'memory_mb_used') + self._assert(2, 'local_gb_used') + self._assert(1, 'current_workload') + + def test_claim_and_audit(self): claim_mem = 3 claim_disk = 2 instance = self._fake_instance(memory_mb=claim_mem, root_gb=claim_disk, @@ -356,12 +469,6 @@ def test_claim_and_audit(self): self.assertEqual(6 - claim_disk, self.compute['free_disk_gb']) def test_claim_and_abort(self): - self.assertEqual(5, self.tracker.compute_node['memory_mb']) - self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) - - self.assertEqual(6, self.tracker.compute_node['local_gb']) - self.assertEqual(0, self.tracker.compute_node['local_gb_used']) - claim_mem = 3 claim_disk = 2 instance = self._fake_instance(memory_mb=claim_mem, @@ -370,21 +477,17 @@ def test_claim_and_abort(self): self.limits) self.assertNotEqual(None, claim) - self.assertEqual(5, self.compute["memory_mb"]) self.assertEqual(claim_mem, self.compute["memory_mb_used"]) self.assertEqual(5 - claim_mem, self.compute["free_ram_mb"]) - self.assertEqual(6, self.compute["local_gb"]) self.assertEqual(claim_disk, self.compute["local_gb_used"]) self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"]) claim.abort() - self.assertEqual(5, self.compute["memory_mb"]) self.assertEqual(0, self.compute["memory_mb_used"]) self.assertEqual(5, self.compute["free_ram_mb"]) - self.assertEqual(6, self.compute["local_gb"]) self.assertEqual(0, self.compute["local_gb_used"]) self.assertEqual(6, self.compute["free_disk_gb"]) @@ -452,8 +555,6 @@ def test_instance_context_claim(self): self.assertEqual(2, self.compute['local_gb_used']) def test_update_load_stats_for_instance(self): - self.assertFalse(self.tracker.disabled) - self.assertEqual(0, self.tracker.compute_node['current_workload']) instance = self._fake_instance(task_state=task_states.SCHEDULING) with self.tracker.instance_claim(self.context, instance): @@ -495,3 +596,206 @@ def test_cpu_stats(self): instance['vm_state'] = vm_states.DELETED self.tracker.update_usage(self.context, instance) self.assertEqual(1, self.tracker.compute_node['vcpus_used']) + + +class ResizeClaimTestCase(BaseTrackerTestCase): + + def setUp(self): + super(ResizeClaimTestCase, self).setUp() + + self.stubs.Set(db, 'migration_create', self._fake_migration_create) + + self.instance = self._fake_instance() + self.instance_type = self._fake_instance_type_create() + + def _fake_migration_create(self, context, values=None): + instance_uuid = str(uuid.uuid1()) + migration = { + 'id': 1, + 'source_compute': 'host1', + 'dest_compute': 'host2', + 'dest_host': '127.0.0.1', + 'old_instance_type_id': 1, + 'new_instance_type_id': 2, + 'instance_uuid': instance_uuid, + 'status': 'pre-migrating', + 'updated_at': timeutils.utcnow() + } + if values: + migration.update(values) + + self._migrations[instance_uuid] = migration + return migration + + def test_claim(self): + self.tracker.resize_claim(self.context, self.instance, + self.instance_type, self.limits) + self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used') + self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used') + self._assert(FAKE_VIRT_VCPUS, 'vcpus_used') + self.assertEqual(1, len(self.tracker.tracked_migrations)) + + def test_abort(self): + try: + with self.tracker.resize_claim(self.context, self.instance, + self.instance_type, self.limits): + raise test.TestingException("abort") + except test.TestingException: + pass + + self._assert(0, 'memory_mb_used') + self._assert(0, 'local_gb_used') + self._assert(0, 'vcpus_used') + self.assertEqual(0, len(self.tracker.tracked_migrations)) + + def test_additive_claims(self): + + limits = self._limits(FAKE_VIRT_MEMORY_MB * 2, FAKE_VIRT_LOCAL_GB * 2, + FAKE_VIRT_VCPUS * 2) + self.tracker.resize_claim(self.context, self.instance, + self.instance_type, limits) + instance2 = self._fake_instance() + self.tracker.resize_claim(self.context, instance2, self.instance_type, + limits) + + self._assert(2 * FAKE_VIRT_MEMORY_MB, 'memory_mb_used') + self._assert(2 * FAKE_VIRT_LOCAL_GB, 'local_gb_used') + self._assert(2 * FAKE_VIRT_VCPUS, 'vcpus_used') + + def test_claim_and_audit(self): + self.tracker.resize_claim(self.context, self.instance, + self.instance_type, self.limits) + + self.tracker.update_available_resource(self.context) + + self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used') + self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used') + self._assert(FAKE_VIRT_VCPUS, 'vcpus_used') + + def test_same_host(self): + self.limits['vcpu'] = 3 + + src_type = self._fake_instance_type_create(id=2, memory_mb=1, + root_gb=1, ephemeral_gb=0, vcpus=1) + dest_type = self._fake_instance_type_create(id=2, memory_mb=2, + root_gb=2, ephemeral_gb=1, vcpus=2) + + # make an instance of src_type: + instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=0, + vcpus=1, instance_type_id=2) + + self.tracker.instance_claim(self.context, instance, self.limits) + + # resize to dest_type: + claim = self.tracker.resize_claim(self.context, self.instance, + dest_type, self.limits) + + self._assert(3, 'memory_mb_used') + self._assert(4, 'local_gb_used') + self._assert(3, 'vcpus_used') + + self.tracker.update_available_resource(self.context) + claim.abort() + + # only the original instance should remain, not the migration: + self._assert(1, 'memory_mb_used') + self._assert(1, 'local_gb_used') + self._assert(1, 'vcpus_used') + self.assertEqual(1, len(self.tracker.tracked_instances)) + self.assertEqual(0, len(self.tracker.tracked_migrations)) + + def test_revert(self): + self.tracker.resize_claim(self.context, self.instance, + self.instance_type, self.limits) + migration, itype = self.tracker.tracked_migrations[ + self.instance['uuid']] + self.tracker.revert_resize(self.context, migration) + + self.assertEqual(0, len(self.tracker.tracked_instances)) + self.assertEqual(0, len(self.tracker.tracked_migrations)) + self._assert(0, 'memory_mb_used') + self._assert(0, 'local_gb_used') + self._assert(0, 'vcpus_used') + + def test_revert_reserve_source(self): + # if a revert has started at the API and audit runs on + # the source compute before the instance flips back to source, + # resources should still be help at the source based on the + # migration: + dest = "desthost" + dest_tracker = self._tracker(host=dest) + dest_tracker.update_available_resource(self.context) + + self.instance = self._fake_instance(memory_mb=FAKE_VIRT_MEMORY_MB, + root_gb=FAKE_VIRT_LOCAL_GB, ephemeral_gb=0, + vcpus=FAKE_VIRT_VCPUS, instance_type_id=1) + + values = {'source_compute': self.host, 'dest_compute': dest, + 'old_instance_type_id': 1, 'new_instance_type_id': 1, + 'status': 'post-migrating', + 'instance_uuid': self.instance['uuid']} + migration = self._fake_migration_create(self.context, values) + + # attach an instance to the destination host tracker: + dest_tracker.instance_claim(self.context, self.instance) + + self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used', + tracker=dest_tracker) + self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used', + tracker=dest_tracker) + self._assert(FAKE_VIRT_VCPUS, 'vcpus_used', + tracker=dest_tracker) + + # audit and recheck to confirm migration doesn't get double counted + # on dest: + dest_tracker.update_available_resource(self.context) + + self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used', + tracker=dest_tracker) + self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used', + tracker=dest_tracker) + self._assert(FAKE_VIRT_VCPUS, 'vcpus_used', + tracker=dest_tracker) + + # apply the migration to the source host tracker: + self.tracker.update_available_resource(self.context) + + self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used') + self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used') + self._assert(FAKE_VIRT_VCPUS, 'vcpus_used') + + # flag the instance and migration as reverting and re-audit: + self.instance['vm_state'] = vm_states.RESIZED + self.instance['task_state'] = task_states.RESIZE_REVERTING + self.tracker.update_available_resource(self.context) + + self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used') + self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used') + self._assert(FAKE_VIRT_VCPUS, 'vcpus_used') + + def test_resize_filter(self): + instance = self._fake_instance(vm_state=vm_states.ACTIVE, + task_state=task_states.SUSPENDING) + self.assertFalse(self.tracker._instance_in_resize_state(instance)) + + instance = self._fake_instance(vm_state=vm_states.RESIZED, + task_state=task_states.SUSPENDING) + self.assertTrue(self.tracker._instance_in_resize_state(instance)) + + instance = self._fake_instance(vm_state=vm_states.ACTIVE, + task_state=task_states.RESIZE_MIGRATING) + self.assertTrue(self.tracker._instance_in_resize_state(instance)) + + def test_dupe_filter(self): + self._fake_instance_type_create(id=2, memory_mb=1, root_gb=1, + ephemeral_gb=1, vcpus=1) + + instance = self._fake_instance(host=self.host) + + values = {'source_compute': self.host, 'dest_compute': self.host, + 'instance_uuid': instance['uuid'], 'new_instance_type_id': 2} + self._fake_migration_create(self.context, values) + self._fake_migration_create(self.context, values) + + self.tracker.update_available_resource(self.context) + self.assertEqual(1, len(self.tracker.tracked_migrations))