diff --git a/config/main.py b/config/main.py index 786ae67d40..a5f3ef4595 100644 --- a/config/main.py +++ b/config/main.py @@ -4569,7 +4569,6 @@ def get_acl_bound_ports(): return list(ports) - def expand_vlan_ports(port_name): """ Expands a given VLAN interface into its member ports. @@ -4594,10 +4593,21 @@ def expand_vlan_ports(port_name): return members +def get_acl_valid_vlans(): + config_db = ConfigDBConnector() + config_db.connect() + vlans = set() + vlan_dict = config_db.get_table("VLAN") + for key in vlan_dict: + vlans.add(key) + return list(vlans) def parse_acl_table_info(table_name, table_type, description, ports, stage): table_info = {"type": table_type} + expand_vlan_asic = ["mellanox"] + is_expand_vlan = True if asic_type in expand_vlan_asic else False + if description: table_info["policy_desc"] = description else: @@ -4608,16 +4618,25 @@ def parse_acl_table_info(table_name, table_type, description, ports, stage): port_list = [] valid_acl_ports = get_acl_bound_ports() + valid_acl_vlans = get_acl_valid_vlans() + if ports: for port in ports.split(","): - port_list += expand_vlan_ports(port) + if is_expand_vlan: + port_list += expand_vlan_ports(port) + else: + port_list.append(port) port_list = list(set(port_list)) # convert to set first to remove duplicate ifaces else: port_list = valid_acl_ports for port in port_list: - if port not in valid_acl_ports: - raise ValueError("Cannot bind ACL to specified port {}".format(port)) + if is_expand_vlan: + if port not in valid_acl_ports: + raise ValueError("Cannot bind ACL to specified port {}".format(port)) + else: + if port not in valid_acl_ports and port not in valid_acl_vlans: + raise ValueError("Cannot bind ACL to specified port {}".format(port)) table_info["ports"] = port_list diff --git a/tests/acl_config_test.py b/tests/acl_config_test.py index ff397e760d..56e24c78e8 100644 --- a/tests/acl_config_test.py +++ b/tests/acl_config_test.py @@ -4,9 +4,18 @@ from click.testing import CliRunner from config.main import expand_vlan_ports, parse_acl_table_info - +from swsssdk import ConfigDBConnector class TestConfigAcl(object): + @pytest.fixture(scope="function", params= ["mellanox", "broadcom"]) + def asic_type(self, request): + org_asic_type = config.asic_type + config.asic_type = request.param + + yield request.param + + config.asic_type = org_asic_type + def test_expand_vlan(self): assert set(expand_vlan_ports("Vlan1000")) == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"} @@ -20,27 +29,43 @@ def test_expand_empty_vlan(self): with pytest.raises(ValueError): expand_vlan_ports("Vlan3000") - def test_parse_table_with_vlan_expansion(self): + def test_parse_table_with_vlan_expansion(self, asic_type): table_info = parse_acl_table_info("TEST", "L3", None, "Vlan1000", "egress") + assert table_info["type"] == "L3" assert table_info["policy_desc"] == "TEST" assert table_info["stage"] == "egress" - assert set(table_info["ports"]) == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"} - def test_parse_table_with_vlan_and_duplicates(self): + if asic_type == "mellanox": + assert set(table_info["ports"]) == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"} + else: + assert set(table_info["ports"]) == {"Vlan1000"} + + def test_parse_table_with_vlan_and_duplicates(self, asic_type): table_info = parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan1000", "egress") assert table_info["type"] == "L3" assert table_info["policy_desc"] == "TEST" assert table_info["stage"] == "egress" - # Since Ethernet4 is a member of Vlan1000 we should not include it twice in the output port_set = set(table_info["ports"]) - assert len(port_set) == 4 - assert set(port_set) == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"} - def test_parse_table_with_empty_vlan(self): - with pytest.raises(ValueError): - parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan3000", "egress") + if asic_type == "mellanox": + # Since Ethernet4 is a member of Vlan1000 we should not include it twice in the output + assert len(port_set) == 4 + assert port_set == {"Ethernet4", "Ethernet8", "Ethernet12", "Ethernet16"} + else: + assert port_set == {"Ethernet4", "Vlan1000"} + + def test_parse_table_with_empty_vlan(self, asic_type): + if asic_type == "mellanox": + with pytest.raises(ValueError): + parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan3000", "egress") + else: + table_info = parse_acl_table_info("TEST", "L3", None, "Ethernet4,Vlan3000", "egress") + assert table_info["type"] == "L3" + assert table_info["policy_desc"] == "TEST" + assert table_info["stage"] == "egress" + assert set(table_info["ports"]) == {"Ethernet4", "Vlan3000"} def test_parse_table_with_invalid_ports(self): with pytest.raises(ValueError): @@ -70,12 +95,15 @@ def test_acl_add_table_empty_string_port_list(self): assert result.exit_code != 0 assert "Cannot bind empty list of ports" in result.output - def test_acl_add_table_empty_vlan(self): + def test_acl_add_table_empty_vlan(self, asic_type): runner = CliRunner() result = runner.invoke( config.config.commands["acl"].commands["add"].commands["table"], ["TEST", "L3", "-p", "Vlan3000"]) - assert result.exit_code != 0 - assert "Cannot bind empty VLAN Vlan3000" in result.output + if asic_type == "mellanox": + assert result.exit_code != 0 + assert "Cannot bind empty VLAN Vlan3000" in result.output + else: + assert result.exit_code == 0 \ No newline at end of file