## Docker for pynusmv

In [None]:
import os

# Pull the image (Docker Desktop)
!docker pull louvainverificationlab/pynusmv

In [None]:
# Run container
print("Starting Docker container...")
!docker run --rm louvainverificationlab/pynusmv python3 -c "import pynusmv; print('PyNuSMV ')"

In [None]:
#import os

#current_folder = os.getcwd()
#print(f"Mount folder: {current_folder}")

#!docker run --rm -v "{current_folder}:/work" -w /work louvainverificationlab/pynusmv python3 filename.py

## Inv_mc

### Attempt 1

In [None]:
%%writefile src/inv1_mc.py
# Elia
import pynusmv
import sys

def spec_to_bdd(model, spec):
    """
    Return the set of states of model satisfying spec, as a BDD.
    """
    bddspec = pynusmv.mc.eval_ctl_spec(model, spec)
    return bddspec

def check_explain_inv_spec(spec):
    """
    Return whether the loaded SMV model satisfies or not the invariant spec,
    that is, whether all reachable states of the model satisfies spec or not.
    Return also an explanation for why the model does not satisfy spec, if it is the case.
    The result is a tuple (bool, trace) as described in the assignment.
    """
    fsm_model = pynusmv.glob.prop_database().master.bddFsm
    spec_as_bdd = spec_to_bdd(fsm_model, spec)
    negated_spec = -spec_as_bdd

    def satisfy_spec(states):
        # True if all states in 'states' satisfy the spec
        return not states.intersected(negated_spec)

    reached = fsm_model.init
    current_states = fsm_model.init
    trace = [reached]

    while current_states.isnot_false() and satisfy_spec(current_states):
        current_states = fsm_model.post(current_states) - reached
        reached = reached + current_states
        trace.append(current_states)

    is_satisfied = satisfy_spec(current_states)

    if is_satisfied:
        return True, None

    invalid_states = current_states.intersection(negated_spec)
    last_state = fsm_model.pick_one_state(invalid_states)
    counter_example = [last_state.get_str_values()]

    has_inputs = len(fsm_model.bddEnc.inputsVars) > 0
    next_state = last_state
    possible_previous_states = fsm_model.pre(next_state)

    for current in reversed(trace[:-1]):
        intersect = current.intersection(possible_previous_states)
        chosen_pre = fsm_model.pick_one_state(intersect)

        if has_inputs:
            inputs = fsm_model.get_inputs_between_states(chosen_pre, next_state)
            counter_example.insert(0, fsm_model.pick_one_inputs(inputs).get_str_values())
        else:
            counter_example.insert(0, {})

        counter_example.insert(0, chosen_pre.get_str_values())

        next_state = chosen_pre
        possible_previous_states = fsm_model.pre(next_state)

    return False, tuple(counter_example)

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:", sys.argv[0], "filename.smv")
        sys.exit(1)

    pynusmv.init.init_nusmv()
    filename = sys.argv[1]
    pynusmv.glob.load_from_file(filename)
    pynusmv.glob.compute_model()
    invtype = pynusmv.prop.propTypes['Invariant']
    for prop in pynusmv.glob.prop_database():
        spec = prop.expr
        if prop.type == invtype:
            print("Property", spec, "is an INVARSPEC.")
            res, trace = check_explain_inv_spec(spec)
            if res == True:
                print("Invariant is respected")
            else:
                print("Invariant is not respected")
                print(trace)
        else:
            print("Property", spec, "is not an INVARSPEC, skipped.")

    pynusmv.init.deinit_nusmv()

In [None]:
import os
import glob
import subprocess

current_dir = os.getcwd()

files_to_check = glob.glob(os.path.join("test", "safety", "*.smv"))

print(f"Starting batch processing of {len(files_to_check)} files...\n")

# Loop through every file
for file_path in files_to_check:
    relative_path = os.path.relpath(file_path, current_dir).replace("\\", "/")

    print(f"Running invariant check on: {os.path.basename(file_path)}")

    # Docker Command
    # -v "{current_dir}:/work"  -> Mounts Windows folder to /work inside Linux
    # -w /work                  -> Makes /work the current directory inside Linux
    # python3 inv_mc.py ...     -> Runs script

    cmd = [
        "docker", "run", "--rm",
        "-v", f"{current_dir}:/work",
        "-w", "/work",
        "louvainverificationlab/pynusmv",
        "python3", "src/inv1_mc.py", relative_path
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)
        if result.stderr:
            print("Errors:", result.stderr)
    except Exception as e:
        print(f"Failed to run Docker: {e}")

print("\nAll checks finished.")

### Attempt 3

In [None]:
%%writefile src/inv3_mc.py
# check outputs, probably i missed smth
import pynusmv
import sys
from typing import Tuple, Optional, Dict, List, Any

def spec_to_bdd(model, spec):
    return pynusmv.mc.eval_ctl_spec(model, spec)

