Skip to content

Commit

Permalink
Add domain sensor support to epum
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Feb 9, 2013
1 parent bba9e6f commit e92d02f
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 16 deletions.
63 changes: 53 additions & 10 deletions epu/epumanagement/decider.py
Expand Up @@ -22,6 +22,7 @@

DEFAULT_ENGINE_CLASS = "epu.decisionengine.impls.simplest.SimplestEngine"
DEFAULT_SENSOR_SAMPLE_PERIOD = 90
DEFAULT_SENSOR_SAMPLE_FUNCTION = 'Average'

class EPUMDecider(object):
"""The decider handles critical sections related to running decision engine cycles.
Expand Down Expand Up @@ -207,11 +208,55 @@ def _get_engine_sensor_state(self, domain):
log.debug("No engine config for sensor available")
return

domain_id = domain.domain_id
user = domain.owner
sensor_type = config.get(CONF_SENSOR_TYPE)
period = 60
monitor_sensors = config.get('monitor_sensors', [])
monitor_domain_sensors = config.get('monitor_domain_sensors', [])
sample_period = config.get('sample_period', DEFAULT_SENSOR_SAMPLE_PERIOD)
sample_function = config.get('sample_function', 'Average')
sample_function = config.get('sample_function', DEFAULT_SENSOR_SAMPLE_FUNCTION)

sensor_aggregator = self._get_sensor_aggregator(config)
if sensor_aggregator is None:
return

# Support only OpenTSDB sensors for now
domain_sensor_state = {}
if sensor_type in (OPENTSDB_SENSOR_TYPE, MOCK_CLOUDWATCH_SENSOR_TYPE):
for metric in monitor_domain_sensors:
start_time = None
end_time = None
dimensions = {}

if sensor_type in (MOCK_CLOUDWATCH_SENSOR_TYPE):
# Only for testing. Won't work with real cloudwatch
end_time = datetime.utcnow()
start_time = end_time - timedelta(seconds=sample_period)
dimensions = {'DomainId': domain_id}
elif sensor_type == OPENTSDB_SENSOR_TYPE:
# OpenTSDB requires local time
end_time = datetime.now()
start_time = end_time - timedelta(seconds=sample_period)
if not instance.hostname:
log.warning("No hostname for '%s'. skipping for now" % instance.iaas_id)
continue
dimensions = {'domain': domain_id, 'user': user}
else:
log.warning("Not sure how to setup '%s' query, skipping" % sensor_type)
continue

state = sensor_aggregator.get_metric_statistics(period, start_time,
end_time, metric, sample_function, dimensions)
for index, metric_result in state.iteritems():
if index not in (domain_id,):
continue
series = metric_result.get(Statistics.SERIES)
if series is not None and series != []:
domain_sensor_state[metric] = metric_result

if domain_sensor_state != {}:
domain.add_domain_sensor_data(domain_sensor_state)


instances = domain.get_instances()
Expand All @@ -227,14 +272,12 @@ def _get_engine_sensor_state(self, domain):
config['access_key'] = credentials.get('access_key')
config['secret_key'] = credentials.get('secret_key')

sensor_aggregator = self._get_sensor_aggregator(config)
if sensor_aggregator is None:
continue

end_time = datetime.utcnow()
start_time = end_time - timedelta(seconds=sample_period)

start_time = None
end_time = None
dimensions = {}
if sensor_type in (CLOUDWATCH_SENSOR_TYPE, MOCK_CLOUDWATCH_SENSOR_TYPE):
end_time = datetime.utcnow()
start_time = end_time - timedelta(seconds=sample_period)
dimensions = {'InstanceId': instance.iaas_id}
elif sensor_type == OPENTSDB_SENSOR_TYPE:
# OpenTSDB requires local time
Expand All @@ -246,8 +289,8 @@ def _get_engine_sensor_state(self, domain):

dimensions = {'host': instance.hostname}
else:
log.warning("Not sure how to set dimensions for '%s' query" % sensor_type)
dimensions = {}
log.warning("Not sure how to setup '%s' query, skipping" % sensor_type)
continue

state = sensor_aggregator.get_metric_statistics(period, start_time,
end_time, metric, sample_function, dimensions)
Expand Down
1 change: 1 addition & 0 deletions epu/epumanagement/reactor.py
Expand Up @@ -118,6 +118,7 @@ def describe_domain(self, caller, domain_id):
return None
domain_desc = dict(name=domain.domain_id,
config=domain.get_all_config(),
sensor_data=domain.get_domain_sensor_data(),
instances=[i.to_dict() for i in domain.get_instances()])
return domain_desc

Expand Down
109 changes: 109 additions & 0 deletions epu/epumanagement/store.py
Expand Up @@ -204,6 +204,25 @@ def add_engine_config(self, conf):
@param conf dictionary mapping strings to JSON-serializable objects
"""

