In [77]:
"""
Classes and methods for working with nondeterministic finite automata.
Source: https://github.com/caleb531/automata
"""

import copy
import abc
import math
from collections import defaultdict, Counter

class AutomatonException(Exception):
    """The base class for all automaton-related errors."""
    pass


class InvalidStateError(AutomatonException):
    """A state is not a valid state for this automaton."""
    pass


class InvalidSymbolError(AutomatonException):
    """A symbol is not a valid symbol for this automaton."""
    pass


class MissingStateError(AutomatonException):
    """A state is missing from the automaton definition."""
    pass


class RejectionException(AutomatonException):
    """The input was rejected by the automaton."""
    pass


class Automaton(metaclass=abc.ABCMeta):
    """An abstract base class for all automata, including Turing machines."""

    @abc.abstractmethod
    def __init__(self):
        """Initialize a complete automaton."""
        raise NotImplementedError

    @abc.abstractmethod
    def validate(self):
        """Return True if this automaton is internally consistent."""
        raise NotImplementedError

    @abc.abstractmethod
    def read_input_stepwise(self, input_str):
        """Return a generator that yields each step while reading input."""
        raise NotImplementedError

    def read_input(self, input_str, validate_final=True):
        """
        Check if the given string is accepted by this automaton.

        Return the automaton's final configuration. If validate_final is true,
        this will return the final configuration only if this string is valid,
        raising RejectionException otherwise.
        """
        validation_generator = self.read_input_stepwise(input_str, validate_final=validate_final)
        for config in validation_generator:
            pass
        return config
    
    
    def accepts_input(self, input_str):
        """Return True if this automaton accepts the given input."""
        try:
            self.read_input(input_str)
            return True
        except RejectionException:
            return False

    def _validate_initial_state(self):
        """Raise an error if the initial state is invalid."""
        if self.initial_state not in self.states:
            raise InvalidStateError(
                '{} is not a valid initial state'.format(self.initial_state))

    def _validate_initial_state_transitions(self):
        """Raise an error if the initial state has no transitions defined."""
        if self.initial_state not in self.transitions:
            raise MissingStateError(
                'initial state {} has no transitions defined'.format(
                    self.initial_state))

    def _validate_final_states(self):
        """Raise an error if any final states are invalid."""
        invalid_states = self.final_states - self.states
        if invalid_states:
            raise InvalidStateError(
                'final states are not valid ({})'.format(
                    ', '.join(str(state) for state in invalid_states)))

    def copy(self):
        """Create a deep copy of the automaton."""
        return self.__class__(**vars(self))

    def __eq__(self, other):
        """Check if two automata are equal."""
        return vars(self) == vars(other)


class FA(Automaton, metaclass=abc.ABCMeta):
    """An abstract base class for finite automata."""
    pass

def ddict2dict(d):
    for k, v in d.items():
        if isinstance(v, dict):
            d[k] = ddict2dict(v)
    return dict(d)