def check_explain_inv_spec(spec) -> Tuple[bool, Optional[Tuple[Dict[str, str], ...]]]:
    """
    BFS reachability to check invariant on all states satisfy the invariant specification

    Return:
        Tuple:
        - True if all reachable states satisfy the invariant, False otherwise
        - trace is None if True, counterexample execution if False

        Execution trace is a tuple of states and inputs, starting and ending with a state.
        Each state and input is represented as a dictionary mapping variable names to their string values.
    """
    fsm_model = pynusmv.glob.prop_database().master.bddFsm

    spec_as_bdd = spec_to_bdd(fsm_model, spec)
    negated_spec = ~spec_as_bdd

    def satisfy_spec(states):
        return states.intersection(negated_spec).is_false()

    reached = fsm_model.init
    current_states = fsm_model.init
    trace_layers = [reached]

    while not current_states.is_false() and satisfy_spec(current_states):
        successors = fsm_model.post(current_states)
        current_states = successors - reached
        reached = reached + current_states
        trace_layers.append(current_states)

    if satisfy_spec(current_states):
        return True, None

    invalid_states = current_states.intersection(negated_spec)
    last_state = fsm_model.pick_one_state(invalid_states)

    counterexample = [last_state.get_str_values()]
    has_inputs = len(fsm_model.bddEnc.inputsVars) > 0
    next_state = last_state
    possible_predecessors = fsm_model.pre(next_state)

    # Build counterexample backwards using trace layers
    for layer in reversed(trace_layers[:-1]):
        intersect = layer.intersection(possible_predecessors)
        if intersect.is_false():
            continue

        chosen_pre = fsm_model.pick_one_state(intersect)

        if has_inputs:
            inputs_between = fsm_model.get_inputs_between_states(chosen_pre, next_state)
            picked_inputs = fsm_model.pick_one_inputs(inputs_between)
            counterexample.insert(0, picked_inputs.get_str_values())
        else:
            counterexample.insert(0, {})

        counterexample.insert(0, chosen_pre.get_str_values())

        next_state = chosen_pre
        possible_predecessors = fsm_model.pre(next_state)

    return False, tuple(counterexample)

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:", sys.argv[0], "filename.smv")
        sys.exit(1)

    pynusmv.init.init_nusmv()
    filename = sys.argv[1]

    try:
        pynusmv.glob.load_from_file(filename)
    except OSError as e:
        print(f"Error: Could not load file '{filename}': {e}")
        pynusmv.init.deinit_nusmv()
        sys.exit(1)

    pynusmv.glob.compute_model()
    invtype = pynusmv.prop.propTypes['Invariant']
    for prop in pynusmv.glob.prop_database():
        spec = prop.expr
        if prop.type == invtype:
            print("Property", spec, "is an INVARSPEC.")
            res, trace = check_explain_inv_spec(spec)
            if res == True:
                print("Invariant is respected")
            else:
                print("Invariant is not respected")
                print(trace)
        else:
            print("Property", spec, "is not an INVARSPEC, skipped.")

    pynusmv.init.deinit_nusmv()

In [None]:
import os
import glob
import subprocess

current_dir = os.getcwd()

files_to_check = glob.glob(os.path.join("test", "safety", "*.smv"))

print(f"Starting batch processing of {len(files_to_check)} files...\n")

# Loop through every file
for file_path in files_to_check:
    relative_path = os.path.relpath(file_path, current_dir).replace("\\", "/")

    print(f"Running invariant check on: {os.path.basename(file_path)}")

    # Docker Command
    # -v "{current_dir}:/work"  -> Mounts Windows folder to /work inside Linux
    # -w /work                  -> Makes /work the current directory inside Linux
    # python3 inv_mc.py ...     -> Runs script

    cmd = [
        "docker", "run", "--rm",
        "-v", f"{current_dir}:/work",
        "-w", "/work",
        "louvainverificationlab/pynusmv",
        "python3", "src/inv3_mc.py", relative_path
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)
        if result.stderr:
            print("Errors:", result.stderr)
    except Exception as e:
        print(f"Failed to run Docker: {e}")

print("\nAll checks finished.")

### Attempt 4

In [None]:
%%writefile src/inv4_mc.py
# slightly differentt approach
import pynusmv
import sys
from typing import Tuple, Optional, Dict, List, Any

def spec_to_bdd(model, spec):
    # Return the set of states of `model` satisfying `spec`, as a BDD.
    return pynusmv.mc.eval_ctl_spec(model, spec)

