Permalink
Browse files

Merge pull request #389 from wfrench/master

Update to driver_process module to add driver launching via an egg
  • Loading branch information...
2 parents 68e381b + d71efa0 commit a36f4934a2f6eb57ceebe6396d0d5963ad54e875 Edward Hunter committed Dec 6, 2012
@@ -18,6 +18,7 @@
import signal
import subprocess
+from urllib2 import Request, urlopen, URLError, HTTPError
from pyon.util.log import log
from ion.agents.instrument.common import BaseEnum
@@ -26,13 +27,15 @@
from ion.agents.instrument.packet_factory_man import create_packet_builder
PYTHON_PATH = 'bin/python'
+CACHE_DIR = '/tmp'
+REPO_BASE = 'http://sddevrepo.oceanobservatories.org/releases/'
class DriverProcessType(BaseEnum):
"""
Base states for driver launcher types.
"""
PYTHON_MODULE = 'ZMQPyClassDriverLauncher'
- EGG = 'ZMQEggDriverLauncherG'
+ EGG = 'ZMQEggDriverLauncher'
class DriverProcess(object):
@@ -48,8 +51,6 @@ class DriverProcess(object):
_command_port = None
_event_port = None
- _packet_factories = None
-
@classmethod
def get_process(cls, driver_config, test_mode = False):
"""
@@ -60,25 +61,25 @@ def get_process(cls, driver_config, test_mode = False):
@param test_mode tell the driver you are running in test mode. Some drivers use this to add a poison pill to
the driver process.
"""
- type = driver_config.get("process_type")
+ type = driver_config.get("process_type")[0]
driver_module = driver_config.get('dvr_mod')
if not type:
raise DriverLaunchException("missing driver config: process_type")
# For some reason the enum wasn't working with the instrument agent. I would see [emum] when called from
# the IA, but (enum,) when called from this module.
- #elif type == DriverProcessType.PYTHON_MODULE:
- elif driver_module:
+ #
+ #
+ elif type == DriverProcessType.PYTHON_MODULE:
return ZMQPyClassDriverProcess(driver_config, test_mode)
elif type == DriverProcessType.EGG:
- raise NotImplementedException()
return ZMQEggDriverProcess(driver_config, test_mode)
-
else:
raise DriverLaunchException("unknown driver process type: %s" % type)
+
def launch(self):
"""
Launch the driver process. Once the process is launched read the two status files that contain the event port
@@ -121,9 +122,6 @@ def poll(self):
return True
-
-
-
def stop(self):
"""
Stop the driver process. We try to stop gracefully using the driver client if we can, otherwise a simple kill
@@ -259,6 +257,23 @@ def _driver_event_port_file(self):
return self._driver_event_file
+ def get_client(self):
+ """
+ Get a python client for the driver process.
+ @return an client object for the driver process
+ """
+ # Start client messaging and verify messaging.
+ if not self._driver_client:
+ try:
+ from mi.core.instrument.zmq_driver_client import ZmqDriverClient
+ driver_client = ZmqDriverClient('localhost', self._command_port, self._event_port)
+ self._driver_client = driver_client
+ except Exception, e:
+ self.stop()
+ log.error('Error starting driver client: %s' % e)
+ raise DriverLaunchException('Error starting driver client.')
+
+ return self._driver_client
class ZMQPyClassDriverProcess(DriverProcess):
"""
@@ -310,113 +325,103 @@ def _process_command(self):
return [ python, '-c', cmd_str ]
- def get_client(self):
+
+class ZMQEggDriverProcess(DriverProcess):
+ '''
+ Object to facilitate ZMQ driver processes using a python egg
+
+ Driver config requirements:
+ dvr_egg :: the filename of the egg
+
+ Example:
+
+ driver_config = {
+ dvr_egg: seabird_sbe37smb_ooicore-0.0.1-py2.7.egg
+
+ process_type: DriverProcessType.EGG
+ }
+ @param driver_config configuration parameters for the driver process
+ @param test_mode should the driver be run in test mode
+ '''
+ def __init__(self, driver_config, test_mode = False):
+ self.config = driver_config
+ self.test_mode = test_mode
+
+ def _check_cache_for_egg(self, egg_name):
"""
- Get a python client for the driver process.
- @return an client object for the driver process
+ Check if the egg is already cached, if so, return the path.
+ @return: egg path if cached, else None
"""
- # Start client messaging and verify messaging.
- if not self._driver_client:
- try:
- from mi.core.instrument.zmq_driver_client import ZmqDriverClient
- driver_client = ZmqDriverClient('localhost', self._command_port, self._event_port)
- self._driver_client = driver_client
- except Exception, e:
- self.stop()
- log.error('Error starting driver client: %s' % e)
- raise DriverLaunchException('Error starting driver client.')
-
- return self._driver_client
+ path = CACHE_DIR + '/' + egg_name
+ if os.path.exists(path):
+ log.debug("_check_cache_for_egg cache hit PATH = " + str(path))
+ return path
+ else:
+ log.debug("_check_cache_for_egg cache miss")
+ return None
- def get_packet_factories(self, stream_info):
+ def _get_remote_egg(self, egg_name):
+ """
+ pull the egg from a remote server if present to the local cache dir.
+ @return: returns the path, throws exception if not found.
"""
- Construct packet factories from PACKET_CONFIG member of the driver_config
- and the given stream_info dict.
+ try:
+ response = urlopen(REPO_BASE + '/' + egg_name)
+ egg_yolk = response.read()
+ log.debug("_fetch_egg GOT YOLK")
+ except HTTPError, e:
+ raise DriverLaunchException(e.code)
+ except URLError, e:
+ raise DriverLaunchException(e.reason)
+
+ path = CACHE_DIR + '/' + egg_name
+
+ try:
+ egg_file = open(CACHE_DIR + '/' + egg_name, "wb")
+ egg_file.write(egg_yolk)
+ except IOError:
+ raise DriverLaunchException("IOError writing egg file to cache")
+ else:
+ egg_file.close()
+
+ log.debug("_fetch_egg GOT EGG, PATH = " + str(path))
+ return path
+
+ def _get_egg(self, egg_name):
+ path = self._check_cache_for_egg(egg_name)
+ if None == path:
+ path = self._get_remote_egg(egg_name) # Will exception out if problem.
- @param stream_info
+ return path
- @retval a dict indexed by stream name of the packet factories defined.
+ def _process_command(self):
"""
+ Build the process command line using the driver_config dict
+ @return a list containing spawn args for the _spawn method
+ 1. check cache (CACHE_DIR = '/tmp')
+ 2. download to cache if not present.
+ 3. construct call command
+ """
- if not self._packet_factories:
- log.info("generating packet factories")
- self._packet_factories = {}
+ path = self._get_egg(self.config.get('dvr_egg'))
- driver_module = self.config.get('dvr_mod')
- if not driver_module:
- raise DriverLaunchException("missing driver config: driver_module")
+ log.debug("cwd: %s" % os.getcwd())
+ driver_package = self.config.get('dvr_egg')
+ ppid = os.getpid() if self.test_mode else None
+
+ python = PYTHON_PATH
+
+ if not driver_package:
+ raise DriverLaunchException("missing driver config: driver_package")
+ if not os.path.exists(python):
+ raise DriverLaunchException("could not find python executable: %s" % python)
+
+ cmd_port_fname = self._driver_command_port_file()
+ evt_port_fname = self._driver_event_port_file()
+
+ cmd_str = "import sys; sys.path.insert(0, '%s/%s'); from mi.main import run; sys.exit(run(command_port_file='%s', event_port_file='%s', ppid=%s))" % \
+ (CACHE_DIR, driver_package, cmd_port_fname, evt_port_fname, str(ppid))
+
+ return [ python, '-c', cmd_str ]
- packet_config = None
- try:
- import_str = 'from %s import PACKET_CONFIG' % driver_module
- exec import_str
- log.debug("PACKET_CONFIG: %s", PACKET_CONFIG)
- packet_config = PACKET_CONFIG
- except:
- log.error('PACKET_CONFIG undefined in driver module %s ' % driver_module)
-
- if packet_config:
- for name in packet_config:
- if not name in stream_info:
- log.error("Name '%s' not found in stream_info" % name)
- continue
-
- stream_config = stream_info[name]
- try:
- packet_builder = create_packet_builder(name, stream_config)
- self._packet_factories[name] = packet_builder
- log.info('created packet builder for stream %s' % name)
- except Exception, e:
- log.error('error creating packet builder: %s' % e)
-
- return self._packet_factories
-
-# TODO remove
-# def get_packet_factories(self):
-# """
-# Construct packet factories from packet_config member of the driver_config.
-# @retval a list of packet factories defined.
-# """
-# if not self._packet_factories:
-# log.info("generating packet factories")
-# self._packet_factories = {}
-#
-# driver_module = self.config.get('dvr_mod')
-# if not driver_module:
-# raise DriverLaunchException("missing driver config: driver_module")
-#
-# # Should we poll the driver process to give us these configurations? If defined in an interface then
-# # this method could be generalized for all driver processes. It also seems like execing an import
-# # might be unsafe.
-# import_str = 'from %s import PACKET_CONFIG' % driver_module
-# try:
-# exec import_str
-# log.debug("PACKET_CONFIG: %s", PACKET_CONFIG)
-# for (name, val) in PACKET_CONFIG.iteritems():
-# if val:
-# try:
-# mod = val[0]
-# cls = val[1]
-# import_str = 'from %s import %s' % (mod, cls)
-# ctor_str = 'ctor = %s' % cls
-# exec import_str
-# exec ctor_str
-# self._packet_factories[name] = ctor
-#
-# except Exception, e:
-# log.error('error creating packet factory: %s', e)
-#
-# else:
-# log.info('created packet factory for stream %s', name)
-# except Exception, e:
-# log.error('Instrument agent %s had error creating packet factories. %s', e)
-#
-# return self._packet_factories
-
-class ZMQEggDriverLauncher(DriverProcess):
- """
- Object to facilitate driver processes launch from an egg as an 'eggsecutable'
- """
- def __init__(self):
- pass
@@ -155,10 +155,6 @@ def __init__(self, *args, **kwargs):
# stream_config agent config member during process on_init.
self._data_publishers = {}
- # Factories for stream packets. Constructed by driver
- # configuration information on transition to inactive.
- self._packet_factories = {}
-
def on_init(self):
"""
Instrument agent pyon process initialization.
@@ -384,8 +380,6 @@ def _handler_uninitialized_initialize(self, *args, **kwargs):
# Start the driver and switch to inactive.
self._start_driver(self._dvr_config)
- self._construct_packet_factories()
-
next_state = ResourceAgentState.INACTIVE
return (next_state, result)
@@ -401,7 +395,6 @@ def _handler_inactive_reset(self, *args, **kwargs):
next_state = None
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -455,7 +448,6 @@ def _handler_idle_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -495,7 +487,6 @@ def _handler_stopped_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -543,7 +534,6 @@ def _handler_command_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -641,7 +631,6 @@ def _handler_streaming_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -1136,26 +1125,7 @@ def _construct_data_publishers(self):
publisher for stream %s.', self._proc_name,
stream_name)
- def _construct_packet_factories(self):
- """
- Construct packet factories from packet_config member of the
- driver_config and self.CFG.stream_config.
- @retval None
- """
- self._packet_factories = self._dvr_proc.get_packet_factories(
- self.CFG.stream_config)
- log.info('Insturment agent %s constructed its packet factories.',
- self._proc_name)
-
- def _clear_packet_factories(self):
- """
- Delete packet factories.
- @retval None
- """
- self._packet_factories.clear()
- log.info('Instrument agent %s deleted packet factories.',
- self._proc_name)
-
+
###############################################################################
# Event callback and handling for direct access.
###############################################################################
Oops, something went wrong.

0 comments on commit a36f493

Please sign in to comment.