Skip to content

Loading…

Update to driver_process module to add driver launching via an egg #389

Merged
merged 8 commits into from

2 participants

@wfrench

This update puts the changes in place to start launching drivers from eggs. However, changes still need to be made to IMS or where ever the driver_config dictionary is created to start using this feature.

Also, it appears that the instrument agent no longer uses the packet factory class. So I've removed that code from the instrument agent and driver process module.

@edwardhunter edwardhunter was assigned
@edwardhunter

Looks good.

@edwardhunter edwardhunter merged commit a36f493 into ooici:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 5, 2012
  1. pre-test changes, only 1 test failing now

    Roger Unwin committed
  2. unit, smoke pass, int has some fail.

    Roger Unwin committed
  3. @wfrench

    Merge branch 'master' of github.com:wfrench/coi-services

    wfrench committed
    Conflicts:
    	ion/agents/instrument/driver_process.py
  4. @wfrench
  5. @wfrench

    merge from ooici

    wfrench committed
  6. @wfrench
Commits on Dec 6, 2012
  1. @wfrench
  2. @wfrench
View
225 ion/agents/instrument/driver_process.py
@@ -18,6 +18,7 @@
import signal
import subprocess
+from urllib2 import Request, urlopen, URLError, HTTPError
from pyon.util.log import log
from ion.agents.instrument.common import BaseEnum
@@ -26,13 +27,15 @@
from ion.agents.instrument.packet_factory_man import create_packet_builder
PYTHON_PATH = 'bin/python'
+CACHE_DIR = '/tmp'
+REPO_BASE = 'http://sddevrepo.oceanobservatories.org/releases/'
class DriverProcessType(BaseEnum):
"""
Base states for driver launcher types.
"""
PYTHON_MODULE = 'ZMQPyClassDriverLauncher'
- EGG = 'ZMQEggDriverLauncherG'
+ EGG = 'ZMQEggDriverLauncher'
class DriverProcess(object):
@@ -48,8 +51,6 @@ class DriverProcess(object):
_command_port = None
_event_port = None
- _packet_factories = None
-
@classmethod
def get_process(cls, driver_config, test_mode = False):
"""
@@ -60,7 +61,7 @@ def get_process(cls, driver_config, test_mode = False):
@param test_mode tell the driver you are running in test mode. Some drivers use this to add a poison pill to
the driver process.
"""
- type = driver_config.get("process_type")
+ type = driver_config.get("process_type")[0]
driver_module = driver_config.get('dvr_mod')
if not type:
@@ -68,17 +69,17 @@ def get_process(cls, driver_config, test_mode = False):
# For some reason the enum wasn't working with the instrument agent. I would see [emum] when called from
# the IA, but (enum,) when called from this module.
- #elif type == DriverProcessType.PYTHON_MODULE:
- elif driver_module:
+ #
+ #
+ elif type == DriverProcessType.PYTHON_MODULE:
return ZMQPyClassDriverProcess(driver_config, test_mode)
elif type == DriverProcessType.EGG:
- raise NotImplementedException()
return ZMQEggDriverProcess(driver_config, test_mode)
-
else:
raise DriverLaunchException("unknown driver process type: %s" % type)
+
def launch(self):
"""
Launch the driver process. Once the process is launched read the two status files that contain the event port
@@ -121,9 +122,6 @@ def poll(self):
return True
-
-
-
def stop(self):
"""
Stop the driver process. We try to stop gracefully using the driver client if we can, otherwise a simple kill
@@ -259,6 +257,23 @@ def _driver_event_port_file(self):
return self._driver_event_file
+ def get_client(self):
+ """
+ Get a python client for the driver process.
+ @return an client object for the driver process
+ """
+ # Start client messaging and verify messaging.
+ if not self._driver_client:
+ try:
+ from mi.core.instrument.zmq_driver_client import ZmqDriverClient
+ driver_client = ZmqDriverClient('localhost', self._command_port, self._event_port)
+ self._driver_client = driver_client
+ except Exception, e:
+ self.stop()
+ log.error('Error starting driver client: %s' % e)
+ raise DriverLaunchException('Error starting driver client.')
+
+ return self._driver_client
class ZMQPyClassDriverProcess(DriverProcess):
"""
@@ -310,113 +325,103 @@ def _process_command(self):
return [ python, '-c', cmd_str ]
- def get_client(self):
+
+class ZMQEggDriverProcess(DriverProcess):
+ '''
+ Object to facilitate ZMQ driver processes using a python egg
+
+ Driver config requirements:
+ dvr_egg :: the filename of the egg
+
+ Example:
+
+ driver_config = {
+ dvr_egg: seabird_sbe37smb_ooicore-0.0.1-py2.7.egg
+
+ process_type: DriverProcessType.EGG
+ }
+ @param driver_config configuration parameters for the driver process
+ @param test_mode should the driver be run in test mode
+ '''
+ def __init__(self, driver_config, test_mode = False):
+ self.config = driver_config
+ self.test_mode = test_mode
+
+ def _check_cache_for_egg(self, egg_name):
"""
- Get a python client for the driver process.
- @return an client object for the driver process
+ Check if the egg is already cached, if so, return the path.
+ @return: egg path if cached, else None
"""
- # Start client messaging and verify messaging.
- if not self._driver_client:
- try:
- from mi.core.instrument.zmq_driver_client import ZmqDriverClient
- driver_client = ZmqDriverClient('localhost', self._command_port, self._event_port)
- self._driver_client = driver_client
- except Exception, e:
- self.stop()
- log.error('Error starting driver client: %s' % e)
- raise DriverLaunchException('Error starting driver client.')
-
- return self._driver_client
+ path = CACHE_DIR + '/' + egg_name
+ if os.path.exists(path):
+ log.debug("_check_cache_for_egg cache hit PATH = " + str(path))
+ return path
+ else:
+ log.debug("_check_cache_for_egg cache miss")
+ return None
- def get_packet_factories(self, stream_info):
+ def _get_remote_egg(self, egg_name):
+ """
+ pull the egg from a remote server if present to the local cache dir.
+ @return: returns the path, throws exception if not found.
"""
- Construct packet factories from PACKET_CONFIG member of the driver_config
- and the given stream_info dict.
+ try:
+ response = urlopen(REPO_BASE + '/' + egg_name)
+ egg_yolk = response.read()
+ log.debug("_fetch_egg GOT YOLK")
+ except HTTPError, e:
+ raise DriverLaunchException(e.code)
+ except URLError, e:
+ raise DriverLaunchException(e.reason)
+
+ path = CACHE_DIR + '/' + egg_name
+
+ try:
+ egg_file = open(CACHE_DIR + '/' + egg_name, "wb")
+ egg_file.write(egg_yolk)
+ except IOError:
+ raise DriverLaunchException("IOError writing egg file to cache")
+ else:
+ egg_file.close()
+
+ log.debug("_fetch_egg GOT EGG, PATH = " + str(path))
+ return path
+
+ def _get_egg(self, egg_name):
+ path = self._check_cache_for_egg(egg_name)
+ if None == path:
+ path = self._get_remote_egg(egg_name) # Will exception out if problem.
- @param stream_info
+ return path
- @retval a dict indexed by stream name of the packet factories defined.
+ def _process_command(self):
"""
+ Build the process command line using the driver_config dict
+ @return a list containing spawn args for the _spawn method
+ 1. check cache (CACHE_DIR = '/tmp')
+ 2. download to cache if not present.
+ 3. construct call command
+ """
- if not self._packet_factories:
- log.info("generating packet factories")
- self._packet_factories = {}
+ path = self._get_egg(self.config.get('dvr_egg'))
- driver_module = self.config.get('dvr_mod')
- if not driver_module:
- raise DriverLaunchException("missing driver config: driver_module")
+ log.debug("cwd: %s" % os.getcwd())
+ driver_package = self.config.get('dvr_egg')
+ ppid = os.getpid() if self.test_mode else None
+
+ python = PYTHON_PATH
+
+ if not driver_package:
+ raise DriverLaunchException("missing driver config: driver_package")
+ if not os.path.exists(python):
+ raise DriverLaunchException("could not find python executable: %s" % python)
+
+ cmd_port_fname = self._driver_command_port_file()
+ evt_port_fname = self._driver_event_port_file()
+
+ cmd_str = "import sys; sys.path.insert(0, '%s/%s'); from mi.main import run; sys.exit(run(command_port_file='%s', event_port_file='%s', ppid=%s))" % \
+ (CACHE_DIR, driver_package, cmd_port_fname, evt_port_fname, str(ppid))
+
+ return [ python, '-c', cmd_str ]
- packet_config = None
- try:
- import_str = 'from %s import PACKET_CONFIG' % driver_module
- exec import_str
- log.debug("PACKET_CONFIG: %s", PACKET_CONFIG)
- packet_config = PACKET_CONFIG
- except:
- log.error('PACKET_CONFIG undefined in driver module %s ' % driver_module)
-
- if packet_config:
- for name in packet_config:
- if not name in stream_info:
- log.error("Name '%s' not found in stream_info" % name)
- continue
-
- stream_config = stream_info[name]
- try:
- packet_builder = create_packet_builder(name, stream_config)
- self._packet_factories[name] = packet_builder
- log.info('created packet builder for stream %s' % name)
- except Exception, e:
- log.error('error creating packet builder: %s' % e)
-
- return self._packet_factories
-
-# TODO remove
-# def get_packet_factories(self):
-# """
-# Construct packet factories from packet_config member of the driver_config.
-# @retval a list of packet factories defined.
-# """
-# if not self._packet_factories:
-# log.info("generating packet factories")
-# self._packet_factories = {}
-#
-# driver_module = self.config.get('dvr_mod')
-# if not driver_module:
-# raise DriverLaunchException("missing driver config: driver_module")
-#
-# # Should we poll the driver process to give us these configurations? If defined in an interface then
-# # this method could be generalized for all driver processes. It also seems like execing an import
-# # might be unsafe.
-# import_str = 'from %s import PACKET_CONFIG' % driver_module
-# try:
-# exec import_str
-# log.debug("PACKET_CONFIG: %s", PACKET_CONFIG)
-# for (name, val) in PACKET_CONFIG.iteritems():
-# if val:
-# try:
-# mod = val[0]
-# cls = val[1]
-# import_str = 'from %s import %s' % (mod, cls)
-# ctor_str = 'ctor = %s' % cls
-# exec import_str
-# exec ctor_str
-# self._packet_factories[name] = ctor
-#
-# except Exception, e:
-# log.error('error creating packet factory: %s', e)
-#
-# else:
-# log.info('created packet factory for stream %s', name)
-# except Exception, e:
-# log.error('Instrument agent %s had error creating packet factories. %s', e)
-#
-# return self._packet_factories
-
-class ZMQEggDriverLauncher(DriverProcess):
- """
- Object to facilitate driver processes launch from an egg as an 'eggsecutable'
- """
- def __init__(self):
- pass
View
32 ion/agents/instrument/instrument_agent.py
@@ -155,10 +155,6 @@ def __init__(self, *args, **kwargs):
# stream_config agent config member during process on_init.
self._data_publishers = {}
- # Factories for stream packets. Constructed by driver
- # configuration information on transition to inactive.
- self._packet_factories = {}
-
def on_init(self):
"""
Instrument agent pyon process initialization.
@@ -384,8 +380,6 @@ def _handler_uninitialized_initialize(self, *args, **kwargs):
# Start the driver and switch to inactive.
self._start_driver(self._dvr_config)
- self._construct_packet_factories()
-
next_state = ResourceAgentState.INACTIVE
return (next_state, result)
@@ -401,7 +395,6 @@ def _handler_inactive_reset(self, *args, **kwargs):
next_state = None
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -455,7 +448,6 @@ def _handler_idle_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -495,7 +487,6 @@ def _handler_stopped_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -543,7 +534,6 @@ def _handler_command_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -641,7 +631,6 @@ def _handler_streaming_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
- self._clear_packet_factories()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
@@ -1136,26 +1125,7 @@ def _construct_data_publishers(self):
publisher for stream %s.', self._proc_name,
stream_name)
- def _construct_packet_factories(self):
- """
- Construct packet factories from packet_config member of the
- driver_config and self.CFG.stream_config.
- @retval None
- """
- self._packet_factories = self._dvr_proc.get_packet_factories(
- self.CFG.stream_config)
- log.info('Insturment agent %s constructed its packet factories.',
- self._proc_name)
-
- def _clear_packet_factories(self):
- """
- Delete packet factories.
- @retval None
- """
- self._packet_factories.clear()
- log.info('Instrument agent %s deleted packet factories.',
- self._proc_name)
-
+
###############################################################################
# Event callback and handling for direct access.
###############################################################################
View
127 ion/agents/instrument/test/test_driver_process.py
@@ -10,22 +10,23 @@
__author__ = 'Bill French'
__license__ = 'Apache 2.0'
+import os
import time
import unittest
-
+from pyon.public import CFG
from nose.plugins.attrib import attr
from pyon.util.unit_test import PyonTestCase
from pyon.public import log
-from ion.agents.instrument.driver_process import DriverProcess, DriverProcessType
+from ion.agents.instrument.driver_process import DriverProcess, DriverProcessType, ZMQEggDriverProcess
+from ion.agents.instrument.exceptions import DriverLaunchException
# Make tests verbose and provide stdout
# bin/nosetests -s -v ion/agents.instrument/test/test_driver_launcher.py
-@unittest.skip("more negitive testing needed")
@attr('UNIT', group='mi')
-class TestPythonClassDriverProcess(PyonTestCase):
+class TestInstrumentDriverProcess(PyonTestCase):
"""
Unit tests for Driver Process using python classes
"""
@@ -33,28 +34,43 @@ def setUp(self):
"""
Setup test cases.
"""
- self._driver_config = { 'host': 'localhost',
+ self._class_driver_config = {
+ 'host': 'localhost',
'cmd_port': 5556,
'evt_port': 5557,
'dvr_mod': 'mi.instrument.seabird.sbe37smb.example.driver',
'dvr_cls': 'InstrumentDriver',
- 'process_type': DriverProcessType.PYTHON_MODULE
+ 'process_type': [DriverProcessType.PYTHON_MODULE]
}
+ self._egg_driver_config = {
+ 'host': 'localhost',
+ 'cmd_port': 5556,
+ 'evt_port': 5557,
+
+ 'dvr_egg': 'seabird_sbe37smb_ooicore-0.0.1-py2.7.egg',
+
+ 'process_type': [DriverProcessType.EGG]
+ }
+
+ self._events = []
+
# Add cleanup handler functions.
# self.addCleanup()
- def event_received(self):
+ def event_received(self, event):
log.debug("Event received")
+ self._events.append(event)
- def test_driver_process(self):
+ def assert_driver_process_launch_success(self, driver_config):
"""
- Test driver process launch
+ Verify that we can launch a driver using a driver config.
+ @param driver_config: driver configuration dictionary
"""
- driver_process = DriverProcess.get_process(self._driver_config, True)
+ driver_process = DriverProcess.get_process(driver_config, True)
self.assertTrue(driver_process)
driver_process.launch()
@@ -67,10 +83,7 @@ def test_driver_process(self):
driver_client = driver_process.get_client()
self.assertTrue(driver_client)
- driver_client.start_messaging(self.event_received())
-
- packet_factories = driver_process.get_packet_factories()
- self.assertTrue(packet_factories)
+ driver_client.start_messaging(self.event_received)
self.assertGreater(driver_process.memory_usage(), 0)
log.info("Driver memory usage before stop: %d", driver_process.memory_usage())
@@ -78,19 +91,87 @@ def test_driver_process(self):
driver_process.stop()
self.assertFalse(driver_process.getpid())
+ def test_driver_process_by_class(self):
+ """
+ Test the driver launching process for a class and module
+ """
+ self.assert_driver_process_launch_success(self._class_driver_config)
+ def test_driver_process_by_egg(self):
+ """
+ Test the driver launching process for a class and module
+ """
+ try:
+ os.unlink("/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+ except:
+ """
+ # ignore this exception.
+ """
+ # remove egg from cache and run. Verifies download
+ self.assert_driver_process_launch_success(self._egg_driver_config)
+ # Verify egg is in cache then run again
+ self.assert_driver_process_launch_success(self._egg_driver_config)
-@attr('UNIT', group='mi')
-class TestEggDriverProcess(PyonTestCase):
- """
- Unit tests for Driver Process using eggs
- """
-
- def setUp(self):
+ def test_01_check_cache_for_egg(self):
"""
- Setup test cases.
+ Test _check_cache_for_egg checks the cache for the egg,
+ returns path if present locally, or None if not.
+ """
+ # Cleanup on isle one!
+ try:
+ os.unlink("/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+ except:
+ """
+ # ignore this exception.
+ """
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ self.assertEqual(launcher._check_cache_for_egg("NOT_FOUND_EGG"), None)
+ self.assertEqual(launcher._check_cache_for_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), None)
+
+
+ def test_02_get_remote_egg(self):
"""
- pass
+ Test _get_remote_egg should return path for cached egg if present,
+ path for cached egg if not present locally, but in repo, or exception if not present locally or in repo.
+ """
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ got_exception = False
+ try:
+ self.assertEqual(launcher._get_remote_egg("NOT_FOUND_EGG"), None)
+ except DriverLaunchException:
+ got_exception = True
+ self.assertTrue(got_exception)
+
+ self.assertEqual(launcher._get_remote_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), "/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+
+
+ def test_03_check_cache_for_egg(self):
+ """
+ Test _check_cache_for_egg checks the cache for the egg,
+ returns path if present locally, or None if not.
+ """
+ # Cleanup on isle one!
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ self.assertEqual(launcher._check_cache_for_egg("NOT_FOUND_EGG"), None)
+ self.assertEqual(launcher._check_cache_for_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), "/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+
+ def test_04_get_egg(self):
+ """
+ Test _get_egg should return a path to a local egg for existing
+ eggs, and exception for non-existing in the repo.
+ """
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ got_exception = False
+ try:
+ self.assertEqual(launcher._get_egg("NOT_FOUND_EGG"), None)
+ except DriverLaunchException:
+ got_exception = True
+ self.assertTrue(got_exception)
+ self.assertEqual(launcher._get_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), "/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
Something went wrong with that request. Please try again.