Skip to content

Commit

Permalink
Merge "Calculate virtual free capacity and notify"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Dec 2, 2016
2 parents c110122 + b67a416 commit 3c83529
Show file tree
Hide file tree
Showing 10 changed files with 801 additions and 2 deletions.
5 changes: 5 additions & 0 deletions cinder/manager.py
Expand Up @@ -174,6 +174,11 @@ def _publish_service_capabilities(self, context):
self.service_name,
self.host,
self.last_capabilities)
self.scheduler_rpcapi.notify_service_capabilities(
context,
self.service_name,
self.host,
self.last_capabilities)

def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs)
Expand Down
6 changes: 6 additions & 0 deletions cinder/scheduler/driver.py
Expand Up @@ -103,6 +103,12 @@ def update_service_capabilities(self, service_name, host, capabilities):
host,
capabilities)

def notify_service_capabilities(self, service_name, host, capabilities):
"""Notify capability update from a service node."""
self.host_manager.notify_service_capabilities(service_name,
host,
capabilities)

def host_passes_filters(self, context, host, request_spec,
filter_properties):
"""Check if the specified host passes the filters."""
Expand Down
176 changes: 176 additions & 0 deletions cinder/scheduler/host_manager.py
Expand Up @@ -28,6 +28,7 @@
from cinder import context as cinder_context
from cinder import exception
from cinder import objects
from cinder import utils
from cinder.i18n import _LI, _LW
from cinder.scheduler import filters
from cinder.volume import utils as vol_utils
Expand Down Expand Up @@ -345,6 +346,17 @@ class HostManager(object):

host_state_cls = HostState

REQUIRED_KEYS = frozenset([
'pool_name',
'total_capacity_gb',
'free_capacity_gb',
'allocated_capacity_gb',
'provisioned_capacity_gb',
'thin_provisioning_support',
'thick_provisioning_support',
'max_over_subscription_ratio',
'reserved_percentage'])

def __init__(self):
self.service_states = {} # { <host>: {<service>: {cap k : v}}}
self.host_state_map = {}
Expand All @@ -358,6 +370,7 @@ def __init__(self):

self._no_capabilities_hosts = set() # Hosts having no capabilities
self._update_host_state_map(cinder_context.get_admin_context())
self.service_states_last_update = {}

