In [30]:
from z3 import *
import itertools

## Sorts

In [31]:
# create uninterpreted sort Node and Epoch
Node = DeclareSort('Node')
Epoch = DeclareSort('Epoch')

In [37]:
# Functions to extract terms and instantiate quantifiers
def extract_terms(expr, bound=[]):
    termdict = {Node: set(), Epoch: set()}
    if expr.sort() in {Node, Epoch}:
        termdict[expr.sort()] = termdict[expr.sort()] | {expr}
    else:
        if type(expr) == QuantifierRef:
            raise ValueError('Unsupported')
        termdict_children = [extract_terms(child) for child in expr.children()]
        for child_termdict in termdict_children:
            termdict[Node] = termdict[Node] | child_termdict[Node]
            termdict[Epoch] = termdict[Epoch] | child_termdict[Epoch]
    return termdict

def instantiate(expr, termdict):
#     if type(expr) != QuantifierRef or not expr.is_forall():
#         raise ValueError('Unsupported')
    quantified_variables, body = expr #expr.body()
    if not isinstance(body,ExprRef):
        raise ValueError('Unsupported')
    #quantified_variables = [Const(expr.var_name(i), expr.var_sort(i)) for i in range(expr.num_vars())]
    instantiations = list(itertools.product(*[termdict[var.sort()] for var in quantified_variables]))
    print(f'quant_vars: {quantified_variables}')
    print(f'instantiations: {list(instantiations)}')
    print(f'body: {body}')
    print(f'instance: {substitute(body, list(zip(quantified_variables, list(instantiations[0]))))}')
    print(f'zip: {list(zip(quantified_variables, list(instantiations[0])))}')
    return And([substitute(body, list(zip(quantified_variables, list(inst)))) for inst in instantiations[:1]])

## State and Model

In [33]:
class DistLockState():
    def __init__(self, name):
        self.name = name

        # relations
        self.ep = Function(f'{name}.ep', Node, Epoch)
        self.held = Function(f'{name}.held', Node, BoolSort())
        self.transfer = Function(f'{name}.transfer', Epoch, Node, BoolSort())
        self.locked = Function(f'{name}.locked', Epoch, Node, BoolSort())

DistLockState('test_pre') # only for testing

<__main__.DistLockState at 0x7fb1a0f26e80>

In [38]:
class DistLockModel():
    def __init__(self):
        # constants
        self.le = Function(f'le', Epoch, Epoch, BoolSort())
        self.zero = Const(f'zero', Epoch)
        self.one = Const(f'one', Epoch)
        self.first = Const(f'first', Node)

        self.states = {}
    
    def get_state(self, name):
        if name not in self.states:
            self.states[name] = DistLockState(name)
        return self.states[name]
    
    def get_axioms(self):
        e1, e2, e3 = Consts('e1 e2 e3', Epoch)
        
        Axioms = ([e1, e2, e3], #ForAll([e1, e2, e3],
            And(
                # reflexivity
                self.le(e1, e1),
                # transitivity
                Implies(And(self.le(e1, e2), self.le(e2, e3)), self.le(e1, e3)),
                # antisymmetry
                Implies(And(self.le(e1, e2), self.le(e2, e1)), e1 == e2),
                # totality
                Or(self.le(e1, e2), self.le(e2, e1)),

                # zero
                self.le(self.zero, e1),
                self.one != self.zero,
            ),
        )

        return Axioms
    
    def get_init_state_cond(self):
        S = self.get_state('init')

        n = Const('n', Node)
        e = Const('e', Epoch)

        cond = ForAll([n, e],
            And(
                S.held(n) == (n == self.first),
                Implies(n != self.first, S.ep(n) == self.zero),
                S.ep(self.first) == self.one,
                S.transfer(e, n) == False,
                S.locked(e, n) == False,
            )
        )

        return cond
    
    def get_interp(self, model: ModelRef):
        # create a dict of all the functions
        interp = {}
        for f in model.decls():
            interp[f.name()] = model.get_interp(f)
        
        return interp

