diff --git a/epu/epumanagement/decider.py b/epu/epumanagement/decider.py index 9c067cc0..188d58d3 100644 --- a/epu/epumanagement/decider.py +++ b/epu/epumanagement/decider.py @@ -22,6 +22,7 @@ DEFAULT_ENGINE_CLASS = "epu.decisionengine.impls.simplest.SimplestEngine" DEFAULT_SENSOR_SAMPLE_PERIOD = 90 +DEFAULT_SENSOR_SAMPLE_FUNCTION = 'Average' class EPUMDecider(object): """The decider handles critical sections related to running decision engine cycles. @@ -207,11 +208,55 @@ def _get_engine_sensor_state(self, domain): log.debug("No engine config for sensor available") return + domain_id = domain.domain_id + user = domain.owner sensor_type = config.get(CONF_SENSOR_TYPE) period = 60 monitor_sensors = config.get('monitor_sensors', []) + monitor_domain_sensors = config.get('monitor_domain_sensors', []) sample_period = config.get('sample_period', DEFAULT_SENSOR_SAMPLE_PERIOD) - sample_function = config.get('sample_function', 'Average') + sample_function = config.get('sample_function', DEFAULT_SENSOR_SAMPLE_FUNCTION) + + sensor_aggregator = self._get_sensor_aggregator(config) + if sensor_aggregator is None: + return + + # Support only OpenTSDB sensors for now + domain_sensor_state = {} + if sensor_type in (OPENTSDB_SENSOR_TYPE, MOCK_CLOUDWATCH_SENSOR_TYPE): + for metric in monitor_domain_sensors: + start_time = None + end_time = None + dimensions = {} + + if sensor_type in (MOCK_CLOUDWATCH_SENSOR_TYPE): + # Only for testing. Won't work with real cloudwatch + end_time = datetime.utcnow() + start_time = end_time - timedelta(seconds=sample_period) + dimensions = {'DomainId': domain_id} + elif sensor_type == OPENTSDB_SENSOR_TYPE: + # OpenTSDB requires local time + end_time = datetime.now() + start_time = end_time - timedelta(seconds=sample_period) + if not instance.hostname: + log.warning("No hostname for '%s'. skipping for now" % instance.iaas_id) + continue + dimensions = {'domain': domain_id, 'user': user} + else: + log.warning("Not sure how to setup '%s' query, skipping" % sensor_type) + continue + + state = sensor_aggregator.get_metric_statistics(period, start_time, + end_time, metric, sample_function, dimensions) + for index, metric_result in state.iteritems(): + if index not in (domain_id,): + continue + series = metric_result.get(Statistics.SERIES) + if series is not None and series != []: + domain_sensor_state[metric] = metric_result + + if domain_sensor_state != {}: + domain.add_domain_sensor_data(domain_sensor_state) instances = domain.get_instances() @@ -227,14 +272,12 @@ def _get_engine_sensor_state(self, domain): config['access_key'] = credentials.get('access_key') config['secret_key'] = credentials.get('secret_key') - sensor_aggregator = self._get_sensor_aggregator(config) - if sensor_aggregator is None: - continue - - end_time = datetime.utcnow() - start_time = end_time - timedelta(seconds=sample_period) - + start_time = None + end_time = None + dimensions = {} if sensor_type in (CLOUDWATCH_SENSOR_TYPE, MOCK_CLOUDWATCH_SENSOR_TYPE): + end_time = datetime.utcnow() + start_time = end_time - timedelta(seconds=sample_period) dimensions = {'InstanceId': instance.iaas_id} elif sensor_type == OPENTSDB_SENSOR_TYPE: # OpenTSDB requires local time @@ -246,8 +289,8 @@ def _get_engine_sensor_state(self, domain): dimensions = {'host': instance.hostname} else: - log.warning("Not sure how to set dimensions for '%s' query" % sensor_type) - dimensions = {} + log.warning("Not sure how to setup '%s' query, skipping" % sensor_type) + continue state = sensor_aggregator.get_metric_statistics(period, start_time, end_time, metric, sample_function, dimensions) diff --git a/epu/epumanagement/reactor.py b/epu/epumanagement/reactor.py index 03749ef7..7221337f 100644 --- a/epu/epumanagement/reactor.py +++ b/epu/epumanagement/reactor.py @@ -118,6 +118,7 @@ def describe_domain(self, caller, domain_id): return None domain_desc = dict(name=domain.domain_id, config=domain.get_all_config(), + sensor_data=domain.get_domain_sensor_data(), instances=[i.to_dict() for i in domain.get_instances()]) return domain_desc diff --git a/epu/epumanagement/store.py b/epu/epumanagement/store.py index d7bc4c88..b715a0aa 100644 --- a/epu/epumanagement/store.py +++ b/epu/epumanagement/store.py @@ -204,6 +204,25 @@ def add_engine_config(self, conf): @param conf dictionary mapping strings to JSON-serializable objects """ + def add_domain_sensor_data(self, sensor_data): + """Store a dictionary of domain sensor data. + + This operation replaces previous sensor data + + data is in the format: + { + 'metric':{ + 'Average': 5 + } + } + + @param sensor_data dictionary mapping strings to JSON-serializable objects + """ + + def get_domain_sensor_data(self): + """Retrieve a dictionary of sensor data from the store + """ + def get_health_config(self, keys=None): """Retrieve the health config dictionary. @@ -402,6 +421,17 @@ def new_instance_sensor(self, instance_id, sensor_data): newinstance = CoreInstance(**d) self.update_instance(newinstance, previous=instance) + def new_domain_sensor(self, sensor_data): + """Record domain sensor change + + @param sensor_data The state + """ + + log.info("Domain %s got sensor data %s", self.domain_id, sensor_data) + + previous_sensor_data = self.get_sensor_data() + self.update_sensor_data(sensor_data, previous=previous_sensor_data) + def ouagent_address(self, instance_id): """Return address to send messages to a particular OU Agent, or None""" instance = self.get_instance(instance_id) @@ -663,6 +693,8 @@ def __init__(self, owner, domain_id, config): self.instances = {} self.instance_heartbeats = {} + self.domain_sensor_data = {} + def is_removed(self): """Whether this domain has been marked for removal @@ -709,6 +741,28 @@ def add_engine_config(self, conf): self.engine_config[k] = json.dumps(v) self.engine_config_version += 1 + def get_domain_sensor_data(self): + """Retrieve a dictionary of sensor data from the store + """ + return self.domain_sensor_data + + def add_domain_sensor_data(self, sensor_data): + """Store a dictionary of domain sensor data. + + This operation replaces previous sensor data + + data is in the format: + { + 'metric':{ + 'Average': 5 + } + } + + @param sensor_data dictionary mapping strings to JSON-serializable objects + """ + print "WRITING: %s" % sensor_data + self.domain_sensor_data = sensor_data + def get_health_config(self, keys=None): """Retrieve the health config dictionary. @@ -854,6 +908,7 @@ def get_engine_state(self): """ s = self.engine_state #TODO not yet dealing with sensors or change lists + s.sensors = self.get_domain_sensor_data() s.instances = dict((i.instance_id, i) for i in self.get_instances()) return s @@ -1217,6 +1272,7 @@ class ZooKeeperDomainStore(DomainStore): SUBSCRIBERS_PATH = "subscribers" INSTANCES_PATH = "instances" INSTANCE_HEARTBEAT_PATH = "heartbeat" + DOMAIN_SENSOR_PATH = "domainsensor" def __init__(self, owner, domain_id, kazoo, path): super(ZooKeeperDomainStore, self).__init__(owner, domain_id) @@ -1227,6 +1283,7 @@ def __init__(self, owner, domain_id, kazoo, path): self.removed_path = self.path + "/" + self.REMOVED_PATH self.subscribers_path = self.path + "/" + self.SUBSCRIBERS_PATH self.instances_path = self.path + "/" + self.INSTANCES_PATH + self.domain_sensor_path = self.path + "/" + self.DOMAIN_SENSOR_PATH self.engine_state = EngineState() @@ -1306,6 +1363,58 @@ def add_engine_config(self, conf): """ self._add_config(EPUM_CONF_ENGINE, conf) + def get_domain_sensor_data(self): + """Retrieve a dictionary of sensor data from the store + """ + path = self.domain_sensor_path + try: + sensor_data = self.kazoo.get(path) + except NoNodeException: + sensor_data = {} + return sensor_data + + def add_domain_sensor_data(self, sensor_data): + """Store a dictionary of domain sensor data. + + This operation replaces previous sensor data + + data is in the format: + { + 'metric':{ + 'Average': 5 + } + } + + @param sensor_data dictionary mapping strings to JSON-serializable objects + + """ + + try: + sensor_json = json.dumps(sensor_data) + except Exception: + log.exception("Could not convert sensor data to JSON") + return + + path = self.domain_sensor_path + version = -1 + + try: + self.kazoo.get(path) + except NoNodeException: + try: + self.kazoo.create(path, sensor_json, makepath=True) + except BadVersionException: + raise WriteConflictError() + except NoNodeException: + raise NotFoundError() + else: + try: + self.kazoo.set(path, sensor_json, version) + except BadVersionException: + raise WriteConflictError() + except NoNodeException: + raise NotFoundError() + def get_health_config(self, keys=None): """Retrieve the health config dictionary. diff --git a/epu/epumanagement/test/mocks.py b/epu/epumanagement/test/mocks.py index db68dba6..b554b564 100644 --- a/epu/epumanagement/test/mocks.py +++ b/epu/epumanagement/test/mocks.py @@ -277,18 +277,21 @@ def get_metric_statistics(self, period, start_time, end_time, metric_name, statistics, dimensions=None): metrics = {} - instanceid = dimensions['InstanceId'] - if instanceid is None: - instance = None + instanceid = dimensions.get('InstanceId') + domainid = dimensions.get('DomainId') + if instanceid is None and domainid is None: + index = None elif isinstance(instanceid, basestring): - instance = instanceid + index = instanceid + elif isinstance(domainid, basestring): + index = domainid else: - instance = instanceid[0] + index = instanceid[0] try: average = sum(self.series_data)/len(self.series_data) except ZeroDivisionError: average = 0 - metrics[instance] = {Statistics.SERIES: self.series_data, Statistics.AVERAGE: average, + metrics[index] = {Statistics.SERIES: self.series_data, Statistics.AVERAGE: average, Statistics.MAXIMUM: max(self.series_data), Statistics.MINIMUM: min(self.series_data), Statistics.SUM: sum(self.series_data), Statistics.SAMPLE_COUNT: len(self.series_data)} return metrics diff --git a/epu/epumanagement/test/test_epumanagement.py b/epu/epumanagement/test/test_epumanagement.py index 7c2e634c..40f21bf2 100644 --- a/epu/epumanagement/test/test_epumanagement.py +++ b/epu/epumanagement/test/test_epumanagement.py @@ -93,6 +93,7 @@ def _config_sensor_domainconf(self, minimum_n): 'minimum_vms': minimum_n, 'metric': 'load', 'monitor_sensors': ['load',], + 'monitor_domain_sensors': ['queuelen',], 'sample_function': 'Average'} return {EPUM_CONF_ENGINE: engine} @@ -224,6 +225,10 @@ def test_sensor_data(self): self.assertEqual(len(domain_desc['instances']), 1) # just make sure it now has sensor_data + self.assertIn("sensor_data", domain_desc) + self.assertIn("queuelen", domain_desc['sensor_data']) + self.assertIn(Statistics.SERIES, domain_desc['sensor_data']['queuelen']) + instance = domain_desc['instances'][0] self.assertIn("instance_id", instance) self.assertIn("state", instance)