Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of github.com:wfrench/coi-services

Conflicts:
	ion/agents/instrument/driver_process.py
  • Loading branch information...
commit 9ce60e07fe95b5dbc5a2487e84d9886334f36d53 2 parents af15cdd + 8742114
@wfrench wfrench authored
View
197 ion/agents/instrument/driver_process.py
@@ -18,6 +18,7 @@
import signal
import subprocess
+from urllib2 import Request, urlopen, URLError, HTTPError
from pyon.util.log import log
from ion.agents.instrument.common import BaseEnum
@@ -26,13 +27,19 @@
from ion.agents.instrument.packet_factory_man import create_packet_builder
PYTHON_PATH = 'bin/python'
+CACHE_DIR = '/tmp'
+REPO_BASE = 'http://sddevrepo.oceanobservatories.org/releases/'
class DriverProcessType(BaseEnum):
"""
Base states for driver launcher types.
"""
PYTHON_MODULE = 'ZMQPyClassDriverLauncher'
+<<<<<<< HEAD
EGG = 'ZMQEggDriverLauncherG'
+=======
+ EGG = 'ZMQEggDriverLauncher'
+>>>>>>> 8742114278a502a40ebd28480e69b0bdae763a46
class DriverProcess(object):
@@ -60,25 +67,29 @@ def get_process(cls, driver_config, test_mode = False):
@param test_mode tell the driver you are running in test mode. Some drivers use this to add a poison pill to
the driver process.
"""
- type = driver_config.get("process_type")
+ type = driver_config.get("process_type")[0]
driver_module = driver_config.get('dvr_mod')
+ log.error("ROGER TYPE " + repr(type))
+
+
+
if not type:
raise DriverLaunchException("missing driver config: process_type")
# For some reason the enum wasn't working with the instrument agent. I would see [emum] when called from
# the IA, but (enum,) when called from this module.
- #elif type == DriverProcessType.PYTHON_MODULE:
- elif driver_module:
+ #
+ #
+ elif type == DriverProcessType.PYTHON_MODULE:
return ZMQPyClassDriverProcess(driver_config, test_mode)
elif type == DriverProcessType.EGG:
- raise NotImplementedException()
return ZMQEggDriverProcess(driver_config, test_mode)
-
else:
raise DriverLaunchException("unknown driver process type: %s" % type)
+
def launch(self):
"""
Launch the driver process. Once the process is launched read the two status files that contain the event port
@@ -121,9 +132,6 @@ def poll(self):
return True
-
-
-
def stop(self):
"""
Stop the driver process. We try to stop gracefully using the driver client if we can, otherwise a simple kill
@@ -259,6 +267,23 @@ def _driver_event_port_file(self):
return self._driver_event_file
+ def get_client(self):
+ """
+ Get a python client for the driver process.
+ @return an client object for the driver process
+ """
+ # Start client messaging and verify messaging.
+ if not self._driver_client:
+ try:
+ from mi.core.instrument.zmq_driver_client import ZmqDriverClient
+ driver_client = ZmqDriverClient('localhost', self._command_port, self._event_port)
+ self._driver_client = driver_client
+ except Exception, e:
+ self.stop()
+ log.error('Error starting driver client: %s' % e)
+ raise DriverLaunchException('Error starting driver client.')
+
+ return self._driver_client
class ZMQPyClassDriverProcess(DriverProcess):
"""
@@ -310,24 +335,14 @@ def _process_command(self):
return [ python, '-c', cmd_str ]
- def get_client(self):
- """
- Get a python client for the driver process.
- @return an client object for the driver process
+ def get_packet_factories(self, stream_info):
"""
- # Start client messaging and verify messaging.
- if not self._driver_client:
- try:
- from mi.core.instrument.zmq_driver_client import ZmqDriverClient
- driver_client = ZmqDriverClient('localhost', self._command_port, self._event_port)
- self._driver_client = driver_client
- except Exception, e:
- self.stop()
- log.error('Error starting driver client: %s' % e)
- raise DriverLaunchException('Error starting driver client.')
+ Construct packet factories from PACKET_CONFIG member of the driver_config
+ and the given stream_info dict.
- return self._driver_client
+ @param stream_info
+<<<<<<< HEAD
def get_packet_factories(self, stream_info):
"""
Construct packet factories from PACKET_CONFIG member of the driver_config
@@ -335,6 +350,8 @@ def get_packet_factories(self, stream_info):
@param stream_info
+=======
+>>>>>>> 8742114278a502a40ebd28480e69b0bdae763a46
@retval a dict indexed by stream name of the packet factories defined.
"""
@@ -414,9 +431,139 @@ def get_packet_factories(self, stream_info):
#
# return self._packet_factories
+<<<<<<< HEAD
class ZMQEggDriverLauncher(DriverProcess):
+=======
+
+class ZMQEggDriverProcess(DriverProcess):
+>>>>>>> 8742114278a502a40ebd28480e69b0bdae763a46
"""
Object to facilitate driver processes launch from an egg as an 'eggsecutable'
"""
- def __init__(self):
- pass
+ def __init__(self, driver_config, test_mode = False):
+ self.config = driver_config
+ self.test_mode = test_mode
+
+
+
+ def _check_cache_for_egg(self, egg_name):
+ """
+ Check if the egg is already cached, if so, return the path.
+ @return: egg path if cached, else None
+ """
+ path = CACHE_DIR + '/' + egg_name
+ if os.path.exists(path):
+ log.debug("_check_cache_for_egg cache hit PATH = " + str(path))
+ return path
+ else:
+ log.debug("_check_cache_for_egg cache miss")
+ return None
+
+ def _get_remote_egg(self, egg_name):
+ """
+ pull the egg from a remote server if present to the local cache dir.
+ @return: returns the path, throws exception if not found.
+ """
+ try:
+ response = urlopen(REPO_BASE + '/' + egg_name)
+ egg_yolk = response.read()
+ log.debug("_fetch_egg GOT YOLK")
+ except HTTPError, e:
+ raise DriverLaunchException(e.code)
+ except URLError, e:
+ raise DriverLaunchException(e.reason)
+
+ path = CACHE_DIR + '/' + egg_name
+
+ try:
+ egg_file = open(CACHE_DIR + '/' + egg_name, "wb")
+ egg_file.write(egg_yolk)
+ except IOError:
+ raise DriverLaunchException("IOError writing egg file to cache")
+ else:
+ egg_file.close()
+
+ log.debug("_fetch_egg GOT EGG, PATH = " + str(path))
+ return path
+
+ def _get_egg(self, egg_name):
+ path = self._check_cache_for_egg(egg_name)
+ if None == path:
+ path = self._get_remote_egg(egg_name) # Will exception out if problem.
+
+ return path
+
+ def _process_command(self):
+ """
+ Build the process command line using the driver_config dict
+ @return a list containing spawn args for the _spawn method
+
+ 1. check cache (CACHE_DIR = '/tmp')
+ 2. download to cache if not present.
+ 3. construct call command
+ """
+
+ path = self._get_egg(self.config.get('dvr_egg'))
+
+ log.debug("cwd: %s" % os.getcwd())
+ driver_package = self.config.get('dvr_egg')
+ ppid = os.getpid() if self.test_mode else None
+
+ python = PYTHON_PATH
+
+ if not driver_package:
+ raise DriverLaunchException("missing driver config: driver_package")
+ if not os.path.exists(python):
+ raise DriverLaunchException("could not find python executable: %s" % python)
+
+ cmd_port_fname = self._driver_command_port_file()
+ evt_port_fname = self._driver_event_port_file()
+
+ cmd_str = "import sys; sys.path.insert(0, '%s/%s'); from mi.main import run; sys.exit(run(command_port_file='%s', event_port_file='%s', ppid=%s))" % \
+ (CACHE_DIR, driver_package, cmd_port_fname, evt_port_fname, str(ppid))
+
+ return [ python, '-c', cmd_str ]
+
+ def get_packet_factories(self, stream_info):
+ """
+ Construct packet factories from PACKET_CONFIG member of the driver_config
+ and the given stream_info dict.
+
+ @param stream_info
+
+ @retval a dict indexed by stream name of the packet factories defined.
+ """
+
+
+ if not self._packet_factories:
+ log.info("generating packet factories")
+ self._packet_factories = {}
+
+ driver_module = self.config.get('dvr_mod')
+ if not driver_module:
+ raise DriverLaunchException("missing driver config: driver_module")
+
+ packet_config = None
+ try:
+ import_str = 'from %s import PACKET_CONFIG' % driver_module
+ exec import_str
+ log.debug("PACKET_CONFIG: %s", PACKET_CONFIG)
+ packet_config = PACKET_CONFIG
+ except:
+ log.error('PACKET_CONFIG undefined in driver module %s ' % driver_module)
+
+ if packet_config:
+ for name in packet_config:
+ if not name in stream_info:
+ log.error("Name '%s' not found in stream_info" % name)
+ continue
+
+ stream_config = stream_info[name]
+ try:
+ packet_builder = create_packet_builder(name, stream_config)
+ self._packet_factories[name] = packet_builder
+ log.info('created packet builder for stream %s' % name)
+ except Exception, e:
+ log.error('error creating packet builder: %s' % e)
+
+ return self._packet_factories
View
135 ion/agents/instrument/test/test_driver_process.py
@@ -10,22 +10,23 @@
__author__ = 'Bill French'
__license__ = 'Apache 2.0'
+import os
import time
import unittest
-
+from pyon.public import CFG
from nose.plugins.attrib import attr
from pyon.util.unit_test import PyonTestCase
from pyon.public import log
-from ion.agents.instrument.driver_process import DriverProcess, DriverProcessType
+from ion.agents.instrument.driver_process import DriverProcess, DriverProcessType, ZMQEggDriverProcess
+from ion.agents.instrument.exceptions import DriverLaunchException
# Make tests verbose and provide stdout
# bin/nosetests -s -v ion/agents.instrument/test/test_driver_launcher.py
-@unittest.skip("more negitive testing needed")
@attr('UNIT', group='mi')
-class TestPythonClassDriverProcess(PyonTestCase):
+class TestInstrumentDriverProcess(PyonTestCase):
"""
Unit tests for Driver Process using python classes
"""
@@ -33,28 +34,43 @@ def setUp(self):
"""
Setup test cases.
"""
- self._driver_config = { 'host': 'localhost',
+ self._class_driver_config = {
+ 'host': 'localhost',
'cmd_port': 5556,
'evt_port': 5557,
'dvr_mod': 'mi.instrument.seabird.sbe37smb.example.driver',
'dvr_cls': 'InstrumentDriver',
- 'process_type': DriverProcessType.PYTHON_MODULE
+ 'process_type': [DriverProcessType.PYTHON_MODULE]
}
+ self._egg_driver_config = {
+ 'host': 'localhost',
+ 'cmd_port': 5556,
+ 'evt_port': 5557,
+
+ 'dvr_egg': 'seabird_sbe37smb_ooicore-0.0.1-py2.7.egg',
+
+ 'process_type': [DriverProcessType.EGG]
+ }
+
+ self._events = []
+
# Add cleanup handler functions.
# self.addCleanup()
- def event_received(self):
+ def event_received(self, event):
log.debug("Event received")
+ self._events.append(event)
- def test_driver_process(self):
+ def assert_driver_process_launch_success(self, driver_config):
"""
- Test driver process launch
+ Verify that we can launch a driver using a driver config.
+ @param driver_config: driver configuration dictionary
"""
- driver_process = DriverProcess.get_process(self._driver_config, True)
+ driver_process = DriverProcess.get_process(driver_config, True)
self.assertTrue(driver_process)
driver_process.launch()
@@ -67,10 +83,16 @@ def test_driver_process(self):
driver_client = driver_process.get_client()
self.assertTrue(driver_client)
- driver_client.start_messaging(self.event_received())
+ driver_client.start_messaging(self.event_received)
+ #
+ # Do we need to verify events here? Is it deterministic behavior?
+ #
+
+ stream_info = CFG.get('stream_config', None)
- packet_factories = driver_process.get_packet_factories()
- self.assertTrue(packet_factories)
+ #@TODO re-enable this when stream_info is defined
+ #packet_factories = driver_process.get_packet_factories(stream_info)
+ #self.assertTrue(packet_factories)
self.assertGreater(driver_process.memory_usage(), 0)
log.info("Driver memory usage before stop: %d", driver_process.memory_usage())
@@ -78,19 +100,88 @@ def test_driver_process(self):
driver_process.stop()
self.assertFalse(driver_process.getpid())
+ def test_driver_process_by_class(self):
+ """
+ Test the driver launching process for a class and module
+ """
+ self.assert_driver_process_launch_success(self._class_driver_config)
+ def test_driver_process_by_egg(self):
+ """
+ Test the driver launching process for a class and module
+ """
+ try:
+ os.unlink("/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+ except:
+ """
+ # ignore this exception.
+ """
+ # remove egg from cache and run. Verifies download
+ self.assert_driver_process_launch_success(self._egg_driver_config)
-@attr('UNIT', group='mi')
-class TestEggDriverProcess(PyonTestCase):
- """
- Unit tests for Driver Process using eggs
- """
-
- def setUp(self):
+ # Verify egg is in cache then run again
+ self.assert_driver_process_launch_success(self._egg_driver_config)
+
+ def test_01_check_cache_for_egg(self):
"""
- Setup test cases.
+ Test _check_cache_for_egg checks the cache for the egg,
+ returns path if present locally, or None if not.
+ """
+ # Cleanup on isle one!
+ try:
+ os.unlink("/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+ except:
+ """
+ # ignore this exception.
+ """
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ self.assertEqual(launcher._check_cache_for_egg("NOT_FOUND_EGG"), None)
+ self.assertEqual(launcher._check_cache_for_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), None)
+
+
+ def test_02_get_remote_egg(self):
"""
- pass
+ Test _get_remote_egg should return path for cached egg if present,
+ path for cached egg if not present locally, but in repo, or exception if not present locally or in repo.
+ """
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ got_exception = False
+ try:
+ self.assertEqual(launcher._get_remote_egg("NOT_FOUND_EGG"), None)
+ except DriverLaunchException:
+ got_exception = True
+ self.assertTrue(got_exception)
+
+ self.assertEqual(launcher._get_remote_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), "/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+
+
+ def test_03_check_cache_for_egg(self):
+ """
+ Test _check_cache_for_egg checks the cache for the egg,
+ returns path if present locally, or None if not.
+ """
+ # Cleanup on isle one!
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ self.assertEqual(launcher._check_cache_for_egg("NOT_FOUND_EGG"), None)
+ self.assertEqual(launcher._check_cache_for_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), "/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
+
+ def test_04_get_egg(self):
+ """
+ Test _get_egg should return a path to a local egg for existing
+ eggs, and exception for non-existing in the repo.
+ """
+
+ launcher = ZMQEggDriverProcess("DUMMY_VAL")
+ got_exception = False
+ try:
+ self.assertEqual(launcher._get_egg("NOT_FOUND_EGG"), None)
+ except DriverLaunchException:
+ got_exception = True
+ self.assertTrue(got_exception)
+ self.assertEqual(launcher._get_egg("seabird_sbe37smb_ooicore-0.0.1-py2.7.egg"), "/tmp/seabird_sbe37smb_ooicore-0.0.1-py2.7.egg")
Please sign in to comment.
Something went wrong with that request. Please try again.