Skip to content

Commit

Permalink
Add Service object management to HA agent
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Oct 18, 2012
1 parent 420b6dd commit 7342c6c
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 12 deletions.
92 changes: 82 additions & 10 deletions ion/agents/cei/high_availability_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pyon.core.exception import BadRequest

from interface.objects import AgentCommand, ProcessDefinition, ProcessSchedule,\
ProcessStateEnum, ProcessQueueingMode, ProcessTarget, ProcessRestartMode
ProcessStateEnum, ProcessQueueingMode, ProcessTarget, ProcessRestartMode, Service
from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceClient
from ion.agents.cei.util import looping_call
from ion.services.cei.process_dispatcher_service import _core_process_definition_from_ion, \
Expand Down Expand Up @@ -64,18 +64,23 @@ def on_init(self):
cfg = self.CFG.get_safe("highavailability")

# use default PD name as the sole PD if none are provided in config
pds = self.CFG.get_safe("highavailability.process_dispatchers",
self.pds = self.CFG.get_safe("highavailability.process_dispatchers",
[ProcessDispatcherService.name])

process_definition_id = self.CFG.get_safe("highavailability.process_definition_id")
process_configuration = self.CFG.get_safe("highavailability.process_configuration")
self.process_definition_id = self.CFG.get_safe("highavailability.process_definition_id")
self.process_configuration = self.CFG.get_safe("highavailability.process_configuration")
aggregator_config = self.CFG.get_safe("highavailability.aggregator")

self.service_id = self._register_service()

# TODO: Allow other core class?
self.core = HighAvailabilityCore(cfg, ProcessDispatcherSimpleAPIClient,
pds, self.policy, process_definition_id=process_definition_id,
self.pds, self.policy, process_definition_id=self.process_definition_id,
parameters=policy_parameters,
process_configuration=process_configuration,
aggregator_config=aggregator_config)
process_configuration=self.process_configuration,
aggregator_config=aggregator_config,
pd_client_kwargs={'container': self.container,
'service_id': self.service_id})

self.policy_thread = looping_call(self.policy_interval, self.core.apply_policy)

Expand Down Expand Up @@ -114,6 +119,51 @@ def on_quit(self):
if self.dashi_handler:
self.dashi_handler.stop()

self._unregister_service()

def _register_service(self):
if not self.process_definition_id:
log.error("No process definition id. Not registering service")
return

if len(self.pds) < 1:
log.error("Must have at least one PD available to register a service")
return

pd_name = self.pds[0]
pd = ProcessDispatcherServiceClient(to_name=pd_name)
definition = pd.read_process_definition(self.process_definition_id)

existing_services, _ = self.container.resource_registry.find_resources(
restype="Service", name=definition.name)

if len(existing_services) > 0:
if len(existing_services) > 1:
log.warning("There is more than one service object for %s. Using the first one" % definition.name)
service_id = existing_services[0]._id
else:
svc_obj = Service(name=definition.name, exchange_name=definition.name)
service_id, _ = self.container.resource_registry.create(svc_obj)

svcdefs, _ = self.container.resource_registry.find_resources(
restype="ServiceDefinition", name=definition.name)

if svcdefs:
self.container.resource_registry.create_association(
self.service_id, "hasServiceDefinition", svcdefs[0]._id)
else:
log.error("Cannot find ServiceDefinition resource for %s",
definition.name)

return service_id

def _unregister_service(self):
if not self.service_id:
log.error("No service id. Cannot unregister service")
return

self.container.resource_registry.delete(self.service_id, del_associations=True)

