In [609]:
from z3 import *

import yaml
import pandas as pd

from dataclasses import dataclass
from pprint import pprint
from typing import Optional
from itertools import product

pd.set_option('display.max_rows', None)

In [610]:
MM_FILE = './assets/metamodels/doml_meta_v2.0.yaml'

# IM_FILE = './assets/doml/2.0/nginx-openstack_v2.0.yaml'
IM_FILE = './assets/doml/2.0/nginx-openstack_v2.0_wrong_vm_iface.yaml'

UB_ELEMS = 1

In [611]:
with open(MM_FILE, 'r') as mm_file:
    mm = yaml.safe_load(mm_file)

with open(IM_FILE, 'r') as im_file:
    im = yaml.safe_load(im_file)

We need to create first the elements by parsing the metamodel

In [612]:
classes = { f'{cat_k}_{elem_k}' : elem_v
    for cat_k, cat_v in mm.items()
    for elem_k, elem_v in cat_v.items() 
}

def merge_superclass(elem):
    e_k, e_v = elem
    sc_k = e_v.get('superclass')
    sc_v = classes.get(sc_k)
    if not sc_k:
        e_v['associations'] = {f'{e_k}::{k}': v for k,v in e_v.get('associations',{}).items()}
        e_v['attributes'] = {f'{e_k}::{k}': v for k,v in e_v.get('attributes',{}).items()}
        return e_v
    else:
        e_v['associations'] = {f'{e_k}::{k}': v for k,v in e_v.get('associations',{}).items()} | sc_v.get('associations',{})
        e_v['attributes'] = {f'{e_k}::{k}': v for k,v in e_v.get('attributes',{}).items()} | sc_v.get('attributes',{})
        return e_v

# all the elements with all the inherited attributes and associations
merged_classes = { 
    k : merge_superclass((k,v))
    for k,v in classes.items()
}

In [613]:

@dataclass
class Class:
    name: str
    attributes: dict[str, dict]
    associations: dict[str, dict]
    ref: Optional[SortRef] = None

@dataclass
class Elem:
    id: str
    name: Optional[str]
    attributes: dict[str, dict]
    associations: dict[str, dict]
    eClass: Optional[Class] = None
    ref: Optional[SortRef] = None
    unbound: bool = False

@dataclass
class AssocRel:
    from_elem: Class
    to_elem: Class
    inverse_of: Optional[str]
    ref: Optional[FuncDeclRef] = None

@dataclass
class AttrRel:
    multiplicity: str
    type: str
    ref: Optional[FuncDeclRef] = None

# Convert ELEMS
CLASSES = {
    class_k : Class(
        class_k, 
        class_v.get('attributes', {}), 
        class_v.get('associations', {})
    )
    for class_k, class_v in merged_classes.items()
}

ELEMS = {
    elem_k : Elem(
        elem_v['id'],
        elem_v['name'],
        elem_v['attrs'],
        elem_v['assocs'],
        CLASSES[elem_v['class']],
    )
    for elem_k, elem_v in im.items()
}

# This also helps catching errors in class/assoc names
ASSOCS = {
    f'{assoc_k}' : AssocRel(class_v, CLASSES[assoc_v['class']], assoc_v.get('inverse_of', None))
    for class_k, class_v in CLASSES.items()
    for assoc_k, assoc_v in class_v.associations.items()
}

# Careful: I decided to default multiplicity to 0..1
ATTRS = {
    f'{attr_k}' : AttrRel(attr_v.get('multiplicity', '0..1'), attr_v['type'])
    for class_k, class_v in CLASSES.items()
    for attr_k, attr_v in class_v.attributes.items()
}

Visualization of all the Elems and Assocs

In [614]:
classes_df = pd.DataFrame.from_dict([
    {
        'name': name
    }
    for name, value in CLASSES.items()
])

classes_df

Unnamed: 0,name
0,commons_DOMLElement
1,commons_Property
2,commons_IProperty
3,commons_SProperty
4,commons_FProperty
5,commons_BProperty
6,commons_Configuration
7,commons_Deployment
8,application_ApplicationLayer
9,application_ApplicationComponent


In [615]:
assoc_df = pd.DataFrame.from_dict([
    {
        'name': name,
        'from': value.from_elem.name,
        'to': value.to_elem.name,
        'inverse': value.inverse_of
    }
    for name, value in ASSOCS.items()
])

assoc_df[['from', 'name', 'to', 'inverse']]

