Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add more tests

  • Loading branch information...
commit a4d3370718b8df1ea69dfcad4d03617de8578dea 1 parent 409ec36
@oldpatricka oldpatricka authored
View
4 src/python/epumgmt/defaults/cloudinitd_load.py
@@ -9,9 +9,9 @@ def get_cloudinitd_service(cloudinitd, name):
"""Return the cloudinit.d service by exact name match or raise IncompatibleEnvironment"""
if not cloudinitd:
- raise Exception("requires cloudinitd reference")
+ raise ProgrammingError("requires cloudinitd reference")
if not name:
- raise Exception("requires service name")
+ raise ProgrammingError("requires service name")
noservicemsg = "Cannot find the '%s' service in cloudinit.d run '%s'" % (name, cloudinitd.run_name)
try:
aservice = cloudinitd.get_service(name)
View
3  src/python/tests/mocks/common.py
@@ -22,7 +22,8 @@ class FakeCommon():
"""
- def __init__(self):
+ def __init__(self, p=None):
self.log = FakeLog()
self.trace = False
+ self.p = p
View
110 src/python/tests/test_epumgmt_api.py
@@ -0,0 +1,110 @@
+import os
+import shutil
+import tempfile
+import ConfigParser
+import epumgmt.api
+
+class TestEpumgmtAPI:
+
+ def setup(self):
+
+ class FakeOpts:
+ name = None
+ self.opts = FakeOpts()
+ self.opts.name = "testname"
+
+ self.epu_home = tempfile.mkdtemp()
+ conf_dir = os.path.join(self.epu_home, "etc/epumgmt")
+ os.makedirs(conf_dir)
+ self.main_conf = os.path.join(conf_dir, "main.conf")
+ self.dirs_conf = os.path.join(conf_dir, "dirs.conf")
+ self.internal_conf = os.path.join(conf_dir, "internal.conf")
+
+ main_conf_str = "[otherconfs]\ndirs: dirs.conf\ninternal: internal.conf"
+
+ self.var_dir = "/var/"
+ dirs_conf_str = "[ecdirs]\nvar: %s" % self.var_dir
+
+ internal_conf_str = "[emimpls]\n"
+ internal_conf_str += "Parameters: epumgmt.defaults.DefaultParameters\n"
+ internal_conf_str += "Common: mocks.common.FakeCommon\n"
+
+ with open(self.main_conf, "w") as main:
+ main.write(main_conf_str)
+
+ with open(self.dirs_conf, "w") as dirs:
+ dirs.write(dirs_conf_str)
+
+ with open(self.internal_conf, "w") as internal:
+ internal.write(internal_conf_str)
+
+ def teardown(self):
+ shutil.rmtree(self.epu_home)
+
+ def test_get_default_config(self):
+ from epumgmt.api import get_default_config
+
+ epumgmt_home = "/path/to/epumgmt"
+ default_conf_rel = "etc/epumgmt/main.conf"
+ default_config = os.path.join(epumgmt_home, default_conf_rel)
+ os.environ["EPUMGMT_HOME"] = epumgmt_home
+
+ assert get_default_config() == default_config
+
+ def test_get_default_ac(self):
+ from epumgmt.api import get_default_ac
+ from epumgmt.api.exceptions import InvalidConfig
+
+ os.environ["EPUMGMT_HOME"] = self.epu_home
+
+ ac = get_default_ac()
+
+ assert ac.has_section("ecdirs")
+ assert ac.has_option("ecdirs", "var")
+ assert ac.get("ecdirs", "var") == self.var_dir
+
+
+ def test_get_parameters(self):
+ from epumgmt.api import get_parameters
+ from epumgmt.defaults.parameters import DefaultParameters
+
+
+ ac = ConfigParser.ConfigParser()
+ ac.add_section("emimpls")
+ ac.set("emimpls", "Parameters", "epumgmt.defaults.parameters.DefaultParameters")
+
+ # Test whether we can pass in an allconfigs object
+ p, ret_ac = get_parameters(self.opts, ac)
+
+ default_params_class = DefaultParameters(None, None).__class__
+ assert p.__class__ == default_params_class
+ assert p.get_arg_or_none("name") == self.opts.name
+
+ # Test whether we can get allconfigs from env
+ os.environ["EPUMGMT_HOME"] = self.epu_home
+ p, ret_ac = get_parameters(self.opts)
+
+ assert ret_ac.has_section("emimpls")
+ assert p.__class__ == default_params_class
+ assert p.get_arg_or_none("name") == self.opts.name
+
+
+ def test_get_common(self):
+ from epumgmt.api import get_common
+ from mocks.common import FakeCommon
+
+ try:
+ get_common()
+ except Exception:
+ no_opts_nor_p_exception = True
+
+ assert no_opts_nor_p_exception
+
+ common_class = FakeCommon().__class__
+
+ os.environ["EPUMGMT_HOME"] = self.epu_home
+ c, p, ac = get_common(opts=self.opts)
+
+ assert c.__class__ == common_class
+ assert p.get_arg_or_none("name") == self.opts.name
+ assert ac.has_section("emimpls")
View
106 src/python/tests/test_epumgmt_default_common.py
@@ -0,0 +1,106 @@
+import os
+import shutil
+import logging
+import tempfile
+import ConfigParser
+
+import epumgmt.defaults.common
+from epumgmt.defaults import DefaultCommon
+from epumgmt.defaults import DefaultParameters
+from epumgmt.api.exceptions import InvalidConfig, UnexpectedError
+
+class TestDefaultCommon:
+
+ def setup(self):
+
+ self.logdir = tempfile.mkdtemp()
+
+ self.ac = ConfigParser.ConfigParser()
+ self.ac.add_section("ecdirs")
+ self.ac.add_section("logging")
+ self.ac.set("logging", "stdoutloglevel", "4")
+ self.ac.add_section("emimpls")
+
+ self.p = DefaultParameters(self.ac, None)
+ self.common = DefaultCommon(self.p)
+
+ def teardown(self):
+ shutil.rmtree(self.logdir)
+
+ def test_init(self):
+
+ try:
+ epumgmt.defaults.common.DefaultCommon(None)
+ except InvalidConfig:
+ raised_invalidconfig = True
+
+ assert raised_invalidconfig
+
+ def test_resolve_var_dir(self):
+
+ var_file = "logs"
+
+ try:
+ self.common.resolve_var_dir(var_file)
+ except InvalidConfig:
+ invalid_config_raised = True
+
+ assert invalid_config_raised
+
+
+ self.ac.set("ecdirs", "var", "var/epumgmt")
+ got_vardir = self.common.resolve_var_dir(var_file)
+
+ vardir = os.path.join(os.path.dirname(__file__), "../../..", "var/epumgmt", var_file)
+
+ assert os.path.samefile(got_vardir, vardir)
+
+
+ def test_get_class_by_keyword(self):
+ from mocks.common import FakeCommon
+
+ test_keyword = "TheClass"
+ test_class = "mocks.common.FakeCommon"
+
+ try:
+ self.common.get_class_by_keyword(test_keyword)
+ except UnexpectedError:
+ raised_unexpected_error = True
+
+ assert raised_unexpected_error
+
+ self.ac.set("emimpls", test_keyword, test_class)
+
+ kls = self.common.get_class_by_keyword(test_keyword)
+
+ assert kls == FakeCommon
+
+def test_close_logfile():
+
+ # This test runs outside the test object to avoid closing
+ # the log file before the other tests can use it
+
+ logdir = tempfile.mkdtemp()
+
+ ac = ConfigParser.ConfigParser()
+ ac.add_section("ecdirs")
+ ac.add_section("logging")
+ ac.set("logging", "stdoutloglevel", "4")
+ ac.add_section("emimpls")
+
+ ac.set("logging", "logfiledir", logdir)
+ ac.set("logging", "fileloglevel", "0")
+
+ params = DefaultParameters(ac, None)
+ logging_common = DefaultCommon(params)
+
+ assert os.path.isfile(logging_common.logfilepath)
+ assert logging_common.logfilehandler
+
+ logging_common.close_logfile()
+ assert not logging_common.logfilehandler
+
+ #logging_common.reopen_logfile()
+ #assert logging_common.logfilehandler
+
+ shutil.rmtree(logdir)
View
84 src/python/tests/test_epumgmt_defaults_cloudinitd_load.py
@@ -0,0 +1,84 @@
+import os
+import types
+import tempfile
+import ConfigParser
+
+from cloudinitd.user_api import CloudInitD
+
+import epumgmt.defaults.cloudinitd_load
+import epumgmt.main.em_args as em_args
+from epumgmt.api.exceptions import ProgrammingError, IncompatibleEnvironment
+from epumgmt.defaults.runlogs import DefaultRunlogs
+from epumgmt.defaults.parameters import DefaultParameters
+
+from mocks.common import FakeCommon
+from mocks.modules import FakeModules
+from mocks.modules import build_fake_scp_command_str
+from mocks.remote_svc_adapter import FakeRemoteSvcAdapter
+
+class TestCloudinitdLoad:
+ def setup(self):
+ """
+ Build a fake test environment, with the sleepers cloudinit.d plan.
+
+ We can grab all logged messages from c.log.transcript.
+ """
+
+ self.test_run_name = "TESTRUN"
+
+ self.config = ConfigParser.RawConfigParser()
+ self.config.add_section("events")
+ self.runlogdir = tempfile.mkdtemp()
+ self.config.set("events", "runlogdir", self.runlogdir)
+ self.vmlogdir = tempfile.mkdtemp()
+ self.config.set("events", "vmlogdir", self.vmlogdir)
+ self.optdict = {}
+ self.optdict[em_args.NAME.name] = self.test_run_name
+
+ self.params = DefaultParameters(self.config, None)
+ self.params.optdict = self.optdict
+ remote_svc_adapter = FakeRemoteSvcAdapter()
+ self.common = FakeCommon()
+ self.modules = FakeModules(remote_svc_adapter=remote_svc_adapter)
+
+ # Note that we monkey-patch the get_scp_command_str function
+ # to prepend "echo" to it. That way we can still allow the
+ # command to be run, but we can still see how it actually gets
+ # constructed
+ runlogs = DefaultRunlogs(self.params, self.common)
+ runlogs.validate()
+ self.modules.runlogs = runlogs
+ new_get_scp = build_fake_scp_command_str(runlogs, runlogs.get_scp_command_str)
+ self.modules.runlogs.get_scp_command_str = types.MethodType(new_get_scp, self.modules.runlogs)
+
+ self.test_dir = os.path.dirname(__file__)
+ self.test_db_dir = tempfile.mkdtemp()
+ self.test_cd_config = os.path.join(self.test_dir, "configs/main.conf")
+ self.cloudinitd = CloudInitD(self.test_db_dir, self.test_cd_config, self.test_run_name)
+
+ def test_get_cloudinitd_service(self):
+ from epumgmt.defaults.cloudinitd_load import get_cloudinitd_service
+
+ try:
+ get_cloudinitd_service(None, None)
+ except ProgrammingError:
+ no_cloudinitd_programming_error = True
+
+ assert no_cloudinitd_programming_error
+
+ try:
+ get_cloudinitd_service(self.cloudinitd, None)
+ except ProgrammingError:
+ no_service_name_programming_error = True
+
+ assert no_service_name_programming_error
+
+ nonexistant_svc = "notreal"
+
+ try:
+ service = get_cloudinitd_service(self.cloudinitd, nonexistant_svc)
+ except IncompatibleEnvironment:
+ no_service_incompatible_env = True
+
+ assert no_service_incompatible_env
+
Please sign in to comment.
Something went wrong with that request. Please try again.