Skip to content

Commit

Permalink
Fix and migrate tests/unit/modules/test_ipset.py to PyTest
Browse files Browse the repository at this point in the history
  • Loading branch information
s0undt3ch authored and Megan Wilhite committed Sep 21, 2021
1 parent e1c02ac commit 0e46c14
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 278 deletions.
47 changes: 22 additions & 25 deletions salt/modules/ipset.py
Expand Up @@ -280,8 +280,8 @@ def version():
salt '*' ipset.version
"""
cmd = "{} --version".format(_ipset_cmd())
out = __salt__["cmd.run"](cmd).split()
cmd = [_ipset_cmd(), "--version"]
out = __salt__["cmd.run"](cmd, python_shell=False).split()
return out[1]


Expand Down Expand Up @@ -318,21 +318,21 @@ def new_set(name=None, set_type=None, family="ipv4", comment=False, **kwargs):
if item not in kwargs:
return "Error: {} is a required argument".format(item)

cmd = "{} create {} {}".format(_ipset_cmd(), name, set_type)
cmd = [_ipset_cmd(), "create", name, set_type]

for item in _CREATE_OPTIONS[set_type]:
if item in kwargs:
if item in _CREATE_OPTIONS_WITHOUT_VALUE:
cmd = "{} {} ".format(cmd, item)
cmd.append(item)
else:
cmd = "{} {} {} ".format(cmd, item, kwargs[item])
cmd.extend([item, kwargs[item]])

# Family only valid for certain set types
if "family" in _CREATE_OPTIONS[set_type]:
cmd = "{} family {}".format(cmd, ipset_family)
cmd.extend(["family", cmd, ipset_family])

This comment has been minimized.

Copy link
@ixs

ixs Dec 6, 2022

Contributor

This change is broken.

#63224 has the details.


if comment:
cmd = "{} comment".format(cmd)
cmd.append("comment")

out = __salt__["cmd.run"](cmd, python_shell=False)

Expand Down Expand Up @@ -360,7 +360,7 @@ def delete_set(name=None, family="ipv4"):
if not name:
return "Error: Set needs to be specified"

cmd = "{} destroy {}".format(_ipset_cmd(), name)
cmd = [_ipset_cmd(), "destroy", name]
out = __salt__["cmd.run"](cmd, python_shell=False)

if not out:
Expand Down Expand Up @@ -398,7 +398,7 @@ def rename_set(name=None, new_set=None, family="ipv4"):
if settype:
return "Error: New Set already exists"

cmd = "{} rename {} {}".format(_ipset_cmd(), name, new_set)
cmd = [_ipset_cmd(), "rename", name, new_set]
out = __salt__["cmd.run"](cmd, python_shell=False)

if not out:
Expand All @@ -419,7 +419,7 @@ def list_sets(family="ipv4"):
salt '*' ipset.list_sets
"""
cmd = "{} list -t".format(_ipset_cmd())
cmd = [_ipset_cmd(), "list", "-t"]
out = __salt__["cmd.run"](cmd, python_shell=False)

_tmp = out.split("\n")
Expand Down Expand Up @@ -483,7 +483,7 @@ def add(name=None, entry=None, family="ipv4", **kwargs):

settype = setinfo["Type"]

cmd = "{}".format(entry)
cmd = [_ipset_cmd(), "add", "-exist", name, entry]

if "timeout" in kwargs:
if "timeout" not in setinfo["Header"]:
Expand All @@ -505,14 +505,13 @@ def add(name=None, entry=None, family="ipv4", **kwargs):

for item in _ADD_OPTIONS[settype]:
if item in kwargs:
cmd = "{} {} {}".format(cmd, item, kwargs[item])
cmd.extend([item, kwargs[item]])

current_members = _find_set_members(name)
if cmd in current_members:
return "Warn: Entry {} already exists in set {}".format(cmd, name)
if entry in current_members:
return "Warn: Entry {} already exists in set {}".format(entry, name)

# Using -exist to ensure entries are updated if the comment changes
cmd = "{} add -exist {} {}".format(_ipset_cmd(), name, cmd)
out = __salt__["cmd.run"](cmd, python_shell=False)

if not out:
Expand Down Expand Up @@ -541,7 +540,7 @@ def delete(name=None, entry=None, family="ipv4", **kwargs):
if not settype:
return "Error: Set {} does not exist".format(name)

cmd = "{} del {} {}".format(_ipset_cmd(), name, entry)
cmd = [_ipset_cmd(), "del", name, entry]
out = __salt__["cmd.run"](cmd, python_shell=False)

if not out:
Expand Down Expand Up @@ -625,7 +624,7 @@ def test(name=None, entry=None, family="ipv4", **kwargs):
if not settype:
return "Error: Set {} does not exist".format(name)