class NFA(FA):
    """A nondeterministic finite automaton."""
    def __init__(self, *, states, input_symbols, transitions,
                 initial_state, final_states, states_by_layer=None):
        """Initialize a complete NFA."""
        self.states = states.copy()
        self.input_symbols = input_symbols.copy()
        self.transitions = copy.deepcopy(transitions)
        self.initial_state = initial_state
        self.final_states = final_states.copy()
        self.validate()

        # --- Counting utilities ---
        self.states_by_layer = states_by_layer
        self.sketch = defaultdict(dict)
        self.n_for_sets = {}
        self.n_for_states = {}
        self.s_for_states = defaultdict(Counter)
        

    def _validate_transition_invalid_symbols(self, start_state, paths):
        """Raise an error if transition symbols are invalid."""
        for input_symbol in paths.keys():
            if input_symbol not in self.input_symbols and input_symbol != '':
                raise InvalidSymbolError(
                    'state {} has invalid transition symbol {}'.format(
                        start_state, input_symbol))

    def _validate_transition_end_states(self, start_state, paths):
        """Raise an error if transition end states are invalid."""
        for end_states in paths.values():
            for end_state in end_states:
                if end_state not in self.states:
                    raise InvalidStateError(
                        'end state {} for transition on {} is '
                        'not valid'.format(end_state, start_state))

    def validate(self):
        """Return True if this NFA is internally consistent."""
        for start_state, paths in self.transitions.items():
            self._validate_transition_invalid_symbols(start_state, paths)
            self._validate_transition_end_states(start_state, paths)
        self._validate_initial_state()
        self._validate_initial_state_transitions()
        self._validate_final_states()
        return True

    def _get_epsilon_closure(self, start_state):
        """
        Return the epsilon closure for the given state.

        The epsilon closure of a state q is the set containing q, along with
        every state that can be reached from q by following only epsilon
        transitions.
        """
        stack = []
        encountered_states = set()
        stack.append(start_state)

        while stack:
            state = stack.pop()
            if state not in encountered_states:
                encountered_states.add(state)
                if '' in self.transitions[state]:
                    stack.extend(self.transitions[state][''])

        return encountered_states

    def _get_next_current_states(self, current_states, input_symbol):
        """Return the next set of current states given the current set."""
        next_current_states = set()

        for current_state in current_states:
            symbol_end_states = self.transitions[current_state].get(
                input_symbol)
            if symbol_end_states:
                for end_state in symbol_end_states:
                    next_current_states.update(
                        self._get_epsilon_closure(end_state))

        return next_current_states

    def _check_for_input_rejection(self, current_states):
        """Raise an error if the given config indicates rejected input."""
        if not (current_states & self.final_states):
            raise RejectionException(
                'the NFA stopped on all non-final states ({})'.format(
                    ', '.join(str(state) for state in current_states)))

    def read_input_stepwise(self, input_str, validate_final=True):
        """
        Check if the given string is accepted by this NFA.

        Yield the current configuration of the NFA at each step.
        """
        current_states = self._get_epsilon_closure(self.initial_state)

        yield current_states
        for input_symbol in input_str:
            current_states = self._get_next_current_states(
                current_states, input_symbol)
            yield current_states
        
        if validate_final:
            self._check_for_input_rejection(current_states)

    def unroll(self, n: int):
        """
        Builds A_unroll with n levels
        to estimate |L(F^n)|
        """
        # for each state q ∈ Q, include copies q_0 , q_1 , ..., q n in A unroll
        new_states = {(q,i) for q in self.states for i in range(n+1)}
        new_states_by_layer = {i: {(q,i) for q in self.states} for i in range(n+1)}
        # for each transition (p, a, q) ∈ ∆ and i ∈ {0, 1, . . . , n − 1}, include
        # transition (p_i, a, q_i+1) in A unroll
        new_transitions = defaultdict(lambda: defaultdict(set))
        for p, trans in self.transitions.items():
            for a, qs in trans.items():
                for q in qs:
                    for i in range(n):
                        new_transitions[p,i][a].add((q,i+1))
        
        
        new_transitions = ddict2dict(new_transitions)
        new_initial_state = (self.initial_state, 0)
        new_final_states = {(q,n) for q in self.final_states}
        
        return NFA(states=new_states,
                   input_symbols=self.input_symbols,
                   transitions=new_transitions,
                   initial_state=new_initial_state,
                   final_states=new_final_states
                   states_by_layer=new_states_by_layer)
    
    def n_for_states_set(self, states: frozenset):
        if states in self.n_for_sets:
            return self.n_for_sets[states]
        total = 0
        states_list = sorted(states) # linear order ≺
        for i in range(len(states_list)):
            # states[i] = (q, i), where q is the original state name in A
            anchor_state = states_list[i] 
            state_name, level = anchor_state
            intersection_rate = 0
            # Now estimate the intersection rate for anchor_state
            for previous_state in states_list[:i]:
                
                
        
    
    
    def count_accepted(self, n:int, eps: float):
        """
        Returns a (1 ± ε)-approximation of |L_n(A_unroll)|
        """
        kappa = math.ceil(n * len(self.states) / eps)
        sample_size = 2 * kappa ** 7
        # For each state q ∈ I, set N(q_0) = |L(q_0)| = 1 
        # and S(q_0) = L(q_0) = {λ}
        for q in self.states_by_layer[0]:
            self.sketch[0][q] = (1, ('',))
            
        # For each i = 1, . . . , n and state q ∈ Q:
        #   (a) Compute N(q i ) given sketch[i − 1]
        #   (b) Sample polynomially many uniform elements from L(q_i) using
        #       N(q_i) and sketch[i − 1], and let S(q_i) be the multiset of
        #       uniform samples obtained
        for i in range(1, n+1):
            for q in self.states_by_layer[i]:
                
                
        

In [65]:
nfa = NFA(
    states={'q0', 'q1', 'q2'},
    input_symbols={'0', '1'},
    transitions={
        'q0': {'0': {'q1'}},
        # Use '' as the key name for empty string (lambda/epsilon) transitions
        'q1': {'0': {'q1'}, '1': {'q0', 'q2'}},
        'q2': {'1': {'q0'}}
    },
    initial_state='q0',
    final_states={'q1'}
)

In [31]:
def bruteforce_save_all(nfa: NFA, n: int):
    accepted_strings = []
    for i in range(2**n):
        string_i = bin(i)[2:].zfill(n)
        if nfa.accepts_input(string_i):
            accepted_strings.append(string_i)
    return accepted_strings

def bruteforce_count_only(nfa: NFA, n: int):
    accepted_count = 0
    for i in range(2**n):
        string_i = bin(i)[2:].zfill(n)
        if nfa.accepts_input(string_i):
            accepted_count += 1
    return accepted_count

In [92]:
%%timeit
bruteforce_count_only(nfa, 17)

921 ms ± 7.06 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [63]:
bruteforce_count_only(nfa, 18)

19513

In [68]:
def count_nfa(nfa: NFA, n: int, eps: float):
    nfa_unroll = nfa.unroll(n)
    return nfa_unroll.count_accepted(n=n, eps=eps)

In [93]:
count_nfa(nfa, 29, 0.99)

1773376316777256096481866


In [91]:
189863754266 / 1e9

189.863754266