Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ACL] Rollback the code of #1475 "Expand VLAN into VLAN members..." #1775

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 12 additions & 27 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4569,31 +4569,14 @@ def get_acl_bound_ports():

return list(ports)


def expand_vlan_ports(port_name):
"""
Expands a given VLAN interface into its member ports.

If the provided interface is a VLAN, then this method will return its member ports.

If the provided interface is not a VLAN, then this method will return a list with only
the provided interface in it.
"""
def get_acl_valid_vlans():
config_db = ConfigDBConnector()
config_db.connect()

if port_name not in config_db.get_keys("VLAN"):
return [port_name]

vlan_members = config_db.get_keys("VLAN_MEMBER")

members = [member for vlan, member in vlan_members if port_name == vlan]

if not members:
raise ValueError("Cannot bind empty VLAN {}".format(port_name))

return members

vlans = set()
vlan_dict = config_db.get_table("VLAN")
for key in vlan_dict:
vlans.add(key)
return list(vlans)

def parse_acl_table_info(table_name, table_type, description, ports, stage):
table_info = {"type": table_type}
Expand All @@ -4608,18 +4591,20 @@ def parse_acl_table_info(table_name, table_type, description, ports, stage):

port_list = []
valid_acl_ports = get_acl_bound_ports()
valid_acl_vlans = get_acl_valid_vlans()

if ports:
for port in ports.split(","):
port_list += expand_vlan_ports(port)
port_list = list(set(port_list)) # convert to set first to remove duplicate ifaces
port_list.append(port)
port_list = set(port_list)
else:
port_list = valid_acl_ports

for port in port_list:
if port not in valid_acl_ports:
if port not in valid_acl_ports and port not in valid_acl_vlans:
raise ValueError("Cannot bind ACL to specified port {}".format(port))

table_info["ports"] = port_list
table_info["ports@"] = ",".join(port_list)

table_info["stage"] = stage

Expand Down
43 changes: 15 additions & 28 deletions tests/acl_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,32 @@
import config.main as config

from click.testing import CliRunner
from config.main import expand_vlan_ports, parse_acl_table_info

from config.main import parse_acl_table_info

class TestConfigAcl(object):
def test_expand_vlan(self):
assert set(expand_vlan_ports("Vlan1000")) == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"}

def test_expand_lag(self):
assert set(expand_vlan_ports("PortChannel1001")) == {"PortChannel1001"}

def test_expand_physical_interface(self):
assert set(expand_vlan_ports("Ethernet4")) == {"Ethernet4"}

def test_expand_empty_vlan(self):
with pytest.raises(ValueError):
expand_vlan_ports("Vlan3000")

def test_parse_table_with_vlan_expansion(self):
def test_parse_table_with_vlan(self):
table_info = parse_acl_table_info("TEST", "L3", None, "Vlan1000", "egress")
assert table_info["type"] == "L3"
assert table_info["policy_desc"] == "TEST"
assert table_info["stage"] == "egress"
assert set(table_info["ports"]) == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"}

port_list = table_info["ports@"].split(",")
assert set(port_list) == {"Vlan1000"}

@pytest.mark.skip('TODO')
def test_parse_table_with_vlan_and_duplicates(self):
table_info = parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan1000", "egress")
# Shall not bind Ethernet/PortChannel port and VLAN to the same ACL table
with pytest.raises(ValueError):
parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan1000", "egress")

def test_parse_table_with_empty_vlan(self):
table_info = parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan3000", "egress")
assert table_info["type"] == "L3"
assert table_info["policy_desc"] == "TEST"
assert table_info["stage"] == "egress"

# Since Ethernet4 is a member of Vlan1000 we should not include it twice in the output
port_set = set(table_info["ports"])
assert len(port_set) == 4
assert set(port_set) == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"}

def test_parse_table_with_empty_vlan(self):
with pytest.raises(ValueError):
parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan3000", "egress")
port_list = table_info["ports@"].split(",")
assert set(port_list) == {"Ethernet4", "Vlan3000"}

def test_parse_table_with_invalid_ports(self):
with pytest.raises(ValueError):
Expand Down Expand Up @@ -77,5 +65,4 @@ def test_acl_add_table_empty_vlan(self):
config.config.commands["acl"].commands["add"].commands["table"],
["TEST", "L3", "-p", "Vlan3000"])

assert result.exit_code != 0
assert "Cannot bind empty VLAN Vlan3000" in result.output
assert result.exit_code == 0