M = DistLockModel()
# only for testing
axiom = M.get_axioms() 
terms = extract_terms(axiom[1])
print(terms)
instantiate(axiom, terms)
#axiom

{Node: set(), Epoch: {one, zero, e1, e2, e3}}
quant_vars: [e1, e2, e3]
instantiations: [(one, one, one), (one, one, zero), (one, one, e1), (one, one, e2), (one, one, e3), (one, zero, one), (one, zero, zero), (one, zero, e1), (one, zero, e2), (one, zero, e3), (one, e1, one), (one, e1, zero), (one, e1, e1), (one, e1, e2), (one, e1, e3), (one, e2, one), (one, e2, zero), (one, e2, e1), (one, e2, e2), (one, e2, e3), (one, e3, one), (one, e3, zero), (one, e3, e1), (one, e3, e2), (one, e3, e3), (zero, one, one), (zero, one, zero), (zero, one, e1), (zero, one, e2), (zero, one, e3), (zero, zero, one), (zero, zero, zero), (zero, zero, e1), (zero, zero, e2), (zero, zero, e3), (zero, e1, one), (zero, e1, zero), (zero, e1, e1), (zero, e1, e2), (zero, e1, e3), (zero, e2, one), (zero, e2, zero), (zero, e2, e1), (zero, e2, e2), (zero, e2, e3), (zero, e3, one), (zero, e3, zero), (zero, e3, e1), (zero, e3, e2), (zero, e3, e3), (e1, one, one), (e1, one, zero), (e1, one, e1), (e1, one, e2), (e1, one, e3),

## Invariants

In [6]:
# Inv: le(E1, E2) & E1 ~= E2 -> le(E1,ep(N1)) | ~le(E2,ep(N1))
# in other words: e1 < e2 ==> e1 <= ep(N1) or e2 > ep(N1)

def get_inv1(M: DistLockModel, S: DistLockState):
    e1, e2 = Consts('e1 e2', Epoch)
    n1 = Const('n1', Node)

    Inv = ForAll([e1, e2, n1],
        Implies(
            And(
                M.le(e1, e2),
                e1 != e2
            ),
            Or(
                M.le(e1, S.ep(n1)),
                Not(
                    M.le(e2, S.ep(n1))
                )
            )
        )
    )
    return Inv

M = DistLockModel()
S = M.get_state('pre')
# only for testing
inv1 = get_inv1(M, S)
inv1

### Invariant parser

In [7]:
invariants = """
le(E1, E2) & E1 ~= E2 -> le(E1,ep(N1)) | ~le(E2,ep(N1))
le(E1, E2) & E1 ~= E2 -> locked(E1,N1) | ~transfer(E1,N1) | ~transfer(E2,N1)
le(E1, E2) & E1 ~= E2 -> locked(E1,N1) | ~transfer(E1,N1) | ~le(E2,ep(N1))
le(E1, E2) & E1 ~= E2 -> le(E1,ep(N1)) | ~locked(E2,N1)
le(E1, E2) & E1 ~= E2 -> locked(E1,N1) | ~transfer(E1,N1) | ~locked(E2,N1)
le(E1, E2) & E1 ~= E2 -> le(E1,ep(N1)) | ~transfer(E1,N1) | ~transfer(E2,N1)
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> ~le(E2,ep(N1)) | ~le(ep(N1),ep(N2)) | ~le(ep(N2),ep(N1))
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | le(ep(N1),ep(N2)) | ~locked(E2,N2)
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | ~transfer(E1,N1) | ~locked(E2,N2)
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> locked(E1,N1) | ~transfer(E1,N1) | ~le(E2,ep(N2))
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | ~held(N2) | ~transfer(E2,N1)
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | ~transfer(E1,N1) | ~le(E2,ep(N2))
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | ~locked(E2,N2) | ~le(ep(N2),ep(N1))
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> locked(E1,N1) | ~transfer(E1,N1) | ~transfer(E2,N2)
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | le(ep(N1),ep(N2)) | ~le(E2,ep(N2))
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | ~transfer(E1,N1) | ~transfer(E2,N2)
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | ~transfer(E2,N1) | ~le(E2,ep(N2))
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> le(E1,ep(N1)) | ~le(ep(N2),ep(N1)) | ~le(E2,ep(N2))
le(E1, E2) & E1 ~= E2 & N1 ~= N2 -> locked(E1,N1) | ~transfer(E1,N1) | ~locked(E2,N2)
locked(E1,N1) | ~transfer(E1,N1) | ~le(E1,ep(N1))
locked(E1,N1) | ~held(N1) | ~transfer(E1,N1)
le(E1,ep(N1)) | ~held(N1) | ~transfer(E1,N1)
transfer(E1,N1) | ~locked(E1,N1)
le(E1,ep(N1)) | ~locked(E1,N1)
N1 ~= N2 -> ~le(ep(N1),ep(N2)) | ~le(ep(N2),ep(N1)) | ~first=N1
N1 ~= N2 -> le(E1,ep(N1)) | le(ep(N1),ep(N2)) | ~locked(E1,N2)
N1 ~= N2 -> le(E1,ep(N1)) | le(ep(N1),ep(N2)) | ~le(E1,ep(N2))
N1 ~= N2 -> le(ep(N1),ep(N2)) | ~held(N2)
N1 ~= N2 -> ~held(N1) | ~le(ep(N1),ep(N2))
N1 ~= N2 -> locked(E1,N1) | ~held(N2) | ~transfer(E1,N1)
N1 ~= N2 -> le(ep(N1),ep(N2)) | le(ep(N2),ep(N1))
N1 ~= N2 -> le(E1,ep(N1)) | ~locked(E1,N2) | ~le(ep(N2),ep(N1))
N1 ~= N2 -> ~held(N1) | ~held(N2)
N1 ~= N2 -> locked(E1,N1) | ~transfer(E1,N1) | ~le(E1,ep(N2))
N1 ~= N2 -> ~locked(E1,N1) | ~locked(E1,N2)
N1 ~= N2 -> ~first=N1 | ~first=N2
N1 ~= N2 -> ~transfer(E1,N1) | ~transfer(E1,N2)
N1 ~= N2 -> le(E1,ep(N1)) | ~held(N1) | ~transfer(E1,N2)
N1 ~= N2 -> ~transfer(E1,N1) | ~locked(E1,N2)
N1 ~= N2 -> ~locked(E1,N1) | ~le(ep(N1),ep(N2)) | ~le(ep(N2),ep(N1))
N1 ~= N2 -> le(E1,ep(N1)) | ~held(N1) | ~locked(E1,N2)
N1 ~= N2 -> le(E1,ep(N1)) | ~le(ep(N2),ep(N1)) | ~le(E1,ep(N2))
N1 ~= N2 -> le(E1,ep(N1)) | ~held(N1) | ~le(E1,ep(N2))
N1 ~= N2 -> le(E1,ep(N1)) | ~held(N2) | ~transfer(E1,N1)
N1 ~= N2 -> le(E1,ep(N1)) | ~transfer(E1,N1) | ~le(E1,ep(N2))
""".replace("~=", "!=").strip().split("\n")

def parse(inv_str, vars):
    inv_str = inv_str.strip()

    if inv_str.count("->") > 1:
        raise Exception("Too many ->")

    if '->' in inv_str:
        left, right = inv_str.split('->')
        return f"Implies({parse(left, vars)},{parse(right, vars)})"

    if "&" in inv_str:
        parts = inv_str.split("&")
        return f"And([{','.join([parse(p, vars) for p in parts])}])"

    if "|" in inv_str:
        parts = inv_str.split("|")
        return f"Or([{','.join([parse(p, vars) for p in parts])}])"

    if "~" in inv_str:
        return f"Not({parse(inv_str[1:], vars)})"

    if "!=" in inv_str:
        left, right = inv_str.split("!=")
        return f"{parse(left, vars)} != {parse(right, vars)}"
    
    if "=" in inv_str:
        left, right = inv_str.split("=")
        return f"{parse(left, vars)} == {parse(right, vars)}"

    if inv_str.endswith(")"):
        first_brace_idx = inv_str.index("(")
        name = inv_str[:first_brace_idx]
        args = inv_str[first_brace_idx+1:-1].split(",")
        prefix = "M." if name == 'le' else "S."
        return f"{prefix}{name}({','.join([parse(a, vars) for a in args])})"

    if inv_str == "first":
        return "M.first"

    vars.add(inv_str)
    return inv_str

def get_inv_fn(fn_name, inv_str, only_code=False):
    vars = set()
    inv_fn_str = parse(inv_str, vars)

    nodes = [v for v in vars if v.startswith("N")]
    epochs = [v for v in vars if v.startswith("E")]

    code = []
    if len(nodes) == 1:
        code += [nodes[0] + " = Const('" + nodes[0] + "', Node)"]
    elif len(nodes) > 1:
        code += [", ".join(nodes) + " = Consts('" + " ".join(nodes) + "', Node)"]
    
    if len(epochs) == 1:
        code += [epochs[0] + " = Const('" + epochs[0] + "', Epoch)"]
    elif len(epochs) > 1:
        code += [", ".join(epochs) + " = Consts('" + " ".join(epochs) + "', Epoch)"]
    
    code += ["inv = ForAll([" + ", ".join(vars) + "], " + inv_fn_str + ")"]
    code += ["return inv"]

    code = f"def {fn_name}(M, S):\n\t" + "\n\t".join(code)
    if only_code:
        return code

    ldict = {}
    exec(code)
    exec(f"ldict['fn'] = {fn_name}")
    return ldict['fn']

all_invars = [get_inv_fn("inv_fn_" + str(i), inv) for i, inv in enumerate(invariants)]

In [8]:
M = DistLockModel()
S = M.get_state('pre')

print(invariants[0])
all_invars[0](M, S)

le(E1, E2) & E1 != E2 -> le(E1,ep(N1)) | ~le(E2,ep(N1))


### Safety Condition

In [9]:
# Inv: safety property locked(E, N1) & locked(E, N2) -> N1 = N2

def get_safety_inv(M: DistLockModel, S: DistLockState):
    e1 = Const('e1', Epoch)
    n1, n2 = Consts('n1 n2', Node)

    Inv = ForAll([e1, n1, n2],
        Implies(
            And(
                S.locked(e1, n1),
                S.locked(e1, n2)
            ),
            n1 == n2
        )
    )

    return Inv

M = DistLockModel()
S = M.get_state('pre')
get_safety_inv(M, S) # only for testing

## Actions

In [10]:
# Grant action

def get_grant_action(M: DistLockModel, S1: DistLockState, S2: DistLockState):
    e = Const('e', Epoch)
    n1, n2 = Consts('n1 n2', Node)

    AcceptAction = ForAll([n1, n2, e],
        Implies(
            # precondition
            And(
                S1.held(n1),
                Not(M.le(e, S1.ep(n1)))
            ),
            # postcondition
            And(
                S2.transfer(e, n2),
                S2.held(n1) == False
            )
        )
    )

    return AcceptAction

M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')
get_grant_action(M, S1, S2) # only for testing

In [11]:
# Accept action

def get_accept_action(M: DistLockModel, S1: DistLockState, S2: DistLockState):
    e = Const('e', Epoch)
    n = Const('n', Node)

    AcceptAction = ForAll([n, e],
        Implies(
            And(
                S1.transfer(e, n),
                Not(M.le(e, S1.ep(n)))
            ),
            And(
                S2.held(n),
                S2.ep(n) == e,
                S2.locked(e, n)
            )
        )
    )

    return AcceptAction

M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')
get_accept_action(M, S1, S2) # only for testing

## VCs

In [12]:
def get_init_vc(M: DistLockModel, inv_fn):
    vc = ( #Not(
        Implies(
            M.get_axioms(),
            Implies(
                M.get_init_state_cond(),
                inv_fn(M, M.get_state('init'))
            )
        )
    )
    return vc

M = DistLockModel()
vc = get_init_vc(M, get_inv1)
print(vc.sexpr()) # only for testing

(let ((a!1 (forall ((e1 Epoch) (e2 Epoch) (e3 Epoch))
             (and (le e1 e1)
                  (=> (and (le e1 e2) (le e2 e3)) (le e1 e3))
                  (=> (and (le e1 e2) (le e2 e1)) (= e1 e2))
                  (or (le e1 e2) (le e2 e1))
                  (le zero e1)
                  (distinct one zero))))
      (a!2 (forall ((n Node) (e Epoch))
             (and (= (init.held n) (= n first))
                  (=> (distinct n first) (= (init.ep n) zero))
                  (= (init.ep first) one)
                  (= (init.transfer e n) false)
                  (= (init.locked e n) false))))
      (a!3 (forall ((e1 Epoch) (e2 Epoch) (n1 Node))
             (let ((a!1 (or (le e1 (init.ep n1)) (not (le e2 (init.ep n1))))))
               (=> (and (le e1 e2) (distinct e1 e2)) a!1)))))
  (=> a!1 (=> a!2 a!3)))


In [13]:
def get_inductiveness_vc(M: DistLockModel, S1: DistLockState, action_fn, S2: DistLockState, inv_fn):
    vc = ( #Not(
        Implies(
            M.get_axioms(),
            Implies(
                And(
                    inv_fn(M, S1),
                    action_fn(M, S1, S2),
                ),
                inv_fn(M, S2)
            )
        )
    )
    return vc

M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')
vc = get_inductiveness_vc(M, S1, get_accept_action, S2, get_inv1)
print(vc.sexpr()) # only for testing

(let ((a!1 (forall ((e1 Epoch) (e2 Epoch) (e3 Epoch))
             (and (le e1 e1)
                  (=> (and (le e1 e2) (le e2 e3)) (le e1 e3))
                  (=> (and (le e1 e2) (le e2 e1)) (= e1 e2))
                  (or (le e1 e2) (le e2 e1))
                  (le zero e1)
                  (distinct one zero))))
      (a!2 (forall ((e1 Epoch) (e2 Epoch) (n1 Node))
             (let ((a!1 (or (le e1 (pre.ep n1)) (not (le e2 (pre.ep n1))))))
               (=> (and (le e1 e2) (distinct e1 e2)) a!1))))
      (a!3 (forall ((n Node) (e Epoch))
             (let ((a!1 (and (pre.transfer e n) (not (le e (pre.ep n))))))
               (=> a!1 (and (post.held n) (= (post.ep n) e) (post.locked e n))))))
      (a!4 (forall ((e1 Epoch) (e2 Epoch) (n1 Node))
             (let ((a!1 (or (le e1 (post.ep n1)) (not (le e2 (post.ep n1))))))
               (=> (and (le e1 e2) (distinct e1 e2)) a!1)))))
  (=> a!1 (=> (and a!2 a!3) a!4)))


In [14]:
def get_safety_vc(M: DistLockModel, S: DistLockState, inv_fn, safety_fn):
    vc = ( # Not(
        Implies(
            M.get_axioms(),
            Implies(
                inv_fn(M, S),
                safety_fn(M, S)
            )
        )
    )
    return vc

M = DistLockModel()
S1 = M.get_state('pre')
vc = get_safety_vc(M, S1, get_inv1, get_safety_inv)
print(vc.sexpr()) # only for testing

(let ((a!1 (forall ((e1 Epoch) (e2 Epoch) (e3 Epoch))
             (and (le e1 e1)
                  (=> (and (le e1 e2) (le e2 e3)) (le e1 e3))
                  (=> (and (le e1 e2) (le e2 e1)) (= e1 e2))
                  (or (le e1 e2) (le e2 e1))
                  (le zero e1)
                  (distinct one zero))))
      (a!2 (forall ((e1 Epoch) (e2 Epoch) (n1 Node))
             (let ((a!1 (or (le e1 (pre.ep n1)) (not (le e2 (pre.ep n1))))))
               (=> (and (le e1 e2) (distinct e1 e2)) a!1))))
      (a!3 (forall ((e1 Epoch) (n1 Node) (n2 Node))
             (=> (and (pre.locked e1 n1) (pre.locked e1 n2)) (= n1 n2)))))
  (=> a!1 (=> a!2 a!3)))


In [42]:
# Inductiveness check using instantiatiojn
# ax & inv_pre & trans & \not inv_post
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')
inv_post = And([inv(M,S2).body() for inv in all_invars[:1]])
# print(inv_post)
inst_terms = extract_terms(inv_post)
# print(inst_terms)
# print(M.get_axioms())
# s1 = Solver()
# s1.add(M.get_axioms())
# print(s1.check())
ax_instantiation = instantiate(M.get_axioms(), inst_terms)
print(ax_instantiation)
# inv_pre_instantiation = And([instantiate(inv(M,S1), inst_terms) for inv in all_invars[:1]])
# trans_instantiation = instantiate(get_accept_action(M,S1,S2), inst_terms)
sol = Solver()
# inv_post
sol.add(ax_instantiation)
print(sol.sexpr())
print(sol.check())
# sol
# sol.add(ax_instantiation)
# sol
#sol.add(simplify(And(ax_instantiation, inv_pre_instantiation, trans_instantiation, Not(inv_post))))
# print('HereIam')
#print(sol.sexpr())
#sol
#sol.check()

{Node: set(), Epoch: {Var(2), Var(1), post.ep(Var(0))}}
([e1, e2, e3], And(le(e1, e1),
    Implies(And(le(e1, e2), le(e2, e3)), le(e1, e3)),
    Implies(And(le(e1, e2), le(e2, e1)), e1 == e2),
    Or(le(e1, e2), le(e2, e1)),
    le(zero, e1),
    one != zero))
quant_vars: [e1, e2, e3]
instantiations: [(Var(2), Var(2), Var(2)), (Var(2), Var(2), Var(1)), (Var(2), Var(2), post.ep(Var(0))), (Var(2), Var(1), Var(2)), (Var(2), Var(1), Var(1)), (Var(2), Var(1), post.ep(Var(0))), (Var(2), post.ep(Var(0)), Var(2)), (Var(2), post.ep(Var(0)), Var(1)), (Var(2), post.ep(Var(0)), post.ep(Var(0))), (Var(1), Var(2), Var(2)), (Var(1), Var(2), Var(1)), (Var(1), Var(2), post.ep(Var(0))), (Var(1), Var(1), Var(2)), (Var(1), Var(1), Var(1)), (Var(1), Var(1), post.ep(Var(0))), (Var(1), post.ep(Var(0)), Var(2)), (Var(1), post.ep(Var(0)), Var(1)), (Var(1), post.ep(Var(0)), post.ep(Var(0))), (post.ep(Var(0)), Var(2), Var(2)), (post.ep(Var(0)), Var(2), Var(1)), (post.ep(Var(0)), Var(2), post.ep(Var(0))), (post.e

In [41]:
s = Solver()
new = DeclareSort('new')
x = Const('x',new)
f = Function('f', new, IntSort())
s.add(And(f(x)==5))
s.check()
s.model()

## End to end examples

### Example 1

In [29]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = lambda M,S: And(all_invars[0](M,S), all_invars[1](M,S))

solver = Solver()
# solver.add()
cond = Not(And(
    #get_init_vc(M, inv),
    get_inductiveness_vc(M, S1, get_accept_action, S2, inv)
))
#cond
solver.add(cond)
solver.check()

The above tells that `get_inv1()` invariant satisfies the initial condition and is also inductive.

In [15]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = get_inv1

solver = Solver()
# solver.add()
solver.add(Not(And(
    get_init_vc(M, inv),
    get_inductiveness_vc(M, S1, get_accept_action, S2, inv),
    get_safety_vc(M, S1, inv, get_safety_inv)
)))
solver.check()

However, it does not imply the safety condition.

### Example 2

In [22]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = get_safety_inv # notice this <-- it's get_safety_inv and not get_inv1.

solver = Solver()
# solver.add()
solver.add(Not(And(
    get_init_vc(M, inv),
    # get_inductiveness_vc(M, S1, get_accept_action, S2, inv)
)))
solver.check()

The safety invariant satisfies the initial condition.

In [23]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = get_safety_inv # notice this <-- it's get_safety_inv and not get_inv1.

solver = Solver()
# solver.add()
solver.add(Not(And(
    get_init_vc(M, inv),
    get_inductiveness_vc(M, S1, get_accept_action, S2, inv)
)))
solver.check()

However, the safety invariant isn't inductive.

### Example 3

In [24]:
M = DistLockModel()
S1 = M.get_state('pre')

solver = Solver()
solver.add(Not(And(
    get_init_vc(M, get_inv1),
    get_safety_vc(M, S1, get_inv1, get_safety_inv))
))
solver.check()

This shows that `get_inv1` doesn't imply safety condition.

The conditions given to the solver are shown below:

In [25]:
print(solver.sexpr())

(declare-sort Node 0)
(declare-sort Epoch 0)
(declare-fun pre.locked (Epoch Node) Bool)
(declare-fun le (Epoch Epoch) Bool)
(declare-fun pre.ep (Node) Epoch)
(declare-fun zero () Epoch)
(declare-fun one () Epoch)
(declare-fun init.ep (Node) Epoch)
(declare-fun init.locked (Epoch Node) Bool)
(declare-fun init.transfer (Epoch Node) Bool)
(declare-fun first () Node)
(declare-fun init.held (Node) Bool)
(assert (let ((a!1 (forall ((e1 Epoch) (e2 Epoch) (e3 Epoch))
             (and (le e1 e1)
                  (=> (and (le e1 e2) (le e2 e3)) (le e1 e3))
                  (=> (and (le e1 e2) (le e2 e1)) (= e1 e2))
                  (or (le e1 e2) (le e2 e1))
                  (le zero e1)
                  (distinct one zero))))
      (a!2 (forall ((n Node) (e Epoch))
             (and (= (init.held n) (= n first))
                  (=> (distinct n first) (= (init.ep n) zero))
                  (= (init.ep first) one)
                  (= (init.transfer e n) false)
                  (= (init

The counter example given by Z3 is shown below

In [26]:
m = solver.model()
print(m.sexpr())

;; universe for Node:
;;   Node!val!1 Node!val!2 Node!val!0 
;; -----------
;; definitions for universe elements:
(declare-fun Node!val!1 () Node)
(declare-fun Node!val!2 () Node)
(declare-fun Node!val!0 () Node)
;; cardinality constraint:
(forall ((x Node)) (or (= x Node!val!1) (= x Node!val!2) (= x Node!val!0)))
;; -----------
;; universe for Epoch:
;;   Epoch!val!3 Epoch!val!0 Epoch!val!4 Epoch!val!1 Epoch!val!5 Epoch!val!6 Epoch!val!2 
;; -----------
;; definitions for universe elements:
(declare-fun Epoch!val!3 () Epoch)
(declare-fun Epoch!val!0 () Epoch)
(declare-fun Epoch!val!4 () Epoch)
(declare-fun Epoch!val!1 () Epoch)
(declare-fun Epoch!val!5 () Epoch)
(declare-fun Epoch!val!6 () Epoch)
(declare-fun Epoch!val!2 () Epoch)
;; cardinality constraint:
(forall ((x Epoch))
        (or (= x Epoch!val!3)
            (= x Epoch!val!0)
            (= x Epoch!val!4)
            (= x Epoch!val!1)
            (= x Epoch!val!5)
            (= x Epoch!val!6)
            (= x Epoch!val!2)))

Another way to access the counter example model is as shown below:

In [27]:
interp = M.get_interp(m)
set_param(max_lines=1000)
print(interp)

{'zero': Epoch!val!5, 'one': Epoch!val!6, 'le': [else ->
 Or(And(If(Var(0) == Epoch!val!4,
           Epoch!val!4,
           If(Var(0) == Epoch!val!3,
              Epoch!val!3,
              If(Var(0) == Epoch!val!1,
                 Epoch!val!1,
                 If(Var(0) == Epoch!val!5,
                    Epoch!val!5,
                    If(Var(0) == Epoch!val!0,
                       Epoch!val!0,
                       If(Var(0) == Epoch!val!6,
                          Epoch!val!6,
                          Epoch!val!2)))))) ==
        Epoch!val!3,
        If(Var(1) == Epoch!val!4,
           Epoch!val!4,
           If(Var(1) == Epoch!val!3,
              Epoch!val!3,
              If(Var(1) == Epoch!val!1,
                 Epoch!val!1,
                 If(Var(1) == Epoch!val!5,
                    Epoch!val!5,
                    If(Var(1) == Epoch!val!0,
                       Epoch!val!0,
                       If(Var(1) == Epoch!val!6,
                          Epoch!val!6,

## Full verification

This code below first checks if all the invariants produced by DistAI:
- Satisfy the initial state
- Imply Safety
- Are inductive

In [28]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = lambda M, S: And(*[inv(M, S) for inv in all_invars])

solver = Solver()
solver.add(Not(And(
    get_init_vc(M, inv),
    # get_safety_vc(M, S1, inv, get_safety_inv)
    # get_inductiveness_vc(M, S1, get_accept_action, S2, inv)
)))
solver.check()

Yay, invariants satisfy the initial conditions

In [29]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = lambda M, S: And(*[inv(M, S) for inv in all_invars])

solver = Solver()
solver.add(Not(And(
    # get_init_vc(M, inv),
    get_safety_vc(M, S1, inv, get_safety_inv)
    # get_inductiveness_vc(M, S1, get_accept_action, S2, inv)
)))
solver.check()

Yay, invariants imply the safety condition

In [34]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = lambda M, S: And(*[inv(M, S) for inv in all_invars])

solver = Solver()
cond1 = Not(And(
    # get_init_vc(M, inv),
    # get_safety_vc(M, S1, inv, get_safety_inv)
    get_inductiveness_vc(M, S1, get_accept_action, S2, inv)
))
cond1 = z3.simplify(cond1)

#solver.check()

And....Z3 chokes on the inductive invariants (wrt the accept action)

In [None]:
M = DistLockModel()
S1 = M.get_state('pre')
S2 = M.get_state('post')

inv = lambda M, S: And(*[inv(M, S) for inv in all_invars])

solver = Solver()
solver.add(Not(And(
    # get_init_vc(M, inv),
    # get_safety_vc(M, S1, inv, get_safety_inv)
    get_inductiveness_vc(M, S1, get_grant_action, S2, inv)
)))
solver.check()

It also chokes on the inductive invariants (wrt the grant action)