def check_explain_inv_spec(spec) -> Tuple[bool, Optional[Tuple[Dict[str, str], ...]]]:
    # Check whether the loaded SMV model satisfies the invariant `spec`.
    # Returns: (result, trace)
    fsm_model = pynusmv.glob.prop_database().master.bddFsm

    spec_as_bdd = spec_to_bdd(fsm_model, spec)
    negated_spec = ~spec_as_bdd

    def satisfy_spec(states):
        # Check if all states in the given BDD satisfy the invariant.
        return states.intersection(negated_spec).is_false()

    reached = fsm_model.init
    current_states = fsm_model.init
    trace_layers = [reached]

    while not current_states.is_false() and satisfy_spec(current_states):
        successors = fsm_model.post(current_states)
        current_states = successors - reached
        reached = reached + current_states
        trace_layers.append(current_states)

    if satisfy_spec(current_states):
        return True, None

    invalid_states = current_states.intersection(negated_spec)
    last_state = fsm_model.pick_one_state(invalid_states)

    counterexample = [last_state.get_str_values()]
    has_inputs = len(fsm_model.bddEnc.inputsVars) > 0
    next_state = last_state

    for layer in reversed(trace_layers[:-1]):
        possible_predecessors = fsm_model.pre(next_state)
        intersect = layer.intersection(possible_predecessors)

        if intersect.is_false():
            continue

        chosen_pre = fsm_model.pick_one_state(intersect)

        if has_inputs:
            inputs_between = fsm_model.get_inputs_between_states(chosen_pre, next_state)
            picked_inputs = fsm_model.pick_one_inputs(inputs_between)
            counterexample.insert(0, picked_inputs.get_str_values())
        else:
            counterexample.insert(0, {})

        counterexample.insert(0, chosen_pre.get_str_values())

        next_state = chosen_pre

    return False, tuple(counterexample)

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:", sys.argv[0], "filename.smv")
        sys.exit(1)

    pynusmv.init.init_nusmv()
    filename = sys.argv[1]

    try:
        pynusmv.glob.load_from_file(filename)
    except OSError as e:
        print(f"Error: Could not load file '{filename}': {e}")
        pynusmv.init.deinit_nusmv()
        sys.exit(1)

    pynusmv.glob.compute_model()
    invtype = pynusmv.prop.propTypes['Invariant']
    for prop in pynusmv.glob.prop_database():
        spec = prop.expr
        if prop.type == invtype:
            print("Property", spec, "is an INVARSPEC.")
            res, trace = check_explain_inv_spec(spec)
            if res == True:
                print("Invariant is respected")
            else:
                print("Invariant is not respected")
                print(trace)
        else:
            print("Property", spec, "is not an INVARSPEC, skipped.")

    pynusmv.init.deinit_nusmv()

In [None]:
import os
import glob
import subprocess

current_dir = os.getcwd()

files_to_check = glob.glob(os.path.join("test", "safety", "*.smv"))

print(f"Starting batch processing of {len(files_to_check)} files...\n")

# Loop through every file
for file_path in files_to_check:
    relative_path = os.path.relpath(file_path, current_dir).replace("\\", "/")

    print(f"Running invariant check on: {os.path.basename(file_path)}")

    # Docker Command
    # -v "{current_dir}:/work"  -> Mounts Windows folder to /work inside Linux
    # -w /work                  -> Makes /work the current directory inside Linux
    # python3 inv_mc.py ...     -> Runs script

    cmd = [
        "docker", "run", "--rm",
        "-v", f"{current_dir}:/work",
        "-w", "/work",
        "louvainverificationlab/pynusmv",
        "python3", "src/inv4_mc.py", relative_path
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)
        if result.stderr:
            print("Errors:", result.stderr)
    except Exception as e:
        print(f"Failed to run Docker: {e}")

print("\nAll checks finished.")

## Response_mc

### Attempt 1

In [None]:
%%writefile src/response1_mc.py
# based on Elia, logic-related bugs, can be fixed
import sys
from collections import deque
import pynusmv
from pynusmv_lower_interface.nusmv.parser import parser

SPEC_TYPES = {
    'LTLSPEC': parser.TOK_LTLSPEC,
    'CONTEXT': parser.CONTEXT,
    'IMPLIES': parser.IMPLIES,
    'IFF': parser.IFF,
    'OR': parser.OR,
    'XOR': parser.XOR,
    'XNOR': parser.XNOR,
    'AND': parser.AND,
    'NOT': parser.NOT,
    'ATOM': parser.ATOM,
    'NUMBER': parser.NUMBER,
    'DOT': parser.DOT,
    'NEXT': parser.OP_NEXT,
    'OP_GLOBAL': parser.OP_GLOBAL,
    'OP_FUTURE': parser.OP_FUTURE,
    'UNTIL': parser.UNTIL,
    'EQUAL': parser.EQUAL,
    'NOTEQUAL': parser.NOTEQUAL,
    'LT': parser.LT,
    'GT': parser.GT,
    'LE': parser.LE,
    'GE': parser.GE,
    'TRUE': parser.TRUEEXP,
    'FALSE': parser.FALSEEXP
}

BASIC_TYPES = {parser.ATOM, parser.NUMBER, parser.TRUEEXP, parser.FALSEEXP, parser.DOT,
               parser.EQUAL, parser.NOTEQUAL, parser.LT, parser.GT, parser.LE, parser.GE}

BOOLEAN_OPS = {parser.AND, parser.OR, parser.XOR, parser.XNOR, parser.IMPLIES, parser.IFF}

# BDD conversion cache
_bdd_cache = {}

def spec_to_bdd(model, spec):
    # boolean formula with no temporal operators -> BDD
    key = str(spec)
    if key not in _bdd_cache:
        _bdd_cache[key] = pynusmv.mc.eval_simple_expression(model, key)
    return _bdd_cache[key]

def is_boolean_formula(spec) -> bool:
    # Checks if formula is a boolean combination of basic formulas (no temporal operators)
    if spec.type in BASIC_TYPES:
        return True
    if spec.type == SPEC_TYPES['NOT']:
        return is_boolean_formula(spec.car)
    if spec.type in BOOLEAN_OPS:
        return is_boolean_formula(spec.car) and is_boolean_formula(spec.cdr)
    return False

def get_GF_formula(spec):
    # Returns inner formula of GF f if spec is of the form GF f, else None
    if spec.type != SPEC_TYPES['OP_GLOBAL']:
        return None
    inner = spec.car
    if inner.type != SPEC_TYPES['OP_FUTURE']:
        return None
    inner = inner.car
    return inner if is_boolean_formula(inner) else None