def _choose_host_filters(self, filter_cls_names):
"""Return a list of available filter names.
Expand Down Expand Up @@ -441,6 +454,24 @@ def update_service_capabilities(self, service_name, host, capabilities):
# Copy the capabilities, so we don't modify the original dict
capab_copy = dict(capabilities)
capab_copy["timestamp"] = timeutils.utcnow() # Reported time

# Set the default capabilities in case None is set.
capab_old = self.service_states.get(host, {"timestamp": 0})
capab_last_update = self.service_states_last_update.get(
host, {"timestamp": 0})

# If the capabilites are not changed and the timestamp is older,
# record the capabilities.

# There are cases: capab_old has the capabilities set,
# but the timestamp may be None in it. So does capab_last_update.

if (not self._get_updated_pools(capab_old, capab_copy)) and (
(not capab_old.get("timestamp")) or
(not capab_last_update.get("timestamp")) or
(capab_last_update["timestamp"] < capab_old["timestamp"])):
self.service_states_last_update[host] = capab_old

self.service_states[host] = capab_copy

LOG.debug("Received %(service_name)s service update from "
Expand All @@ -450,6 +481,34 @@ def update_service_capabilities(self, service_name, host, capabilities):

self._no_capabilities_hosts.discard(host)

def notify_service_capabilities(self, service_name, host, capabilities):
"""Notify the ceilometer with updated volume stats"""
if service_name != 'volume':
return

updated = []
capa_new = self.service_states.get(host, {})
timestamp = timeutils.utcnow()

# Compare the capabilities and timestamps to decide notifying
if not capa_new:
updated = self._get_updated_pools(capa_new, capabilities)
else:
if timestamp > self.service_states[host]["timestamp"]:
updated = self._get_updated_pools(self.service_states[host],
capabilities)
if not updated:
updated = self._get_updated_pools(
self.service_states_last_update.get(host, {}),
self.service_states.get(host, {}))

if updated:
capab_copy = dict(capabilities)
capab_copy["timestamp"] = timestamp
# If capabilities changes, notify and record the capabilities.
self.service_states_last_update[host] = capab_copy
self.get_usage_and_notify(capabilities, updated, host, timestamp)

def has_all_capabilities(self):
return len(self._no_capabilities_hosts) == 0

Expand Down Expand Up @@ -533,3 +592,120 @@ def get_pools(self, context):
all_pools.append(new_pool)

return all_pools

def get_usage_and_notify(self, capa_new, updated_pools, host, timestamp):
context = cinder_context.get_admin_context()
usage = self._get_usage(capa_new, updated_pools, host, timestamp)

self._notify_capacity_usage(context, usage)

def _get_usage(self, capa_new, updated_pools, host, timestamp):
pools = capa_new.get('pools')
usage = []
if pools and isinstance(pools, list):
backend_usage = dict(type='backend',
name_to_id=host,
total=0,
free=0,
allocated=0,
provisioned=0,
virtual_free=0,
reported_at=timestamp)

# Process the usage.
for pool in pools:
pool_usage = self._get_pool_usage(pool, host, timestamp)
if pool_usage:
backend_usage["total"] += pool_usage["total"]
backend_usage["free"] += pool_usage["free"]
backend_usage["allocated"] += pool_usage["allocated"]
backend_usage["provisioned"] += pool_usage["provisioned"]
backend_usage["virtual_free"] += pool_usage["virtual_free"]
# Only the updated pool is reported.
if pool in updated_pools:
usage.append(pool_usage)
usage.append(backend_usage)
return usage

def _get_pool_usage(self, pool, host, timestamp):
total = pool["total_capacity_gb"]
free = pool["free_capacity_gb"]

unknowns = ["unknown", "infinite", None]
if (total in unknowns) or (free in unknowns):
return {}

allocated = pool["allocated_capacity_gb"]
provisioned = pool["provisioned_capacity_gb"]
reserved = pool["reserved_percentage"]
ratio = pool["max_over_subscription_ratio"]
support = pool["thin_provisioning_support"]

virtual_free = utils.calculate_virtual_free_capacity(
total,
free,
provisioned,
support,
ratio,
reserved,
support)

pool_usage = dict(
type='pool',
name_to_id='#'.join([host, pool['pool_name']]),
total=float(total),
free=float(free),
allocated=float(allocated),
provisioned=float(provisioned),
virtual_free=float(virtual_free),
reported_at=timestamp)

return pool_usage

def _get_updated_pools(self, old_capa, new_capa):
# Judge if the capabilities should be reported.

new_pools = new_capa.get('pools', [])
if not new_pools:
return []

if isinstance(new_pools, list):
# If the volume_stats is not well prepared, don't notify.
if not all(
self.REQUIRED_KEYS.issubset(pool) for pool in new_pools):
return []
else:
LOG.debug("The reported capabilities are not well structured...")
return []

old_pools = old_capa.get('pools', [])
if not old_pools:
return new_pools

updated_pools = []

newpools = {}
oldpools = {}
for new_pool in new_pools:
newpools[new_pool['pool_name']] = new_pool

for old_pool in old_pools:
oldpools[old_pool['pool_name']] = old_pool

for key in newpools.keys():
if key in oldpools.keys():
for k in self.REQUIRED_KEYS:
if newpools[key][k] != oldpools[key][k]:
updated_pools.append(newpools[key])
break
else:
updated_pools.append(newpools[key])

return updated_pools

def _notify_capacity_usage(self, context, usage):
if usage:
for u in usage:
vol_utils.notify_about_capacity_usage(
context, u, u['type'], None, None)
LOG.debug("Publish storage capacity: %s.", usage)
9 changes: 9 additions & 0 deletions cinder/scheduler/manager.py
Expand Up @@ -88,6 +88,15 @@ def update_service_capabilities(self, context, service_name=None,
host,
capabilities)

def notify_service_capabilities(self, context, service_name,
host, capabilities):
"""Process a capability update from a service node."""
if capabilities is None:
capabilities = {}
self.driver.notify_service_capabilities(service_name,
host,
capabilities)

def _wait_for_scheduler(self):
# NOTE(dulek): We're waiting for scheduler to announce that it's ready
# or CONF.periodic_interval seconds from service startup has passed.
Expand Down
10 changes: 9 additions & 1 deletion cinder/scheduler/rpcapi.py
Expand Up @@ -58,9 +58,10 @@ class SchedulerAPI(rpc.RPCAPI):
set to 2.3.
3.0 - Remove 2.x compatibility
3.1 - Adds notify_service_capabilities()
"""

RPC_API_VERSION = '3.0'
RPC_API_VERSION = '3.1'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler'
Expand Down Expand Up @@ -139,3 +140,10 @@ def update_service_capabilities(self, ctxt, service_name, host,
cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)

def notify_service_capabilities(self, ctxt, service_name,
host, capabilities):
cctxt = self._get_cctxt(version='3.1')
cctxt.cast(ctxt, 'notify_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)

0 comments on commit 3c83529

Please sign in to comment.