From 0e46c14907f1c63c9bd7206ed3a7dd59e59e0800 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Mon, 13 Sep 2021 14:59:47 +0100 Subject: [PATCH] Fix and migrate ``tests/unit/modules/test_ipset.py`` to PyTest --- salt/modules/ipset.py | 47 ++--- tests/pytests/unit/modules/test_ipset.py | 242 ++++++++++++++++++++++ tests/unit/modules/test_ipset.py | 253 ----------------------- 3 files changed, 264 insertions(+), 278 deletions(-) create mode 100644 tests/pytests/unit/modules/test_ipset.py delete mode 100644 tests/unit/modules/test_ipset.py diff --git a/salt/modules/ipset.py b/salt/modules/ipset.py index 4191932aa1f9..c93c28f4875c 100644 --- a/salt/modules/ipset.py +++ b/salt/modules/ipset.py @@ -280,8 +280,8 @@ def version(): salt '*' ipset.version """ - cmd = "{} --version".format(_ipset_cmd()) - out = __salt__["cmd.run"](cmd).split() + cmd = [_ipset_cmd(), "--version"] + out = __salt__["cmd.run"](cmd, python_shell=False).split() return out[1] @@ -318,21 +318,21 @@ def new_set(name=None, set_type=None, family="ipv4", comment=False, **kwargs): if item not in kwargs: return "Error: {} is a required argument".format(item) - cmd = "{} create {} {}".format(_ipset_cmd(), name, set_type) + cmd = [_ipset_cmd(), "create", name, set_type] for item in _CREATE_OPTIONS[set_type]: if item in kwargs: if item in _CREATE_OPTIONS_WITHOUT_VALUE: - cmd = "{} {} ".format(cmd, item) + cmd.append(item) else: - cmd = "{} {} {} ".format(cmd, item, kwargs[item]) + cmd.extend([item, kwargs[item]]) # Family only valid for certain set types if "family" in _CREATE_OPTIONS[set_type]: - cmd = "{} family {}".format(cmd, ipset_family) + cmd.extend(["family", cmd, ipset_family]) if comment: - cmd = "{} comment".format(cmd) + cmd.append("comment") out = __salt__["cmd.run"](cmd, python_shell=False) @@ -360,7 +360,7 @@ def delete_set(name=None, family="ipv4"): if not name: return "Error: Set needs to be specified" - cmd = "{} destroy {}".format(_ipset_cmd(), name) + cmd = [_ipset_cmd(), "destroy", name] out = __salt__["cmd.run"](cmd, python_shell=False) if not out: @@ -398,7 +398,7 @@ def rename_set(name=None, new_set=None, family="ipv4"): if settype: return "Error: New Set already exists" - cmd = "{} rename {} {}".format(_ipset_cmd(), name, new_set) + cmd = [_ipset_cmd(), "rename", name, new_set] out = __salt__["cmd.run"](cmd, python_shell=False) if not out: @@ -419,7 +419,7 @@ def list_sets(family="ipv4"): salt '*' ipset.list_sets """ - cmd = "{} list -t".format(_ipset_cmd()) + cmd = [_ipset_cmd(), "list", "-t"] out = __salt__["cmd.run"](cmd, python_shell=False) _tmp = out.split("\n") @@ -483,7 +483,7 @@ def add(name=None, entry=None, family="ipv4", **kwargs): settype = setinfo["Type"] - cmd = "{}".format(entry) + cmd = [_ipset_cmd(), "add", "-exist", name, entry] if "timeout" in kwargs: if "timeout" not in setinfo["Header"]: @@ -505,14 +505,13 @@ def add(name=None, entry=None, family="ipv4", **kwargs): for item in _ADD_OPTIONS[settype]: if item in kwargs: - cmd = "{} {} {}".format(cmd, item, kwargs[item]) + cmd.extend([item, kwargs[item]]) current_members = _find_set_members(name) - if cmd in current_members: - return "Warn: Entry {} already exists in set {}".format(cmd, name) + if entry in current_members: + return "Warn: Entry {} already exists in set {}".format(entry, name) # Using -exist to ensure entries are updated if the comment changes - cmd = "{} add -exist {} {}".format(_ipset_cmd(), name, cmd) out = __salt__["cmd.run"](cmd, python_shell=False) if not out: @@ -541,7 +540,7 @@ def delete(name=None, entry=None, family="ipv4", **kwargs): if not settype: return "Error: Set {} does not exist".format(name) - cmd = "{} del {} {}".format(_ipset_cmd(), name, entry) + cmd = [_ipset_cmd(), "del", name, entry] out = __salt__["cmd.run"](cmd, python_shell=False) if not out: @@ -625,7 +624,7 @@ def test(name=None, entry=None, family="ipv4", **kwargs): if not settype: return "Error: Set {} does not exist".format(name) - cmd = "{} test {} {}".format(_ipset_cmd(), name, entry) + cmd = [_ipset_cmd(), "test", name, entry] out = __salt__["cmd.run_all"](cmd, python_shell=False) if out["retcode"] > 0: @@ -654,11 +653,9 @@ def flush(name=None, family="ipv4"): salt '*' ipset.flush set """ - ipset_family = _IPSET_FAMILIES[family] + cmd = [_ipset_cmd(), "flush"] if name: - cmd = "{} flush {}".format(_ipset_cmd(), name) - else: - cmd = "{} flush".format(_ipset_cmd()) + cmd.append(name) out = __salt__["cmd.run"](cmd, python_shell=False) return not out @@ -669,7 +666,7 @@ def _find_set_members(name): Return list of members for a set """ - cmd = "{} list {}".format(_ipset_cmd(), name) + cmd = [_ipset_cmd(), "list", name] out = __salt__["cmd.run_all"](cmd, python_shell=False) if out["retcode"] > 0: @@ -692,7 +689,7 @@ def _find_set_info(name): Return information about the set """ - cmd = "{} list -t {}".format(_ipset_cmd(), name) + cmd = [_ipset_cmd(), "list", "-t", name] out = __salt__["cmd.run_all"](cmd, python_shell=False) if out["retcode"] > 0: @@ -813,8 +810,8 @@ def _compare_member_parts(member_part, entry_part): def _is_network(o): - return isinstance(o, ipaddress.IPv4Network) or isinstance(o, ipaddress.IPv6Network) + return isinstance(o, (ipaddress.IPv4Network, ipaddress.IPv6Network)) def _is_address(o): - return isinstance(o, ipaddress.IPv4Address) or isinstance(o, ipaddress.IPv6Address) + return isinstance(o, (ipaddress.IPv4Address, ipaddress.IPv6Address)) diff --git a/tests/pytests/unit/modules/test_ipset.py b/tests/pytests/unit/modules/test_ipset.py new file mode 100644 index 000000000000..7f6a624d2f82 --- /dev/null +++ b/tests/pytests/unit/modules/test_ipset.py @@ -0,0 +1,242 @@ +""" + :codeauthor: Rupesh Tare +""" +import pytest +import salt.modules.ipset as ipset +from tests.support.mock import MagicMock, patch + + +@pytest.fixture +def configure_loader_modules(): + with patch.object(ipset, "_ipset_cmd", return_value="ipset"): + yield {ipset: {}} + + +def test_version(): + """ + Test for Return version from ipset --version + """ + with patch.object(ipset, "_ipset_cmd", return_value="A"): + mock = MagicMock(return_value="A\nB\nC") + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.version() == "B" + + +def test_new_set(): + """ + Test for Create new custom set + """ + assert ipset.new_set() == "Error: Set Name needs to be specified" + + assert ipset.new_set("s") == "Error: Set Type needs to be specified" + + assert ipset.new_set("s", "d") == "Error: Set Type is invalid" + + assert ipset.new_set("s", "bitmap:ip") == "Error: range is a required argument" + + mock = MagicMock(return_value=False) + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.new_set("s", "bitmap:ip", range="range") + + +def test_delete_set(): + """ + Test for Delete ipset set. + """ + assert ipset.delete_set() == "Error: Set needs to be specified" + + with patch.object(ipset, "_ipset_cmd", return_value="A"): + mock = MagicMock(return_value=True) + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.delete_set("set", "family") + + +def test_rename_set(): + """ + Test for Delete ipset set. + """ + assert ipset.rename_set() == "Error: Set needs to be specified" + + assert ipset.rename_set("s") == "Error: New name for set needs to be specified" + + with patch.object(ipset, "_find_set_type", return_value=False): + assert ipset.rename_set("s", "d") == "Error: Set does not exist" + + with patch.object(ipset, "_find_set_type", return_value=True): + assert ipset.rename_set("s", "d") == "Error: New Set already exists" + + with patch.object(ipset, "_find_set_type", side_effect=[True, False]): + with patch.object(ipset, "_ipset_cmd", return_value="A"): + mock = MagicMock(return_value=True) + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.rename_set("set", "new_set") + + +def test_list_sets(): + """ + Test for List all ipset sets. + """ + with patch.object(ipset, "_ipset_cmd", return_value="A"): + mock = MagicMock(return_value="A:a") + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.list_sets() == [{"A": ""}] + + +def test_check_set(): + """ + Test for Check that given ipset set exists. + """ + assert ipset.check_set() == "Error: Set needs to be specified" + + with patch.object(ipset, "_find_set_info", side_effect=[False, True]): + assert not ipset.check_set("set") + assert ipset.check_set("set") + + +def test_add(): + """ + Test for Append an entry to the specified set. + """ + assert ipset.add() == "Error: Set needs to be specified" + + assert ipset.add("set") == "Error: Entry needs to be specified" + + with patch.object(ipset, "_find_set_info", return_value=None): + assert ipset.add("set", "entry") == "Error: Set set does not exist" + + mock = MagicMock(return_value={"Type": "type", "Header": "Header"}) + with patch.object(ipset, "_find_set_info", mock): + assert ( + ipset.add("set", "entry", timeout=0) + == "Error: Set set not created with timeout support" + ) + + assert ( + ipset.add("set", "entry", packets=0) + == "Error: Set set not created with counters support" + ) + + assert ( + ipset.add("set", "entry", comment=0) + == "Error: Set set not created with comment support" + ) + + mock = MagicMock(return_value={"Type": "bitmap:ip", "Header": "Header"}) + with patch.object(ipset, "_find_set_info", mock): + with patch.object(ipset, "_find_set_members", return_value="entry"): + assert ( + ipset.add("set", "entry") + == "Warn: Entry entry already exists in set set" + ) + + with patch.object(ipset, "_find_set_members", return_value="A"): + mock = MagicMock(return_value="") + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.add("set", "entry") == "Success" + + mock = MagicMock(return_value="out") + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.add("set", "entry") == "Error: out" + + +def test_delete(): + """ + Test for Delete an entry from the specified set. + """ + assert ipset.delete() == "Error: Set needs to be specified" + + assert ipset.delete("s") == "Error: Entry needs to be specified" + + with patch.object(ipset, "_find_set_type", return_value=None): + assert ipset.delete("set", "entry") == "Error: Set set does not exist" + + with patch.object(ipset, "_find_set_type", return_value=True): + with patch.object(ipset, "_ipset_cmd", return_value="A"): + mock = MagicMock(side_effect=["", "A"]) + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.delete("set", "entry") == "Success" + assert ipset.delete("set", "entry") == "Error: A" + + +def test_check(): + """ + Test for Check that an entry exists in the specified set. + """ + assert ipset.check() == "Error: Set needs to be specified" + + assert ipset.check("s") == "Error: Entry needs to be specified" + + with patch.object(ipset, "_find_set_type", return_value=None): + assert ipset.check("set", "entry") == "Error: Set set does not exist" + + with patch.object(ipset, "_find_set_type", return_value="hash:ip"): + with patch.object( + ipset, + "_find_set_members", + side_effect=[ + "entry", + "", + ["192.168.0.4", "192.168.0.5"], + ["192.168.0.3"], + ["192.168.0.6"], + ["192.168.0.4", "192.168.0.5"], + ["192.168.0.3"], + ["192.168.0.6"], + ], + ): + assert ipset.check("set", "entry") + assert not ipset.check("set", "entry") + assert ipset.check("set", "192.168.0.4/31") + assert not ipset.check("set", "192.168.0.4/31") + assert not ipset.check("set", "192.168.0.4/31") + assert ipset.check("set", "192.168.0.4-192.168.0.5") + assert not ipset.check("set", "192.168.0.4-192.168.0.5") + assert not ipset.check("set", "192.168.0.4-192.168.0.5") + + with patch.object(ipset, "_find_set_type", return_value="hash:net"): + with patch.object( + ipset, + "_find_set_members", + side_effect=[ + "entry", + "", + "192.168.0.4/31", + "192.168.0.4/30", + "192.168.0.4/31", + "192.168.0.4/30", + ], + ): + assert ipset.check("set", "entry") + assert not ipset.check("set", "entry") + assert ipset.check("set", "192.168.0.4/31") + assert not ipset.check("set", "192.168.0.4/31") + assert ipset.check("set", "192.168.0.4-192.168.0.5") + assert not ipset.check("set", "192.168.0.4-192.168.0.5") + + +def test_test(): + """ + Test for Test if an entry is in the specified set. + """ + assert ipset.test() == "Error: Set needs to be specified" + + assert ipset.test("s") == "Error: Entry needs to be specified" + + with patch.object(ipset, "_find_set_type", return_value=None): + assert ipset.test("set", "entry") == "Error: Set set does not exist" + + with patch.object(ipset, "_find_set_type", return_value=True): + mock = MagicMock(side_effect=[{"retcode": 1}, {"retcode": -1}]) + with patch.dict(ipset.__salt__, {"cmd.run_all": mock}): + assert not ipset.test("set", "entry") + assert ipset.test("set", "entry") + + +def test_flush(): + """ + Test for Flush entries in the specified set + """ + mock = MagicMock(side_effect=["", "A"]) + with patch.dict(ipset.__salt__, {"cmd.run": mock}): + assert ipset.flush("set") + assert not ipset.flush("set") diff --git a/tests/unit/modules/test_ipset.py b/tests/unit/modules/test_ipset.py deleted file mode 100644 index 69b7530523b2..000000000000 --- a/tests/unit/modules/test_ipset.py +++ /dev/null @@ -1,253 +0,0 @@ -""" - :codeauthor: Rupesh Tare -""" - - -import salt.modules.ipset as ipset -from tests.support.mixins import LoaderModuleMockMixin -from tests.support.mock import MagicMock, patch -from tests.support.unit import TestCase - - -class IpsetTestCase(TestCase, LoaderModuleMockMixin): - """ - Test cases for salt.modules.aptpkg - """ - - def setup_loader_modules(self): - return {ipset: {}} - - def test_version(self): - """ - Test for Return version from ipset --version - """ - with patch.object(ipset, "_ipset_cmd", return_value="A"): - mock = MagicMock(return_value="A\nB\nC") - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertEqual(ipset.version(), "B") - - def test_new_set(self): - """ - Test for Create new custom set - """ - self.assertEqual(ipset.new_set(), "Error: Set needs to be specified") - - self.assertEqual(ipset.new_set("s"), "Error: Set Type needs to be specified") - - self.assertEqual(ipset.new_set("s", "d"), "Error: Set Type is invalid") - - self.assertEqual( - ipset.new_set("s", "bitmap:ip"), "Error: range is a required argument" - ) - - mock = MagicMock(return_value=False) - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertTrue(ipset.new_set("s", "bitmap:ip", range="range")) - - def test_delete_set(self): - """ - Test for Delete ipset set. - """ - self.assertEqual(ipset.delete_set(), "Error: Set needs to be specified") - - with patch.object(ipset, "_ipset_cmd", return_value="A"): - mock = MagicMock(return_value=True) - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertTrue(ipset.delete_set("set", "family")) - - def test_rename_set(self): - """ - Test for Delete ipset set. - """ - self.assertEqual(ipset.rename_set(), "Error: Set needs to be specified") - - self.assertEqual( - ipset.rename_set("s"), "Error: New name for set needs to be specified" - ) - - with patch.object(ipset, "_find_set_type", return_value=False): - self.assertEqual(ipset.rename_set("s", "d"), "Error: Set does not exist") - - with patch.object(ipset, "_find_set_type", return_value=True): - self.assertEqual( - ipset.rename_set("s", "d"), "Error: New Set already exists" - ) - - with patch.object(ipset, "_find_set_type", side_effect=[True, False]): - with patch.object(ipset, "_ipset_cmd", return_value="A"): - mock = MagicMock(return_value=True) - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertTrue(ipset.rename_set("set", "new_set")) - - def test_list_sets(self): - """ - Test for List all ipset sets. - """ - with patch.object(ipset, "_ipset_cmd", return_value="A"): - mock = MagicMock(return_value="A:a") - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertEqual(ipset.list_sets(), [{"A": ""}]) - - def test_check_set(self): - """ - Test for Check that given ipset set exists. - """ - self.assertEqual(ipset.check_set(), "Error: Set needs to be specified") - - with patch.object(ipset, "_find_set_info", side_effect=[False, True]): - self.assertFalse(ipset.check_set("set")) - self.assertTrue(ipset.check_set("set")) - - def test_add(self): - """ - Test for Append an entry to the specified set. - """ - self.assertEqual(ipset.add(), "Error: Set needs to be specified") - - self.assertEqual(ipset.add("set"), "Error: Entry needs to be specified") - - with patch.object(ipset, "_find_set_info", return_value=None): - self.assertEqual(ipset.add("set", "entry"), "Error: Set set does not exist") - - mock = MagicMock(return_value={"Type": "type", "Header": "Header"}) - with patch.object(ipset, "_find_set_info", mock): - self.assertEqual( - ipset.add("set", "entry", timeout=0), - "Error: Set set not created with timeout support", - ) - - self.assertEqual( - ipset.add("set", "entry", packets=0), - "Error: Set set not created with counters support", - ) - - self.assertEqual( - ipset.add("set", "entry", comment=0), - "Error: Set set not created with comment support", - ) - - mock = MagicMock(return_value={"Type": "bitmap:ip", "Header": "Header"}) - with patch.object(ipset, "_find_set_info", mock): - with patch.object(ipset, "_find_set_members", return_value="entry"): - self.assertEqual( - ipset.add("set", "entry"), - "Warn: Entry entry already exists in set set", - ) - - with patch.object(ipset, "_find_set_members", return_value="A"): - mock = MagicMock(return_value="") - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertEqual(ipset.add("set", "entry"), "Success") - - mock = MagicMock(return_value="out") - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertEqual(ipset.add("set", "entry"), "Error: out") - - def test_delete(self): - """ - Test for Delete an entry from the specified set. - """ - self.assertEqual(ipset.delete(), "Error: Set needs to be specified") - - self.assertEqual(ipset.delete("s"), "Error: Entry needs to be specified") - - with patch.object(ipset, "_find_set_type", return_value=None): - self.assertEqual( - ipset.delete("set", "entry"), "Error: Set set does not exist" - ) - - with patch.object(ipset, "_find_set_type", return_value=True): - with patch.object(ipset, "_ipset_cmd", return_value="A"): - mock = MagicMock(side_effect=["", "A"]) - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertEqual(ipset.delete("set", "entry"), "Success") - self.assertEqual(ipset.delete("set", "entry"), "Error: A") - - def test_check(self): - """ - Test for Check that an entry exists in the specified set. - """ - self.assertEqual(ipset.check(), "Error: Set needs to be specified") - - self.assertEqual(ipset.check("s"), "Error: Entry needs to be specified") - - with patch.object(ipset, "_find_set_type", return_value=None): - self.assertEqual( - ipset.check("set", "entry"), "Error: Set set does not exist" - ) - - with patch.object(ipset, "_find_set_type", return_value="hash:ip"): - with patch.object( - ipset, - "_find_set_members", - side_effect=[ - "entry", - "", - ["192.168.0.4", "192.168.0.5"], - ["192.168.0.3"], - ["192.168.0.6"], - ["192.168.0.4", "192.168.0.5"], - ["192.168.0.3"], - ["192.168.0.6"], - ], - ): - self.assertTrue(ipset.check("set", "entry")) - self.assertFalse(ipset.check("set", "entry")) - self.assertTrue(ipset.check("set", "192.168.0.4/31")) - self.assertFalse(ipset.check("set", "192.168.0.4/31")) - self.assertFalse(ipset.check("set", "192.168.0.4/31")) - self.assertTrue(ipset.check("set", "192.168.0.4-192.168.0.5")) - self.assertFalse(ipset.check("set", "192.168.0.4-192.168.0.5")) - self.assertFalse(ipset.check("set", "192.168.0.4-192.168.0.5")) - - with patch.object(ipset, "_find_set_type", return_value="hash:net"): - with patch.object( - ipset, - "_find_set_members", - side_effect=[ - "entry", - "", - "192.168.0.4/31", - "192.168.0.4/30", - "192.168.0.4/31", - "192.168.0.4/30", - ], - ): - self.assertTrue(ipset.check("set", "entry")) - self.assertFalse(ipset.check("set", "entry")) - self.assertTrue(ipset.check("set", "192.168.0.4/31")) - self.assertFalse(ipset.check("set", "192.168.0.4/31")) - self.assertTrue(ipset.check("set", "192.168.0.4-192.168.0.5")) - self.assertFalse(ipset.check("set", "192.168.0.4-192.168.0.5")) - - def test_test(self): - """ - Test for Test if an entry is in the specified set. - """ - self.assertEqual(ipset.test(), "Error: Set needs to be specified") - - self.assertEqual(ipset.test("s"), "Error: Entry needs to be specified") - - with patch.object(ipset, "_find_set_type", return_value=None): - self.assertEqual( - ipset.test("set", "entry"), "Error: Set set does not exist" - ) - - with patch.object(ipset, "_find_set_type", return_value=True): - mock = MagicMock(side_effect=[{"retcode": 1}, {"retcode": -1}]) - with patch.dict(ipset.__salt__, {"cmd.run_all": mock}): - self.assertFalse(ipset.test("set", "entry")) - self.assertTrue(ipset.test("set", "entry")) - - def test_flush(self): - """ - Test for Flush entries in the specified set - """ - with patch.object(ipset, "_find_set_type", return_value=None): - self.assertEqual(ipset.flush("set"), "Error: Set set does not exist") - - with patch.object(ipset, "_find_set_type", return_value=True): - mock = MagicMock(side_effect=["", "A"]) - with patch.dict(ipset.__salt__, {"cmd.run": mock}): - self.assertTrue(ipset.flush("set")) - self.assertFalse(ipset.flush("set"))