def get_implication(spec):
    # Returns (f, g) if spec is GF f -> GF g, else None
    if spec.type != SPEC_TYPES['IMPLIES']:
        return None
    f = get_GF_formula(spec.car)
    g = get_GF_formula(spec.cdr)
    if f is None or g is None:
        return None
    return f, g

def parse_react(spec):
    # Parses a reactivity formula of the form (GF f1 -> GF g1) & ... & (GF fn -> GF gn)
    # !!! Elia's approach, not the one that stated in the problem
    if spec.type != SPEC_TYPES['CONTEXT']:
        return None
    main_formula = spec.cdr
    working = deque([main_formula])
    conjuncts = []

    while working:
        head = working.pop()
        if head.type == SPEC_TYPES['AND']:
            working.append(head.car)
            working.append(head.cdr)
        else:
            imp = get_implication(head)
            if imp is None:
                return None
            conjuncts.append(imp)
    return conjuncts

# Symbolic reachability and counterexample
def compute_reach(fsm_model):
    # Reachable states and frontier sets
    reach = fsm_model.init
    new = fsm_model.init
    reached_frontiers = [new]

    while not new.is_false():
        post = fsm_model.post(new)
        new = post - reach
        reached_frontiers.append(new)
        reach = reach + new

    return reach, reached_frontiers

def find_knot(fsm_model, recur, pre_reach):
    # Finds a knot state for building lasso-shaped counterexamples
    attempts = 0
    max_attempts = 100
    s = fsm_model.pick_one_state(recur)
    R = pynusmv.dd.BDD.false()

    while attempts < max_attempts:
        post = fsm_model.post(s)
        new = post & pre_reach
        loop_frontiers = [new]

        while not new.is_false():
            R = R + new
            post = fsm_model.post(new)
            new = post & pre_reach
            new = new - R
            loop_frontiers.append(new)

        R = R & recur
        if s.entailed(R):
            return s, loop_frontiers

        s = fsm_model.pick_one_state(R)
        attempts += 1

    raise RuntimeError("No knot found within maximum attempts")

def build_counterexample(fsm_model, recur, pre_reach, reached_frontiers):
    # lasso-shaped counterexample from recur and pre_reach
    knot, loop_frontiers = find_knot(fsm_model, recur, pre_reach)

    # Loop construction
    valid_loop_frontiers = [f for f in loop_frontiers if knot.entailed(f)]
    cycle_trace = [knot.get_str_values()]
    curr_state = knot

    for loop_frontier in reversed(valid_loop_frontiers[:-1]):
        pre_states = fsm_model.pre(curr_state) & loop_frontier
        picked_pre_state = fsm_model.pick_one_state(pre_states)
        possible_inputs = fsm_model.get_inputs_between_states(pre_states, curr_state)
        picked_input = fsm_model.pick_one_inputs(possible_inputs) if not possible_inputs.is_false() else {}
        cycle_trace.insert(0, picked_input.get_str_values() if picked_input else {})
        cycle_trace.insert(0, picked_pre_state.get_str_values())
        curr_state = picked_pre_state

    # Prefix construction
    valid_reach_frontiers = [f for f in reached_frontiers if knot.entailed(f)]
    prefix_trace = []
    curr_state = knot
    for reach_frontier in reversed(valid_reach_frontiers[:-1]):
        pre_states = fsm_model.pre(curr_state) & reach_frontier
        picked_pre_state = fsm_model.pick_one_state(pre_states)
        possible_inputs = fsm_model.get_inputs_between_states(picked_pre_state, curr_state)
        picked_input = fsm_model.pick_one_inputs(possible_inputs) if not possible_inputs.is_false() else {}
        prefix_trace.insert(0, picked_input.get_str_values() if picked_input else {})
        prefix_trace.insert(0, picked_pre_state.get_str_values())
        curr_state = picked_pre_state

    return prefix_trace + cycle_trace

def check_repeatability(fsm_model, bdd_f, bdd_not_g, reach, reached_frontiers):
    # Checks if F and not G is repeatable in the reachable states
    recur = reach & bdd_f & bdd_not_g

    while not recur.is_false():
        pre = fsm_model.pre(recur)
        new = pre & bdd_not_g
        pre_reach = pynusmv.dd.BDD.false()

        while not new.is_false():
            pre_reach = pre_reach + new
            if recur.entailed(pre_reach):
                counterexample = build_counterexample(fsm_model, recur, pre_reach, reached_frontiers)
                return False, counterexample
            pre = fsm_model.pre(new)
            new = (pre & bdd_not_g) - pre_reach

        recur = recur & pre_reach

    return True, None

# Reactivity check
def check_explain_react_spec(spec):
    # Checks whether a reactivity formula is satisfied and returns a counterexample
    fsm_model = pynusmv.glob.prop_database().master.bddFsm
    conjuncts = parse_react(spec)
    if conjuncts is None:
        return None

    reach, reached_frontiers = compute_reach(fsm_model)

    for f, g in conjuncts:
        bdd_f = spec_to_bdd(fsm_model, f)
        bdd_g = spec_to_bdd(fsm_model, g)
        bdd_not_g = ~bdd_g

        reachable, counterexample = check_repeatability(fsm_model, bdd_f, bdd_not_g, reach, reached_frontiers)
        if not reachable:
            return False, counterexample

    return True, None