Unnamed: 0,from,name,to,inverse
0,concrete_ComputingGroup,commons_DOMLElement::annotations,commons_Property,
1,commons_BProperty,commons_Property::reference,commons_DOMLElement,
2,commons_Configuration,commons_Configuration::deployments,commons_Deployment,
3,commons_Deployment,commons_Deployment::component,application_ApplicationComponent,
4,commons_Deployment,commons_Deployment::node,infrastructure_InfrastructureElement,
5,application_ApplicationLayer,application_ApplicationLayer::components,application_ApplicationComponent,
6,application_DBMS,application_SoftwareComponent::exposedInterfaces,application_SoftwareInterface,
7,application_DBMS,application_SoftwareComponent::consumedInterfaces,application_SoftwareInterface,
8,application_SaaSDBMS,application_SaaS::exposedInterfaces,application_SoftwareInterface,
9,infrastructure_InfrastructureLayer,infrastructure_InfrastructureLayer::nodes,infrastructure_ComputingNode,


## Z3 Sort Definitions
Here we define the Sorts of Elements and Associations as Enums (as they are finite), and the relationship between two elements as a Function `AssocRel :: ElemSort, AssocSort, ElemSort -> BoolSort`, which tells us if two items are in a relationship (returns true) or not.

In [616]:
# Init Z3 solver
s = Solver()

def Iff(a, b):
    return a == b

In [617]:
ub_elems = { f'elem_ub_{i}' : Elem(
    f'elem_ub_{i}',
    f'Unbound Element #{i}',
    {},
    {},
    unbound=True
    ) 
    for i in range(UB_ELEMS)
}

ELEMS |= ub_elems

In [618]:
class_sort, classes = EnumSort('Class', list(CLASSES.keys()))
# Add the Ref to each ELEM
for c in classes:
    CLASSES[str(c)].ref = c

elem_sort, elems = EnumSort('Elem', list(ELEMS.keys()))
for elem in elems:
    ELEMS[str(elem)].ref = elem

# Where assoc_sort is an EnumSort of all associations names...
assoc_sort, assocs = EnumSort('Assoc', list(ASSOCS.keys()))
# Add the Ref to each ASSOC
for assoc in assocs:
    ASSOCS[str(assoc)].ref = assoc

attr_sort, attrs = EnumSort('Attr', list(ATTRS.keys()))
# Add the Ref to each ATTR
for attr in attrs:
    ATTRS[str(attr)].ref = attr


In [619]:
# ElemClass(Elem) -> Class
elem_class_fn = Function('ElemClass', elem_sort, class_sort)

for _, elem in ELEMS.items():
    if not elem.unbound:
        s.assert_and_track(
            elem_class_fn(elem.ref) == elem.eClass.ref, 
            f'ElemClass {elem.id} {elem.eClass.name}'
        )

In [620]:
# AssocRel(Elem, Assoc, Elem) -> Bool
assoc_rel = Function('AssocRel', elem_sort, assoc_sort, elem_sort, BoolSort())

assoc_a = Const('assoc_a', assoc_sort)
for (_, e1), (_, e2) in product(ELEMS.items(), ELEMS.items()):
    if (not e1.unbound and not e2.unbound):
        stmt = ForAll(
            [assoc_a],
            Iff(
                assoc_rel(e1.ref, assoc_a, e2.ref),
                Or(
                    *(
                        assoc_a == ASSOCS[e1_assoc_k].ref
                        for e1_assoc_k, e1_assoc_v in e1.associations.items()
                        if e2.id in e1_assoc_v
                    )
                )
            )
        )
        s.assert_and_track(stmt, f'AssocRel {e1.id} {e2.id}')

The following assertions allow us to ensure that unbound elements (that till now have not been assigned any constraint) have a class enforced onto them if they belong to a certain relationship

In [621]:
elem_a = Const('elem_a', elem_sort)
for assoc_k, assoc_v in ASSOCS.items():
    for _, ub_elem in ELEMS.items():
        if ub_elem.unbound:
            s.assert_and_track(
                ForAll(
                    [elem_a],
                    Implies(
                        assoc_rel(elem_a,  assoc_v.ref, ub_elem.ref),
                        elem_class_fn(ub_elem.ref) == assoc_v.to_elem.ref
                    )
                ),
                f'AssocRel_EnforceClass {assoc_k}'
            )