def add_domain_sensor_data(self, sensor_data):
"""Store a dictionary of domain sensor data.
This operation replaces previous sensor data
data is in the format:
{
'metric':{
'Average': 5
}
}
@param sensor_data dictionary mapping strings to JSON-serializable objects
"""

def get_domain_sensor_data(self):
"""Retrieve a dictionary of sensor data from the store
"""

def get_health_config(self, keys=None):
"""Retrieve the health config dictionary.
Expand Down Expand Up @@ -402,6 +421,17 @@ def new_instance_sensor(self, instance_id, sensor_data):
newinstance = CoreInstance(**d)
self.update_instance(newinstance, previous=instance)

def new_domain_sensor(self, sensor_data):
"""Record domain sensor change
@param sensor_data The state
"""

log.info("Domain %s got sensor data %s", self.domain_id, sensor_data)

previous_sensor_data = self.get_sensor_data()
self.update_sensor_data(sensor_data, previous=previous_sensor_data)

def ouagent_address(self, instance_id):
"""Return address to send messages to a particular OU Agent, or None"""
instance = self.get_instance(instance_id)
Expand Down Expand Up @@ -663,6 +693,8 @@ def __init__(self, owner, domain_id, config):
self.instances = {}
self.instance_heartbeats = {}

self.domain_sensor_data = {}


def is_removed(self):
"""Whether this domain has been marked for removal
Expand Down Expand Up @@ -709,6 +741,28 @@ def add_engine_config(self, conf):
self.engine_config[k] = json.dumps(v)
self.engine_config_version += 1

def get_domain_sensor_data(self):
"""Retrieve a dictionary of sensor data from the store
"""
return self.domain_sensor_data

def add_domain_sensor_data(self, sensor_data):
"""Store a dictionary of domain sensor data.
This operation replaces previous sensor data
data is in the format:
{
'metric':{
'Average': 5
}
}
@param sensor_data dictionary mapping strings to JSON-serializable objects
"""
print "WRITING: %s" % sensor_data
self.domain_sensor_data = sensor_data

def get_health_config(self, keys=None):
"""Retrieve the health config dictionary.
Expand Down Expand Up @@ -854,6 +908,7 @@ def get_engine_state(self):
"""
s = self.engine_state
#TODO not yet dealing with sensors or change lists
s.sensors = self.get_domain_sensor_data()
s.instances = dict((i.instance_id, i) for i in self.get_instances())
return s

Expand Down Expand Up @@ -1217,6 +1272,7 @@ class ZooKeeperDomainStore(DomainStore):
SUBSCRIBERS_PATH = "subscribers"
INSTANCES_PATH = "instances"
INSTANCE_HEARTBEAT_PATH = "heartbeat"
DOMAIN_SENSOR_PATH = "domainsensor"

