Skip to content

Commit

Permalink
Basic sensor scaling working in HA Service
Browse files Browse the repository at this point in the history
Needs lots more tests, and implement cool off period
  • Loading branch information
oldpatricka committed Sep 26, 2012
1 parent 21ba71b commit 5a0864a
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 28 deletions.
187 changes: 160 additions & 27 deletions epu/highavailability/policy.py
@@ -1,6 +1,7 @@
import logging
import datetime


from epu.sensors.trafficsentinel import TrafficSentinel
from epu.states import ProcessState, HAState
log = logging.getLogger(__name__)

Expand All @@ -27,6 +28,22 @@ def apply_policy(self, all_procs, managed_upids):
def status(self):
raise NotImplementedError("'status' is not implemented")

def _get_least_used_pd(self, all_procs):
smallest_n = None
smallest_pd = None
for pd_name, procs in all_procs.iteritems():
if smallest_n == None or smallest_n > len(procs):
smallest_n = len(procs)
smallest_pd = pd_name
return smallest_pd

def _extract_upids_from_all_procs(self, all_procs):
all_upids = []
for pd, procs in all_procs.iteritems():
for proc in procs:
all_upids.append(proc['upid'])
return all_upids


class NPreservingPolicy(IPolicy):
"""
Expand Down Expand Up @@ -171,28 +188,13 @@ def _set_status(self, to_rebalance, managed_upids):
def status(self):
return self._status

def _get_least_used_pd(self, all_procs):
smallest_n = None
smallest_pd = None
for pd_name, procs in all_procs.iteritems():
if smallest_n == None or smallest_n > len(procs):
smallest_n = len(procs)
smallest_pd = pd_name
return smallest_pd

def _extract_upids_from_all_procs(self, all_procs):
all_upids = []
for pd, procs in all_procs.iteritems():
for proc in procs:
all_upids.append(proc['upid'])
return all_upids


class HSflowPolicy(IPolicy):
class SensorPolicy(IPolicy):

def __init__(self, parameters=None, process_definition_id=None,
schedule_process_callback=None, terminate_process_callback=None,
ganglia_hostname=None, ganglia_port=None):
aggregator_config=None):
"""Set up the Policy
@param parameters: The parameters used by this policy to determine the
Expand All @@ -209,9 +211,14 @@ def __init__(self, parameters=None, process_definition_id=None,
@param terminate_process_callback: A callback to terminate a process on
a PD. Must have signature: terminate(upid)
@param ganglia_hostname: hostname of Ganglia server to connect to
@param ganglia_port: port of Ganglia server to connect to
@param aggregator_config: configuration dict of aggregator. For traffic
sentinel, this should look like:
config = {
'type': 'trafficsentinel',
'host': 'host.name.tld',
'username': 'user',
'password': 'pw'
}
"""

self.schedule_process = schedule_process_callback or dummy_schedule_process_callback
Expand All @@ -222,17 +229,40 @@ def __init__(self, parameters=None, process_definition_id=None,
else:
self._parameters = None

self.process_id = process_definition_id
self.previous_all_procs = {}

self._status = HAState.PENDING
self.minimum_n = 1

if aggregator_config is None:
raise Exception("Must provide an aggregator config")

self._ganglia = GangliaClient(hostname=ganglia_hostname, port=ganglia_port)
aggregator_type = aggregator_config.get('type', '').lower()
if aggregator_type == 'trafficsentinel':
host = aggregator_config.get('host')
username = aggregator_config.get('username')
password = aggregator_config.get('password')
self._sensor_aggregator = TrafficSentinel(host, username, password)
else:
raise Exception("Don't know what to do with %s aggregator type" % aggregator_type)

@property
def parameters(self):
"""parameters
a dictionary with TODO
a dictionary of parameters that looks like:
metric: Name of Sensor Aggregator Metric to use for scaling decisions
sample_period: Number of seconds of sample data to use (eg. if 3600, use sample data from 1 hour ago until present time
sample_function: Statistical function to apply to sampled data. Choose from Average, Sum, SampleCount, Maximum, Minimum
cooldown_period: Minimum time in seconds between scale up or scale down actions
scaleupthreshhold: If the sampled metric is above this value, scale up the number of processes
scaleupnprocesses: Number of processes to scale up by
scaledownthreshhold: If the sampled metric is below this value, scale down the number of processes
scaledownnprocesses: Number of processes to scale down by
minimum_processes: Minimum number of processes to maintain
maximum_processes: Maximum number of processes to maintain
"""
return self._parameters

Expand All @@ -246,13 +276,116 @@ def status(self):

def apply_policy(self, all_procs, managed_upids):

# Query Ganglia
ganglia_info = self._ganglia.query()
# Check for missing upids (From a dead pd for example)
all_upids = self._extract_upids_from_all_procs(all_procs)
for upid in managed_upids:
if upid not in all_upids:
# Process is missing! Remove from managed_upids
managed_upids.remove(upid)

# Check for terminated procs
for pd, procs in all_procs.iteritems():
for proc in procs:

if proc['upid'] not in managed_upids:
continue

if proc.get('state') is None:
# Pyon procs may have no state
continue

state = proc['state']
state_code, state_name = state.split('-')
running_code, running_name = ProcessState.RUNNING.split('-')
if state_code > running_code: # if terminating or exited, etc
managed_upids.remove(proc['upid'])

# Get numbers from metric
hostnames = self._get_hostnames(all_procs, managed_upids)
period = 60
end_time = datetime.datetime.now() # TODO: what TZ does TS use?
seconds = self._parameters['sample_period']
start_time = end_time - datetime.timedelta(seconds=seconds)
metric_name = self._parameters['metric']
sample_function = self._parameters['sample_function']
statistics = [sample_function, ]
dimensions = {'hostname': hostnames}
metric_per_host = self._sensor_aggregator.get_metric_statistics(
period, start_time, end_time, metric_name, statistics, dimensions)

values = []
for host, metric_value in metric_per_host.iteritems():
values.append(metric_value[sample_function])

try:
average_metric = sum(values) / len(values)
except ZeroDivisionError:
average_metric = 0
if average_metric > self._parameters['scale_up_threshold']:
scale_by = self._parameters['scale_up_n_processes']

if len(managed_upids) - scale_by > self._parameters['maximum_processes']:
scale_by = self._parameters['maximum_processes'] - len(managed_upids)

elif average_metric < self._parameters['scale_down_threshold']:
scale_by = - abs(self._parameters['scale_down_n_processes'])

if len(managed_upids) + scale_by < self._parameters['minimum_processes']:
scale_by = self._parameters['minimum_processes'] - len(managed_upids)
else:
scale_by = 0

if scale_by < 0: # remove excess
scale_by = -1 * scale_by
for to_scale in range(0, scale_by):
upid = managed_upids[0]
terminated = self.terminate_process(upid)
elif scale_by > 0:
for to_rebalance in range(0, scale_by):
pd_name = self._get_least_used_pd(all_procs)
new_upid = self.schedule_process(pd_name, self.process_id)

self._set_status(scale_by, managed_upids)

self.previous_all_procs = all_procs

return managed_upids

def _set_status(self, to_rebalance, managed_upids):
if self._status == HAState.FAILED:
# If already in FAILED state, keep this state.
# Requires human intervention
self._status == HAState.FAILED
elif to_rebalance == 0:
self._status = HAState.STEADY
elif len(managed_upids) >= self.minimum_n and self._parameters['minimum_processes'] > 0:
self._status = HAState.READY
else:
self._status = HAState.PENDING

def _get_hostnames(self, all_procs, upids):
"""get hostnames of eeagents that have managed processes
"""

hostnames = []

for pd, procs in all_procs.iteritems():
for proc in procs:

if proc['upid'] not in upids:
continue

hostname = proc.get('hostname')
if hostname is None:
continue

hostnames.append(hostname)

return list(set(hostnames))

policy_map = {
'npreserving': NPreservingPolicy,
'hsflow': HSflowPolicy,
'sensor': SensorPolicy,
}


Expand Down
135 changes: 135 additions & 0 deletions epu/highavailability/test/test_policy.py
@@ -0,0 +1,135 @@

import urllib2
import unittest

from StringIO import StringIO
from mock import Mock

from epu.highavailability.policy import SensorPolicy
from epu.processdispatcher.store import ProcessRecord


class SensorPolicyTest(unittest.TestCase):

def setUp(self):

aggregator_config = {
'type': 'trafficsentinel',
'host': 'fake',
'username': 'fake',
'password': 'fake',
}

self.mock_schedule = Mock()
self.mock_terminate = Mock()

self.policy = SensorPolicy(schedule_process_callback=self.mock_schedule,
terminate_process_callback=self.mock_terminate,
aggregator_config=aggregator_config)

self.original_urlopen = urllib2.urlopen

def tearDown(self):
urllib2.urlopen = self.original_urlopen

def patch_urllib(self, return_string):
self.traffic_sentinel_string = StringIO(return_string)
urllib2.urlopen = Mock(return_value=self.traffic_sentinel_string)


def test_get_hostnames(self):

owner = 'fred'
upids = ['myupid0', 'myupid1', 'myupid2']
hostnames = ['my.hostname0.tld', 'my.hostname1.tld', 'my.hostname2.tld']
definition = None
state = None

all_procs = {
'pd0': [
ProcessRecord.new(owner, upids[0], definition, state, hostname=hostnames[0]),
],
'pd1': [
ProcessRecord.new(owner, upids[1], definition, state, hostname=hostnames[1]),
ProcessRecord.new(owner, upids[2], definition, state, hostname=hostnames[2]),
]
}

got = self.policy._get_hostnames(all_procs, upids)
self.assertEqual(sorted(got), sorted(hostnames))

def test_apply_policy(self):

parameters = {
'metric': 'load_five',
'sample_period': 600,
'sample_function': 'Average',
'cooldown_period': 600,
'scale_up_threshold': 2.0,
'scale_up_n_processes': 1,
'scale_down_threshold': 0.5,
'scale_down_n_processes': 1,
'maximum_processes': 5,
'minimum_processes': 1,
}

self.policy.parameters = parameters

owner = 'fred'
upids = ['myupid0', 'myupid1', 'myupid2']
hostnames = ['my.hostname0.tld', 'my.hostname1.tld', 'my.hostname2.tld']
loads_no_scale = [1.0, 0.5, 1.1]
loads_scale_up = [2.0, 2.5, 4.1]
loads_scale_down = [0.0, 0.5, 0.1]
definition = None
state = None

all_procs = {
'pd0': [
ProcessRecord.new(owner, upids[0], definition, state, hostname=hostnames[0]),
],
'pd1': [
ProcessRecord.new(owner, upids[1], definition, state, hostname=hostnames[1]),
ProcessRecord.new(owner, upids[2], definition, state, hostname=hostnames[2]),
]
}


# Since average is below 2.0, but above 0.5, we shouldn't see any
# scaling activity
self.patch_urllib(make_ts_string(hostnames, loads_no_scale))
self.policy.apply_policy(all_procs, upids)

self.assertEqual(self.mock_schedule.call_count, 0)
self.assertEqual(self.mock_terminate.call_count, 0)
self.mock_schedule.reset_mock()
self.mock_terminate.reset_mock()

# This average is above 2.0, so we should see one process schedule
self.patch_urllib(make_ts_string(hostnames, loads_scale_up))
self.policy.apply_policy(all_procs, upids)

self.assertEqual(self.mock_schedule.call_count, 1)
self.assertEqual(self.mock_terminate.call_count, 0)
self.mock_schedule.reset_mock()
self.mock_terminate.reset_mock()

# This average is below 0.5, so we should see one process terminate
self.patch_urllib(make_ts_string(hostnames, loads_scale_down))
self.policy.apply_policy(all_procs, upids)

self.assertEqual(self.mock_schedule.call_count, 0)
self.assertEqual(self.mock_terminate.call_count, 1)
self.mock_schedule.reset_mock()
self.mock_terminate.reset_mock()


def make_ts_string(hosts, metrics):
traffic_sentinel_string = ""

assert len(hosts) == len(metrics)

for i, host in enumerate(hosts):
traffic_sentinel_string += "%s,%f\n" % (host, metrics[i])

return traffic_sentinel_string
5 changes: 4 additions & 1 deletion epu/sensors/trafficsentinel.py
Expand Up @@ -82,7 +82,10 @@ def get_metric_statistics(self, period, start_time, end_time, metric_name,
for hostname, metric in hosts.iteritems():
series = metric[Statistics.SERIES]
if Statistics.AVERAGE in statistics:
metric[Statistics.AVERAGE] = sum(map(float, series)) / float(len(series))
try:
metric[Statistics.AVERAGE] = sum(map(float, series)) / float(len(series))
except ZeroDivisionError:
metric[Statistics.AVERAGE] = 0.0
if Statistics.SUM in statistics:
metric[Statistics.SUM] = sum(map(float,series))
if Statistics.SAMPLE_COUNT in statistics:
Expand Down

0 comments on commit 5a0864a

Please sign in to comment.