Skip to content
Browse files

Add tests for svc_adapter

  • Loading branch information...
1 parent ea329f0 commit c7a31ffe030f156d4fa0405fb5d41daf0cb6f187 @oldpatricka oldpatricka committed Jul 18, 2011
Showing with 152 additions and 1 deletion.
  1. +4 −0 src/README
  2. +7 −0 src/python/tests/mocks/common.py
  3. +141 −1 src/python/tests/test_epumgmt_defaults_svc_adapter.py
View
4 src/README
@@ -10,6 +10,10 @@ and run something like:
$ nosetests --with-coverage --cover-package=epumgmt.main.em_core_status
+You may also want to avoid slow tests, to do this, use:
+
+$ nosetests -a '!slow'
+
During (test driven) development this can be conveniently combined with:
$ ./sbin/cat-latest-logfile.sh
View
7 src/python/tests/mocks/common.py
@@ -1,3 +1,5 @@
+import sys
+import traceback
class FakeLog():
@@ -16,6 +18,11 @@ def warn(self, msg):
def error(self, msg):
self.transcript.append(("ERROR", msg))
+ def exception(self, msg):
+ exc = sys.exc_info()
+ msg += "".join(traceback.format_exception(exc[0], exc[1], exc[2]))
+ self.transcript.append(("ERROR", msg))
+
class FakeCommon():
"""FakeCommon fakes the common object so we can check what
gets logged by the stuff that calls it
View
142 src/python/tests/test_epumgmt_defaults_svc_adapter.py
@@ -3,8 +3,10 @@
import types
import shutil
import tempfile
+import datetime
import nose.tools
import ConfigParser
+from nose.plugins.attrib import attr
import epumgmt.defaults.svc_adapter
import epumgmt.main.em_args as em_args
@@ -14,8 +16,10 @@
from epumgmt.api import RunVM
from epumgmt.defaults import DefaultParameters
from epumgmt.defaults import DefaultRunlogs
+from epumgmt.defaults import DefaultRemoteSvcAdapter
from mocks.common import FakeCommon
from mocks.modules import FakeModules
+from mocks.event import Event
@nose.tools.raises(ProgrammingError)
@@ -41,7 +45,6 @@ def test_check_init():
class TestDefaultRemoteSvcAdapter:
def setup(self):
- from epumgmt.defaults.svc_adapter import DefaultRemoteSvcAdapter
self.run_name = "TESTRUN"
@@ -218,6 +221,143 @@ def test_intake_query_result_from_file(self):
assert instance.heartbeat_state == heartbeat_state
assert instance.heartbeat_time == heartbeat_time
+ def test_intake_query_result(self):
+
+ provisioner = RunVM()
+ provisioner.runlogdir = tempfile.mkdtemp()
+
+ local_filename = "log.json"
+ remote_filename = "something"
+ local_abs_filepath = os.path.join(provisioner.runlogdir, local_filename)
+
+ controller_name = "test-controller"
+ de_state = "STABLE_DE"
+ de_conf_report = "balala"
+ last_queuelen_size = 5
+ last_queuelen_time = 1234
+ instance_0 = "fashfjsahfjksa"
+ instance_0_state = "600-RUNNING"
+ iaas_state_time = 12312142
+ heartbeat_state = "SOMETHING"
+ heartbeat_time = 5
+
+
+ test_json = """
+ {"%s": {"de_state":"%s", "de_conf_report":"%s", "last_queuelen_size":%s,
+ "last_queuelen_time": %s,
+ "instances": {
+ "%s": {"iaas_state":"%s", "iaas_state_time": %s,
+ "heartbeat_state":"%s", "heartbeat_time": %s}}}}
+ """ % (controller_name, de_state, de_conf_report, last_queuelen_size,
+ last_queuelen_time, instance_0, instance_0_state, iaas_state_time,
+ heartbeat_state, heartbeat_time)
+
+ with open(local_abs_filepath, "w") as j_file:
+ j_file.write(test_json)
+
+ self.svc_adapter.initialize(self.m, self.run_name, self.cloudinitd)
+
+ # Test detection of non-existant files
+ try:
+ fake_file = "not_a_file"
+ self.svc_adapter._intake_query_result(provisioner, fake_file, remote_filename)
+ raised_unexpected_error = False
+ except UnexpectedError:
+ raised_unexpected_error = True
+
+ assert raised_unexpected_error
+
+ # Test reading a file actually works
+ # all of the other values should be tested by test_intake_query_result_from_file
+ map = self.svc_adapter._intake_query_result(provisioner, local_filename, remote_filename)
+ assert map.has_key(controller_name)
+
+
+ def test_run_one_command(self):
+
+ svc_adapter = DefaultRemoteSvcAdapter(self.p, self.c)
+
+ command_to_succeed = "true"
+ succeeded = svc_adapter._run_one_cmd(command_to_succeed)
+ assert succeeded
+
+
+ non_existant_command = "a4f6d2f32r22c3c34c423c2f2g34"
+ succeeded = svc_adapter._run_one_cmd(non_existant_command)
+ assert not succeeded
+
+
+ command_to_fail = "cat fsdbjkfsdy89fsdy89fsfsdfsdfsd"
+ succeeded = svc_adapter._run_one_cmd(command_to_fail)
+ assert not succeeded
+
+
+ @attr("slow")
+ def test_run_one_command_timeout(self):
+ """
+ This tests _run_one_command's timeout feature, so you may want
+ to skip it when running these tests frequently.
+ """
+
+ command_that_will_timeout = "sleep 60"
+ svc_adapter = DefaultRemoteSvcAdapter(self.p, self.c)
+ succeeded = svc_adapter._run_one_cmd(command_that_will_timeout)
+ assert not succeeded
+
+
+ def test_get_pathconfs(self):
+
+ source = "something"
+ username = "fordprefect"
+
+ self.svc_adapter.initialize(self.m, self.run_name, self.cloudinitd)
+
+ abs_homedir, abs_envfile = self.svc_adapter._get_pathconfs(source, username)
+ print abs_homedir, abs_envfile
+ assert abs_homedir == "/home/%s/app" % username
+ assert abs_envfile == "/home/%s/app-venv/bin/activate" % username
+
+ def test_get_provisioner(self):
+
+ self.svc_adapter.initialize(self.m, self.run_name, self.cloudinitd)
+ provisioner = self.svc_adapter._get_provisioner()
+
+ assert provisioner.name == "provisioner"
+
+
+ def test_latest_iaas_status(self):
+
+ self.svc_adapter.initialize(self.m, self.run_name, self.cloudinitd)
+
+ vm = RunVM()
+ vm.instanceid = "i-dfs3f32"
+
+ try:
+ status = self.svc_adapter.latest_iaas_status(None)
+ raised_programming_error = False
+ except ProgrammingError:
+ raised_programming_error = True
+ assert raised_programming_error
+
+
+ # Test response when there's no status in VM object
+ status = self.svc_adapter.latest_iaas_status(vm)
+ assert not status
+
+
+ vm.events = []
+ vm_iaas_state_0 = "RUNNING"
+ vm_timestamp_0 = datetime.datetime(2000, 10, 3)
+ vm_iaas_state_1 = "SOMETHINGELSE"
+ vm_timestamp_1 = datetime.datetime(2000, 10, 5)
+ vm.events.append(Event(name="iaas_state", state=vm_iaas_state_0, timestamp=vm_timestamp_0))
+ vm.events.append(Event(name="iaas_state", state=vm_iaas_state_1, timestamp=vm_timestamp_1))
+
+ status = self.svc_adapter.latest_iaas_status(vm)
+
+ assert status == vm_iaas_state_1
+
+
def make_fake_run_one_cmd(target, real_run_one_cmd):
def fake_run_one_cmd(target, cmd):
cmd = "echo %s" % cmd

0 comments on commit c7a31ff

Please sign in to comment.
Something went wrong with that request. Please try again.