Permalink
Browse files

Add sensor policy test to ha agent

  • Loading branch information...
1 parent 34bb957 commit 02e36a1d382a11344e559961795860b72e2ee125 @oldpatricka oldpatricka committed Sep 27, 2012
Showing with 246 additions and 1 deletion.
  1. +3 −1 ion/agents/cei/high_availability_agent.py
  2. +243 −0 ion/agents/cei/test/test_haagent.py
View
4 ion/agents/cei/high_availability_agent.py
@@ -59,11 +59,13 @@ def on_init(self):
pds = self.CFG.get_safe("highavailability.process_dispatchers", [])
process_definition_id = self.CFG.get_safe("highavailability.process_definition_id")
process_configuration = self.CFG.get_safe("highavailability.process_configuration")
+ aggregator_config = self.CFG.get_safe("highavailability.aggregator")
# TODO: Allow other core class?
self.core = HighAvailabilityCore(cfg, ProcessDispatcherSimpleAPIClient,
pds, self.policy, process_definition_id=process_definition_id,
parameters=policy_parameters,
- process_configuration=process_configuration)
+ process_configuration=process_configuration,
+ aggregator_config=aggregator_config)
self.policy_thread = looping_call(self.policy_interval, self.core.apply_policy)
View
243 ion/agents/cei/test/test_haagent.py
@@ -1,6 +1,10 @@
import gevent
import functools
+import BaseHTTPServer
+import socket
+from BaseHTTPServer import HTTPServer
from gevent import queue
+from random import randint
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
@@ -214,3 +218,242 @@ def test_schedule(self):
self.assertEqual(args[0], definition_id)
self.assertEqual(kwargs['configuration'], configuration)
self.assertEqual(kwargs['process_id'], upid)
+
+
+@attr('INT', group='cei')
+class HighAvailabilityAgentSensorPolicyTest(IonIntegrationTestCase):
+
+ def _start_webserver(self, port=None):
+ """ Start a webserver for testing code download
+ Note: tries really hard to get a port, and if it can't use
+ the suggested port, randomly picks another, and returns it
+ """
+ def log_message(self, format, *args):
+ #swallow log massages
+ pass
+
+ class TestRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ server_version = 'test_server'
+ extensions_map = ''
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header("Content-type", "text/plain")
+ self.send_header("Content-Length", len(self.server.response))
+ self.end_headers()
+ self.wfile.write(self.server.response)
+
+
+ class Server(HTTPServer):
+
+ response = ''
+
+ def serve_forever(self):
+ self._serving = 1
+ while self._serving:
+ self.handle_request()
+
+ def stop(self):
+ self._serving = 0
+
+ if port is None:
+ port = 8008
+ Handler = TestRequestHandler
+ Handler.log_message = log_message
+
+ for i in range(0, 100):
+ try:
+ self._webserver = Server(("localhost", port), Handler)
+ except socket.error:
+ print "port %s is in use, picking another" % port
+ port = randint(8000, 10000)
+ continue
+ else:
+ break
+
+ self._web_glet = gevent.spawn(self._webserver.serve_forever)
+ return port
+
+ def _stop_webserver(self):
+ if self._webserver is not None:
+ self._web_glet.kill()
+
+ def setUp(self):
+ self._start_container()
+ self.container.start_rel_from_url('res/deploy/r2cei.yml')
+ self.pd_cli = ProcessDispatcherServiceClient(to_name="process_dispatcher")
+
+ self.process_definition_id = uuid4().hex
+ self.process_definition = ProcessDefinition(name='test', executable={
+ 'module': 'ion.agents.cei.test.test_haagent',
+ 'class': 'TestProcess'
+ })
+ self.pd_cli.create_process_definition(self.process_definition, self.process_definition_id)
+
+ http_port = 8919
+ http_port = self._start_webserver(port=http_port)
+
+ self.resource_id = "haagent_1234"
+ self._haa_name = "high_availability_agent"
+ self._haa_config = {
+ 'highavailability': {
+ 'policy': {
+ 'interval': 1,
+ 'name': 'sensor',
+ 'parameters': {
+ 'metric': 'app_attributes:ml',
+ 'sample_period': 600,
+ 'sample_function': 'Average',
+ 'cooldown_period': 2,
+ 'scale_up_threshold': 2.0,
+ 'scale_up_n_processes': 1,
+ 'scale_down_threshold': 1.0,
+ 'scale_down_n_processes': 1,
+ 'maximum_processes': 5,
+ 'minimum_processes': 1,
+ }
+ },
+ 'aggregator': {
+ 'type': 'trafficsentinel',
+ 'host': 'localhost',
+ 'port': http_port,
+ 'protocol': 'http',
+ 'username': 'user',
+ 'password': 'pw'
+ },
+ 'process_definition_id': self.process_definition_id,
+ "process_dispatchers": [
+ 'process_dispatcher'
+ ]
+ },
+ 'agent': {'resource_id': self.resource_id},
+ }
+
+
+ self._base_procs = self.pd_cli.list_processes()
+
+ self.container_client = ContainerAgentClient(node=self.container.node,
+ name=self.container.name)
+ self._haa_pid = self.container_client.spawn_process(name=self._haa_name,
+ module="ion.agents.cei.high_availability_agent",
+ cls="HighAvailabilityAgent", config=self._haa_config)
+
+ # Start a resource agent client to talk with the instrument agent.
+ self._haa_pyon_client = SimpleResourceAgentClient(self.resource_id, process=FakeProcess())
+ log.info('Got haa client %s.', str(self._haa_pyon_client))
+
+ self.haa_client = HighAvailabilityAgentClient(self._haa_pyon_client)
+
+ self.event_queue = queue.Queue()
+ self.event_sub = None
+
+ def tearDown(self):
+ self.container.terminate_process(self._haa_pid)
+ self._start_webserver()
+ self._stop_container()
+
+ def _event_callback(self, event, *args, **kwargs):
+ self.event_queue.put(event)
+
+ def subscribe_events(self, origin):
+ self.event_sub = EventSubscriber(event_type="ProcessLifecycleEvent",
+ callback=self._event_callback, origin_type="DispatchedProcess")
+ self.event_sub.start()
+
+ def await_state_event(self, pid, state):
+ event = self.event_queue.get(timeout=60)
+ log.debug("Got event: %s", event)
+ self.assertTrue(event.origin.startswith(pid))
+ self.assertEqual(event.state, state)
+ return event
+
+ def get_running_procs(self):
+ """returns a normalized set of running procs (removes the ones that
+ were there at setup time)
+ """
+
+ base = self._base_procs
+ base_pids = [proc.process_id for proc in base]
+ current = self.pd_cli.list_processes()
+ current_pids = [proc.process_id for proc in current]
+ print "filtering base procs %s from %s" % (base_pids, current_pids)
+ normal = [cproc for cproc in current if cproc.process_id not in base_pids and cproc.process_state == ProcessStateEnum.SPAWN]
+ return normal
+
+ def _get_managed_upids(self):
+ result = self.haa_client.dump().result
+ upids = result['managed_upids']
+ return upids
+
+ def _set_response(self, response):
+ self._webserver.response = response
+
+ @needs_epu
+ def test_sensor_policy(self):
+ status = self.haa_client.status().result
+ # Ensure HA hasn't already failed
+ assert status in ('PENDING', 'READY', 'STEADY')
+
+ self.subscribe_events(None)
+ self.await_state_event("test", ProcessStateEnum.SPAWN)
+
+ self.assertEqual(len(self.get_running_procs()), 1)
+
+ for i in range(0, 5):
+ status = self.haa_client.status().result
+ try:
+ self.assertEqual(status, 'STEADY')
+ break
+ except:
+ gevent.sleep(1)
+ else:
+ assert False, "HA Service took too long to get to state STEADY"
+
+ # Set ml for each proc such that we scale up
+ upids = self._get_managed_upids()
+ response = ""
+ for upid in upids:
+ response += "%s,ml=5\n"
+ self._set_response(response)
+ self.await_state_event("test", ProcessStateEnum.SPAWN)
+
+ self.assertEqual(len(self.get_running_procs()), 2)
+
+ # Set ml so we stay steady
+ upids = self._get_managed_upids()
+ response = ""
+ for upid in upids:
+ response += "%s,ml=1.5\n"
+ self._set_response(response)
+
+ self.assertEqual(len(self.get_running_procs()), 2)
+
+ for i in range(0, 5):
+ status = self.haa_client.status().result
+ try:
+ self.assertEqual(status, 'STEADY')
+ break
+ except:
+ gevent.sleep(1)
+ else:
+ assert False, "HA Service took too long to get to state STEADY"
+
+ # Set ml so we scale down
+ upids = self._get_managed_upids()
+ response = ""
+ for upid in upids:
+ response += "%s,ml=0.5\n"
+ self._set_response(response)
+
+ self.await_state_event("test", ProcessStateEnum.TERMINATE)
+
+ self.assertEqual(len(self.get_running_procs()), 1)
+
+ for i in range(0, 5):
+ status = self.haa_client.status().result
+ try:
+ self.assertEqual(status, 'STEADY')
+ break
+ except:
+ gevent.sleep(1)
+ else:
+ assert False, "HA Service took too long to get to state STEADY"

0 comments on commit 02e36a1

Please sign in to comment.