In [622]:
# WORKING - 4 Jan 2023 17:30
# # AttrRel(Elem, Attr, Value) -> Bool
# attr_int_rel = Function('AttrIntRel', elem_sort, attr_sort, IntSort(), BoolSort())
# # ...do the others
# attr_a = Const('attr_a', attr_sort)
# attr_value_b = Const('attr_value_b', IntSort())
# for _, elem in ELEMS.items():
#     if elem.eClass:
#         eClassAttrs = elem.eClass.attributes
#         print(f'{elem.id} {elem.name}')
#         for k, v in eClassAttrs.items():
#             if v['type'] == 'Integer':
#                 print('\t', k)
#         stmt = ForAll(
#             [attr_a],
#             Iff(
#                 attr_int_rel(elem.ref, attr_a, attr_value_b),
#                 Or(
#                     *(
#                         attr_a == ATTRS[attr_name].ref
#                         for attr_name, attr_value in eClassAttrs.items()
#                         if attr_value['type'] == 'Integer'
#                     )
#                 )
#             )
#         )
#         s.assert_and_track(stmt, f'AttrIntRel {elem.id} Int')

# TEST - 4 Jan 2023 17:30 - WORKS
# AttrRel(Elem, Attr) -> Bool
# True if Elem has Attr relationship
# attr_int_exist_rel = Function('AttrIntExistRel', elem_sort, attr_sort, BoolSort())
# # ...do the others
# attr_a = Const('attr_a', attr_sort)
# for _, elem in ELEMS.items():
#     if elem.eClass:
#         eClassAttrs = elem.eClass.attributes
#         print(f'{elem.id} {elem.name}')
#         for k, v in eClassAttrs.items():
#             type = v['type']
#             print(f'\t{k} ({type})')
#         stmt = ForAll(
#             [attr_a],
#             Iff(
#                 attr_int_exist_rel(elem.ref, attr_a),
#                 Or(
#                     *(
#                         attr_a == ATTRS[attr_name].ref
#                         for attr_name, attr_value in eClassAttrs.items()
#                         if attr_value['type'] == 'Integer'
#                     )
#                 )
#             )
#         )
#         s.assert_and_track(stmt, f'AttrIntExistRel {elem.id}')


In [623]:
attr_int_exist_rel = Function('AttrIntExistRel', elem_sort, attr_sort, BoolSort())
elem_a = Const('elem_a', elem_sort)
attr_a = Const('attr_a', attr_sort)
for class_k, class_v in CLASSES.items():
    s.assert_and_track((
        ForAll(
            [elem_a],
            Implies(
                elem_class_fn(elem_a) == class_v.ref,
                ForAll(
                    [attr_a],
                    Iff(
                        attr_int_exist_rel(elem_a, attr_a),
                        Or(
                            *(
                                attr_a == ATTRS[i].ref
                                for i in class_v.attributes.keys()
                                if ATTRS[i].type == 'Integer'
                            )
                        )
                    )    
                )
            )
        )
    ), f'AttrIntExistRel {class_k}')

In [624]:
attr_int_exist_value_rel = Function('AttrIntExistValueRel', elem_sort, attr_sort, BoolSort())
attr_a = Const('attr_a', attr_sort)

for elem_k, elem_v in ELEMS.items():
    if not elem_v.unbound:
        s.assert_and_track((
            ForAll(
                [attr_a],
                        Iff(
                            attr_int_exist_value_rel(elem_v.ref, attr_a),
                            Or(
                                *(
                                    attr_a == ATTRS[eAttr_k].ref
                                    for eAttr_k, eAttr_v in elem_v.attributes.items()
                                    if ATTRS[eAttr_k].type == 'Integer' and len(eAttr_v) == 1
                                )
                            )
                        )
            )
        ), f'AttrIntExistValueRel {elem_k}')

In [625]:
attr_int_value_rel = Function('AttrIntValueRel', elem_sort, attr_sort, IntSort())

for elem_k, elem_v in ELEMS.items():
    if not elem_v.unbound:
        for attr_k, attr_v in elem_v.attributes.items():
            # print(f'{elem_k}({elem_v.name}):\n\t{attr_k}({ATTRS[attr_k].type}, {ATTRS[attr_k].multiplicity}) = {attr_v[0]}')
            if ATTRS[attr_k].type == 'Integer' and len(attr_v) == 1:  
                s.assert_and_track((
                    attr_int_value_rel(elem_v.ref, ATTRS[attr_k].ref) == attr_v[0]
                ), f'AttrIntValueRel {elem_k} {attr_k}')