def print_counterexample(trace):
    if not trace:
        print("No counterexample.")
        return
    for i, step in enumerate(trace):
        if i % 2 == 0:
            print(f"State {i//2}: {step}")
        else:
            print(f"  Input: {step}")

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:", sys.argv[0], "filename.smv")
        sys.exit(1)

    pynusmv.init.init_nusmv()
    filename = sys.argv[1]
    pynusmv.glob.load_from_file(filename)
    pynusmv.glob.compute_model()
    type_ltl = pynusmv.prop.propTypes['LTL']
    for prop in pynusmv.glob.prop_database():
        spec = prop.expr
        print(spec)
        if prop.type != type_ltl:
            print("property is not LTLSPEC, skipping")
            continue
        res = check_explain_response_spec(spec)
        if res == None:
            print('Property is not a response formula, skipping')
        elif res[0] == True:
            print("Property is respected")
        elif res[0] == False:
            print("Property is not respected")
            print("Counterexample:", res[1])

    pynusmv.init.deinit_nusmv()

In [None]:
import os
import glob
import subprocess

current_dir = os.getcwd()

files_to_check = glob.glob(os.path.join("test", "response", "*.smv"))

print(f"Starting batch processing of {len(files_to_check)} files...\n")

# Loop through every file
for file_path in files_to_check:
    relative_path = os.path.relpath(file_path, current_dir).replace("\\", "/")

    print(f"Running invariant check on: {os.path.basename(file_path)}")

    # Docker Command
    # -v "{current_dir}:/work"  -> Mounts Windows folder to /work inside Linux
    # -w /work                  -> Makes /work the current directory inside Linux
    # python3 inv_mc.py ...     -> Runs script

    cmd = [
        "docker", "run", "--rm",
        "-v", f"{current_dir}:/work",
        "-w", "/work",
        "louvainverificationlab/pynusmv",
        "python3", "src/response1_mc.py", relative_path
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)
        if result.stderr:
            print("Errors:", result.stderr)
    except Exception as e:
        print(f"Failed to run Docker: {e}")

print("\nAll checks finished.")

### Attempt 2

In [None]:
%%writefile src/response2_mc.py
# 
import pynusmv
import sys
from pynusmv_lower_interface.nusmv.parser import parser
from typing import Tuple, Optional, Dict, List, Any

specTypes = {'LTLSPEC': parser.TOK_LTLSPEC, 'CONTEXT': parser.CONTEXT,
             'IMPLIES': parser.IMPLIES, 'IFF': parser.IFF, 'OR': parser.OR, 'XOR': parser.XOR, 'XNOR': parser.XNOR,
             'AND': parser.AND, 'NOT': parser.NOT, 'ATOM': parser.ATOM, 'NUMBER': parser.NUMBER, 'DOT': parser.DOT,
             'NEXT': parser.OP_NEXT, 'OP_GLOBAL': parser.OP_GLOBAL, 'OP_FUTURE': parser.OP_FUTURE,
             'UNTIL': parser.UNTIL,
             'EQUAL': parser.EQUAL, 'NOTEQUAL': parser.NOTEQUAL, 'LT': parser.LT, 'GT': parser.GT,
             'LE': parser.LE, 'GE': parser.GE, 'TRUE': parser.TRUEEXP, 'FALSE': parser.FALSEEXP
             }

basicTypes = {parser.ATOM, parser.NUMBER, parser.TRUEEXP, parser.FALSEEXP, parser.DOT,
              parser.EQUAL, parser.NOTEQUAL, parser.LT, parser.GT, parser.LE, parser.GE}
booleanOp = {parser.AND, parser.OR, parser.XOR,
             parser.XNOR, parser.IMPLIES, parser.IFF}

def spec_to_bdd(model, spec):
    # expression (no temporal operators) -> BDD
    return pynusmv.mc.eval_simple_expression(model, str(spec))

def is_boolean_formula(spec):
    # Checks if spec is a boolean combination of basic formulas
    if spec.type in basicTypes:
        return True
    if spec.type == specTypes['NOT']:
        return is_boolean_formula(spec.car)
    if spec.type in booleanOp:
        return is_boolean_formula(spec.car) and is_boolean_formula(spec.cdr)
    return False

def parse(spec):
    # Parser for Response properties: G(f -> F g)
    # Returns (f_formula, g_formula) if 'spec' matches, else None
    if spec.type != specTypes['CONTEXT']:
        return None
    spec = spec.cdr

    if spec.type != specTypes['OP_GLOBAL']: # Must be G(...)
        return None
    spec = spec.car

    if spec.type != specTypes['IMPLIES']: # Must be f -> ...
        return None

    f_formula = spec.car
    if not is_boolean_formula(f_formula):
        return None

    g_formula = spec.cdr
    if g_formula.type != specTypes['OP_FUTURE']: # Must be F g
        return None
    g_formula = g_formula.car

    if not is_boolean_formula(g_formula):
        return None

    return (f_formula, g_formula)

def _compute_reachable(fsm_model) -> Tuple[pynusmv.dd.BDD, List[pynusmv.dd.BDD]]:
    # Set of reachable states (R) using forward BFS
    # Returns (R, trace_layers).
    R = fsm_model.init
    F = fsm_model.init
    trace_layers = [F]

    while not F.is_false():
        F_next = fsm_model.post(F) - R
        if F_next.is_false():
            break
        trace_layers.append(F_next)
        R = R + F_next
        F = F_next

    return R, trace_layers

