Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Make Process objects in PD and Service objects in HA

  • Loading branch information...
commit 4eb53a61006a0568254933e938390236b15c813a 1 parent 2a5c205
@oldpatricka oldpatricka authored
View
92 ion/agents/cei/high_availability_agent.py
@@ -6,7 +6,7 @@
from pyon.core.exception import BadRequest
from interface.objects import AgentCommand, ProcessDefinition, ProcessSchedule,\
- ProcessStateEnum, ProcessQueueingMode, ProcessTarget, ProcessRestartMode
+ ProcessStateEnum, ProcessQueueingMode, ProcessTarget, ProcessRestartMode, Service
from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceClient
from ion.agents.cei.util import looping_call
from ion.services.cei.process_dispatcher_service import _core_process_definition_from_ion, \
@@ -64,18 +64,23 @@ def on_init(self):
cfg = self.CFG.get_safe("highavailability")
# use default PD name as the sole PD if none are provided in config
- pds = self.CFG.get_safe("highavailability.process_dispatchers",
+ self.pds = self.CFG.get_safe("highavailability.process_dispatchers",
[ProcessDispatcherService.name])
- process_definition_id = self.CFG.get_safe("highavailability.process_definition_id")
- process_configuration = self.CFG.get_safe("highavailability.process_configuration")
+ self.process_definition_id = self.CFG.get_safe("highavailability.process_definition_id")
+ self.process_configuration = self.CFG.get_safe("highavailability.process_configuration")
aggregator_config = self.CFG.get_safe("highavailability.aggregator")
+
+ self.service_id = self._register_service()
+
# TODO: Allow other core class?
self.core = HighAvailabilityCore(cfg, ProcessDispatcherSimpleAPIClient,
- pds, self.policy, process_definition_id=process_definition_id,
+ self.pds, self.policy, process_definition_id=self.process_definition_id,
parameters=policy_parameters,
- process_configuration=process_configuration,
- aggregator_config=aggregator_config)
+ process_configuration=self.process_configuration,
+ aggregator_config=aggregator_config,
+ pd_client_kwargs={'container': self.container,
+ 'service_id': self.service_id})
self.policy_thread = looping_call(self.policy_interval, self.core.apply_policy)
@@ -114,6 +119,51 @@ def on_quit(self):
if self.dashi_handler:
self.dashi_handler.stop()
+ self._unregister_service()
+
+ def _register_service(self):
+ if not self.process_definition_id:
+ log.error("No process definition id. Not registering service")
+ return
+
+ if len(self.pds) < 1:
+ log.error("Must have at least one PD available to register a service")
+ return
+
+ pd_name = self.pds[0]
+ pd = ProcessDispatcherServiceClient(to_name=pd_name)
+ definition = pd.read_process_definition(self.process_definition_id)
+
+ existing_services, _ = self.container.resource_registry.find_resources(
+ restype="Service", name=definition.name)
+
+ if len(existing_services) > 0:
+ if len(existing_services) > 1:
+ log.warning("There is more than one service object for %s. Using the first one" % definition.name)
+ service_id = existing_services[0]._id
+ else:
+ svc_obj = Service(name=definition.name, exchange_name=definition.name)
+ service_id, _ = self.container.resource_registry.create(svc_obj)
+
+ svcdefs, _ = self.container.resource_registry.find_resources(
+ restype="ServiceDefinition", name=definition.name)
+
+ if svcdefs:
+ self.container.resource_registry.create_association(
+ self.service_id, "hasServiceDefinition", svcdefs[0]._id)
+ else:
+ log.error("Cannot find ServiceDefinition resource for %s",
+ definition.name)
+
+ return service_id
+
+ def _unregister_service(self):
+ if not self.service_id:
+ log.error("No service id. Cannot unregister service")
+ return
+
+ self.container.resource_registry.delete(self.service_id, del_associations=True)
+
def rcmd_reconfigure_policy(self, new_policy):
"""Service operation: Change the parameters of the policy used for service
@@ -205,21 +255,40 @@ class ProcessDispatcherSimpleAPIClient(object):
}
def __init__(self, name, real_client=None, **kwargs):
+ self.container = kwargs.get('container')
+ if self.container:
+ del(kwargs['container'])
+ self.service_id = kwargs.get('service_id')
+ if self.container:
+ del(kwargs['service_id'])
+
if real_client is not None:
self.real_client = real_client
else:
self.real_client = ProcessDispatcherServiceClient(to_name=name, **kwargs)
self.event_pub = EventPublisher()
+ def _associate_process(self, process):
+ try:
+ self.container.resource_registry.create_association(self.service_id,
+ "hasProcess", process.process_id)
+ except AttributeError, Exception:
+ log.exception("Couldn't associate service %s to process %s" % (self.service_id, process.process_id))
+
+
def create_definition(self, definition_id, definition_type, executable,
name=None, description=None):
+ if name is None:
+ raise BadRequest("create_definition must have a name supplied")
+
# note: we lose the description
definition = ProcessDefinition(name=name)
definition.executable = {'module': executable.get('module'),
'class': executable.get('class')}
definition.definition_type = definition_type
- return self.real_client.create_process_definition(definition, definition_id)
+ created_definition = self.real_client.create_process_definition(
+ definition, definition_id)
def describe_definition(self, definition_id):
@@ -236,7 +305,7 @@ def schedule_process(self, upid, definition_id, configuration=None,
origin=definition.name, origin_type="DispatchedHAProcess",
state=ProcessStateEnum.RUNNING)
- pid = self.real_client.create_process(definition_id)
+ create_upid = self.real_client.create_process(definition_id)
process_schedule = ProcessSchedule()
if queueing_mode is not None:
@@ -264,9 +333,12 @@ def schedule_process(self, upid, definition_id, configuration=None,
process_schedule.target = target
sched_pid = self.real_client.schedule_process(definition_id,
- process_schedule, configuration=configuration, process_id=upid)
+ process_schedule, configuration=configuration, process_id=create_upid)
proc = self.real_client.read_process(sched_pid)
+
+ self._associate_process(proc)
+
dict_proc = {'upid': proc.process_id,
'state': self.state_map.get(proc.process_state, self.unknown_state),
}
View
64 ion/agents/cei/test/test_haagent.py
@@ -18,6 +18,7 @@
from pyon.util.int_test import IonIntegrationTestCase
from pyon.util.context import LocalContextMixin
from pyon.core import bootstrap
+from pyon.core.exception import Timeout
from ion.agents.cei.high_availability_agent import HighAvailabilityAgentClient, \
ProcessDispatcherSimpleAPIClient
@@ -70,7 +71,8 @@ def setUp(self):
self.pd_cli = ProcessDispatcherServiceClient(to_name="process_dispatcher")
self.process_definition_id = uuid4().hex
- self.process_definition = ProcessDefinition(name='test', executable={
+ self.process_definition_name = 'test'
+ self.process_definition = ProcessDefinition(name=self.process_definition_name, executable={
'module': 'ion.agents.cei.test.test_haagent',
'class': 'TestProcess'
})
@@ -118,7 +120,10 @@ def setUp(self):
def tearDown(self):
self.waiter.stop()
- self.container.terminate_process(self._haa_pid)
+ try:
+ self.container.terminate_process(self._haa_pid)
+ except Exception:
+ log.exception("Couldn't terminate HA Agent (May have been terminated by a test)")
self._stop_container()
def get_running_procs(self):
@@ -181,6 +186,55 @@ def test_features(self):
self.waiter.await_state_event(state=ProcessStateEnum.TERMINATED)
self.assertEqual(len(self.get_running_procs()), 0)
+ def test_associations(self):
+
+ # Ensure that once the HA Agent starts, there is a Service object in
+ # the registry
+ services_registered, _ = self.container.resource_registry.find_resources(
+ restype="Service", name=self.process_definition_name)
+ self.assertEqual(len(services_registered), 1)
+ service = services_registered[0]
+
+ # Ensure that once a process is started, there is an association between
+ # it and the service
+ new_policy = {'preserve_n': 1}
+ self.haa_client.reconfigure_policy(new_policy)
+ self.waiter.await_state_event(state=ProcessStateEnum.RUNNING)
+ self.assertEqual(len(self.get_running_procs()), 1)
+
+ proc = self.get_running_procs()[0]
+
+ processes_associated, _ = self.container.resource_registry.find_resources(
+ restype="Process", name=proc.process_id)
+ self.assertEqual(len(processes_associated), 1)
+
+ has_processes = self.container.resource_registry.find_associations(
+ service, "hasProcess")
+ self.assertEqual(len(has_processes), 1)
+
+ # Ensure that once we terminate that process, there are no associations
+ new_policy = {'preserve_n': 0}
+ self.haa_client.reconfigure_policy(new_policy)
+
+ self.waiter.await_state_event(state=ProcessStateEnum.TERMINATED)
+ self.assertEqual(len(self.get_running_procs()), 0)
+
+ processes_associated, _ = self.container.resource_registry.find_resources(
+ restype="Process", name=proc.process_id)
+ self.assertEqual(len(processes_associated), 0)
+
+ has_processes = self.container.resource_registry.find_associations(
+ service, "hasProcess")
+ self.assertEqual(len(has_processes), 0)
+
+ # Ensure that once we terminate that HA Agent, the Service object is
+ # cleaned up
+ self.container.terminate_process(self._haa_pid)
+
+ services_registered, _ = self.container.resource_registry.find_resources(
+ restype="Service", name=self.process_definition_name)
+ self.assertEqual(len(services_registered), 0)
+
def test_dashi(self):
import dashi
@@ -224,7 +278,6 @@ def test_schedule(self):
self.assertEqual(args[0], definition_id)
self.assertEqual(kwargs['configuration'], configuration)
- self.assertEqual(kwargs['process_id'], upid)
@attr('INT', group='cei')
@@ -297,7 +350,10 @@ def setUp(self):
'module': 'ion.agents.cei.test.test_haagent',
'class': 'TestProcess'
})
- self.pd_cli.create_process_definition(self.process_definition, self.process_definition_id)
+
+ self.pd_cli.create_process_definition(self.process_definition,
+ self.process_definition_id)
+
http_port = 8919
http_port = self._start_webserver(port=http_port)
View
4 ion/services/ans/test/test_workflow.py
@@ -167,7 +167,7 @@ def test_transform_workflow(self):
data_product_stream_ids.append(ctd_stream_id)
#Create and start the workflow
- workflow_id, workflow_product_id = self.workflowclient.create_data_process_workflow(workflow_def_id, ctd_parsed_data_product_id, timeout=30)
+ workflow_id, workflow_product_id = self.workflowclient.create_data_process_workflow(workflow_def_id, ctd_parsed_data_product_id, timeout=300)
workflow_output_ids,_ = self.rrclient.find_subjects(RT.Workflow, PRED.hasOutputProduct, workflow_product_id, True)
assertions(len(workflow_output_ids) == 1 )
@@ -199,7 +199,7 @@ def test_transform_workflow(self):
log.debug("results::: %s" % results)
#Stop the workflow processes
- self.workflowclient.terminate_data_process_workflow(workflow_id, False, timeout=25) # Should test true at some point
+ self.workflowclient.terminate_data_process_workflow(workflow_id, False, timeout=250) # Should test true at some point
#Make sure the Workflow object was removed
objs, _ = self.rrclient.find_resources(restype=RT.Workflow)
View
13 ion/services/cei/process_dispatcher_service.py
@@ -275,7 +275,8 @@ def create_process(self, process_definition_id=''):
process_id = str(process_definition.name or "process") + uuid.uuid4().hex
process_id = create_valid_identifier(process_id, ws_sub='_')
- # TODO: Create a resource object or directory entry here?
+ process = Process(process_id=process_id)
+ self.container.resource_registry.create(process, object_id=process_id)
return process_id
@@ -317,6 +318,12 @@ def schedule_process(self, process_definition_id='', schedule=None, configuratio
process_id = str(process_definition.name or "process") + uuid.uuid4().hex
process_id = create_valid_identifier(process_id, ws_sub='_')
+ try:
+ process = Process(process_id=process_id)
+ self.container.resource_registry.create(process, object_id=process_id)
+ except BadRequest:
+ log.debug("Tried to create Process %s, but already exists. This is normally ok." % process_id)
+
return self.backend.spawn(process_id, process_definition_id, schedule, configuration)
def cancel_process(self, process_id=''):
@@ -329,7 +336,9 @@ def cancel_process(self, process_id=''):
if not process_id:
raise NotFound('No process was provided')
- return self.backend.cancel(process_id)
+ cancel_result = self.backend.cancel(process_id)
+ self.container.resource_registry.delete(process_id, del_associations=True)
+ return cancel_result
def read_process(self, process_id=''):
"""Returns a Process as an object.
Please sign in to comment.
Something went wrong with that request. Please try again.