In [626]:
assert(s.check() == sat)

Since we're done with the setup, we need to be sure that the base model is satisfiable!

Right now we're not checking any requirement. We're just telling the solver which are the valid relationships between stuff.

### Tests

In [627]:
# TEST: VirtualMachine must have only its own attribute relationship, and not others.
# VirtualMachine DOES NOT have Attribute infrastructure_Rule::cidr
# so we're expecting it to be unsatisfiable
vm = Const('vm', elem_sort)

s.push()
s.add(
    ForAll(
        [vm],
        Implies(
            elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
            attr_int_exist_rel(vm, ATTRS["infrastructure_Rule::cidr"].ref)
        )
    )
)
# If this fails, it means that you are assigning attribute relationship wrong!
assert s.check() == unsat
s.pop()

In [628]:
# Same test, for another component, but this time it MUST have its own attribute.
s.push()
s.add(
    ForAll(
        [vm],
        Implies(
            elem_class_fn(vm) == CLASSES["infrastructure_Rule"].ref,
            attr_int_exist_rel(vm, ATTRS["infrastructure_Rule::fromPort"].ref)
        )
    )
)
# If this fails, it means that you are assigning attribute relationship wrong!
assert s.check() == sat
s.pop()

In [629]:
# TEST: VirtualMachine must have a value specified in the DOML 
# for attribute `infrastructure_ComputingNode::cpu_count`
# MAKE SURE THAT in the test DOML/IM.yaml every VM DOES NOT have an attribute
# `infrastructure_ComputingNode::cpu_count`
s.push()
s.add(
    ForAll(
        [vm],
        Implies(
            elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
            attr_int_exist_value_rel(vm, ATTRS["infrastructure_ComputingNode::cpu_count"].ref)
        )
    )
)
# If this fails, it means that you are assigning attribute relationship wrong!
assert s.check() == unsat
s.pop()

In [630]:
# TEST: VirtualMachine must have a value specified in the DOML 
# for attribute `infrastructure_ComputingNode::memory_mb`
# MAKE SURE THAT in the test DOML/IM.yaml every VM HAS an attribute
# `infrastructure_ComputingNode::memory_mb`
s.push()
s.add(
    ForAll(
        [vm],
        Implies(
            elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
            attr_int_exist_value_rel(vm, ATTRS["infrastructure_ComputingNode::memory_mb"].ref)
        )
    )
)
# If this fails, it means that you are assigning attribute relationship wrong!
assert s.check() == sat
s.pop()

In [631]:
# TEST: VirtualMachine must have a value specified in the DOML 
# for attribute `infrastructure_ComputingNode::memory_mb` and it has
# to be equal to 2048.
s.push()
s.add(
    ForAll(
        [vm],
        Implies(
            elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
            attr_int_value_rel(vm, ATTRS["infrastructure_ComputingNode::memory_mb"].ref) == 2048
        )
    )
)
# If this fails, it means that you are assigning attribute relationship wrong!
assert s.check() == sat
s.pop()

In [632]:
# TEST: VirtualMachine must have a value specified in the DOML 
# for attribute `infrastructure_ComputingNode::memory_mb` and it has
# to be equal to 2048. 
s.push()
s.add(
    ForAll(
        [vm],
        Implies(
            elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
            attr_int_value_rel(vm, ATTRS["infrastructure_ComputingNode::memory_mb"].ref) == 1024
        )
    )
)
# If this fails, it means that you are assigning attribute relationship wrong!
assert s.check() == unsat
s.pop()

Congrats! You passed all the tests!

Time for the real requirements!

## (Real) Requirements

In [633]:
vm, iface = Consts("vm iface", elem_sort)

req_vm_iface = ForAll(
    [vm],
    Implies(
        elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
        Exists(
            [iface],
            And(
                assoc_rel(vm, ASSOCS["infrastructure_ComputingNode::ifaces"].ref, iface)
            )
        )
    )
)
s.assert_and_track(req_vm_iface, "vm_iface")

In [634]:
memory_attr_ref = ATTRS["infrastructure_ComputingNode::memory_mb"].ref

req_vm_cpucount = ForAll(
    [vm],
    Implies(
        elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
        And(
            attr_int_exist_rel(vm, memory_attr_ref),
            attr_int_exist_value_rel(vm, memory_attr_ref),
            attr_int_value_rel(vm, memory_attr_ref) == 2048
        )
    )
)
s.assert_and_track(req_vm_cpucount, "vm_memory_mb")
s.check()