def rcmd_reconfigure_policy(self, new_policy):
"""Service operation: Change the parameters of the policy used for service
Expand Down Expand Up @@ -205,21 +255,40 @@ class ProcessDispatcherSimpleAPIClient(object):
}

def __init__(self, name, real_client=None, **kwargs):
self.container = kwargs.get('container')
if self.container:
del(kwargs['container'])
self.service_id = kwargs.get('service_id')
if self.container:
del(kwargs['service_id'])

if real_client is not None:
self.real_client = real_client
else:
self.real_client = ProcessDispatcherServiceClient(to_name=name, **kwargs)
self.event_pub = EventPublisher()

def _associate_process(self, process):
try:
self.container.resource_registry.create_association(self.service_id,
"hasProcess", process.process_id)
except Exception:
log.exception("Couldn't associate service %s to process %s" % (self.service_id, process.process_id))


def create_definition(self, definition_id, definition_type, executable,
name=None, description=None):

if name is None:
raise BadRequest("create_definition must have a name supplied")

# note: we lose the description
definition = ProcessDefinition(name=name)
definition.executable = {'module': executable.get('module'),
'class': executable.get('class')}
definition.definition_type = definition_type
return self.real_client.create_process_definition(definition, definition_id)
created_definition = self.real_client.create_process_definition(
definition, definition_id)

def describe_definition(self, definition_id):

Expand All @@ -236,7 +305,7 @@ def schedule_process(self, upid, definition_id, configuration=None,
origin=definition.name, origin_type="DispatchedHAProcess",
state=ProcessStateEnum.RUNNING)

pid = self.real_client.create_process(definition_id)
create_upid = self.real_client.create_process(definition_id)

process_schedule = ProcessSchedule()
if queueing_mode is not None:
Expand Down Expand Up @@ -264,9 +333,12 @@ def schedule_process(self, upid, definition_id, configuration=None,
process_schedule.target = target

sched_pid = self.real_client.schedule_process(definition_id,
process_schedule, configuration=configuration, process_id=upid)
process_schedule, configuration=configuration, process_id=create_upid)

proc = self.real_client.read_process(sched_pid)

self._associate_process(proc)

dict_proc = {'upid': proc.process_id,
'state': self.state_map.get(proc.process_state, self.unknown_state),
}
Expand Down
36 changes: 34 additions & 2 deletions ion/agents/cei/test/test_haagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pyon.util.int_test import IonIntegrationTestCase
from pyon.util.context import LocalContextMixin
from pyon.core import bootstrap
from pyon.core.exception import Timeout

from ion.agents.cei.high_availability_agent import HighAvailabilityAgentClient, \
ProcessDispatcherSimpleAPIClient
Expand Down Expand Up @@ -70,7 +71,8 @@ def setUp(self):
self.pd_cli = ProcessDispatcherServiceClient(to_name="process_dispatcher")

self.process_definition_id = uuid4().hex
self.process_definition = ProcessDefinition(name='test', executable={
self.process_definition_name = 'test'
self.process_definition = ProcessDefinition(name=self.process_definition_name, executable={
'module': 'ion.agents.cei.test.test_haagent',
'class': 'TestProcess'
})
Expand Down Expand Up @@ -181,6 +183,33 @@ def test_features(self):
self.waiter.await_state_event(state=ProcessStateEnum.TERMINATED)
self.assertEqual(len(self.get_running_procs()), 0)

def test_associations(self):

services_registered, _ = self.container.resource_registry.find_resources(
restype="Service", name=self.process_definition_name)
self.assertEqual(len(services_registered), 1)

new_policy = {'preserve_n': 1}
self.haa_client.reconfigure_policy(new_policy)
self.waiter.await_state_event(state=ProcessStateEnum.RUNNING)
self.assertEqual(len(self.get_running_procs()), 1)

proc = self.get_running_procs()[0]

processes_associated, _ = self.container.resource_registry.find_resources(
restype="Process", name=proc.process_id)
self.assertEqual(len(processes_associated), 1)

new_policy = {'preserve_n': 0}
self.haa_client.reconfigure_policy(new_policy)

self.waiter.await_state_event(state=ProcessStateEnum.TERMINATED)
self.assertEqual(len(self.get_running_procs()), 0)

processes_associated, _ = self.container.resource_registry.find_resources(
restype="Process", name=proc.process_id)
self.assertEqual(len(processes_associated), 0)

def test_dashi(self):

import dashi
Expand Down Expand Up @@ -297,7 +326,10 @@ def setUp(self):
'module': 'ion.agents.cei.test.test_haagent',
'class': 'TestProcess'
})
self.pd_cli.create_process_definition(self.process_definition, self.process_definition_id)

self.pd_cli.create_process_definition(self.process_definition,
self.process_definition_id)


http_port = 8919
http_port = self._start_webserver(port=http_port)
Expand Down

0 comments on commit 7342c6c

Please sign in to comment.