Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add script to block/unblock ICMP traffic on a given ACL table [experimental; do not merge] #1488

Draft
wants to merge 2 commits into
base: 201911
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
293 changes: 293 additions & 0 deletions scripts/set_icmp_state
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
#!/usr/bin/env python
daall marked this conversation as resolved.
Show resolved Hide resolved
"""
Utility for blocking and unblocking ICMP traffic on ACL tables.

The block operation will insert a DENY rule at the top of the table. The unblock operation
will remove an existing DENY rule that has been created by the block operation (i.e. it does
NOT insert an ALLOW rule, only removes DENY rules).

There are two scopes: you can block/allow traffic to all destinations, or block/allow traffic
to specific destinations. The "all" rule will be be given higher priority (9999) than rules to
specific destinations (9995). SONiC supports inserting multiple rules at the same priority, so
all destination-specific rules will be given 9995.

You can also specify a VLAN ID to restrict the block to a given VLAN destination. Note that you
cannot have a rule that blocks all and blocks all on a certain VLAN in the same table (e.g.
if you have blocked all traffic on table A vlan 100, you cannot also block all traffic on table A with
no VLAN ID).

Example:

Block/unblock all ICMP traffic to Vlan101 on V4_VLAN_101_OUT:
./set_icmp_state block all V4_VLAN_101_OUT -v 101
./set_icmp_state allow all V4_VLAN_101_OUT

Block/unblock all ICMP traffic to 10.0.0.37/32 on Vlan101 on V4_VLAN_101_OUT:
./set_icmp_state block destination V4_VLAN_101_OUT 10.0.0.37/32 -v 101
./set_icmp_state allow destination V4_VLAN_101_OUT 10.0.0.37/32
"""

from __future__ import print_function

import syslog
import sys
import click
import ipaddress

from swsssdk import ConfigDBConnector

CONFIG_DB_ACL_TABLE_TABLE = "ACL_TABLE"
CONFIG_DB_ACL_RULE_TABLE = "ACL_RULE"
CONFIG_DB_VLAN_TABLE = "VLAN"

BLOCK_ALL_KEY = "RULE_BLOCK_ALL_ICMP"
BLOCK_KEY_PREFIX = "RULE_BLOCK_ICMP_"

IP_VERSION_V4 = "V4"
IP_VERSION_V6 = "V6"

ICMP_IP_PROTOCOL_NUM = "1"
ICMPV6_NEXT_HEADER_NUM = "58"

BLOCK_ALL_PRIORITY = "9999"
BLOCK_DEST_PRIORITY = "9995"


def notice(msg):
"""Log a NOTICE message to the console and syslog."""
syslog.syslog(syslog.LOG_NOTICE, msg)
print(msg)


def error(msg):
"""Log an ERR message to the console and syslog, and exit the program with an error code."""
syslog.syslog(syslog.LOG_ERR, msg)
print(msg, file=sys.stderr)
sys.exit(1)


def verify_table_exists(configdb, table_name):
"""Verify that a given ACL table exists in Config DB."""
target_table = configdb.get_entry(CONFIG_DB_ACL_TABLE_TABLE, table_name)

if not target_table:
error("Table {} not found, exiting...".format(table_name))

return target_table


def verify_valid_ip_address(ip_address, table_ip_version):
"""Verify that a given IP address is valid.

This checks that 1) the provided address is valid, and 2) that the IP version of the
address matches the IP version of the table this address will be applied to.
"""
try:
destination_ip_version = ipaddress.ip_network(ip_address).version
table_ip_version = 6 if table_ip_version == IP_VERSION_V6 else 4
if destination_ip_version != table_ip_version:
error("Cannot use an IPv{} rule with an IPv{} table, exiting...".format(destination_ip_version, table_ip_version))
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception [](start = 11, length = 9)

Is there a more specific exception type? #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the docs it will be a ValueError, so we can check for that.

error("Could not parse {} as a valid IP address; exception={}".format(ip_address, e))
daall marked this conversation as resolved.
Show resolved Hide resolved


def verify_valid_vlan_id(configdb, vlan_id):
"""Verify that a given VLAN ID is valid.

This checks that 1) the provided VLAN ID is valid/in-bounds, and 2) that the VLAN
exists in Config DB.
"""
if vlan_id is None:
return

if vlan_id <= 0 or vlan_id >= 4096:
error("Vlan ID {} is out of bounds [1, 4095]".format(vlan_id))

target_vlan = configdb.get_entry(CONFIG_DB_VLAN_TABLE, "Vlan{}".format(vlan_id))

if not target_vlan:
error("Vlan{} not found".format(vlan_id))


def get_table_ip_version(configdb, table_name):
"""Get the IP version of the ACL table.

Returns "V4" for IPv4 and "V6" for IPv6.
"""
table = verify_table_exists(configdb, table_name)

table_type = table.get("type", None)
if not table_type:
error("Table {} has no type, invalid configuration, exiting...".format(table_name))

return IP_VERSION_V6 if table_type in ("L3V6", "MIRRORV6") else IP_VERSION_V4


def get_block_rule_key(destination_ip_address):
"""Get the key that will be used to refer to the ACL rule used to block ICMP traffic to a destination.

Since the rules are all given the same priority in SONiC, we can't identify a rule based on the priority.
So, we use the destination IP being blocked to give each rule a unique name in the system.
"""
return BLOCK_KEY_PREFIX + str(destination_ip_address)
daall marked this conversation as resolved.
Show resolved Hide resolved