def _compute_eg(fsm_model, bdd_prop) -> pynusmv.dd.BDD:
    # Set of states satisfying EG(bdd_prop).
    # Fixed Point: Z = bdd_prop & EX(Z)
    Z = bdd_prop
    while True:
        pre_Z = fsm_model.pre(Z)
        next_Z = bdd_prop & pre_Z

        if next_Z == Z:
            return Z
        Z = next_Z

def _build_prefix_trace(fsm_model, trace_layers, violating_state_bdd) -> List[Dict[str, str]]:
    # Prefix of a counterexample from Init to violating_state_bdd
    last_state = violating_state_bdd
    counterexample_list = [last_state.get_str_values()]

    has_inputs = len(fsm_model.bddEnc.inputsVars) > 0
    curr_state_bdd = last_state

    # Iterate backwards to find the path
    for layer in reversed(trace_layers[:-1]):
        # predecessor of 'curr' in current 'layer'
        possible_predecessors = fsm_model.pre(curr_state_bdd) & layer

        if possible_predecessors.is_false():
            continue

        prev_state_bdd = fsm_model.pick_one_state(possible_predecessors)

        if has_inputs:
            inputs_between = fsm_model.get_inputs_between_states(prev_state_bdd, curr_state_bdd)
            picked_inputs = fsm_model.pick_one_inputs(inputs_between)
            counterexample_list.insert(0, picked_inputs.get_str_values())
        else:
            counterexample_list.insert(0, {})

        counterexample_list.insert(0, prev_state_bdd.get_str_values())
        curr_state_bdd = prev_state_bdd

    return counterexample_list


def check_explain_response_spec(spec):
    # Checks a Response property G(f -> F g)
    fsm_model = pynusmv.glob.prop_database().master.bddFsm

    parsed = parse(spec)
    if parsed is None:
        return None

    f_formula, g_formula = parsed

    bdd_f = spec_to_bdd(fsm_model, f_formula)
    bdd_g = spec_to_bdd(fsm_model, g_formula)
    bdd_not_g = ~bdd_g

    bdd_eg_not_g = _compute_eg(fsm_model, bdd_not_g)

    bdd_bad_states = bdd_f & bdd_eg_not_g

    if bdd_bad_states.is_false():
        return (True, None)

    reachable_states, trace_layers = _compute_reachable(fsm_model)
    violation_states = reachable_states & bdd_bad_states

    if violation_states.is_false():
        return (True, None)


    # Pick reachable violation
    s_violation_bdd = fsm_model.pick_one_state(violation_states)

    # prefix trace (Init -> ... -> s_violation)
    prefix_trace_list = _build_prefix_trace(fsm_model, trace_layers, s_violation_bdd)

    # loop (s_violation -> ... -> s_loop)
    has_inputs = len(fsm_model.bddEnc.inputsVars) > 0
    loop_list = []
    curr_bdd = s_violation_bdd

    states_on_path = [prefix_trace_list[i] for i in range(0, len(prefix_trace_list), 2)]

    while True:
        # Find a successor in EG(¬g)
        possible_successors = fsm_model.post(curr_bdd) & bdd_eg_not_g

        if possible_successors.is_false():
            print("Strange")
            return (False, tuple(prefix_trace_list))

        next_bdd = fsm_model.pick_one_state(possible_successors)
        next_dict = next_bdd.get_str_values()

        # Inputs
        if has_inputs:
            inputs_between = fsm_model.get_inputs_between_states(curr_bdd, next_bdd)
            picked_inputs = fsm_model.pick_one_inputs(inputs_between)
            loop_list.append(picked_inputs.get_str_values())
        else:
            loop_list.append({})

        loop_list.append(next_dict)

        # loop closed check
        if next_dict in states_on_path:
            full_trace = prefix_trace_list + loop_list
            return (False, tuple(full_trace))

        states_on_path.append(next_dict)
        curr_bdd = next_bdd

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:", sys.argv[0], "filename.smv")
        sys.exit(1)

    pynusmv.init.init_nusmv()
    filename = sys.argv[1]
    try:
        pynusmv.glob.load_from_file(filename)
        pynusmv.glob.compute_model()
    except OSError as e:
        print(f"Error loading file: {e}")
        pynusmv.init.deinit_nusmv()
        sys.exit(1)

    type_ltl = pynusmv.prop.propTypes['LTL']

    for prop in pynusmv.glob.prop_database():
        spec = prop.expr
        print(f"\n Checking property: {spec} ")

        if prop.type != type_ltl:
            print("not LTLSPEC, skip")
            continue

        res = check_explain_response_spec(spec)

        if res is None:
            print("Result: NOT a Response formula, skip")
        elif res[0] == True:
            print("Result: RESPECTED")
        elif res[0] == False:
            print("Result: NOT RESPECTED")
            print("Counterexample:")

            trace = res[1]
            loop_start_index = -1
            last_state = trace[-1]

            for i in range(0, len(trace) - 2, 2):
                if trace[i] == last_state:
                    loop_start_index = i
                    break

            for i, step in enumerate(trace):
                if loop_start_index != -1 and i == loop_start_index:
                    print("Loop starts here")

                if i % 2 == 0:
                    print(f"  State {i//2}: {step}")
                else:
                    print(f"  Input:    {step}")
            if loop_start_index != -1:
                print(f"  -- Loops back to State {loop_start_index//2} --")

    pynusmv.init.deinit_nusmv()

