Skip to content

Commit

Permalink
factor out launching of platform agent to facilitate further adjustme…
Browse files Browse the repository at this point in the history
…nts and testing
  • Loading branch information
carueda committed Sep 18, 2012
1 parent aa0d9b7 commit 59a5ce8
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 205 deletions.
160 changes: 26 additions & 134 deletions ion/agents/platform/platform_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@

from ion.agents.instrument.instrument_fsm import InstrumentFSM

from pyon.event.event import EventSubscriber

from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceClient
from interface.objects import ProcessDefinition, ProcessStateEnum

from gevent import queue
from ion.agents.platform.platform_agent_launcher import Launcher


PA_MOD = 'ion.agents.platform.platform_agent'
Expand Down Expand Up @@ -125,10 +120,7 @@ def __init__(self):
# {subplatform_id: (ResourceAgentClient, PID), ...}
self._pa_clients = {} # Never None

# process dispatcher elements:
self._pd_client = None
self._event_queue = queue.Queue()
self._event_sub = None
self._launcher = Launcher()

#
# TODO the following defined here as in InstrumentAgent,
Expand All @@ -154,7 +146,7 @@ def _reset(self):
self._platform_id, len(self._pa_clients))
for subplatform_id in self._pa_clients:
_, pid = self._pa_clients[subplatform_id]
self._pd_client.cancel_process(pid)
self._launcher.cancel_process(pid)

self._pa_clients.clear()

Expand Down Expand Up @@ -196,7 +188,6 @@ def _pre_initialize(self):
raise PlatformException(msg)

self._container_name = self._plat_config['container_name']
self._set_process_dispatcher()

if 'platform_topology' in self._plat_config:
self._topology = self._plat_config['platform_topology']
Expand All @@ -206,12 +197,6 @@ def _pre_initialize(self):
self._parent_platform_id = ppid
log.debug("_parent_platform_id set to: %s", self._parent_platform_id)

def _set_process_dispatcher(self):
log.debug("%r: _set_process_dispatcher using name: %r",
self._platform_id, self._container_name)

self._pd_client = ProcessDispatcherServiceClient()

def _construct_data_publishers(self):
"""
Construct the stream publishers from the stream_config agent
Expand Down Expand Up @@ -386,15 +371,29 @@ def add_instruments(self):

def _launch_platform_agent(self, subplatform_id):
"""
Launches a platform agent including the INITIALIZE command,
and returns corresponding ResourceAgentClient instance.
Launches a sub-platform agent, creates ResourceAgentClient, and pings
and initializes the sub-platform agent.
@param subplatform_id Platform ID
"""
log.debug("%r: _launch_platform_agent: subplatform_id=%s",
self._platform_id, subplatform_id)

return self._launch_platform_agent_with_pd(subplatform_id)
agent_config = {
'agent': {'resource_id': subplatform_id},
'stream_config': self.CFG.stream_config,
'test_mode': True
}

log.debug("%r: launching sub-platform agent: pid=%s",
self._platform_id, subplatform_id)
pid = self._launcher.launch(subplatform_id, agent_config)

pa_client = self._create_resource_agent_client(subplatform_id)
self._pa_clients[subplatform_id] = (pa_client, pid)

self._ping_subplatform(subplatform_id)
self._initialize_subplatform(subplatform_id)

def _create_resource_agent_client(self, subplatform_id):
"""
Expand Down Expand Up @@ -456,125 +455,18 @@ def _initialize_subplatform(self, subplatform_id):
log.debug("%r: _initialize_subplatform %r retval = %s",
self._platform_id, subplatform_id, str(retval))

def _launch_platform_agent_with_pd(self, subplatform_id):
"""
Launches a PlatformAgent for the given sub-platform ID
"""

def _state_event_callback(event, *args, **kwargs):
state_str = ProcessStateEnum._str_map.get(event.state)
origin = event.origin
log.debug("%r: _state_event_callback CALLED: state=%s from %s\n "
"event=%s\n args=%s\n kwargs=%s",
self._platform_id, state_str, origin, str(event), str(args), str(kwargs))