def construct_icmp_block_rule(priority, ip_version, destination_ip_address=None, vlan_id=None):
"""Construct a rule to block ICMP traffic to a given destination.

If no destination IP is specified, all ICMP traffic will be blocked.
"""
rule = {
"PRIORITY": priority,
"PACKET_ACTION": "DROP",
"IP_PROTOCOL": ICMPV6_NEXT_HEADER_NUM if ip_version == IP_VERSION_V6 else ICMP_IP_PROTOCOL_NUM
}

if destination_ip_address is not None:
dst_ip_key = "DST_IPV6" if ip_version == IP_VERSION_V6 else "DST_IP"
rule[dst_ip_key] = destination_ip_address

if vlan_id is not None:
rule["VLAN_ID"] = vlan_id

return rule


@click.group()
def cli():
pass


@cli.group("allow")
def allow():
pass


# ./set_icmp_state allow all <table_name>
@allow.command("all")
@click.argument("table_name", type=click.STRING, required=True)
def all(table_name):
"""Allow all ICMP traffic on a given ACL table.

Note: This only removes deny rules from the table, it does not add any new allow rules.
"""
notice("Allowing ALL ICMP traffic on table {}...".format(table_name))

configdb = ConfigDBConnector()
configdb.connect()

verify_table_exists(configdb, table_name)

block_rule_key = (table_name, BLOCK_ALL_KEY)
block_rule = configdb.get_entry(CONFIG_DB_ACL_RULE_TABLE, block_rule_key)
if not block_rule:
notice("No rule in table {} to block all ICMP traffic, skipping...".format(table_name))
return

configdb.set_entry(CONFIG_DB_ACL_RULE_TABLE, block_rule_key, None)
notice("Deny rule has been deleted, all ICMP traffic is now in ALLOW state.")


# ./set_icmp_state allow destination <table_name> <ip_address>
@allow.command("destination")
@click.argument("table_name", type=click.STRING, required=True)
@click.argument("ip_address", type=click.STRING, required=True)
def destination(table_name, ip_address):
"""Allow all ICMP traffic to a specific destination on a given ACL table.

Note: This only removes deny rules from the table, it does not add any new allow rules.
"""
notice("Allowing ICMP traffic to {} on table {}".format(ip_address, table_name))

configdb = ConfigDBConnector()
configdb.connect()

verify_table_exists(configdb, table_name)

table_ip_version = get_table_ip_version(configdb, table_name)
verify_valid_ip_address(ip_address, table_ip_version)

block_rule_key = (table_name, get_block_rule_key(ip_address))
block_rule = configdb.get_entry(CONFIG_DB_ACL_RULE_TABLE, block_rule_key)
if not block_rule:
notice("No rule in table {} to block ICMP traffic from {}, skipping...".format(table_name, ip_address))
return

configdb.set_entry(CONFIG_DB_ACL_RULE_TABLE, block_rule_key, None)
notice("Deny rule has been deleted, ICMP traffic from {} is now in ALLOW state.".format(ip_address))


@cli.group("block")
def block():
pass


# ./set_icmp_state block all <table_name> [-v <vlan_id>]
@block.command("all")
@click.argument("table_name", type=click.STRING, required=True)
@click.option("-v", "--vlan_id", type=click.INT)
def all(table_name, vlan_id):
"""Block all ICMP traffic on a given ACL table."""
notice("Blocking ALL ICMP traffic on table {}...".format(table_name))

configdb = ConfigDBConnector()
configdb.connect()

verify_table_exists(configdb, table_name)

verify_valid_vlan_id(configdb, vlan_id)

block_rule_key = (table_name, BLOCK_ALL_KEY)
block_rule = configdb.get_entry(CONFIG_DB_ACL_RULE_TABLE, block_rule_key)
if block_rule:
notice("Existing rule in table {} to block all ICMP traffic, skipping...".format(table_name))
return

ip_version = get_table_ip_version(configdb, table_name)
block_rule = construct_icmp_block_rule(
BLOCK_ALL_PRIORITY,
ip_version,
vlan_id=vlan_id)

configdb.set_entry(CONFIG_DB_ACL_RULE_TABLE, block_rule_key, block_rule)
notice("Deny rule has been applied, all ICMP traffic is now in DENY state.")


# ./set_icmp_state block destination <table_name> <ip_address> [-v <vlan_id>]
@block.command("destination")
@click.argument("table_name", type=click.STRING, required=True)
@click.argument("ip_address", type=click.STRING, required=True)
@click.option("-v", "--vlan_id", type=click.INT)
def destination(table_name, ip_address, vlan_id):
"""Block all ICMP traffic to a specific destination on a given ACL table."""
notice("Blocking ICMP traffic to {} on table {}".format(ip_address, table_name))

configdb = ConfigDBConnector()
configdb.connect()

verify_table_exists(configdb, table_name)

table_ip_version = get_table_ip_version(configdb, table_name)
verify_valid_ip_address(ip_address, table_ip_version)

verify_valid_vlan_id(configdb, vlan_id)

block_rule_key = get_block_rule_key(ip_address)
block_rule = configdb.get_entry(CONFIG_DB_ACL_RULE_TABLE, (table_name, block_rule_key))
if block_rule:
notice("Existing rule in table {} to block ICMP traffic to {}, skipping...".format(table_name, ip_address))
return

block_rule = construct_icmp_block_rule(
BLOCK_DEST_PRIORITY,
table_ip_version,
destination_ip_address=ip_address,
vlan_id=vlan_id
)

configdb.set_entry(CONFIG_DB_ACL_RULE_TABLE, (table_name, block_rule_key), block_rule)
notice("Deny rule has been applied, all ICMP traffic is now in DENY state.")


if __name__ == "__main__":
cli()