In [None]:
import os
import glob
import subprocess

current_dir = os.getcwd()

files_to_check = glob.glob(os.path.join("test", "response", "*.smv"))

print(f"Starting batch processing of {len(files_to_check)} files...\n")

# Loop through every file
for file_path in files_to_check:
    relative_path = os.path.relpath(file_path, current_dir).replace("\\", "/")

    print(f"Running invariant check on: {os.path.basename(file_path)}")

    # Docker Command
    # -v "{current_dir}:/work"  -> Mounts Windows folder to /work inside Linux
    # -w /work                  -> Makes /work the current directory inside Linux
    # python3 inv_mc.py ...     -> Runs script

    cmd = [
        "docker", "run", "--rm",
        "-v", f"{current_dir}:/work",
        "-w", "/work",
        "louvainverificationlab/pynusmv",
        "python3", "src/response2_mc.py", relative_path
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)
        if result.stderr:
            print("Errors:", result.stderr)
    except Exception as e:
        print(f"Failed to run Docker: {e}")

print("\nAll checks finished.")

### Attempt 3

In [None]:
%%writefile src/response3_mc.py
# minor modification of Attempt 2
# I think this is the best version as of now
import pynusmv
import sys
from pynusmv_lower_interface.nusmv.parser import parser
from typing import Tuple, Optional, Dict, List, Any

specTypes = {'LTLSPEC': parser.TOK_LTLSPEC, 'CONTEXT': parser.CONTEXT,
             'IMPLIES': parser.IMPLIES, 'IFF': parser.IFF, 'OR': parser.OR, 'XOR': parser.XOR, 'XNOR': parser.XNOR,
             'AND': parser.AND, 'NOT': parser.NOT, 'ATOM': parser.ATOM, 'NUMBER': parser.NUMBER, 'DOT': parser.DOT,
             'NEXT': parser.OP_NEXT, 'OP_GLOBAL': parser.OP_GLOBAL, 'OP_FUTURE': parser.OP_FUTURE,
             'UNTIL': parser.UNTIL,
             'EQUAL': parser.EQUAL, 'NOTEQUAL': parser.NOTEQUAL, 'LT': parser.LT, 'GT': parser.GT,
             'LE': parser.LE, 'GE': parser.GE, 'TRUE': parser.TRUEEXP, 'FALSE': parser.FALSEEXP
             }

basicTypes = {parser.ATOM, parser.NUMBER, parser.TRUEEXP, parser.FALSEEXP, parser.DOT,
              parser.EQUAL, parser.NOTEQUAL, parser.LT, parser.GT, parser.LE, parser.GE}
booleanOp = {parser.AND, parser.OR, parser.XOR,
             parser.XNOR, parser.IMPLIES, parser.IFF}

def spec_to_bdd(model, spec):
    # expression (no temporal operators) -> BDD
    return pynusmv.mc.eval_simple_expression(model, str(spec))

def is_boolean_formula(spec):
    # Checks if spec is a boolean combination of basic formulas
    if spec.type in basicTypes:
        return True
    if spec.type == specTypes['NOT']:
        return is_boolean_formula(spec.car)
    if spec.type in booleanOp:
        return is_boolean_formula(spec.car) and is_boolean_formula(spec.cdr)
    return False

def parse(spec):
    # Parser for Response properties: G(f -> F g)
    # Returns (f_formula, g_formula) if 'spec' matches, else None
    if spec.type != specTypes['CONTEXT']:
        return None
    spec = spec.cdr

    if spec.type != specTypes['OP_GLOBAL']: # Must be G(...)
        return None
    spec = spec.car

    if spec.type != specTypes['IMPLIES']: # Must be f -> ...
        return None

    f_formula = spec.car
    if not is_boolean_formula(f_formula):
        return None

    g_formula = spec.cdr
    if g_formula.type != specTypes['OP_FUTURE']: # Must be F g
        return None
    g_formula = g_formula.car

    if not is_boolean_formula(g_formula):
        return None

    return (f_formula, g_formula)

def _compute_reachable(fsm_model) -> Tuple[pynusmv.dd.BDD, List[pynusmv.dd.BDD]]:
    # Set of reachable states (R) using forward BFS
    # Returns (R, trace_layers).
    R = fsm_model.init
    F = fsm_model.init
    trace_layers = [F]

    while not F.is_false():
        F_next = fsm_model.post(F) - R
        if F_next.is_false():
            break
        trace_layers.append(F_next)
        R = R + F_next
        F = F_next

    return R, trace_layers

def _compute_eg(fsm_model, bdd_prop) -> pynusmv.dd.BDD:
    # Set of states satisfying EG(bdd_prop).
    # Fixed Point: Z = bdd_prop & EX(Z)
    Z = bdd_prop
    while True:
        pre_Z = fsm_model.pre(Z)
        next_Z = bdd_prop & pre_Z

        if next_Z == Z:
            return Z
        Z = next_Z