In [635]:
vm = Const("vm", elem_sort)

cpu_attr_ref = ATTRS["infrastructure_ComputingNode::cpu_count"].ref

req_vm_cpucount = ForAll(
    [vm],
    Implies(
        elem_class_fn(vm) == CLASSES["infrastructure_VirtualMachine"].ref,
        And(
            attr_int_exist_rel(vm, cpu_attr_ref),
            # attr_int_exist_value_rel(vm, cpu_attr_ref),
            attr_int_value_rel(vm, cpu_attr_ref) > 4
        )
    )
)
s.assert_and_track(req_vm_cpucount, "vm_cpu_count")
s.check()

In [636]:
iface = Const("iface", elem_sort)

endpoint_attr_ref = ATTRS["infrastructure_NetworkInterface::endPoint"].ref

req_iface_endpoint = ForAll(
    [iface],
    Implies(
        elem_class_fn(iface) == CLASSES["infrastructure_NetworkInterface"].ref,
        And(
            attr_int_exist_rel(iface, endpoint_attr_ref),
            # attr_int_exist_value_rel(vm, cpu_attr_ref),
            attr_int_value_rel(iface, endpoint_attr_ref) >= 16777216 
            # 16777216 ==toIP=> 1.0.0.0 first valid IP class-A ip address
        )
    )
)
s.assert_and_track(req_iface_endpoint, "vm_iface_endpoint")
s.check()

In `req_vm_cpucount` when using
```python
attr_int_exist_rel(vm, ATTRS["infrastructure_ComputingNode::cpu_count"].ref)
```
it works (produces *sat*) since `cpu_count` is indeed a property of VM.

If we use another attr. key instead, such as `infrastructure_KeyPair::keyfile`,
we get *unsat* as a result, which is consistent, since `keyfile` doesn't belong to the
properties of a VM.

In [637]:
isSat = s.check()
assert(isSat == sat)
print(isSat)

sat


With `ub_elems = 1` the above assert should fail, because the solver didn't assign any new element as a NetworkInterface to `vm1`.

In [638]:
s.check()
model = s.model()

In [639]:
vmCpuValueRef = model.eval(attr_int_value_rel(vm, cpu_attr_ref))
print(vmCpuValueRef)
# If arg0:<vm = elem_1234> arg1:<val_if_true> arg2:<val_if_false>
assignedElem = vmCpuValueRef.arg(0)
assignedValue = vmCpuValueRef.arg(1)
# arg0:vm = arg1:elem_1234
elem = ELEMS[str(assignedElem.arg(1))] # which is 'vm1'!
print(f'{elem.id} ({elem.name}) has been assigned {str(cpu_attr_ref)} = {assignedValue}')

If(vm == elem_139682454814288, 5, 2)
elem_139682454814288 (vm1) has been assigned infrastructure_ComputingNode::cpu_count = 5


In [640]:
# A value that is not assigned by anything is equal to 2???
cost_val = model.eval(attr_int_value_rel(vm, ATTRS['infrastructure_ComputingNode::cost'].ref))
print(cost_val)

2


In [641]:
elem_ub_0_class = model.eval(elem_class_fn(ELEMS['elem_ub_0'].ref))
assert elem_ub_0_class == CLASSES['infrastructure_NetworkInterface'].ref

In [642]:
assert model.eval(assoc_rel(ELEMS['elem_139682454814288'].ref, ASSOCS['infrastructure_ComputingNode::ifaces'].ref, ELEMS['elem_ub_0'].ref))
# Is vm1.ifaces = [elem_ub_0]? Should be TRUE

In [643]:
ifaceEndpointValueRef = s.model().eval(attr_int_value_rel(iface, endpoint_attr_ref))
print(ifaceEndpointValueRef)
# # If arg0:<vm = elem_1234> arg1:<val_if_true> arg2:<val_if_false>
# assignedElem = vmCpuValueRef.arg(0)
# assignedValue = vmCpuValueRef.arg(1)
# # arg0:vm = arg1:elem_1234
# elem = ELEMS[str(assignedElem.arg(1))] # which is 'vm1'!
# print(f'{elem.id} ({elem.name}) has been assigned {str(cpu_attr_ref)} = {assignedValue}')

If(iface == elem_ub_0, 16777216, 2)


In [644]:
# # List all model stuff
# model_df = pd.DataFrame([str(x).split(' ') for x in list(model)])
# model_df.sort_values(by=0)