Permalink
Browse files

Add parameter checking and heuristic to check how to query TS

  • Loading branch information...
1 parent b6aa283 commit dd4e64584dcc8531df8456b6b4aa592123106e31 @oldpatricka oldpatricka committed Sep 25, 2012
Showing with 198 additions and 24 deletions.
  1. +91 −7 epu/highavailability/policy.py
  2. +2 −0 epu/sensors/__init__.py
  3. +105 −17 epu/sensors/trafficsentinel.py
View
98 epu/highavailability/policy.py
@@ -1,10 +1,11 @@
import logging
import datetime
+from epu.sensors import Statistics
from epu.sensors.trafficsentinel import TrafficSentinel
from epu.states import ProcessState, HAState
-log = logging.getLogger(__name__)
+log = logging.getLogger(__name__)
def dummy_schedule_process_callback(*args, **kwargs):
log.debug("dummy_schedule_process_callback(%s, %s) called" % args, kwargs)
@@ -238,6 +239,8 @@ def __init__(self, parameters=None, process_definition_id=None,
username = aggregator_config.get('username')
password = aggregator_config.get('password')
self._sensor_aggregator = TrafficSentinel(host, username, password)
+ self.app_metrics = self._sensor_aggregator.app_metrics
+ self.host_metrics = self._sensor_aggregator.app_metrics
else:
raise Exception("Don't know what to do with %s aggregator type" % aggregator_type)
@@ -251,10 +254,10 @@ def parameters(self):
sample_period: Number of seconds of sample data to use (eg. if 3600, use sample data from 1 hour ago until present time
sample_function: Statistical function to apply to sampled data. Choose from Average, Sum, SampleCount, Maximum, Minimum
cooldown_period: Minimum time in seconds between scale up or scale down actions
- scaleupthreshhold: If the sampled metric is above this value, scale up the number of processes
- scaleupnprocesses: Number of processes to scale up by
- scaledownthreshhold: If the sampled metric is below this value, scale down the number of processes
- scaledownnprocesses: Number of processes to scale down by
+ scale_up_threshhold: If the sampled metric is above this value, scale up the number of processes
+ scale_up_n_processes: Number of processes to scale up by
+ scale_down_threshhold: If the sampled metric is below this value, scale down the number of processes
+ scale_down_n_processes: Number of processes to scale down by
minimum_processes: Minimum number of processes to maintain
maximum_processes: Maximum number of processes to maintain
@@ -263,14 +266,91 @@ def parameters(self):
@parameters.setter
def parameters(self, new_parameters):
- # TODO: validate parameters
+
+ if new_parameters.get('metric') is None:
+ log.error("metric_name cannot be None")
+ return
+
+ try:
+ sample = int(new_parameters.get('sample_period'))
+ if sample < 0:
+ raise ValueError()
+ except ValueError:
+ log.error("sample_period '%s' is not a positive integer" % (
+ new_parameters.get('sample_period')))
+
+ if new_parameters.get('sample_function') not in Statistics.ALL:
+ log.error("'%s' is not a known sample_function. Choose from %s" % (
+ new_parameters.get('sample_function'), Statistics.ALL))
+ return
+
+ try:
+ cool = int(new_parameters.get('cooldown_period'))
+ if cool < 0:
+ raise ValueError()
+ except ValueError:
+ log.error("cooldown_period '%s' is not a positive integer" % (
+ new_parameters.get('cooldown_period')))
+ return
+
+ try:
+ float(new_parameters.get('scale_up_threshold'))
+ except ValueError:
+ log.error("scale_up_threshold '%s' is not a floating point number" % (
+ new_parameters.get('scale_up_threshold')))
+ return
+
+ try:
+ int(new_parameters.get('scale_up_n_processes'))
+ except ValueError:
+ log.error("scale_up_n_processes '%s' is not an integer" % (
+ new_parameters.get('scale_up_n_processes')))
+ return
+
+ try:
+ float(new_parameters.get('scale_down_threshold'))
+ except ValueError:
+ log.error("scale_down_threshold '%s' is not a floating point number" % (
+ new_parameters.get('scale_down_threshold')))
+ return
+
+ try:
+ int(new_parameters.get('scale_down_n_processes'))
+ except ValueError:
+ log.error("scale_down_n_processes '%s' is not an integer" % (
+ new_parameters.get('scale_up_n_processes')))
+ return
+
+ try:
+ minimum_processes = int(new_parameters.get('minimum_processes'))
+ if minimum_processes < 0:
+ raise ValueError()
+ except ValueError:
+ log.error("minimum_processes '%s' is not a positive integer" % (
+ new_parameters.get('minimum_processes')))
+ return
+
+ try:
+ maximum_processes = int(new_parameters.get('maximum_processes'))
+ if maximum_processes < 0:
+ raise ValueError()
+ except ValueError:
+ log.error("maximum_processes '%s' is not a positive integer" % (
+ new_parameters.get('maximum_processes')))
+ return
+
+ # phew!
self._parameters = new_parameters
def status(self):
return self._status
def apply_policy(self, all_procs, managed_upids):
+ if self._parameters is None:
+ log.debug("No parameters set, unable to apply policy")
+ return []
+
time_since_last_scale = datetime.datetime.now() - self.last_scale_action
if time_since_last_scale.seconds < self._parameters['cooldown_period']:
log.debug("Returning early from scale test because we're in cooldown")
@@ -309,7 +389,11 @@ def apply_policy(self, all_procs, managed_upids):
metric_name = self._parameters['metric']
sample_function = self._parameters['sample_function']
statistics = [sample_function, ]
- dimensions = {'hostname': hostnames}
+
+ if metric_name in self.app_metrics:
+ dimensions = {'upid': managed_upids}
+ else:
+ dimensions = {'hostname': hostnames}
metric_per_host = self._sensor_aggregator.get_metric_statistics(
period, start_time, end_time, metric_name, statistics, dimensions)
View
2 epu/sensors/__init__.py
@@ -20,6 +20,8 @@ class Statistics(object):
MAXIMUM = "Maximum"
MINIMUM = "Minimum"
+ ALL = (SERIES, AVERAGE, SUM, SAMPLE_COUNT, MAXIMUM, MINIMUM)
+
class ISensorAggregator(object):
"""Abstract Sensor Aggregator class. Sensor aggregator implementations.
View
122 epu/sensors/trafficsentinel.py
@@ -7,14 +7,6 @@
from epu.sensors import ISensorAggregator, Statistics
from epu.exceptions import ProgrammingError
-script = """
-var view = "host";
-var select = "hostname,mem_used,mem_free";
-var where = "";
-var interval = "today";
-
-Query.topN(view, select, where, interval).run().printCSV();
-"""
class TrafficSentinel(ISensorAggregator):
@@ -24,6 +16,8 @@ def __init__(self, host, username=None, password=None):
self.username = username
self.password = password
self.base_url = "https://%s/inmsf/Query" % self.host
+ self.app_metrics = APP_METRICS
+ self.host_metrics = HOST_METRICS
def get_metric_statistics(self, period, start_time, end_time, metric_name,
statistics, dimensions=None):
@@ -43,9 +37,25 @@ def get_metric_statistics(self, period, start_time, end_time, metric_name,
is a value or list of values to filter on.
For example:
dimensions={'hostname'=['alpha.vms.cloud.tld', 'tango.vms.cloud.tld']}
+
+ the dimension 'upid' is a special dimension that gets hashed and mapped to
+ the equivalent value in TS (TODO)
"""
- query_type = 'host'
+ # Ugly heuristic to determine where to query a metric from
+ if dimensions and dimensions.get('upid') and metric_name in self.app_metrics:
+ query_type = 'application'
+ index_by = 'app_name'
+ elif dimensions and dimensions.get('upid') and metric_name in self.host_metrics:
+ query_type = 'host'
+ index_by = 'app_name'
+ elif dimensions and dimensions.get('hostname') and metric_name in self.app_metrics:
+ query_type = 'application'
+ index_by = 'hostname'
+ else:
+ query_type = 'host'
+ index_by = 'hostname'
+
if not isinstance(start_time, datetime):
raise TypeError("start_time must be a datetime object")
@@ -55,7 +65,7 @@ def get_metric_statistics(self, period, start_time, end_time, metric_name,
tformat = "%Y%m%d %H:%M"
interval = "%s,%s" % (start_time.strftime(tformat), end_time.strftime(tformat))
time_group = int(period)
- query_fields = ['hostname', metric_name,]
+ query_fields = [index_by, metric_name,]
script = self._build_script(query_fields, query_type, interval, time_group, dimensions)
@@ -69,17 +79,17 @@ def get_metric_statistics(self, period, start_time, end_time, metric_name,
request.add_header('Authorization', auth_header)
reply = urllib2.urlopen(request)
- hosts = {}
+ results = {}
reader = csv.reader(reply)
for metrics in reader:
- hostname = metrics.pop(0)
- host = hosts.get(hostname, {Statistics.SERIES: []})
+ index = metrics.pop(0)
+ result = results.get(index, {Statistics.SERIES: []})
for i, metric in enumerate(metrics):
- host[Statistics.SERIES].append(metric)
+ result[Statistics.SERIES].append(metric)
- hosts[hostname] = host
+ results[index] = result
- for hostname, metric in hosts.iteritems():
+ for index, metric in results.iteritems():
series = metric[Statistics.SERIES]
if Statistics.AVERAGE in statistics:
try:
@@ -95,7 +105,7 @@ def get_metric_statistics(self, period, start_time, end_time, metric_name,
if Statistics.MINIMUM in statistics:
metric[Statistics.MINIMUM] = min(map(float, series))
- return hosts
+ return results
def _build_query_url(self, base_url, authenticate=None, script=None):
@@ -150,3 +160,81 @@ def _build_script(self, query_fields, query_type, interval, group, dimensions=No
""" % (query_type, formatted_query_fields, where, interval, group)
return script
+
+APP_METRICS = ["agent", "app_attributes", "app_ctxt_attributes",
+"app_ctxt_name", "app_ctxt_operation", "app_error", "app_initiator",
+"app_name", "app_operation", "app_status", "app_target", "clientaddress",
+"clientcountry", "cliententerprise", "clientgroup", "clientname", "clientpath",
+"clientport", "clientsite", "clientsubnet", "clientzone", "destinationaddress",
+"destinationagententerprise", "destinationagentgroup", "destinationagentsite",
+"destinationagentzone", "destinationcountry", "destinationenterprise",
+"destinationgroup", "destinationname", "destinationpath", "destinationport",
+"destinationsite", "destinationsubnet", "destinationzone", "enterprise",
+"fcclient", "fcserver", "group", "hostname", "httpauthuser", "httpdirection",
+"httphost", "httpmethod", "httpmimetype", "httpprotocol", "httprefhost",
+"httprefpath", "httpstatus", "httpurl", "httpuseragent", "httpxff", "ifindex",
+"interface", "ip6client", "ip6server", "ipclient", "ipserver", "macclient",
+"machine_type", "macserver", "memcachecommand", "memcachekey",
+"memcachenumkeys", "memcacheprotocol", "memcachestatus", "op_bytes",
+"op_count", "op_duration", "op_reqbytes", "op_respbytes", "op_type", "os_name",
+"os_release", "protocol", "protocolgroup", "serveraddress", "servercountry",
+"serverenterprise", "servername", "serverpath", "os_name", "os_release",
+"serversubnet", "site", "sourceaddress", "sourceagententerprise",
+"sourceagentsite", "sourceagentzone", "sourcecountry", "sourcegroup",
+"sourcename", "sourcepath", "sourceport", "sourcesite", "sourcesubnet",
+"sourcezone", "time", "uriextension", "urifile", "urifragment", "urihost",
+"uripath", "uriport", "uriquery", "urischeme", "uuid", "uuidclient",
+"uuiddestination", "uuidserver", "uuidsource", "zone"]
+
+HOST_METRICS = ["agent", "app_connections_max", "app_connections_open",
+"app_count", "app_cpusystem", "app_cpuuser", "app_duration", "app_err_badreq",
+"app_err_forbidden", "app_err_internal", "app_err_notfound", "app_err_notimpl",
+"app_err_other", "app_err_timeout", "app_err_toolarge", "app_err_unauth",
+"app_err_unavail", "app_errors", "app_files_max", "app_files_open",
+"app_mem_max", "app_mem_used", "app_name", "app_req_delayed",
+"app_req_dropped", "app_success", "app_workers_active", "app_workers_idle",
+"app_workers_max", "bytes_in", "bytes_out", "bytes_read", "bytes_written",
+"contexts", "cpu_idle", "cpu_intr", "cpu_nice", "cpu_num", "cpu_sintr",
+"cpu_system", "cpu_user", "cpu_util", "cpu_wio", "diskfree", "diskpartmax",
+"disktotal", "drops_in", "drops_out", "enterprise", "errs_in", "errs_out",
+"group", "has_app", "has_host", "has_http", "has_if", "has_java",
+"has_memcache", "has_switch", "has_vm", "hostname", "http_method_connect",
+"http_method_delete", "http_method_get", "http_method_head",
+"http_method_option", "http_method_other", "http_method_post",
+"http_method_put", "http_method_total", "http_method_trace", "http_status_1xx",
+"http_status_2xx", "http_status_3xx", "http_status_4xx", "http_status_5xx",
+"http_status_other", "ifindex", "interface", "interrupts",
+"jvm_classes_loaded", "jvm_classes_total", "jvm_classes_unloaded",
+"jvm_compile_ms", "jvm_fds_max", "jvm_fds_open", "jvm_gc_count", "jvm_gc_ms",
+"jvm_heap_committed", "jvm_heap_initial", "jvm_heap_max", "jvm_heap_used",
+"jvm_name", "jvm_non_heap_committed", "jvm_non_heap_initial",
+"jvm_non_heap_max", "jvm_non_heap_used", "jvm_thread_daemon",
+"jvm_thread_live", "jvm_thread_started", "jvm_vendor", "jvm_version",
+"load_fifteen", "load_five", "load_one", "loadpercpu_fifteen",
+"loadpercpu_five", "loadpercpu_one", "machine_type", "mem_buffers",
+"mem_cached", "mem_cached_pc", "mem_free", "mem_shared", "mem_total",
+"mem_used", "mem_used_pc", "memcache_accepting", "memcache_authcmds",
+"memcache_autherrors", "memcache_bytes", "memcache_bytesread",
+"memcache_byteswritten", "memcache_casbadval", "memcache_cashits",
+"memcache_casmisses", "memcache_cmdflush", "memcache_cmdget",
+"memcache_cmdset", "memcache_cmdtouch", "memcache_connections",
+"memcache_connectionyields", "memcache_cpusystem", "memcache_cpuuser",
+"memcache_currentconnections", "memcache_currentitems", "memcache_decrhits",
+"memcache_decrmisses", "memcache_delhits", "memcache_delmisses",
+"memcache_evictions", "memcache_gethits", "memcache_getmisses",
+"memcache_hits", "memcache_hits_pc", "memcache_incrhits",
+"memcache_incrmisses", "memcache_items", "memcache_limitbytes",
+"memcache_listendisabled", "memcache_misses", "memcache_misses_pc",
+"memcache_ops", "memcache_reclaimed", "memcache_rejected", "memcache_structs",
+"memcache_threads", "memcache_uptime", "memcache_used_pc", "os_name",
+"os_release", "page_in", "page_out", "pkts_in", "pkts_out", "proc_run",
+"proc_total", "read_time", "read_time_mean", "reads", "site", "swap_free",
+"swap_in", "swap_inout", "swap_out", "swap_total", "swap_util_pc", "time",
+"uuid", "v_dsindex", "v_hostname", "v_machine_type", "v_os_name",
+"v_os_release", "v_uuid", "vbytes_in", "vbytes_out", "vbytes_read",
+"vbytes_written", "vcpu_num", "vcpu_pc", "vcpu_state", "vdiskallocation",
+"vdiskavailable", "vdiskcapacity", "vdrops_in", "vdrops_out", "verrs_in",
+"verrs_out", "vmem_free", "vmem_total", "vmem_used", "vmem_used_pc",
+"vnode_cpu_mhz", "vnode_cpus", "vnode_domains", "vnode_mem_free",
+"vnode_mem_total", "vpkts_in", "vpkts_out", "vreads", "vwrites", "write_time",
+"write_time_mean", "writes", "zone"]

0 comments on commit dd4e645

Please sign in to comment.