Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add many test, especially to workload and persistence

  • Loading branch information...
commit 6b7484c9483e7bf07775df0ec2d258229594ad6a 1 parent 7e34596
@oldpatricka oldpatricka authored
View
2  src/python/epumgmt/defaults/runlogs.py
@@ -123,7 +123,7 @@ def fetch_logs(self, scpcmd):
def _run_one_cmd(self, cmd):
self.c.log.debug("command = '%s'" % cmd)
- timeout = 120.0 # seconds
+ timeout = 3120.0 # seconds
(killed, retcode, stdout, stderr) = child.child(cmd, timeout=timeout)
if killed:
View
6 src/python/epumgmt/main/em_core_workloadtest.py
@@ -10,6 +10,7 @@
from epumgmt.defaults.log_events import AmqpEvents, TorqueEvents
from epumgmt.main.em_core_load import get_cloudinit_for_destruction
+from epumgmt.api.exceptions import InvalidConfig, ProgrammingError
import epumgmt.main.em_core
import epumgmt.main.em_core_load
import epumgmt.defaults.child
@@ -159,9 +160,8 @@ def submit(self, job):
try:
urllib2.urlopen(submitStr, timeout=180)
retry = 0
- except Exception as e:
- self.c.log.error('Failed to submit task at %s' % job.startsec)
- self.c.log.error('Exception: %s' % e)
+ except:
+ self.c.log.exception('Failed to submit task at %s' % job.startsec)
retry -= 1
View
19 src/python/tests/mocks/amqp.py
@@ -0,0 +1,19 @@
+import BaseHTTPServer
+
+class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
+
+ def do_GET(self):
+ self.send_response(200)
+
+ def log_message(self, format, *args):
+ pass
+
+class FakeAMQPServer:
+ """The FakeAMQPServer listens to requests, and OKs any get
+ """
+
+ def __init__(self):
+
+ self.port = 8000
+ self._server = BaseHTTPServer.HTTPServer(("", self.port), Handler)
+ self._server.handle_request()
View
20 src/python/tests/mocks/common.py
@@ -7,22 +7,22 @@ class FakeLog():
def __init__(self):
self.transcript = []
- def info(self, msg):
- self.transcript.append(("INFO", msg))
+ def info(self, msg, substitution=()):
+ self.transcript.append(("INFO", msg % substitution))
- def debug(self, msg):
- self.transcript.append(("DEBUG", msg))
+ def debug(self, msg, substitution=()):
+ self.transcript.append(("DEBUG", msg % substitution))
- def warn(self, msg):
- self.transcript.append(("WARNING", msg))
+ def warn(self, msg, substitution=()):
+ self.transcript.append(("WARNING", msg % substitution))
- def error(self, msg):
- self.transcript.append(("ERROR", msg))
+ def error(self, msg, substitution=()):
+ self.transcript.append(("ERROR", msg % substitution))
- def exception(self, msg):
+ def exception(self, msg, substitution=()):
exc = sys.exc_info()
msg += "".join(traceback.format_exception(exc[0], exc[1], exc[2]))
- self.transcript.append(("ERROR", msg))
+ self.transcript.append(("ERROR", msg % substitution))
class FakeCommon():
"""FakeCommon fakes the common object so we can check what
View
5 src/python/tests/mocks/modules.py
@@ -28,3 +28,8 @@ def fake_scp_command_str(target, c, vm, cloudinitd):
return fake_scp_command_str
+def make_fake_execute_cmd(target, real_execute_cmd):
+ def fake_execute_cmd(target, cmd):
+ return real_execute_cmd("echo %s" % cmd)
+
+ return fake_execute_cmd
View
57 src/python/tests/test_epumgmt_defaults_log_events.py
@@ -133,3 +133,60 @@ def test_get_event_datetimes_dict_badfile(self):
assert len(failed_to_open) == 1
os.chmod(self.producer_ioncontainer_log, old_mode)
+
+
+class TestControllerEvents:
+
+ def setup(self):
+ self.vardir = tempfile.mkdtemp()
+ self.runlogdir = "runlogs"
+ self.vmlogdir = "vmlogs"
+
+ controller_dir = os.path.join(self.vardir, self.runlogdir, "epucontrollerkill_logs")
+ os.makedirs(controller_dir)
+ self.controller_ioncontainer_log = os.path.join(controller_dir, "ioncontainer.log")
+ with open(self.controller_ioncontainer_log, "w") as container_file:
+ container_file.write("contents!")
+
+ self.config = ConfigParser.RawConfigParser()
+ self.config.add_section("events")
+ self.config.set("events", "runlogdir", self.runlogdir)
+ self.config.add_section("ecdirs")
+ self.config.set("ecdirs", "var", self.vardir)
+
+
+ self.p = DefaultParameters(self.config, None)
+ self.c = FakeCommon(self.p)
+ self.controller_events = epumgmt.defaults.log_events.ControllerEvents(self.p, self.c, None, "")
+
+
+ def teardown(self):
+
+ shutil.rmtree(self.vardir)
+
+ def test_set_controllerlog_filenames(self):
+
+ self.controller_events._set_controllerlog_filenames()
+
+ assert self.controller_ioncontainer_log in self.controller_events.controllerlog_filenames
+
+ def test_update_log_filenames(self):
+
+ self.controller_events._update_log_filenames()
+ assert self.controller_ioncontainer_log in self.controller_events.controllerlog_filenames
+
+ def test_create_datetime(self):
+
+ month = 8
+ day = 9
+ year = 1985
+ hour = 1
+ minute = 42
+ second = 43
+ microsecond = 44
+ date_string = "%s - %s - %s" % (year, month, day)
+ time_string = "%s:%s:%s.%s" % (hour, minute, second, microsecond)
+
+ datetime = self.controller_events._create_datetime(date_string, time_string)
+
+ assert datetime.month == month
View
2  src/python/tests/test_epumgmt_defaults_svc_adapter.py
@@ -299,7 +299,7 @@ def test_run_one_command_timeout(self):
to skip it when running these tests frequently.
"""
- command_that_will_timeout = "sleep 60"
+ command_that_will_timeout = "sleep 10000"
svc_adapter = DefaultRemoteSvcAdapter(self.p, self.c)
succeeded = svc_adapter._run_one_cmd(command_that_will_timeout)
assert not succeeded
View
322 src/python/tests/test_epumgmt_main_em_core_persistence.py
@@ -0,0 +1,322 @@
+import os
+import shutil
+import tempfile
+import ConfigParser
+
+from epumgmt.main.em_core_persistence import Persistence
+from epumgmt.defaults import DefaultParameters, DefaultCommon
+from epumgmt.api.exceptions import InvalidConfig, ProgrammingError
+from epumgmt.api import RunVM
+
+from mocks.common import FakeCommon
+from cloudyvents.cyvents import CYvent
+
+class TestPersistence:
+
+ def setup(self):
+ self.vardir = tempfile.mkdtemp()
+ persistencedir = "persistence"
+ persistencefile = "epumgmt.db"
+ os.mkdir(os.path.join(self.vardir, persistencedir))
+ config = ConfigParser.RawConfigParser()
+ config.add_section("persistence")
+ config.set("persistence", "persistencedb", persistencefile)
+ config.set("persistence", "persistencedir", persistencedir)
+ config.add_section("ecdirs")
+ config.set("ecdirs", "var", self.vardir)
+
+ params = DefaultParameters(config, None)
+ common = FakeCommon(params)
+
+ self.persistence = Persistence(params, common)
+
+ def teardown(self):
+
+ shutil.rmtree(self.vardir)
+
+
+ def test_find_db_conf(self):
+
+ # Test persistencedb not configured
+ params = DefaultParameters(None, None)
+ persistence = Persistence(params, None)
+ try:
+ persistence._find_db_conf()
+ raised_invalid_config = False
+ except InvalidConfig:
+ raised_invalid_config = True
+ assert raised_invalid_config
+
+
+ # Test URL is passed through
+ sqlite_url = "sqlite:////path/to/epumgmt.db"
+
+ config = ConfigParser.RawConfigParser()
+ config.add_section("persistence")
+ config.set("persistence", "persistencedb", sqlite_url)
+
+ params = DefaultParameters(config, None)
+ common = FakeCommon()
+ persistence = Persistence(params, common)
+
+ dbconf = persistence._find_db_conf()
+ assert dbconf == sqlite_url
+
+
+ # Test relative path, but no persistencedir setting
+ relative_path = "fake/path/to/epumgmt.db"
+
+ config = ConfigParser.RawConfigParser()
+ config.add_section("persistence")
+ config.set("persistence", "persistencedb", relative_path)
+
+ params = DefaultParameters(config, None)
+ common = FakeCommon()
+ persistence = Persistence(params, common)
+
+ try:
+ dbconf = persistence._find_db_conf()
+ raised_invalid_config = False
+ except InvalidConfig:
+ raised_invalid_config = True
+ assert raised_invalid_config
+
+
+ # Test persistence dir isn't a dir
+ _, persistencedir = tempfile.mkstemp() # note path isn't a dir
+ config = ConfigParser.RawConfigParser()
+ config.add_section("persistence")
+ config.set("persistence", "persistencedb", relative_path)
+ config.set("persistence", "persistencedir", persistencedir)
+
+ params = DefaultParameters(config, None)
+ common = FakeCommon()
+ persistence = Persistence(params, common)
+
+ try:
+ dbconf = persistence._find_db_conf()
+ raised_invalid_config = False
+ except InvalidConfig:
+ raised_invalid_config = True
+ os.remove(persistencedir)
+ assert raised_invalid_config
+
+
+ # Test existant persistence db
+ vardir = tempfile.mkdtemp()
+ persistencedir = "persistence"
+ persistencefile = "epumgmt.db"
+ os.mkdir(os.path.join(vardir, persistencedir))
+ config = ConfigParser.RawConfigParser()
+ config.add_section("persistence")
+ config.set("persistence", "persistencedb", persistencefile)
+ config.set("persistence", "persistencedir", persistencedir)
+ config.add_section("ecdirs")
+ config.set("ecdirs", "var", vardir)
+
+ params = DefaultParameters(config, None)
+ common = FakeCommon(params)
+ persistence = Persistence(params, common)
+
+
+ dbconf = persistence._find_db_conf()
+
+ shutil.rmtree(vardir)
+ assert dbconf.startswith("sqlite:/")
+ assert dbconf.endswith(os.path.join(persistencedir, persistencefile))
+
+ def test_new_vm(self):
+
+ self.persistence.validate()
+
+ run_name = "testrun"
+
+ vm = RunVM()
+ vm.instanceid = "i-4h23ui4"
+ vm.nodeid = "hjk-hjk-hjk-hjk-hjk"
+ vm.hostname = "fake.example.com"
+ vm.service_type = "testservice"
+ vm.parent = "myparent"
+ vm.runlogdir = "/where/my/logs/are"
+ vm.vmlogdir = "/where/my/other/logs/are"
+ event_name = "testevent"
+ vm.events = [CYvent(None, event_name, None, None, None)]
+
+
+ newvm = self.persistence.new_vm(run_name, vm)
+ assert newvm
+ cdb_iaas = self.persistence.cdb.get_iaas_by_runname(run_name)
+ assert len(cdb_iaas) == 1
+ saved_iaas = cdb_iaas[0]
+ assert saved_iaas.iaasid == vm.instanceid
+ assert saved_iaas.nodeid == vm.nodeid
+ assert saved_iaas.hostname == vm.hostname
+ assert saved_iaas.service_type == vm.service_type
+ assert saved_iaas.parent == vm.parent
+ assert saved_iaas.runlogdir == vm.runlogdir
+ assert saved_iaas.vmlogdir == vm.vmlogdir
+ assert saved_iaas.events[0].name == event_name
+
+
+ def test_store_run_vms(self):
+
+ # Test when persistence not yet initialized
+ try:
+ self.persistence.store_run_vms(None, None)
+ raised_programming_error = False
+ except ProgrammingError:
+ raised_programming_error = True
+ assert raised_programming_error
+
+
+ # Regular test
+ self.persistence.validate()
+
+ run_name = "testrun"
+
+ vm = RunVM()
+ vm.instanceid = "i-4h23ui4"
+ vm.nodeid = "hjk-hjk-hjk-hjk-hjk"
+ vm.hostname = "fake.example.com"
+ vm.service_type = "testservice"
+ vm.parent = "myparent"
+ vm.runlogdir = "/where/my/logs/are"
+ vm.vmlogdir = "/where/my/other/logs/are"
+ event_name = "testevent"
+ vm.events = [CYvent(None, event_name, None, None, None)]
+ run_vms = [vm]
+
+ self.persistence.store_run_vms(run_name, run_vms)
+
+ cdb_iaas = self.persistence.cdb.get_iaas_by_runname(run_name)
+ assert len(cdb_iaas) == 1
+ saved_iaas = cdb_iaas[0]
+ assert saved_iaas.iaasid == vm.instanceid
+ assert saved_iaas.nodeid == vm.nodeid
+ assert saved_iaas.hostname == vm.hostname
+ assert saved_iaas.service_type == vm.service_type
+ assert saved_iaas.parent == vm.parent
+ assert saved_iaas.runlogdir == vm.runlogdir
+ assert saved_iaas.vmlogdir == vm.vmlogdir
+ assert saved_iaas.events[0].name == event_name
+
+
+ def test_find_instanceid_byservice(self):
+
+ # Test when persistence not yet initialized
+ try:
+ self.persistence.find_instanceid_byservice(None, None)
+ raised_programming_error = False
+ except ProgrammingError:
+ raised_programming_error = True
+ assert raised_programming_error
+
+ self.persistence.validate()
+
+
+ # Test regular case
+ run_name = "testrun"
+
+ vm0 = RunVM()
+ vm0.instanceid = "i-4h23ui4"
+ vm0.nodeid = "hjk-hjk-hjk-hjk-hjk"
+ vm0.hostname = "fake.example.com"
+ vm0.service_type = "testservice"
+ vm0.parent = "myparent"
+ vm0.runlogdir = "/where/my/logs/are"
+ vm0.vmlogdir = "/where/my/other/logs/are"
+ run_vms = [vm0]
+
+ self.persistence.store_run_vms(run_name, run_vms)
+
+ got_vm = self.persistence.find_instanceid_byservice(run_name, vm0.service_type)
+ assert got_vm == vm0.instanceid
+
+
+ # Test that we get None when we ask for a non-existant service
+ got_vm = self.persistence.find_instanceid_byservice(run_name, "nonexistance-service")
+ assert not got_vm
+
+
+ # Test when we have two services with the same name
+ vm1 = RunVM()
+ vm1.instanceid = "i-x4h23ui4"
+ vm1.nodeid = "xhjk-hjk-hjk-hjk-hjk"
+ vm1.hostname = "fakex.example.com"
+ vm1.service_type = "testservice"
+ vm1.parent = "myparent"
+ vm1.runlogdir = "/where/my/logs/are"
+ vm1.vmlogdir = "/where/my/other/logs/are"
+
+ run_vms.append(vm1)
+ self.persistence.store_run_vms(run_name, run_vms)
+
+ try:
+ got_vm = self.persistence.find_instanceid_byservice(run_name, vm0.service_type)
+ raised_programming_error = False
+ except ProgrammingError:
+ raised_programming_error = True
+ assert raised_programming_error
+
+
+ def test_check_new_instanceid(self):
+
+ self.persistence.validate()
+
+ run_name = "testrun"
+
+ vm = RunVM()
+ vm.instanceid = "i-4h23ui4"
+ vm.nodeid = "hjk-hjk-hjk-hjk-hjk"
+ vm.hostname = "fake.example.com"
+ vm.service_type = "testservice"
+ vm.parent = "myparent"
+ vm.runlogdir = "/where/my/logs/are"
+ vm.vmlogdir = "/where/my/other/logs/are"
+
+ newvm = self.persistence.new_vm(run_name, vm)
+ assert newvm
+
+ assert self.persistence.check_new_instanceid(vm.instanceid)
+ assert not self.persistence.check_new_instanceid("i-haventseenyet")
+
+
+ def test_get_run_vms_or_none(self):
+
+ # Test when persistence not yet validated
+ try:
+ self.persistence.get_run_vms_or_none(None)
+ raised_programming_error = False
+ except ProgrammingError:
+ raised_programming_error = True
+ assert raised_programming_error
+
+ run_name = "testrun"
+
+ self.persistence.validate()
+
+ # Test empty case
+ assert not self.persistence.get_run_vms_or_none(run_name)
+
+ # Test normal case
+ vm = RunVM()
+ vm.instanceid = "i-4h23ui4"
+ vm.nodeid = "hjk-hjk-hjk-hjk-hjk"
+ vm.hostname = "fake.example.com"
+ vm.service_type = "testservice"
+ vm.parent = "myparent"
+ vm.runlogdir = "/where/my/logs/are"
+ vm.vmlogdir = "/where/my/other/logs/are"
+ event_name = "testevent"
+ extra_key = "something"
+ extra_val = "else"
+ vm.events = [CYvent(None, event_name, None, None, {extra_key:extra_val})]
+
+ newvm = self.persistence.new_vm(run_name, vm)
+
+ got_vms = self.persistence.get_run_vms_or_none(run_name)
+
+ assert len(got_vms) == 1
+
+ assert got_vms[0].instanceid == vm.instanceid
+ assert got_vms[0].events[0].extra[extra_key] == extra_val
View
311 src/python/tests/test_epumgmt_main_em_core_workloadtest.py
@@ -0,0 +1,311 @@
+import re
+import os
+import types
+import signal
+import tempfile
+import datetime
+import ConfigParser
+
+from nose.plugins.attrib import attr
+
+import epumgmt.main.em_core_workloadtest
+
+from mocks.amqp import FakeAMQPServer
+from mocks.common import FakeCommon
+from mocks.modules import FakeModules
+from mocks.modules import make_fake_execute_cmd
+from cloudinitd.user_api import CloudInitD
+from epumgmt.defaults import DefaultParameters
+from epumgmt.defaults import DefaultCommon
+from epumgmt.main.em_core_workloadtest import EPUController, Torque, AMQP
+from epumgmt.api.exceptions import InvalidConfig, ProgrammingError
+
+class TestEPUController:
+
+ def setup(self):
+
+ self.run_name = "testrun"
+ runlogdir = tempfile.mkdtemp()
+ os.mkdir(os.path.join(runlogdir, self.run_name))
+
+ config = ConfigParser.RawConfigParser()
+ config.add_section("events")
+ config.set("events", "runlogdir", runlogdir)
+
+ self.m = FakeModules()
+ self.p = DefaultParameters(config, None)
+ self.c = FakeCommon(self.p)
+
+
+ def test_init(self):
+
+ # Test no config provided
+ badp = DefaultParameters(None, None)
+ try:
+ EPUController(badp, None, None, None)
+ raised_invalid_config = False
+ except InvalidConfig:
+ raised_invalid_config = True
+
+ assert raised_invalid_config
+
+ # Test with working config
+ epucontroller = EPUController(self.p, self.c, self.m, self.run_name)
+ assert epucontroller
+ assert epucontroller.controllerlog.endswith("controllerkill.log")
+ assert epucontroller.controllerlog.startswith(epucontroller.controllerlogdir)
+
+
+ def test_log_event(self):
+
+ epucontroller = EPUController(self.p, self.c, self.m, self.run_name)
+
+ log_message = "This is an exciting message. Important, as well."
+
+ epucontroller._log_event(log_message)
+
+ with open(epucontroller.controllerlog) as log:
+ assert log_message in log.read()
+
+ def test_get_log_time(self):
+
+ # Test only that we get a time that is parsable. Assume we're getting correct time
+ epucontroller = EPUController(self.p, self.c, self.m, self.run_name)
+
+ time = epucontroller._get_log_time()
+ parsed = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S.%f")
+ assert parsed
+
+ def test_start(self):
+
+ epucontroller = EPUController(self.p, self.c, self.m, self.run_name)
+ starts = 0
+
+ epucontroller.start()
+ starts += 1
+ with open(epucontroller.controllerlog) as log:
+ log_read = log.read()
+ assert log_read.count("EPU_CONTROLLER_START") == starts
+
+ epucontroller.start(num=2)
+ starts += 2
+ with open(epucontroller.controllerlog) as log:
+ log_read = log.read()
+ assert log_read.count("EPU_CONTROLLER_START") == starts
+
+ def test_terminate(self):
+
+ epucontroller = EPUController(self.p, self.c, self.m, self.run_name)
+ terminates = 0
+
+ epucontroller.terminate()
+ terminates += 1
+ with open(epucontroller.controllerlog) as log:
+ log_read = log.read()
+ assert log_read.count("EPU_CONTROLLER_TERMINATE") == terminates
+
+ epucontroller.terminate(num=2)
+ terminates += 2
+ with open(epucontroller.controllerlog) as log:
+ log_read = log.read()
+ print log_read.count("EPU_CONTROLLER_TERMINATE")
+ assert log_read.count("EPU_CONTROLLER_TERMINATE") == terminates
+
+
+class TestTorque:
+
+ def setup(self):
+
+ self.run_name = "testrun"
+ runlogdir = tempfile.mkdtemp()
+ os.mkdir(os.path.join(runlogdir, self.run_name))
+
+ config = ConfigParser.RawConfigParser()
+ config.add_section("events")
+ config.set("events", "runlogdir", runlogdir)
+
+ self.m = FakeModules()
+ self.p = DefaultParameters(config, None)
+ self.c = FakeCommon(self.p)
+
+ self.test_dir = os.path.dirname(__file__)
+ self.test_db_dir = tempfile.mkdtemp()
+ self.test_cd_config = os.path.join(self.test_dir, "configs/main.conf")
+ self.cloudinitd = CloudInitD(self.test_db_dir, self.test_cd_config, self.run_name)
+
+
+ def test_execute_cmd(self):
+
+ torque = Torque(self.p, self.c, self.m, self.run_name, self.cloudinitd)
+ self.c.trace = True
+
+ command_to_succeed = "true"
+ success = torque._execute_cmd(command_to_succeed)
+ assert success
+
+ non_existant_command = "neo23n3io2j4023um409m23u904m23"
+ success = torque._execute_cmd(non_existant_command)
+ assert not success
+
+ command_to_fail = "cat neo23n3io2j4023um409m23u904m23"
+ success = torque._execute_cmd(command_to_fail)
+ assert not success
+
+ @attr("slow")
+ def test_execute_cmd_timeout(self):
+ """This test is pretty slow (it waits for a timeout). so you
+ may want to skip it.
+ """
+
+ torque = Torque(self.p, self.c, self.m, self.run_name, self.cloudinitd)
+
+ slow_command = "sleep 60"
+ success = torque._execute_cmd(slow_command)
+ assert not success
+
+
+ def test_copy_file(self):
+
+ torque = Torque(self.p, self.c, self.m, self.run_name, self.cloudinitd)
+
+ remote_hostname = "example.com"
+ torque.svc._svc._s.hostname = remote_hostname
+
+ # monkey patch execute command to prepend an echo
+ new_execute_command = make_fake_execute_cmd(torque, torque._execute_cmd)
+ torque._execute_cmd = types.MethodType(new_execute_command, torque)
+
+ test_file = "/path/to/some/file"
+ test_file_base = os.path.basename("/path/to/some/file")
+
+ torque._copy_file(test_file)
+
+ run_commands = [message for (level, message)
+ in self.c.log.transcript
+ if level == "DEBUG"
+ and "command = 'echo " in message]
+
+ print run_commands
+
+ # confirm file is copied
+ assert [scp_command for scp_command
+ in run_commands
+ if re.match(".*scp.*%s.*%s:/tmp" % (test_file, remote_hostname), scp_command)
+ ]
+
+ # confirm file's permissions are fixed
+ assert [ssh_command for ssh_command
+ in run_commands
+ if re.match(".*ssh.*%s.*chmod ugo\+r.*/tmp/.*%s" % (remote_hostname, test_file_base),
+ ssh_command)
+ ]
+
+
+ def test_qsub_job(self):
+
+ torque = Torque(self.p, self.c, self.m, self.run_name, self.cloudinitd)
+ remote_hostname = "example.com"
+ torque.svc._svc._s.hostname = remote_hostname
+ # monkey patch execute command to prepend an echo
+ new_execute_command = make_fake_execute_cmd(torque, torque._execute_cmd)
+ torque._execute_cmd = types.MethodType(new_execute_command, torque)
+
+ job_name = "test.sh"
+
+ torque._qsub_job(job_name)
+ run_commands = [message for (level, message)
+ in self.c.log.transcript
+ if level == "DEBUG"
+ and "command = 'echo " in message]
+
+ print run_commands
+
+ # confirm qsub sent
+ assert [ssh_command for ssh_command
+ in run_commands
+ if re.match(".*ssh.*%s.*qsub /tmp/%s.*" % (remote_hostname, job_name),
+ ssh_command)
+ ]
+
+ def test_submit(self):
+
+ torque = Torque(self.p, self.c, self.m, self.run_name, self.cloudinitd)
+ remote_hostname = "example.com"
+ torque.svc._svc._s.hostname = remote_hostname
+ # monkey patch execute command to prepend an echo
+ new_execute_command = make_fake_execute_cmd(torque, torque._execute_cmd)
+ torque._execute_cmd = types.MethodType(new_execute_command, torque)
+
+ job = epumgmt.main.em_core_workloadtest.WorkItem(0, 1, 1, 1)
+
+ torque.submit(job)
+
+ print self.c.log.transcript
+ run_commands = [message for (level, message)
+ in self.c.log.transcript
+ if level == "DEBUG"
+ and "command = 'echo " in message]
+
+ print run_commands
+
+ # confirm qsub sent
+ assert [ssh_command for ssh_command
+ in run_commands
+ if re.match(".*ssh.*%s.*qsub.*" % (remote_hostname),
+ ssh_command)
+ ]
+
+class TestAMQP:
+
+ def setup(self):
+
+
+ self.host = "localhost"
+ self.port = "8000"
+
+ self.run_name = "testrun"
+ runlogdir = tempfile.mkdtemp()
+ os.mkdir(os.path.join(runlogdir, self.run_name))
+
+ config = ConfigParser.RawConfigParser()
+ config.add_section("events")
+ config.set("events", "runlogdir", runlogdir)
+
+ self.m = FakeModules()
+ self.p = DefaultParameters(config, None)
+ self.c = FakeCommon(self.p)
+
+ self.amqp = AMQP(self.p, self.c, self.m, self.run_name, self.host, self.port)
+
+ self.child_pid = os.fork()
+ if self.child_pid == 0:
+ # This is the child
+ server = FakeAMQPServer()
+ sys.exit(0)
+
+ def teardown(self):
+ # Kill child process just in case...
+ os.kill(self.child_pid, signal.SIGKILL)
+
+ def test_submit_ok(self):
+
+ job = epumgmt.main.em_core_workloadtest.WorkItem(0, 1, 1, 1)
+ self.amqp.submit(job)
+ print self.c.log.transcript
+ message = self.c.log.transcript[0][1]
+ url = (self.host, self.port, self.run_name, job.batchid, job.count, job.sleepsec)
+ assert message == "submit: http://%s:%s/%s-jobs/%s/%s/%s" % url
+
+ def test_submit_not_ok(self):
+ """Ensure that we get an error when server isn't running
+ """
+ os.kill(self.child_pid, signal.SIGKILL)
+
+ job = epumgmt.main.em_core_workloadtest.WorkItem(0, 1, 1, 1)
+ self.amqp.submit(job)
+ print self.c.log.transcript
+ message_type = self.c.log.transcript[-1][0]
+ assert message_type == "ERROR"
+
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.