def _build_prefix_trace(fsm_model, trace_layers, violating_state_bdd) -> List[Dict[str, str]]:
    # Prefix of a counterexample from Init to violating_state_bdd
    last_state = violating_state_bdd
    counterexample_list = [last_state.get_str_values()]

    has_inputs = len(fsm_model.bddEnc.inputsVars) > 0
    curr_state_bdd = last_state

    # Iterate backwards to find the path
    for layer in reversed(trace_layers[:-1]):
        # predecessor of 'curr' in current 'layer'
        possible_predecessors = fsm_model.pre(curr_state_bdd) & layer

        if possible_predecessors.is_false():
            continue

        prev_state_bdd = fsm_model.pick_one_state(possible_predecessors)

        if has_inputs:
            inputs_between = fsm_model.get_inputs_between_states(prev_state_bdd, curr_state_bdd)
            picked_inputs = fsm_model.pick_one_inputs(inputs_between)
            counterexample_list.insert(0, picked_inputs.get_str_values())
        else:
            counterexample_list.insert(0, {})

        counterexample_list.insert(0, prev_state_bdd.get_str_values())
        curr_state_bdd = prev_state_bdd

    return counterexample_list


def check_explain_response_spec(spec):
    # Checks a Response property G(f -> F g)
    fsm_model = pynusmv.glob.prop_database().master.bddFsm

    parsed = parse(spec)
    if parsed is None:
        return None

    f_formula, g_formula = parsed

    bdd_f = spec_to_bdd(fsm_model, f_formula)
    bdd_g = spec_to_bdd(fsm_model, g_formula)
    bdd_not_g = ~bdd_g

    bdd_eg_not_g = _compute_eg(fsm_model, bdd_not_g)

    bdd_bad_states = bdd_f & bdd_eg_not_g

    if bdd_bad_states.is_false():
        return (True, None)

    reachable_states, trace_layers = _compute_reachable(fsm_model)
    violation_states = reachable_states & bdd_bad_states

    if violation_states.is_false():
        return (True, None)


    # Pick reachable violation
    s_violation_bdd = fsm_model.pick_one_state(violation_states)

    # prefix trace (Init -> ... -> s_violation)
    prefix_trace_list = _build_prefix_trace(fsm_model, trace_layers, s_violation_bdd)

    # loop (s_violation -> ... -> s_loop)
    has_inputs = len(fsm_model.bddEnc.inputsVars) > 0
    loop_list = []
    curr_bdd = s_violation_bdd

    states_on_path = [prefix_trace_list[i] for i in range(0, len(prefix_trace_list), 2)]

    while True:
        # Find a successor in EG(¬g)
        possible_successors = fsm_model.post(curr_bdd) & bdd_eg_not_g

        if possible_successors.is_false():
            print("Strange")
            return (False, tuple(prefix_trace_list))

        next_bdd = fsm_model.pick_one_state(possible_successors)
        next_dict = next_bdd.get_str_values()

        # Inputs
        if has_inputs:
            inputs_between = fsm_model.get_inputs_between_states(curr_bdd, next_bdd)
            picked_inputs = fsm_model.pick_one_inputs(inputs_between)
            loop_list.append(picked_inputs.get_str_values())
        else:
            loop_list.append({})

        loop_list.append(next_dict)

        # loop closed check
        if next_dict in states_on_path:
            full_trace = prefix_trace_list + loop_list
            return (False, tuple(full_trace))

        states_on_path.append(next_dict)
        curr_bdd = next_bdd

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:", sys.argv[0], "filename.smv")
        sys.exit(1)

    pynusmv.init.init_nusmv()
    filename = sys.argv[1]
    try:
        pynusmv.glob.load_from_file(filename)
        pynusmv.glob.compute_model()
    except OSError as e:
        print(f"Error loading file: {e}")
        pynusmv.init.deinit_nusmv()
        sys.exit(1)

    type_ltl = pynusmv.prop.propTypes['LTL']

    for prop in pynusmv.glob.prop_database():
        spec = prop.expr
        print(spec)

        if prop.type != type_ltl:
            print("property is not LTLSPEC, skipping")
            continue

        res = check_explain_response_spec(spec)

        if res is None:
            print('Property is not a response formula, skipping')
        elif res[0] == True:
            print("Property is respected")
        elif res[0] == False:
            print("Property is not respected")
            print("Counterexample:", res[1])

    pynusmv.init.deinit_nusmv()

In [None]:
import os
import glob
import subprocess

current_dir = os.getcwd()

files_to_check = glob.glob(os.path.join("test", "response", "*.smv"))

print(f"Starting batch processing of {len(files_to_check)} files...\n")

# Loop through every file
for file_path in files_to_check:
    relative_path = os.path.relpath(file_path, current_dir).replace("\\", "/")

    print(f"Running invariant check on: {os.path.basename(file_path)}")

    # Docker Command
    # -v "{current_dir}:/work"  -> Mounts Windows folder to /work inside Linux
    # -w /work                  -> Makes /work the current directory inside Linux
    # python3 inv_mc.py ...     -> Runs script

    cmd = [
        "docker", "run", "--rm",
        "-v", f"{current_dir}:/work",
        "-w", "/work",
        "louvainverificationlab/pynusmv",
        "python3", "src/response3_mc.py", relative_path
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)
        if result.stderr:
            print("Errors:", result.stderr)
    except Exception as e:
        print(f"Failed to run Docker: {e}")

print("\nAll checks finished.")