self._event_queue.put(event)

def _subscribe_events(origin):
self._event_sub = EventSubscriber(
event_type="ProcessLifecycleEvent",
callback=_state_event_callback,
origin=origin,
origin_type="DispatchedProcess"
)
self._event_sub.start()

log.debug("%r: _subscribe_events: origin=%s STARTED",
self._platform_id, str(origin))


pa_name = 'PlatformAgent_%s' % subplatform_id

pdef = ProcessDefinition(name=pa_name)
pdef.executable = {
'module': PA_MOD,
'class': PA_CLS
}
pdef_id = self._pd_client.create_process_definition(process_definition=pdef)

pid = self._pd_client.create_process(process_definition_id=pdef_id)

_subscribe_events(pid)

agent_config = {
'agent': {'resource_id': subplatform_id},
'stream_config' : self.CFG.stream_config,
'test_mode' : True
}

log.debug("%r: schedule_process: pid=%s",
self._platform_id, str(pid))

self._pd_client.schedule_process(process_definition_id=pdef_id,
process_id=pid,
configuration=agent_config)

self._await_state_event(pid, ProcessStateEnum.SPAWN)

return pid

def _await_state_event(self, pid, state, timeout=30):
state_str = ProcessStateEnum._str_map.get(state)
log.debug("%r: _await_state_event: state=%s from %s timeout=%s",
self._platform_id, state_str, str(pid), timeout)

#check on the process as it exists right now
process_obj = self._pd_client.read_process(pid)
log.debug("%r: process_obj.process_state: %s", self._platform_id,
ProcessStateEnum._str_map.get(process_obj.process_state))
if state == process_obj.process_state:
self._event_sub.stop()
log.debug("%r: ALREADY in state %s", self._platform_id, state_str)
return

try:
event = self._event_queue.get(timeout=timeout)
except queue.Empty:
msg = "Event timeout! Waited %s seconds for process %s to notifiy state %s" % (
timeout, pid, state_str)
log.error(msg)
raise PlatformException(msg)

log.debug("%r: Got event: %s", self._platform_id, event)
if event.state != state:
msg = "%r: Expecting state %s but got %s" % (
self._platform_id, state, event.state)
log.error(msg)
raise PlatformException(msg)
if event.origin != pid:
msg = "%r: Expecting origin %s but got %s" % (
self._platform_id, pid, event.origin)
log.error(msg)
raise PlatformException(msg)

def _subplatforms_launch(self):
"""
Launches all my sub-platforms storing the corresponding
ResourceAgentClient objects in _pa_clients.
"""
if self._pd_client is None:
msg = "%r: _set_process_dispatcher hasn't been called!" % self._platform_id
log.error(msg)
raise PlatformException(msg)

subplatform_ids = self._plat_driver.get_subplatform_ids()
log.debug("%r: launching subplatforms %s",
self._platform_id, str(subplatform_ids))

self._pa_clients.clear()
for subplatform_id in subplatform_ids:
#
# launch agent, create, ResourceAgentClient, ping, and initialize
#
pid = self._launch_platform_agent(subplatform_id)

pa_client = self._create_resource_agent_client(subplatform_id)
self._pa_clients[subplatform_id] = (pa_client, pid)

self._ping_subplatform(subplatform_id)
self._initialize_subplatform(subplatform_id)


subplatform_ids = self._plat_driver.get_subplatform_ids()
if len(subplatform_ids):
log.debug("%r: launching subplatforms %s",
self._platform_id, str(subplatform_ids))
for subplatform_id in subplatform_ids:
self._launch_platform_agent(subplatform_id)

def _subplatforms_execute_agent(self, command=None, create_command=None,
expected_state=None):
Expand Down
152 changes: 152 additions & 0 deletions ion/agents/platform/platform_agent_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python

"""
@package ion.agents.platform.platform_agent_launcher
@file ion/agents/platform/platform_agent_launcher.py
@author Carlos Rueda
@brief Helper for launching platform agent processes
"""

__author__ = 'Carlos Rueda'
__license__ = 'Apache 2.0'