def __init__(self, owner, domain_id, kazoo, path):
super(ZooKeeperDomainStore, self).__init__(owner, domain_id)
Expand All @@ -1227,6 +1283,7 @@ def __init__(self, owner, domain_id, kazoo, path):
self.removed_path = self.path + "/" + self.REMOVED_PATH
self.subscribers_path = self.path + "/" + self.SUBSCRIBERS_PATH
self.instances_path = self.path + "/" + self.INSTANCES_PATH
self.domain_sensor_path = self.path + "/" + self.DOMAIN_SENSOR_PATH

self.engine_state = EngineState()

Expand Down Expand Up @@ -1306,6 +1363,58 @@ def add_engine_config(self, conf):
"""
self._add_config(EPUM_CONF_ENGINE, conf)

def get_domain_sensor_data(self):
"""Retrieve a dictionary of sensor data from the store
"""
path = self.domain_sensor_path
try:
sensor_data = self.kazoo.get(path)
except NoNodeException:
sensor_data = {}
return sensor_data

def add_domain_sensor_data(self, sensor_data):
"""Store a dictionary of domain sensor data.
This operation replaces previous sensor data
data is in the format:
{
'metric':{
'Average': 5
}
}
@param sensor_data dictionary mapping strings to JSON-serializable objects
"""

try:
sensor_json = json.dumps(sensor_data)
except Exception:
log.exception("Could not convert sensor data to JSON")
return

path = self.domain_sensor_path
version = -1

try:
self.kazoo.get(path)
except NoNodeException:
try:
self.kazoo.create(path, sensor_json, makepath=True)
except BadVersionException:
raise WriteConflictError()
except NoNodeException:
raise NotFoundError()
else:
try:
self.kazoo.set(path, sensor_json, version)
except BadVersionException:
raise WriteConflictError()
except NoNodeException:
raise NotFoundError()

def get_health_config(self, keys=None):
"""Retrieve the health config dictionary.
Expand Down
15 changes: 9 additions & 6 deletions epu/epumanagement/test/mocks.py
Expand Up @@ -277,18 +277,21 @@ def get_metric_statistics(self, period, start_time, end_time, metric_name,
statistics, dimensions=None):

metrics = {}
instanceid = dimensions['InstanceId']
if instanceid is None:
instance = None
instanceid = dimensions.get('InstanceId')
domainid = dimensions.get('DomainId')
if instanceid is None and domainid is None:
index = None
elif isinstance(instanceid, basestring):
instance = instanceid
index = instanceid
elif isinstance(domainid, basestring):
index = domainid
else:
instance = instanceid[0]
index = instanceid[0]
try:
average = sum(self.series_data)/len(self.series_data)
except ZeroDivisionError:
average = 0
metrics[instance] = {Statistics.SERIES: self.series_data, Statistics.AVERAGE: average,
metrics[index] = {Statistics.SERIES: self.series_data, Statistics.AVERAGE: average,
Statistics.MAXIMUM: max(self.series_data), Statistics.MINIMUM: min(self.series_data),
Statistics.SUM: sum(self.series_data), Statistics.SAMPLE_COUNT: len(self.series_data)}
return metrics
5 changes: 5 additions & 0 deletions epu/epumanagement/test/test_epumanagement.py
Expand Up @@ -93,6 +93,7 @@ def _config_sensor_domainconf(self, minimum_n):
'minimum_vms': minimum_n,
'metric': 'load',
'monitor_sensors': ['load',],
'monitor_domain_sensors': ['queuelen',],
'sample_function': 'Average'}
return {EPUM_CONF_ENGINE: engine}

Expand Down Expand Up @@ -224,6 +225,10 @@ def test_sensor_data(self):
self.assertEqual(len(domain_desc['instances']), 1)

# just make sure it now has sensor_data
self.assertIn("sensor_data", domain_desc)
self.assertIn("queuelen", domain_desc['sensor_data'])
self.assertIn(Statistics.SERIES, domain_desc['sensor_data']['queuelen'])

instance = domain_desc['instances'][0]
self.assertIn("instance_id", instance)
self.assertIn("state", instance)
Expand Down

0 comments on commit e92d02f

Please sign in to comment.