cmd = "{} test {} {}".format(_ipset_cmd(), name, entry)
cmd = [_ipset_cmd(), "test", name, entry]
out = __salt__["cmd.run_all"](cmd, python_shell=False)

if out["retcode"] > 0:
Expand Down Expand Up @@ -654,11 +653,9 @@ def flush(name=None, family="ipv4"):
salt '*' ipset.flush set
"""

ipset_family = _IPSET_FAMILIES[family]
cmd = [_ipset_cmd(), "flush"]
if name:
cmd = "{} flush {}".format(_ipset_cmd(), name)
else:
cmd = "{} flush".format(_ipset_cmd())
cmd.append(name)
out = __salt__["cmd.run"](cmd, python_shell=False)

return not out
Expand All @@ -669,7 +666,7 @@ def _find_set_members(name):
Return list of members for a set
"""

cmd = "{} list {}".format(_ipset_cmd(), name)
cmd = [_ipset_cmd(), "list", name]
out = __salt__["cmd.run_all"](cmd, python_shell=False)

if out["retcode"] > 0:
Expand All @@ -692,7 +689,7 @@ def _find_set_info(name):
Return information about the set
"""

cmd = "{} list -t {}".format(_ipset_cmd(), name)
cmd = [_ipset_cmd(), "list", "-t", name]
out = __salt__["cmd.run_all"](cmd, python_shell=False)

if out["retcode"] > 0:
Expand Down Expand Up @@ -813,8 +810,8 @@ def _compare_member_parts(member_part, entry_part):


def _is_network(o):
return isinstance(o, ipaddress.IPv4Network) or isinstance(o, ipaddress.IPv6Network)
return isinstance(o, (ipaddress.IPv4Network, ipaddress.IPv6Network))


def _is_address(o):
return isinstance(o, ipaddress.IPv4Address) or isinstance(o, ipaddress.IPv6Address)
return isinstance(o, (ipaddress.IPv4Address, ipaddress.IPv6Address))
242 changes: 242 additions & 0 deletions tests/pytests/unit/modules/test_ipset.py
@@ -0,0 +1,242 @@
"""
:codeauthor: Rupesh Tare <rupesht@saltstack.com>
"""
import pytest
import salt.modules.ipset as ipset
from tests.support.mock import MagicMock, patch


@pytest.fixture
def configure_loader_modules():
with patch.object(ipset, "_ipset_cmd", return_value="ipset"):
yield {ipset: {}}


def test_version():
"""
Test for Return version from ipset --version
"""
with patch.object(ipset, "_ipset_cmd", return_value="A"):
mock = MagicMock(return_value="A\nB\nC")
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.version() == "B"


def test_new_set():
"""
Test for Create new custom set
"""
assert ipset.new_set() == "Error: Set Name needs to be specified"

assert ipset.new_set("s") == "Error: Set Type needs to be specified"

assert ipset.new_set("s", "d") == "Error: Set Type is invalid"

assert ipset.new_set("s", "bitmap:ip") == "Error: range is a required argument"

mock = MagicMock(return_value=False)
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.new_set("s", "bitmap:ip", range="range")


def test_delete_set():
"""
Test for Delete ipset set.
"""
assert ipset.delete_set() == "Error: Set needs to be specified"

with patch.object(ipset, "_ipset_cmd", return_value="A"):
mock = MagicMock(return_value=True)
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.delete_set("set", "family")


def test_rename_set():
"""
Test for Delete ipset set.
"""
assert ipset.rename_set() == "Error: Set needs to be specified"

assert ipset.rename_set("s") == "Error: New name for set needs to be specified"

with patch.object(ipset, "_find_set_type", return_value=False):
assert ipset.rename_set("s", "d") == "Error: Set does not exist"

with patch.object(ipset, "_find_set_type", return_value=True):
assert ipset.rename_set("s", "d") == "Error: New Set already exists"

with patch.object(ipset, "_find_set_type", side_effect=[True, False]):
with patch.object(ipset, "_ipset_cmd", return_value="A"):
mock = MagicMock(return_value=True)
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.rename_set("set", "new_set")


def test_list_sets():
"""
Test for List all ipset sets.
"""
with patch.object(ipset, "_ipset_cmd", return_value="A"):
mock = MagicMock(return_value="A:a")
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.list_sets() == [{"A": ""}]


def test_check_set():
"""
Test for Check that given ipset set exists.
"""
assert ipset.check_set() == "Error: Set needs to be specified"

with patch.object(ipset, "_find_set_info", side_effect=[False, True]):
assert not ipset.check_set("set")
assert ipset.check_set("set")


def test_add():
"""
Test for Append an entry to the specified set.
"""
assert ipset.add() == "Error: Set needs to be specified"

assert ipset.add("set") == "Error: Entry needs to be specified"

with patch.object(ipset, "_find_set_info", return_value=None):
assert ipset.add("set", "entry") == "Error: Set set does not exist"

mock = MagicMock(return_value={"Type": "type", "Header": "Header"})
with patch.object(ipset, "_find_set_info", mock):
assert (
ipset.add("set", "entry", timeout=0)
== "Error: Set set not created with timeout support"
)

assert (
ipset.add("set", "entry", packets=0)
== "Error: Set set not created with counters support"
)

assert (
ipset.add("set", "entry", comment=0)
== "Error: Set set not created with comment support"
)

mock = MagicMock(return_value={"Type": "bitmap:ip", "Header": "Header"})
with patch.object(ipset, "_find_set_info", mock):
with patch.object(ipset, "_find_set_members", return_value="entry"):
assert (
ipset.add("set", "entry")
== "Warn: Entry entry already exists in set set"
)

with patch.object(ipset, "_find_set_members", return_value="A"):
mock = MagicMock(return_value="")
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.add("set", "entry") == "Success"

mock = MagicMock(return_value="out")
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.add("set", "entry") == "Error: out"


def test_delete():
"""
Test for Delete an entry from the specified set.
"""
assert ipset.delete() == "Error: Set needs to be specified"

assert ipset.delete("s") == "Error: Entry needs to be specified"

with patch.object(ipset, "_find_set_type", return_value=None):
assert ipset.delete("set", "entry") == "Error: Set set does not exist"

with patch.object(ipset, "_find_set_type", return_value=True):
with patch.object(ipset, "_ipset_cmd", return_value="A"):
mock = MagicMock(side_effect=["", "A"])
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.delete("set", "entry") == "Success"
assert ipset.delete("set", "entry") == "Error: A"


def test_check():
"""
Test for Check that an entry exists in the specified set.
"""
assert ipset.check() == "Error: Set needs to be specified"

assert ipset.check("s") == "Error: Entry needs to be specified"

with patch.object(ipset, "_find_set_type", return_value=None):
assert ipset.check("set", "entry") == "Error: Set set does not exist"

with patch.object(ipset, "_find_set_type", return_value="hash:ip"):
with patch.object(
ipset,
"_find_set_members",
side_effect=[
"entry",
"",
["192.168.0.4", "192.168.0.5"],
["192.168.0.3"],
["192.168.0.6"],
["192.168.0.4", "192.168.0.5"],
["192.168.0.3"],
["192.168.0.6"],
],
):
assert ipset.check("set", "entry")
assert not ipset.check("set", "entry")
assert ipset.check("set", "192.168.0.4/31")
assert not ipset.check("set", "192.168.0.4/31")
assert not ipset.check("set", "192.168.0.4/31")
assert ipset.check("set", "192.168.0.4-192.168.0.5")
assert not ipset.check("set", "192.168.0.4-192.168.0.5")
assert not ipset.check("set", "192.168.0.4-192.168.0.5")

with patch.object(ipset, "_find_set_type", return_value="hash:net"):
with patch.object(
ipset,
"_find_set_members",
side_effect=[
"entry",
"",
"192.168.0.4/31",
"192.168.0.4/30",
"192.168.0.4/31",
"192.168.0.4/30",
],
):
assert ipset.check("set", "entry")
assert not ipset.check("set", "entry")
assert ipset.check("set", "192.168.0.4/31")
assert not ipset.check("set", "192.168.0.4/31")
assert ipset.check("set", "192.168.0.4-192.168.0.5")
assert not ipset.check("set", "192.168.0.4-192.168.0.5")


def test_test():
"""
Test for Test if an entry is in the specified set.
"""
assert ipset.test() == "Error: Set needs to be specified"

assert ipset.test("s") == "Error: Entry needs to be specified"

with patch.object(ipset, "_find_set_type", return_value=None):
assert ipset.test("set", "entry") == "Error: Set set does not exist"

with patch.object(ipset, "_find_set_type", return_value=True):
mock = MagicMock(side_effect=[{"retcode": 1}, {"retcode": -1}])
with patch.dict(ipset.__salt__, {"cmd.run_all": mock}):
assert not ipset.test("set", "entry")
assert ipset.test("set", "entry")


def test_flush():
"""
Test for Flush entries in the specified set
"""
mock = MagicMock(side_effect=["", "A"])
with patch.dict(ipset.__salt__, {"cmd.run": mock}):
assert ipset.flush("set")
assert not ipset.flush("set")

0 comments on commit 0e46c14

Please sign in to comment.