from pyon.public import log
from pyon.event.event import EventSubscriber

from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceClient
from interface.objects import ProcessDefinition, ProcessStateEnum

from ion.agents.platform.exceptions import PlatformException

from gevent import queue


PA_MOD = 'ion.agents.platform.platform_agent'
PA_CLS = 'PlatformAgent'


# TODO clean up log-and-throw anti-idiom in several places, which is used
# because the exception alone does not show up in the logs!


class Launcher(object):
"""
Helper for launching platform agent processes.
"""

def __init__(self):
self._pd_client = ProcessDispatcherServiceClient()
self._event_queue = None
self._event_sub = None

def launch(self, platform_id, agent_config, timeout_spawn=30):
"""
Launches a sub-platform agent.
@param platform_id Platform ID
@param agent_config Agent configuration
@param timeout_spawn Timeout in secs for the SPAWN event (by
default 30). If None or zero, no wait is performed.
@retval process ID
"""
log.debug("launch: platform_id=%s, timeout_spawn=%s",
platform_id, str(timeout_spawn))

try:
return self._do_launch(platform_id, agent_config, timeout_spawn)
finally:
self._event_queue = None
self._event_sub = None

def _do_launch(self, platform_id, agent_config, timeout_spawn):

pa_name = 'PlatformAgent_%s' % platform_id

pdef = ProcessDefinition(name=pa_name)
pdef.executable = {
'module': PA_MOD,
'class': PA_CLS
}
pdef_id = self._pd_client.create_process_definition(process_definition=pdef)

pid = self._pd_client.create_process(process_definition_id=pdef_id)

if timeout_spawn:
self._event_queue = queue.Queue()
self._subscribe_events(pid)

log.debug("calling schedule_process: pid=%s", str(pid))

self._pd_client.schedule_process(process_definition_id=pdef_id,
process_id=pid,
configuration=agent_config)

if timeout_spawn:
self._await_state_event(pid, ProcessStateEnum.SPAWN, timeout=timeout_spawn)

return pid

def _state_event_callback(self, event, *args, **kwargs):
state_str = ProcessStateEnum._str_map.get(event.state)
origin = event.origin
log.debug("_state_event_callback CALLED: state=%s from %s\n "
"event=%s\n args=%s\n kwargs=%s",
state_str, origin, str(event), str(args), str(kwargs))

self._event_queue.put(event)

def _subscribe_events(self, origin):
self._event_sub = EventSubscriber(
event_type="ProcessLifecycleEvent",
callback=self._state_event_callback,
origin=origin,
origin_type="DispatchedProcess"
)
self._event_sub.start()

log.debug("_subscribe_events: origin=%s STARTED", str(origin))

def _await_state_event(self, pid, state, timeout):
state_str = ProcessStateEnum._str_map.get(state)
log.debug("_await_state_event: state=%s from %s timeout=%s",
state_str, str(pid), timeout)

#check on the process as it exists right now
process_obj = self._pd_client.read_process(pid)
log.debug("process_obj.process_state: %s",
ProcessStateEnum._str_map.get(process_obj.process_state))

if state == process_obj.process_state:
self._event_sub.stop()
log.debug("ALREADY in state %s", state_str)
return

try:
event = self._event_queue.get(timeout=timeout)
except queue.Empty:
msg = "Event timeout! Waited %s seconds for process %s to notifiy state %s" % (
timeout, pid, state_str)
log.error(msg, exc_info=True)
raise PlatformException(msg)
except:
msg = "Something unexpected happened"
log.error(msg, exc_info=True)
raise PlatformException(msg)

log.debug("Got event: %s", event)
if event.state != state:
msg = "Expecting state %s but got %s" % (state, event.state)
log.error(msg)
raise PlatformException(msg)
if event.origin != pid:
msg = "Expecting origin %s but got %s" % (pid, event.origin)
log.error(msg)
raise PlatformException(msg)

def cancel_process(self, pid):
"""
Helper to terminate a process
"""
self._pd_client.cancel_process(pid)
Loading

0 comments on commit 59a5ce8

Please sign in to comment.