In [None]:
# Python version of WindowsFirewallRuleParser.cs
# This assumes supporting classes like AddressSet, AddressRange, PortSet, PortRange, and NetworkProtocol are defined.

import ipaddress
import csv
from typing import List, Dict, Generator, Optional

class AddressRange:
    def __init__(self, low: str, high: str):
        self.low = ipaddress.IPv4Address(low)
        self.high = ipaddress.IPv4Address(high)

class AddressSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[AddressRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

class PortRange:
    def __init__(self, low: int, high: int):
        self.low = low
        self.high = high

class PortSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[PortRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

class NetworkProtocol:
    def __init__(self, any_protocol: bool = False, protocol_number: int = 0):
        self.any = any_protocol
        self.protocol_number = protocol_number

    @staticmethod
    def try_get_protocol_number(name: str) -> Optional[int]:
        protocol_map = {"TCP": 6, "UDP": 17, "ICMP": 1}
        return protocol_map.get(name.upper())

class WindowsFirewallRule:
    def __init__(self, name: str, remote_addresses: AddressSet, remote_ports: PortSet,
                 local_ports: PortSet, protocol: NetworkProtocol, enabled: bool, allow: bool):
        self.name = name
        self.remote_addresses = remote_addresses
        self.remote_ports = remote_ports
        self.local_ports = local_ports
        self.protocol = protocol
        self.enabled = enabled
        self.allow = allow

class WindowsFirewallRuleParser:
    REQUIRED_HEADERS = [
        "Name", "Enabled", "Action", "Local Port",
        "Remote Address", "Remote Port", "Protocol"
    ]

    @staticmethod
    def parse(text: str, separator: str) -> Generator[WindowsFirewallRule, None, None]:
        reader = csv.reader(text.strip().splitlines(), delimiter=separator)
        header_line = next(reader)
        header_index = WindowsFirewallRuleParser.parse_header(header_line)

        for i, line in enumerate(reader):
            try:
                yield WindowsFirewallRuleParser.parse_record(header_index, line)
            except Exception as e:
                print(f"Skipping line {i + 2} - {e}")

    @staticmethod
    def parse_header(header_line: List[str]) -> Dict[str, int]:
        header_index = {h.strip(): i for i, h in enumerate(header_line)}
        missing = [h for h in WindowsFirewallRuleParser.REQUIRED_HEADERS if h not in header_index]
        if missing:
            raise ValueError(f"Missing required headers: {', '.join(missing)}")
        return header_index

    @staticmethod
    def parse_record(header_index: Dict[str, int], record: List[str]) -> WindowsFirewallRule:
        def get(field: str) -> str:
            return record[header_index[field]].strip()

        return WindowsFirewallRule(
            name=get("Name"),
            remote_addresses=WindowsFirewallRuleParser.parse_address_set(get("Remote Address")),
            remote_ports=WindowsFirewallRuleParser.parse_port_set(get("Remote Port")),
            local_ports=WindowsFirewallRuleParser.parse_port_set(get("Local Port")),
            protocol=WindowsFirewallRuleParser.parse_network_protocol(get("Protocol")),
            enabled=(get("Enabled") == "Yes"),
            allow=(get("Action") == "Allow")
        )

    @staticmethod
    def parse_address_set(text: str) -> AddressSet:
        if text == "Any":
            return AddressSet(contains_all=True)

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(AddressRange(low.strip(), high.strip()))
            else:
                ranges.append(AddressRange(part, part))
        return AddressSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_port_set(text: str) -> PortSet:
        if not text:
            raise ValueError("Port is empty")

        if text == "Any":
            return PortSet(contains_all=True)

        unsupported_macros = {
            "RPC Endpoint Mapper", "RPC Dynamic Ports",
            "IPHTTPS", "Edge Traversal", "PlayTo Discovery"
        }
        if text in unsupported_macros:
            raise ValueError(f"Unsupported port macro: {text}")

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(PortRange(int(low.strip()), int(high.strip())))
            else:
                val = int(part)
                ranges.append(PortRange(val, val))
        return PortSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_network_protocol(text: str) -> NetworkProtocol:
        if text == "Any":
            return NetworkProtocol(any_protocol=True)

        protocol_number = NetworkProtocol.try_get_protocol_number(text)
        if protocol_number is None:
            protocol_number = int(text)

        return NetworkProtocol(any_protocol=False, protocol_number=protocol_number)

# Example of taking input and parsing
if __name__ == '__main__':
    import sys
    
    filename = input("Enter the firewall dump file path: ")
    # separator = input("Enter the column separator (e.g., ','): ")

    with open(filename, 'r') as f:
        text = f.read()

    print("Parsed firewall rules:")
    for rule in WindowsFirewallRuleParser.parse(text, separator='\t'):
        print(vars(rule))

In [2]:
# Python version of WindowsFirewallRuleParser.cs
# This assumes supporting classes like AddressSet, AddressRange, PortSet, PortRange, and NetworkProtocol are defined.

import ipaddress
import csv
from typing import List, Dict, Generator, Optional

class AddressRange:
    def __init__(self, low: str, high: str):
        self.low = ipaddress.IPv4Address(low)
        self.high = ipaddress.IPv4Address(high)

class AddressSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[AddressRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

class PortRange:
    def __init__(self, low: int, high: int):
        self.low = low
        self.high = high

class PortSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[PortRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

class NetworkProtocol:
    def __init__(self, any_protocol: bool = False, protocol_number: int = 0):
        self.any = any_protocol
        self.protocol_number = protocol_number

    @staticmethod
    def try_get_protocol_number(name: str) -> Optional[int]:
        protocol_map = {"TCP": 6, "UDP": 17, "ICMP": 1}
        return protocol_map.get(name.upper())

class WindowsFirewallRule:
    def __init__(self, name: str, remote_addresses: AddressSet, remote_ports: PortSet,
                 local_ports: PortSet, protocol: NetworkProtocol, enabled: bool, allow: bool):
        self.name = name
        self.remote_addresses = remote_addresses
        self.remote_ports = remote_ports
        self.local_ports = local_ports
        self.protocol = protocol
        self.enabled = enabled
        self.allow = allow

    def __str__(self):
        return (
            f"Rule: {self.name}\n"
            f"  Enabled: {self.enabled}, Action: {'Allow' if self.allow else 'Block'}\n"
            f"  Protocol: {'Any' if self.protocol.any else self.protocol.protocol_number}\n"
            f"  Local Ports: {'Any' if self.local_ports.contains_all else [(r.low, r.high) for r in self.local_ports.ranges]}\n"
            f"  Remote Ports: {'Any' if self.remote_ports.contains_all else [(r.low, r.high) for r in self.remote_ports.ranges]}\n"
            f"  Remote Addresses: {'Any' if self.remote_addresses.contains_all else [(str(r.low), str(r.high)) for r in self.remote_addresses.ranges]}"
        )

class WindowsFirewallRuleParser:
    REQUIRED_HEADERS = [
        "Name", "Enabled", "Action", "Local Port",
        "Remote Address", "Remote Port", "Protocol"
    ]

    @staticmethod
    def parse(text: str, separator: str) -> Generator[WindowsFirewallRule, None, None]:
        reader = csv.reader(text.strip().splitlines(), delimiter=separator)
        header_line = next(reader)
        header_index = WindowsFirewallRuleParser.parse_header(header_line)

        for i, line in enumerate(reader):
            try:
                yield WindowsFirewallRuleParser.parse_record(header_index, line)
            except Exception as e:
                print(f"Skipping line {i + 2} - {e}")

    @staticmethod
    def parse_header(header_line: List[str]) -> Dict[str, int]:
        header_index = {h.strip(): i for i, h in enumerate(header_line)}
        missing = [h for h in WindowsFirewallRuleParser.REQUIRED_HEADERS if h not in header_index]
        if missing:
            raise ValueError(f"Missing required headers: {', '.join(missing)}")
        return header_index

    @staticmethod
    def parse_record(header_index: Dict[str, int], record: List[str]) -> WindowsFirewallRule:
        def get(field: str) -> str:
            return record[header_index[field]].strip()

        return WindowsFirewallRule(
            name=get("Name"),
            remote_addresses=WindowsFirewallRuleParser.parse_address_set(get("Remote Address")),
            remote_ports=WindowsFirewallRuleParser.parse_port_set(get("Remote Port")),
            local_ports=WindowsFirewallRuleParser.parse_port_set(get("Local Port")),
            protocol=WindowsFirewallRuleParser.parse_network_protocol(get("Protocol")),
            enabled=(get("Enabled") == "Yes"),
            allow=(get("Action") == "Allow")
        )

    @staticmethod
    def parse_address_set(text: str) -> AddressSet:
        if text == "Any":
            return AddressSet(contains_all=True)

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(AddressRange(low.strip(), high.strip()))
            else:
                ranges.append(AddressRange(part, part))
        return AddressSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_port_set(text: str) -> PortSet:
        if not text:
            raise ValueError("Port is empty")

        if text == "Any":
            return PortSet(contains_all=True)

        unsupported_macros = {
            "RPC Endpoint Mapper", "RPC Dynamic Ports",
            "IPHTTPS", "Edge Traversal", "PlayTo Discovery"
        }
        if text in unsupported_macros:
            raise ValueError(f"Unsupported port macro: {text}")

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(PortRange(int(low.strip()), int(high.strip())))
            else:
                val = int(part)
                ranges.append(PortRange(val, val))
        return PortSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_network_protocol(text: str) -> NetworkProtocol:
        if text == "Any":
            return NetworkProtocol(any_protocol=True)

        protocol_number = NetworkProtocol.try_get_protocol_number(text)
        if protocol_number is None:
            protocol_number = int(text)

        return NetworkProtocol(any_protocol=False, protocol_number=protocol_number)

# Example of taking input and parsing
if __name__ == '__main__':
    import sys
    
    filename = input("Enter the firewall dump file path: ")
    # separator = input("Enter the column separator (e.g., ',', '\\t'): ")

    with open(filename, 'r') as f:
        text = f.read()

    print("Parsed firewall rules:")
    for rule in WindowsFirewallRuleParser.parse(text, separator='\t'):
        print(rule)


Parsed firewall rules:
Rule: @{EnvironmentsApp_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://EnvironmentsApp/resources/DisplayName}
  Enabled: True, Action: Allow
  Protocol: Any
  Local Ports: Any
  Remote Ports: Any
  Remote Addresses: Any
Rule: @{EnvironmentsApp_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://EnvironmentsApp/resources/DisplayName}
  Enabled: True, Action: Allow
  Protocol: Any
  Local Ports: Any
  Remote Ports: Any
  Remote Addresses: Any
Rule: @{HoloItemPlayerApp_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://HoloItemPlayerApp/resources/DisplayName}
  Enabled: True, Action: Allow
  Protocol: Any
  Local Ports: Any
  Remote Ports: Any
  Remote Addresses: Any
Rule: @{HoloItemPlayerApp_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://HoloItemPlayerApp/resources/DisplayName}
  Enabled: True, Action: Allow
  Protocol: Any
  Local Ports: Any
  Remote Ports: Any
  Remote Addresses: Any
Rule: @{HoloShell_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://HoloShell

In [3]:
# This code checks equivalence of two parsed firewall rule sets using Z3
from z3 import *

class FirewallEquivalenceChecker:
    def __init__(self, rules1, rules2):
        self.rules1 = rules1
        self.rules2 = rules2

    def allows(self, rules, packet):
        allowed_exprs = []
        for rule in rules:
            if not rule.enabled:
                continue

            proto_cond = BoolVal(rule.protocol.any) if rule.protocol.any else packet['protocol'] == rule.protocol.protocol_number
            local_port_cond = self.port_set_expr(packet['local_port'], rule.local_ports)
            remote_port_cond = self.port_set_expr(packet['remote_port'], rule.remote_ports)
            remote_addr_cond = self.address_set_expr(packet['remote_ip'], rule.remote_addresses)
            match_expr = And(proto_cond, local_port_cond, remote_port_cond, remote_addr_cond)

            allowed_exprs.append(And(match_expr, BoolVal(rule.allow)))

        return Or(*allowed_exprs) if allowed_exprs else BoolVal(False)

    def port_set_expr(self, port_var, port_set):
        if port_set.contains_all:
            return BoolVal(True)
        return Or([And(port_var >= r.low, port_var <= r.high) for r in port_set.ranges])

    def address_set_expr(self, ip_var, addr_set):
        if addr_set.contains_all:
            return BoolVal(True)
        return Or([
            And(ip_var >= self.ip_to_int(r.low), ip_var <= self.ip_to_int(r.high))
            for r in addr_set.ranges
        ])

    def ip_to_int(self, ip):
        return int(ip)

    def check_equivalence(self):
        # ctx = Context()
        s = Solver()

        packet = {
            'protocol': Int('proto'),
            'local_port': Int('lport'),
            'remote_port': Int('rport'),
            'remote_ip': Int('rip')
        }

        f1_allows = self.allows(self.rules1, packet)
        f2_allows = self.allows(self.rules2, packet)

        inequivalence = Not(f1_allows == f2_allows)
        s.add(inequivalence)

        if s.check() == unsat:
            print("Firewalls are equivalent.")
        else:
            print("Firewalls are NOT equivalent.")
            m = s.model()
            print("Counterexample:")
            for k, v in packet.items():
                if k == 'remote_ip':
                    print(f"  {k}: {ipaddress.IPv4Address(m[v].as_long())}")
                else :
                    print(f"  {k}: {m[v]}")


if __name__ == '__main__':
    # from windows_firewall_parser import WindowsFirewallRuleParser

    sep = '\t'#input("Enter separator (e.g., ',' or '\\t'): ")
    file1 = input("Enter first firewall file: ")
    file2 = input("Enter second firewall file: ")

    with open(file1) as f1:
        rules1 = list(WindowsFirewallRuleParser.parse(f1.read(), sep))
    with open(file2) as f2:
        rules2 = list(WindowsFirewallRuleParser.parse(f2.read(), sep))

    checker = FirewallEquivalenceChecker(rules1, rules2)
    checker.check_equivalence()

Skipping line 78 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 79 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 80 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 82 - Expected 4 octets in 'Local subnet'
Skipping line 94 - invalid literal for int() with base 10: 'IGMP'
Skipping line 95 - Unsupported port macro: IPHTTPS
Skipping line 96 - invalid literal for int() with base 10: 'IPv6'
Skipping line 97 - Expected 4 octets in 'Local subnet'
Skipping line 98 - Expected 4 octets in 'Local subnet'
Skipping line 99 - Expected 4 octets in 'Local subnet'
Skipping line 100 - Expected 4 octets in 'Local subnet'
Skipping line 101 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 102 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 103 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 104 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 105 - Expected 4 octets in 'Local subnet'
Skipping line 106 - Expected 4 o

In [2]:
#Final
# Python version of WindowsFirewallRuleParser.cs
# Assumes the existence of supporting classes like AddressSet, AddressRange, PortSet, PortRange, and NetworkProtocol.

import ipaddress
import csv
from typing import List, Dict, Generator, Optional

# Represents an IP address range from 'low' to 'high'
class AddressRange:
    def __init__(self, low: str, high: str):
        self.low = ipaddress.IPv4Address(low)
        self.high = ipaddress.IPv4Address(high)

# Represents a set of address ranges or a wildcard for all addresses
class AddressSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[AddressRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

# Represents a port range from 'low' to 'high'
class PortRange:
    def __init__(self, low: int, high: int):
        self.low = low
        self.high = high

# Represents a set of port ranges or a wildcard for all ports
class PortSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[PortRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

# Represents a network protocol (e.g., TCP, UDP, ICMP)
class NetworkProtocol:
    def __init__(self, any_protocol: bool = False, protocol_number: int = 0):
        self.any = any_protocol
        self.protocol_number = protocol_number

    @staticmethod
    def try_get_protocol_number(name: str) -> Optional[int]:
        """Map protocol name to number"""
        protocol_map = {
            "HOPOPT": 0,
            "ICMP": 1,
            "IGMP": 2,
            "TCP": 6,
            "UDP": 17,
            "IPv6": 41,
            "IPv6-Route": 43,
            "IPv6-Frag": 44,
            "GRE": 47,
            "ICMPv6": 58,
            "IPv6-NoNxt": 59,
            "IPv6-Opts": 60,
            "VRRP": 112,
            "PGM": 113,
            "L2TP": 115
        }
        return protocol_map.get(name.upper())

# Represents a single firewall rule
class WindowsFirewallRule:
    def __init__(self, name: str, remote_addresses: AddressSet, remote_ports: PortSet,
                 local_ports: PortSet, protocol: NetworkProtocol, enabled: bool, allow: bool):
        self.name = name
        self.remote_addresses = remote_addresses
        self.remote_ports = remote_ports
        self.local_ports = local_ports
        self.protocol = protocol
        self.enabled = enabled
        self.allow = allow

    def __str__(self):
        """Provides a human-readable string representation of the rule"""
        return (
            f"Rule: {self.name}\n"
            f"  Enabled: {self.enabled}, Action: {'Allow' if self.allow else 'Block'}\n"
            f"  Protocol: {'Any' if self.protocol.any else self.protocol.protocol_number}\n"
            f"  Local Ports: {'Any' if self.local_ports.contains_all else [(r.low, r.high) for r in self.local_ports.ranges]}\n"
            f"  Remote Ports: {'Any' if self.remote_ports.contains_all else [(r.low, r.high) for r in self.remote_ports.ranges]}\n"
            f"  Remote Addresses: {'Any' if self.remote_addresses.contains_all else [(str(r.low), str(r.high)) for r in self.remote_addresses.ranges]}"
        )

# Parses firewall rules from a CSV or TSV file
class WindowsFirewallRuleParser:
    REQUIRED_HEADERS = [
        "Name", "Enabled", "Action", "Local Port",
        "Remote Address", "Remote Port", "Protocol"
    ]

    @staticmethod
    def parse(text: str, separator: str) -> Generator[WindowsFirewallRule, None, None]:
        """
        Parses the given CSV/TSV text into firewall rules
        """
        reader = csv.reader(text.strip().splitlines(), delimiter=separator)
        header_line = next(reader)
        header_index = WindowsFirewallRuleParser.parse_header(header_line)

        for i, line in enumerate(reader):
            try:
                yield WindowsFirewallRuleParser.parse_record(header_index, line)
            except Exception as e:
                print(f"Skipping line {i + 2} - {e}")

    @staticmethod
    def parse_header(header_line: List[str]) -> Dict[str, int]:
        """Builds a mapping from header names to their index positions"""
        header_index = {h.strip(): i for i, h in enumerate(header_line)}
        missing = [h for h in WindowsFirewallRuleParser.REQUIRED_HEADERS if h not in header_index]
        if missing:
            raise ValueError(f"Missing required headers: {', '.join(missing)}")
        return header_index

    @staticmethod
    def parse_record(header_index: Dict[str, int], record: List[str]) -> WindowsFirewallRule:
        """Parses a single row of the firewall rules CSV into a rule object"""
        def get(field: str) -> str:
            return record[header_index[field]].strip()

        return WindowsFirewallRule(
            name=get("Name"),
            remote_addresses=WindowsFirewallRuleParser.parse_address_set(get("Remote Address")),
            remote_ports=WindowsFirewallRuleParser.parse_port_set(get("Remote Port")),
            local_ports=WindowsFirewallRuleParser.parse_port_set(get("Local Port")),
            protocol=WindowsFirewallRuleParser.parse_network_protocol(get("Protocol")),
            enabled=(get("Enabled") == "Yes"),
            allow=(get("Action") == "Allow")
        )

    @staticmethod
    def parse_address_set(text: str) -> AddressSet:
        """Parses a string representing an address set into an AddressSet object"""
        if text == "Any":
            return AddressSet(contains_all=True)

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(AddressRange(low.strip(), high.strip()))
            else:
                ranges.append(AddressRange(part, part))
        return AddressSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_port_set(text: str) -> PortSet:
        """Parses a string representing a port set into a PortSet object"""
        if not text:
            raise ValueError("Port is empty")

        if text == "Any":
            return PortSet(contains_all=True)

        unsupported_macros = {
            "RPC Endpoint Mapper", "RPC Dynamic Ports",
            "IPHTTPS", "Edge Traversal", "PlayTo Discovery"
        }
        if text in unsupported_macros:
            raise ValueError(f"Unsupported port macro: {text}")

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(PortRange(int(low.strip()), int(high.strip())))
            else:
                val = int(part)
                ranges.append(PortRange(val, val))
        return PortSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_network_protocol(text: str) -> NetworkProtocol:
        """Parses the protocol field into a NetworkProtocol object"""
        if text == "Any":
            return NetworkProtocol(any_protocol=True)

        protocol_number = NetworkProtocol.try_get_protocol_number(text)
        if protocol_number is None:
            protocol_number = int(text)

        return NetworkProtocol(any_protocol=False, protocol_number=protocol_number)

# Checks if two sets of firewall rules are logically equivalent using Z3 SMT solver
from z3 import *

class FirewallEquivalenceChecker:
    def __init__(self, rules1, rules2):
        self.rules1 = rules1
        self.rules2 = rules2

    def allows(self, rules, packet):
        """
        Encodes firewall behavior as a logical formula that determines if a packet is allowed
        """
        allowed_exprs = []
        for rule in rules:
            if not rule.enabled:
                continue

            # proto_cond = BoolVal(rule.protocol.any) if rule.protocol.any else packet['protocol'] == rule.protocol.protocol_number
            if rule.protocol.any:
                proto_cond = BoolVal(True)
            else:
                proto_cond = packet['protocol'] == rule.protocol.protocol_number
            local_port_cond = self.port_set_expr(packet['local_port'], rule.local_ports)
            remote_port_cond = self.port_set_expr(packet['remote_port'], rule.remote_ports)
            remote_addr_cond = self.address_set_expr(packet['remote_ip'], rule.remote_addresses)

            match_expr = And(proto_cond, local_port_cond, remote_port_cond, remote_addr_cond)
            allowed_exprs.append(And(match_expr, BoolVal(rule.allow)))

        return Or(*allowed_exprs) if allowed_exprs else BoolVal(False)

    def port_set_expr(self, port_var, port_set):
        """Returns a Z3 expression that matches a port against the given PortSet"""
        if port_set.contains_all:
            return BoolVal(True)
        return Or([And(port_var >= r.low, port_var <= r.high) for r in port_set.ranges])

    def address_set_expr(self, ip_var, addr_set):
        """Returns a Z3 expression that matches an IP address against the given AddressSet"""
        if addr_set.contains_all:
            return BoolVal(True)
        return Or([
            And(ip_var >= self.ip_to_int(r.low), ip_var <= self.ip_to_int(r.high))
            for r in addr_set.ranges
        ])

    def ip_to_int(self, ip):
        """Converts IPv4Address to integer for SMT encoding"""
        return int(ip)

    def check_equivalence(self):
        """Main function that checks if two rule sets are semantically equivalent"""
        s = Solver()

        # Symbolic packet representation
        packet = {
            'protocol': Int('proto'),
            'local_port': Int('lport'),
            'remote_port': Int('rport'),
            'remote_ip': Int('rip')
        }

        f1_allows = self.allows(self.rules1, packet)
        f2_allows = self.allows(self.rules2, packet)

        print(f1_allows)
        print('______________________________________________________________')
        print(f2_allows)

        # Check if there's any packet on which the rules differ
        inequivalence = Not(f1_allows == f2_allows)
        s.add(inequivalence)

        if s.check() == unsat:
            print("Firewalls are equivalent.")
        else:
            print("Firewalls are NOT equivalent.")
            print("Counterexample:")
            m = s.model()
            for k, v in packet.items():
                if k == 'remote_ip':
                    print(f"  {k}: {ipaddress.IPv4Address(m[v].as_long())}")
                else:
                    print(f"  {k}: {m[v]}")

if __name__ == '__main__':
    # Load and compare two firewall rule files
    sep = '\t'  # Assuming TSV format by default
    file1 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1.txt'#input("Enter first firewall file: ")
    file2 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1_out.txt'#input("Enter second firewall file: ")

    with open(file1) as f1:
        rules1 = list(WindowsFirewallRuleParser.parse(f1.read(), sep))
    with open(file2) as f2:
        rules2 = list(WindowsFirewallRuleParser.parse(f2.read(), sep))


Skipping line 69 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 70 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 71 - Unsupported port macro: PlayTo Discovery
Skipping line 72 - Expected 4 octets in 'Local subnet'
Skipping line 74 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 76 - Expected 4 octets in 'Local subnet'
Skipping line 77 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 78 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 79 - Expected 4 octets in 'Local subnet'
Skipping line 81 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 86 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 87 - invalid literal for int() with base 10: 'ICMPv4'
Skipping line 91 - Unsupported port macro: IPHTTPS
Skipping line 92 - invalid literal for int() with base 10: 'IPv6'
Skipping line 93 - Expected 4 octets in 'Local subnet'
Skipping line 94 - Expected 4 octets in 'Local subnet'
Skipping line 95 - Expected 4 octets in 'Local subnet'
Ski

In [None]:
from z3 import *
import ipaddress

class FirewallEquivalenceChecker:
    def __init__(self, rules1, rules2, block_by_default=True):
        self.rules1 = rules1
        self.rules2 = rules2
        self.block_by_default = block_by_default

    def allows(self, rules, packet):
        """
        Encodes firewall behavior as a logical formula that determines if a packet is allowed.
        This matches the C# logic with Allow, Block, and BlockByDefault handling.
        """
        block_exprs = []
        allow_exprs = []

        for rule in rules:
            if not rule.enabled:
                continue

            # Build match expression
            proto_cond = BoolVal(True) if rule.protocol.any else packet['protocol'] == rule.protocol.protocol_number
            local_port_cond = self.port_set_expr(packet['local_port'], rule.local_ports)
            remote_port_cond = self.port_set_expr(packet['remote_port'], rule.remote_ports)
            remote_addr_cond = self.address_set_expr(packet['remote_ip'], rule.remote_addresses)

            match_expr = And(proto_cond, local_port_cond, remote_port_cond, remote_addr_cond)

            if rule.allow:
                allow_exprs.append(match_expr)
            else:
                block_exprs.append(match_expr)

        has_block = Or(*block_exprs) if block_exprs else BoolVal(False)
        has_allow = Or(*allow_exprs) if allow_exprs else BoolVal(False)

        if not self.block_by_default:
            return Not(has_block)
        else:
            return And(has_allow, Not(has_block))

    def port_set_expr(self, port_var, port_set):
        """Returns a Z3 expression that matches a port against the given PortSet."""
        if port_set.contains_all:
            return BoolVal(True)
        return Or([And(port_var >= r.low, port_var <= r.high) for r in port_set.ranges])

    def address_set_expr(self, ip_var, addr_set):
        """Returns a Z3 expression that matches an IP address against the given AddressSet."""
        if addr_set.contains_all:
            return BoolVal(True)
        return Or([
            And(ip_var >= self.ip_to_int(r.low), ip_var <= self.ip_to_int(r.high))
            for r in addr_set.ranges
        ])

    def ip_to_int(self, ip):
        """Converts IPv4Address to integer for SMT encoding."""
        return int(ip)

    def check_equivalence(self):
        """Main function that checks if two rule sets are semantically equivalent."""
        s = Solver()

        # Symbolic packet
        packet = {
            'protocol': Int('proto'),
            'local_port': Int('lport'),
            'remote_port': Int('rport'),
            'remote_ip': Int('rip')
        }

        f1_allows = self.allows(self.rules1, packet)
        f2_allows = self.allows(self.rules2, packet)

        print("Firewall 1 formula:\n", f1_allows)
        print("______________________________________________________________")
        print("Firewall 2 formula:\n", f2_allows)

        # Look for packet on which behavior differs
        s.add(f1_allows != f2_allows)

        if s.check() == unsat:
            print("✅ Firewalls are equivalent.")
        else:
            print("❌ Firewalls are NOT equivalent.")
            print("Counterexample:")
            m = s.model()
            for k, v in packet.items():
                if k == 'remote_ip':
                    print(f"  {k}: {ipaddress.IPv4Address(m[v].as_long())}")
                else:
                    print(f"  {k}: {m[v]}")
if __name__ == '__main__':
    # Load and compare two firewall rule files
    sep = '\t'  # Assuming TSV format by default
    file1 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1.txt'#input("Enter first firewall file: ")
    file2 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1_out.txt'#input("Enter second firewall file: ")

    with open(file1) as f1:
        rules1 = list(WindowsFirewallRuleParser.parse(f1.read(), sep))
    with open(file2) as f2:
        rules2 = list(WindowsFirewallRuleParser.parse(f2.read(), sep))
    checker = FirewallEquivalenceChecker(rules1, rules2)
    checker.check_equivalence()

Skipping line 69 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 70 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 71 - Unsupported port macro: PlayTo Discovery
Skipping line 72 - Expected 4 octets in 'Local subnet'
Skipping line 74 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 76 - Expected 4 octets in 'Local subnet'
Skipping line 77 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 78 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 79 - Expected 4 octets in 'Local subnet'
Skipping line 81 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 86 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 87 - invalid literal for int() with base 10: 'ICMPv4'
Skipping line 91 - Unsupported port macro: IPHTTPS
Skipping line 92 - invalid literal for int() with base 10: 'IPv6'
Skipping line 93 - Expected 4 octets in 'Local subnet'
Skipping line 94 - Expected 4 octets in 'Local subnet'
Skipping line 95 - Expected 4 octets in 'Local subnet'
Ski

In [None]:
from z3 import *
import ipaddress

class FirewallEquivalenceChecker:
    def __init__(self, rules1, rules2, block_by_default=True):
        self.rules1 = rules1
        self.rules2 = rules2
        self.block_by_default = block_by_default

    def allows(self, rules, packet):
        block_exprs = []
        allow_exprs = []

        for rule in rules:
            if not rule.enabled:
                continue

            proto_cond = BoolVal(True) if rule.protocol.any else packet['protocol'] == rule.protocol.protocol_number
            local_port_cond = self.port_set_expr(packet['local_port'], rule.local_ports)
            remote_port_cond = self.port_set_expr(packet['remote_port'], rule.remote_ports)
            remote_addr_cond = self.address_set_expr(packet['remote_ip'], rule.remote_addresses)

            match_expr = And(proto_cond, local_port_cond, remote_port_cond, remote_addr_cond)

            if rule.allow:
                allow_exprs.append(match_expr)
            else:
                block_exprs.append(match_expr)

        has_block = Or(*block_exprs) if block_exprs else BoolVal(False)
        has_allow = Or(*allow_exprs) if allow_exprs else BoolVal(False)

        if not self.block_by_default:
            return Not(has_block)
        else:
            return And(has_allow, Not(has_block))

    def port_set_expr(self, port_var, port_set):
        if port_set.contains_all:
            return BoolVal(True)
        return Or([And(port_var >= r.low, port_var <= r.high) for r in port_set.ranges])

    def address_set_expr(self, ip_var, addr_set):
        if addr_set.contains_all:
            return BoolVal(True)
        return Or([
            And(ip_var >= self.ip_to_int(r.low), ip_var <= self.ip_to_int(r.high))
            for r in addr_set.ranges
        ])

    def ip_to_int(self, ip):
        return int(ip)

    def packet_from_model(self, model, packet_vars):
        """
        Extract packet values from model.
        If variable is undefined, replace with "Any".
        """
        packet_values = {}
        for k, v in packet_vars.items():
            if model[v] is not None :
                val = model.eval(v, model_completion=True)
                if k == 'remote_ip':
                    try:
                        packet_values[k] = str(ipaddress.IPv4Address(val.as_long()))
                    except Exception:
                        packet_values[k] = "Any"
                else:
                    packet_values[k] = val.as_long() if hasattr(val, 'as_long') else val
            else :
                packet_values[k] = "Any"
            # val = model.eval(v, model_completion=True)
            # if val is None:
            #     packet_values[k] = "Any"
            # else:
            #     # Special handling for remote_ip (32-bit)
            #     if k == 'remote_ip':
            #         try:
            #             packet_values[k] = str(ipaddress.IPv4Address(val.as_long()))
            #         except Exception:
            #             packet_values[k] = "Any"
            #     else:
            #         packet_values[k] = val.as_long() if hasattr(val, 'as_long') else val
        print("Packet values from model:", packet_values)
        return packet_values

    def rules_matching_packet(self, rules, packet_values):
        """
        Returns list of rule names that match the concrete packet.
        'packet_values' is a dict with concrete values (or 'Any')
        """
        matched_rules = []
        for rule in rules:
            if not rule.enabled:
                continue

            if not self.rule_matches_packet(rule, packet_values):
                continue

            matched_rules.append(rule.name if hasattr(rule, 'name') else "UnnamedRule")
        return matched_rules

    def rule_matches_packet(self, rule, packet_values):
        """
        Check if a rule matches a concrete packet (values, not symbolic)
        Supports 'Any' meaning unconstrained.
        """
        def port_in_set(port, port_set):
            if port == "Any":
                return True
            for r in port_set.ranges:
                if r.low <= port <= r.high:
                    return True
            return port_set.contains_all

        def addr_in_set(ip_str, addr_set):
            if ip_str == "Any":
                return True
            ip_int = int(ipaddress.IPv4Address(ip_str))
            for r in addr_set.ranges:
                low_int = int(r.low)
                high_int = int(r.high)
                if low_int <= ip_int <= high_int:
                    return True
            return addr_set.contains_all

        # Protocol match
        if not rule.protocol.any:
            if packet_values['protocol'] != "Any" and packet_values['protocol'] != rule.protocol.protocol_number:
                return False

        # Ports
        if not port_in_set(packet_values['local_port'], rule.local_ports):
            return False
        if not port_in_set(packet_values['remote_port'], rule.remote_ports):
            return False

        # Addresses
        if not addr_in_set(packet_values['remote_ip'], rule.remote_addresses):
            return False

        return True
    

    def check_equivalence(self, max_counterexamples=10):
        s = Solver()
        packet = {
            'protocol': Int('proto'),
            'local_port': Int('lport'),
            'remote_port': Int('rport'),
            'remote_ip': Int('rip')
        }

        f1_allows = self.allows(self.rules1, packet)
        f2_allows = self.allows(self.rules2, packet)

        s.add(f1_allows != f2_allows)

        found = 0
        while found < max_counterexamples and s.check() == sat:
            m = s.model()
            print(m)
            pkt_vals = self.packet_from_model(m, packet)

            f1_allowed = is_true(m.eval(f1_allows, model_completion=True))
            f2_allowed = is_true(m.eval(f2_allows, model_completion=True))


            f1_matching_rules = self.rules_matching_packet(self.rules1, pkt_vals)
            f2_matching_rules = self.rules_matching_packet(self.rules2, pkt_vals)

            print(f"Counterexample #{found+1}:")
            for k, v in pkt_vals.items():
                print(f"  {k}: {v}")

            print(f"  Firewall 1 allows? {f1_allowed} (Matching rules: {f1_matching_rules})")
            print(f"  Firewall 2 allows? {f2_allowed} (Matching rules: {f2_matching_rules})")
            print("-" * 50)

            # Block this solution for next iteration
            block_current = []
            for k,v in pkt_vals.items():
                if v == "Any":
                    continue  # Don't block on 'Any' values
                else :
                    block_current.append(packet[k] != v)
            print(block_current)
            s.add(Or(block_current))

            found += 1

        if found == 0:
            print("✅ Firewalls are equivalent.")

if __name__ == '__main__':
    # Load and compare two firewall rule files
    sep = '\t'  # Assuming TSV format by default
    # file1 = input("Enter first firewall file: ")
    file1 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1.txt'#
    # file2 = input("Enter second firewall file: ")
    file2 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1_out.txt'#input("Enter second firewall file: ")

    with open(file1) as f1:
        rules1 = list(WindowsFirewallRuleParser.parse(f1.read(), sep))
    with open(file2) as f2:
        rules2 = list(WindowsFirewallRuleParser.parse(f2.read(), sep))
    checker = FirewallEquivalenceChecker(rules1, rules2)
    checker.check_equivalence()

Skipping line 69 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 70 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 71 - Unsupported port macro: PlayTo Discovery
Skipping line 72 - Expected 4 octets in 'Local subnet'
Skipping line 74 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 76 - Expected 4 octets in 'Local subnet'
Skipping line 77 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 78 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 79 - Expected 4 octets in 'Local subnet'
Skipping line 81 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 86 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 87 - invalid literal for int() with base 10: 'ICMPv4'
Skipping line 91 - Unsupported port macro: IPHTTPS
Skipping line 92 - invalid literal for int() with base 10: 'IPv6'
Skipping line 93 - Expected 4 octets in 'Local subnet'
Skipping line 94 - Expected 4 octets in 'Local subnet'
Skipping line 95 - Expected 4 octets in 'Local subnet'
Ski

In [23]:
checker = FirewallEquivalenceChecker(rules1, rules2)
checker.check_equivalence()

[proto = 17]
Packet values from model: {'protocol': 17, 'local_port': 'Any', 'remote_port': 'Any', 'remote_ip': 'Any'}
Counterexample #1:
  protocol: 17
  local_port: Any
  remote_port: Any
  remote_ip: Any
  Firewall 1 allows? False (Matching rules: ['AnyDesk', 'AnyDesk', 'AnyDesk', 'AnyDesk', 'brave.exe', 'ChatGPT', 'Logi Options+ Agent', 'Logi Options+ Agent', 'Logi Plugin Service', 'Visual Studio Code', 'VLC media player', '@{HoloShell_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://HoloShell/resources/DisplayName}', '@{HoloShell_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://HoloShell/resources/DisplayName}', '@{Microsoft.AAD.BrokerPlugin_1000.19580.1000.0_neutral_neutral_cw5n1h2txyewy?ms-resource://Microsoft.AAD.BrokerPlugin/resources/PackageDisplayName}', '@{Microsoft.AAD.BrokerPlugin_1000.19580.1000.0_neutral_neutral_cw5n1h2txyewy?ms-resource://Microsoft.AAD.BrokerPlugin/resources/PackageDisplayName}', '@{Microsoft.AAD.BrokerPlugin_1000.19580.1000.0_neutral_neutral_cw5n1h2

In [62]:
from z3 import *
import ipaddress
from itertools import combinations

class FirewallEquivalenceChecker:
    def __init__(self, rules1, rules2, block_by_default=True):
        self.rules1 = rules1
        self.rules2 = rules2
        self.block_by_default = block_by_default

    def allows(self, rules, packet):
        block_exprs = []
        allow_exprs = []

        for rule in rules:
            if not rule.enabled:
                continue

            proto_cond = BoolVal(True) if rule.protocol.any else packet['protocol'] == rule.protocol.protocol_number
            local_port_cond = self.port_set_expr(packet['local_port'], rule.local_ports)
            remote_port_cond = self.port_set_expr(packet['remote_port'], rule.remote_ports)
            remote_addr_cond = self.address_set_expr(packet['remote_ip'], rule.remote_addresses)

            match_expr = And(proto_cond, local_port_cond, remote_port_cond, remote_addr_cond)

            if rule.allow:
                allow_exprs.append(match_expr)
            else:
                block_exprs.append(match_expr)

        has_block = Or(*block_exprs) if block_exprs else BoolVal(False)
        has_allow = Or(*allow_exprs) if allow_exprs else BoolVal(False)

        if not self.block_by_default:
            return Not(has_block)
        else:
            return And(has_allow, Not(has_block))

    def port_set_expr(self, port_var, port_set):
        if port_set.contains_all:
            return BoolVal(True)
        return Or([And(port_var >= r.low, port_var <= r.high) for r in port_set.ranges])

    def address_set_expr(self, ip_var, addr_set):
        if addr_set.contains_all:
            return BoolVal(True)
        return Or([
            And(ip_var >= self.ip_to_int(r.low), ip_var <= self.ip_to_int(r.high))
            for r in addr_set.ranges
        ])

    def ip_to_int(self, ip):
        return int(ip)

    def packet_from_model(self, model, packet_vars):
        """
        Extract packet values from model.
        If variable is undefined, replace with "Any".
        """
        packet_values = {}
        for k, v in packet_vars.items():
            val = model.eval(v, model_completion=True)
            if val is None:
                packet_values[k] = "Any"
            else:
                # Special handling for remote_ip (32-bit)
                if k == 'remote_ip':
                    try:
                        packet_values[k] = str(ipaddress.IPv4Address(val.as_long()))
                    except Exception:
                        packet_values[k] = "Any"
                else:
                    packet_values[k] = val.as_long() if hasattr(val, 'as_long') else val
        return packet_values

    def rules_matching_packet(self, rules, packet_values):
        """
        Returns list of rule names that match the concrete packet.
        'packet_values' is a dict with concrete values (or 'Any')
        """
        matched_rules = []
        for rule in rules:
            if not rule.enabled:
                continue

            if not self.rule_matches_packet(rule, packet_values):
                continue

            matched_rules.append(rule.name if hasattr(rule, 'name') else "UnnamedRule")
        return matched_rules

    def rule_matches_packet(self, rule, packet_values):
        """
        Check if a rule matches a concrete packet (values, not symbolic)
        Supports 'Any' meaning unconstrained.
        """
        def port_in_set(port, port_set):
            if port == "Any":
                return True
            for r in port_set.ranges:
                if r.low <= port <= r.high:
                    return True
            return port_set.contains_all

        def addr_in_set(ip_str, addr_set):
            if ip_str == "Any":
                return True
            ip_int = int(ipaddress.IPv4Address(ip_str))
            for r in addr_set.ranges:
                low_int = int(r.low)
                high_int = int(r.high)
                if low_int <= ip_int <= high_int:
                    return True
            return addr_set.contains_all

        # Protocol match
        if not rule.protocol.any:
            if packet_values['protocol'] != "Any" and packet_values['protocol'] != rule.protocol.protocol_number:
                return False

        # Ports
        if not port_in_set(packet_values['local_port'], rule.local_ports):
            return False
        if not port_in_set(packet_values['remote_port'], rule.remote_ports):
            return False

        # Addresses
        if not addr_in_set(packet_values['remote_ip'], rule.remote_addresses):
            return False

        return True

    def generalize_counterexample(self, model, packet_vars, f1_allows, f2_allows):
        s = Solver()
        base_constraints = []
        fixed_values = {}

        # Evaluate original concrete packet
        concrete_packet = {k: model.eval(v, model_completion=True) for k, v in packet_vars.items()}
        print("Concrete packet from model:", concrete_packet)
        print(packet_vars)

        for var_to_try_any in packet_vars:
            print(var_to_try_any)
            temp_constraints = []
            for k, v in packet_vars.items():
                if k == var_to_try_any:
                    continue
                temp_constraints.append(v == concrete_packet[k])
            print(temp_constraints)
            s.push()
            s.add(*temp_constraints)
            s.add(f1_allows != f2_allows)
            if s.check() != sat:
                print("yes")
                s.pop()
            else:
                s.pop()
                fixed_values[var_to_try_any] = concrete_packet[var_to_try_any]
                print(concrete_packet[var_to_try_any])

        return fixed_values

    def check_equivalence(self, max_counterexamples=10):
        s = Solver()
        packet = {
            'protocol': Int('proto'),
            'local_port': Int('lport'),
            'remote_port': Int('rport'),
            'remote_ip': Int('rip')
        }

        f1_allows = self.allows(self.rules1, packet)
        f2_allows = self.allows(self.rules2, packet)

        s.add(f1_allows != f2_allows)

        seen_patterns = set()
        found = 0

        while found < max_counterexamples and s.check() == sat:
            m = s.model()
            print(m)
            fixed_vars = self.generalize_counterexample(m, packet, f1_allows, f2_allows)
            print(fixed_vars)

            key = tuple((k, fixed_vars[k].as_long()) for k in sorted(fixed_vars))
            if key in seen_patterns:
                break  # Already seen
            seen_patterns.add(key)

            pkt_repr = {}
            for k in packet:
                if k in fixed_vars:
                    if k == 'remote_ip':
                        pkt_repr[k] = str(ipaddress.IPv4Address(fixed_vars[k].as_long()))
                    else:
                        pkt_repr[k] = fixed_vars[k].as_long()
                else:
                    pkt_repr[k] = "Any"

            f1_allowed = is_true(m.eval(f1_allows, model_completion=True))
            f2_allowed = is_true(m.eval(f2_allows, model_completion=True))

            f1_matching = self.rules_matching_packet(self.rules1, pkt_repr)
            f2_matching = self.rules_matching_packet(self.rules2, pkt_repr)

            print(f"Generalized Counterexample #{found + 1}:")
            for k in ['protocol', 'local_port', 'remote_port', 'remote_ip']:
                print(f"  {k}: {pkt_repr[k]}")
            print(f"  Firewall 1 allows? {f1_allowed} (Matching rules: {f1_matching})")
            print(f"  Firewall 2 allows? {f2_allowed} (Matching rules: {f2_matching})")
            print("-" * 50)

            # Block future packets with same fixed assignments
            block_clause = [packet[k] == v for k, v in fixed_vars.items()]
            s.add(Not(And(block_clause)))

            found += 1

        if found == 0:
            print("✅ Firewalls are equivalent.")



if __name__ == '__main__':
    # Load and compare two firewall rule files
    sep = '\t'  # Assuming TSV format by default
    file1 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1.txt'#input("Enter first firewall file: ")
    file2 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1_out.txt'#input("Enter second firewall file: ")

    with open(file1) as f1:
        rules1 = list(WindowsFirewallRuleParser.parse(f1.read(), sep))
    with open(file2) as f2:
        rules2 = list(WindowsFirewallRuleParser.parse(f2.read(), sep))


Skipping line 69 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 70 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 71 - Unsupported port macro: PlayTo Discovery
Skipping line 72 - Expected 4 octets in 'Local subnet'
Skipping line 74 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 76 - Expected 4 octets in 'Local subnet'
Skipping line 77 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 78 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 79 - Expected 4 octets in 'Local subnet'
Skipping line 81 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 86 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 87 - invalid literal for int() with base 10: 'ICMPv4'
Skipping line 91 - Unsupported port macro: IPHTTPS
Skipping line 92 - invalid literal for int() with base 10: 'IPv6'
Skipping line 93 - Expected 4 octets in 'Local subnet'
Skipping line 94 - Expected 4 octets in 'Local subnet'
Skipping line 95 - Expected 4 octets in 'Local subnet'
Ski

In [63]:
checker = FirewallEquivalenceChecker(rules1, rules2)
checker.check_equivalence()

[proto = 6]
Concrete packet from model: {'protocol': 6, 'local_port': 0, 'remote_port': 0, 'remote_ip': 0}
{'protocol': proto, 'local_port': lport, 'remote_port': rport, 'remote_ip': rip}
protocol
[0 == lport, 0 == rport, 0 == rip]
6
local_port
[6 == proto, 0 == rport, 0 == rip]
0
remote_port
[6 == proto, 0 == lport, 0 == rip]
0
remote_ip
[6 == proto, 0 == lport, 0 == rport]
0
{'protocol': 6, 'local_port': 0, 'remote_port': 0, 'remote_ip': 0}
Generalized Counterexample #1:
  protocol: 6
  local_port: 0
  remote_port: 0
  remote_ip: 0.0.0.0
  Firewall 1 allows? False (Matching rules: ['AnyDesk', 'AnyDesk', 'AnyDesk', 'AnyDesk', 'brave.exe', 'ChatGPT', 'Logi Options+ Agent', 'Logi Options+ Agent', 'Logi Plugin Service', 'Visual Studio Code', 'VLC media player', '@{HoloShell_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://HoloShell/resources/DisplayName}', '@{HoloShell_10.0.22621.1_neutral__cw5n1h2txyewy?ms-resource://HoloShell/resources/DisplayName}', '@{Microsoft.AAD.BrokerPlugin_1000

## FINAL

In [None]:
import ipaddress
import csv
from z3 import *
from typing import List, Dict, Generator, Optional

# Represents an IP address range from 'low' to 'high'
class AddressRange:
    def __init__(self, low: str, high: str):
        self.low = ipaddress.IPv4Address(low)
        self.high = ipaddress.IPv4Address(high)

# Represents a set of address ranges or a wildcard for all addresses
class AddressSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[AddressRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

# Represents a port range from 'low' to 'high'
class PortRange:
    def __init__(self, low: int, high: int):
        self.low = low
        self.high = high

# Represents a set of port ranges or a wildcard for all ports
class PortSet:
    def __init__(self, contains_all: bool = False, ranges: Optional[List[PortRange]] = None):
        self.contains_all = contains_all
        self.ranges = ranges or []

# Represents a network protocol (e.g., TCP, UDP, ICMP)
class NetworkProtocol:
    def __init__(self, any_protocol: bool = False, protocol_number: int = 0):
        self.any = any_protocol
        self.protocol_number = protocol_number

    @staticmethod
    def try_get_protocol_number(name: str) -> Optional[int]:
        """Map protocol name to number"""
        protocol_map = {
            "HOPOPT": 0,
            "ICMP": 1,
            "IGMP": 2,
            "TCP": 6,
            "UDP": 17,
            "IPv6": 41,
            "IPv6-Route": 43,
            "IPv6-Frag": 44,
            "GRE": 47,
            "ICMPv6": 58,
            "IPv6-NoNxt": 59,
            "IPv6-Opts": 60,
            "VRRP": 112,
            "PGM": 113,
            "L2TP": 115
        }
        return protocol_map.get(name.upper())

# Represents a single firewall rule
class WindowsFirewallRule:
    def __init__(self, name: str, remote_addresses: AddressSet, remote_ports: PortSet,
                 local_ports: PortSet, protocol: NetworkProtocol, enabled: bool, allow: bool):
        self.name = name
        self.remote_addresses = remote_addresses
        self.remote_ports = remote_ports
        self.local_ports = local_ports
        self.protocol = protocol
        self.enabled = enabled
        self.allow = allow

    def __str__(self):
        """Provides a human-readable string representation of the rule"""
        return (
            f"Rule: {self.name}\n"
            f"  Enabled: {self.enabled}, Action: {'Allow' if self.allow else 'Block'}\n"
            f"  Protocol: {'Any' if self.protocol.any else self.protocol.protocol_number}\n"
            f"  Local Ports: {'Any' if self.local_ports.contains_all else [(r.low, r.high) for r in self.local_ports.ranges]}\n"
            f"  Remote Ports: {'Any' if self.remote_ports.contains_all else [(r.low, r.high) for r in self.remote_ports.ranges]}\n"
            f"  Remote Addresses: {'Any' if self.remote_addresses.contains_all else [(str(r.low), str(r.high)) for r in self.remote_addresses.ranges]}"
        )

# Parses firewall rules from a CSV or TSV file
class WindowsFirewallRuleParser:
    REQUIRED_HEADERS = [
        "Name", "Enabled", "Action", "Local Port",
        "Remote Address", "Remote Port", "Protocol"
    ]

    @staticmethod
    def parse(text: str, separator: str) -> Generator[WindowsFirewallRule, None, None]:
        """
        Parses the given CSV/TSV text into firewall rules
        """
        reader = csv.reader(text.strip().splitlines(), delimiter=separator)
        header_line = next(reader)
        header_index = WindowsFirewallRuleParser.parse_header(header_line)

        for i, line in enumerate(reader):
            try:
                yield WindowsFirewallRuleParser.parse_record(header_index, line)
            except Exception as e:
                print(f"Skipping line {i + 2} - {e}")

    @staticmethod
    def parse_header(header_line: List[str]) -> Dict[str, int]:
        """Builds a mapping from header names to their index positions"""
        header_index = {h.strip(): i for i, h in enumerate(header_line)}
        missing = [h for h in WindowsFirewallRuleParser.REQUIRED_HEADERS if h not in header_index]
        if missing:
            raise ValueError(f"Missing required headers: {', '.join(missing)}")
        return header_index

    @staticmethod
    def parse_record(header_index: Dict[str, int], record: List[str]) -> WindowsFirewallRule:
        """Parses a single row of the firewall rules CSV into a rule object"""
        def get(field: str) -> str:
            return record[header_index[field]].strip()

        return WindowsFirewallRule(
            name=get("Name"),
            remote_addresses=WindowsFirewallRuleParser.parse_address_set(get("Remote Address")),
            remote_ports=WindowsFirewallRuleParser.parse_port_set(get("Remote Port")),
            local_ports=WindowsFirewallRuleParser.parse_port_set(get("Local Port")),
            protocol=WindowsFirewallRuleParser.parse_network_protocol(get("Protocol")),
            enabled=(get("Enabled") == "Yes"),
            allow=(get("Action") == "Allow")
        )

    @staticmethod
    def parse_address_set(text: str) -> AddressSet:
        """Parses a string representing an address set into an AddressSet object"""
        if text == "Any":
            return AddressSet(contains_all=True)

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(AddressRange(low.strip(), high.strip()))
            else:
                ranges.append(AddressRange(part, part))
        return AddressSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_port_set(text: str) -> PortSet:
        """Parses a string representing a port set into a PortSet object"""
        if not text:
            raise ValueError("Port is empty")

        if text == "Any":
            return PortSet(contains_all=True)

        unsupported_macros = {
            "RPC Endpoint Mapper", "RPC Dynamic Ports",
            "IPHTTPS", "Edge Traversal", "PlayTo Discovery"
        }
        if text in unsupported_macros:
            raise ValueError(f"Unsupported port macro: {text}")

        ranges = []
        for part in text.split(','):
            part = part.strip()
            if '-' in part:
                low, high = part.split('-')
                ranges.append(PortRange(int(low.strip()), int(high.strip())))
            else:
                val = int(part)
                ranges.append(PortRange(val, val))
        return PortSet(contains_all=False, ranges=ranges)

    @staticmethod
    def parse_network_protocol(text: str) -> NetworkProtocol:
        """Parses the protocol field into a NetworkProtocol object"""
        if text == "Any":
            return NetworkProtocol(any_protocol=True)

        protocol_number = NetworkProtocol.try_get_protocol_number(text)
        if protocol_number is None:
            protocol_number = int(text)

        return NetworkProtocol(any_protocol=False, protocol_number=protocol_number)

class FirewallEquivalenceChecker:
    def __init__(self, rules1, rules2, block_by_default=True):
        self.rules1 = rules1
        self.rules2 = rules2
        self.block_by_default = block_by_default

    def allows(self, rules, packet):
        block_exprs = []
        allow_exprs = []

        for rule in rules:
            if not rule.enabled:
                continue

            proto_cond = BoolVal(True) if rule.protocol.any else packet['protocol'] == rule.protocol.protocol_number
            local_port_cond = self.port_set_expr(packet['local_port'], rule.local_ports)
            remote_port_cond = self.port_set_expr(packet['remote_port'], rule.remote_ports)
            remote_addr_cond = self.address_set_expr(packet['remote_ip'], rule.remote_addresses)

            match_expr = And(proto_cond, local_port_cond, remote_port_cond, remote_addr_cond)

            if rule.allow:
                allow_exprs.append(match_expr)
            else:
                block_exprs.append(match_expr)

        has_block = Or(*block_exprs) if block_exprs else BoolVal(False)
        has_allow = Or(*allow_exprs) if allow_exprs else BoolVal(False)

        if not self.block_by_default:
            return Not(has_block)
        else:
            return And(has_allow, Not(has_block))

    def port_set_expr(self, port_var, port_set):
        if port_set.contains_all:
            return BoolVal(True)
        return Or([And(port_var >= r.low, port_var <= r.high) for r in port_set.ranges])

    def address_set_expr(self, ip_var, addr_set):
        if addr_set.contains_all:
            return BoolVal(True)
        return Or([
            And(ip_var >= self.ip_to_int(r.low), ip_var <= self.ip_to_int(r.high))
            for r in addr_set.ranges
        ])

    def ip_to_int(self, ip):
        return int(ip)

    def packet_from_model(self, model, packet_vars):
        """
        Extract packet values from model.
        If variable is undefined, replace with "Any".
        """
        packet_values = {}
        packet_values_constraint = {}
        for k, v in packet_vars.items():
            if model[v] is not None :
                val = model.eval(v, model_completion=True)
                if k == 'remote_ip':
                    try:
                        packet_values_constraint[k] = val
                        packet_values[k] = str(ipaddress.IPv4Address(val.as_long()))
                    except Exception:
                        packet_values[k] = "Any"
                else:
                    packet_values_constraint[k] = val
                    packet_values[k] = val.as_long() if hasattr(val, 'as_long') else val
            else :
                packet_values[k] = "Any"
        return packet_values, packet_values_constraint

    def rules_matching_packet(self, rules, packet_values):
        """
        Returns list of rule names that match the concrete packet.
        'packet_values' is a dict with concrete values (or 'Any')
        """
        matched_rules = []
        for rule in rules:
            if not rule.enabled:
                continue

            if not self.rule_matches_packet(rule, packet_values):
                continue

            action = "Allow" if rule.allow else "Block"
            matched_rules.append((rule.name if hasattr(rule, 'name') else "UnnamedRule", action))
        return matched_rules

    def rule_matches_packet(self, rule, packet_values):
        """
        Check if a rule matches a concrete packet (values, not symbolic)
        Supports 'Any' meaning unconstrained.
        """
        def port_in_set(port, port_set):
            if port == "Any":
                return True
            for r in port_set.ranges:
                if r.low <= port <= r.high:
                    return True
            return port_set.contains_all

        def addr_in_set(ip_str, addr_set):
            if ip_str == "Any":
                return True
            ip_int = int(ipaddress.IPv4Address(ip_str))
            for r in addr_set.ranges:
                low_int = int(r.low)
                high_int = int(r.high)
                if low_int <= ip_int <= high_int:
                    return True
            return addr_set.contains_all

        # Protocol match
        if not rule.protocol.any:
            if packet_values['protocol'] != "Any" and packet_values['protocol'] != rule.protocol.protocol_number:
                return False

        # Ports
        if not port_in_set(packet_values['local_port'], rule.local_ports):
            return False
        if not port_in_set(packet_values['remote_port'], rule.remote_ports):
            return False

        # Addresses
        if not addr_in_set(packet_values['remote_ip'], rule.remote_addresses):
            return False

        return True
    

    def check_equivalence(self, max_counterexamples=10):
        s = Solver()
        packet = {
            'protocol': Int('proto'),
            'local_port': Int('lport'),
            'remote_port': Int('rport'),
            'remote_ip': Int('rip')
        }

        f1_allows = self.allows(self.rules1, packet)
        f2_allows = self.allows(self.rules2, packet)

        s.add(f1_allows != f2_allows)

        found = 0
        print("-" * 50)
        if s.check() == sat:
            print("\n❌ Firewalls are NOT equivalent.\n")
            print("-" * 50)
            
        while found < max_counterexamples and s.check() == sat:
            m = s.model()
            pkt_vals, pkt_vals_const = self.packet_from_model(m, packet)

            f1_allowed = is_true(m.eval(f1_allows, model_completion=True))
            f2_allowed = is_true(m.eval(f2_allows, model_completion=True))


            f1_matching_rules = self.rules_matching_packet(self.rules1, pkt_vals)
            f2_matching_rules = self.rules_matching_packet(self.rules2, pkt_vals)

            print(f"Counterexample #{found+1}:")
            for k, v in pkt_vals.items():
                if k == 'protocol':
                    reversed_protocol_map = {0: "HOPOPT", 1: "ICMP", 2: "IGMP", 6: "TCP", 17: "UDP", 41: "IPv6", 43: "IPv6-Route", 44: "IPv6-Frag", 47: "GRE", 58: "ICMPv6", 59: "IPv6-NoNxt", 60: "IPv6-Opts", 112: "VRRP", 113: "PGM", 115: "L2TP"}
                    v = reversed_protocol_map.get(v, v)  

                print(f"  {k}: {v}")

            print(f"  Firewall 1 allows? {f1_allowed} (Matching rules: {f1_matching_rules})")
            print(f"  Firewall 2 allows? {f2_allowed} (Matching rules: {f2_matching_rules})")
            print("-" * 50)

            # Block this solution for next iteration
            block_current = []
            for k,v in pkt_vals_const.items():
                block_current.append(packet[k] != v)
            s.add(Or(block_current))

            found += 1

        if found == 0:
            print("✅ Firewalls are equivalent.")

if __name__ == '__main__':

    sep = '\t'  
    file1 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1.txt'
    file2 = 'D:\Documents\GitHub\FirewallChecker\sharanya_1_out.txt'

    with open(file1) as f1:
        rules1 = list(WindowsFirewallRuleParser.parse(f1.read(), sep))
    with open(file2) as f2:
        rules2 = list(WindowsFirewallRuleParser.parse(f2.read(), sep))
    checker = FirewallEquivalenceChecker(rules1, rules2)
    checker.check_equivalence()

Skipping line 69 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 70 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 71 - Unsupported port macro: PlayTo Discovery
Skipping line 72 - Expected 4 octets in 'Local subnet'
Skipping line 74 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 76 - Expected 4 octets in 'Local subnet'
Skipping line 77 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 78 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 79 - Expected 4 octets in 'Local subnet'
Skipping line 81 - Expected 4 octets in 'PlayTo Renderers'
Skipping line 86 - invalid literal for int() with base 10: 'ICMPv6'
Skipping line 87 - invalid literal for int() with base 10: 'ICMPv4'
Skipping line 91 - Unsupported port macro: IPHTTPS
Skipping line 92 - invalid literal for int() with base 10: 'IPv6'
Skipping line 93 - Expected 4 octets in 'Local subnet'
Skipping line 94 - Expected 4 octets in 'Local subnet'
Skipping line 95 - Expected 4 octets in 'Local subnet'
Ski