In [1]:
from xml.etree import ElementTree
import datetime
import time

In [2]:
def domain_xml_to_ocaml(domain):
    if domain.tag == 'id':
        ocaml_domain = 'DomainId ({})'.format(domain.text)
    elif domain.tag == 'range':
        ocaml_domain = 'DomainRange ({}, {})'.format(
            domain.findtext('min'),
            domain.findtext('max'))
    else:
        #TODO: account for unbounded domain range
        raise Exception('Unbounded domain')
    return ocaml_domain


def criterian_xml_to_ocaml(criterian):
    ocaml_criterians = []
    if criterian.tag in ['topics', 'partitions']:
        for element in list(criterian):
            ocaml_criterians.append(element.text)
    elif criterian.tag == 'data_tags':
        for element in list(criterian):
            tag = '("{name}", "{value}")'.format(
                name=element.findtext('name'),
                value=element.findtext('value'))
            ocaml_criterians.append(tag)
    return ocaml_criterians
    

def criteria_xml_to_ocaml(criteria):
    ocaml_criterias = {}
    criterian_types = ['topics', 'partitions', 'data_tags']
    for criterian_type in criterian_types:
        criterians = criteria.findall(criterian_type)
        if criterians:
            criterian = ElementTree.Element(criterian_type)
            for _criterian in criterians:
                criterian.extend(_criterian)
            ocaml_criterians = criterian_xml_to_ocaml(criterian)
            ocaml_criterians = '"' + '";"'.join(ocaml_criterians) + '"'
        else:
            ocaml_criterians = []
            ocaml_criterians = ""
        ocaml_criterias[criterian_type] = ocaml_criterians
    
    
    ocaml_string = ('\n{{topics = [{topics}];\n'
                    'partitions = [{partitions}];\n'
                    'tags = [{data_tags}]\n}}')
    ocaml_string = ocaml_string.format(
        topics=ocaml_criterias['topics'],
        partitions=ocaml_criterias['partitions'],
        data_tags=ocaml_criterias['data_tags'],
    )
    return ocaml_string.replace("\n", "\n\t")


def rule_xml_to_ocaml(rule):
    ocaml_domains = []
    for domain in rule.find('domains'):
        ocaml_domain = domain_xml_to_ocaml(domain)
        ocaml_domains.append(ocaml_domain)
    ocaml_domains = '[' + '";"'.join(ocaml_domains) + ']'
    
    if rule.tag == 'allow_rule':
        ocaml_qualifier = 'ALLOW'
    if rule.tag == 'deny_rule':
        ocaml_qualifier = 'DENY'
    ocaml_criterias = {}
    criteria_types = ['publish', 'subscribe', 'relay']
    for criteria_type in criteria_types:
        criterias = rule.findall(criteria_type)
        if criterias:
            criteria = ElementTree.Element(criteria_type)
            for _criteria in criterias:
                criteria.extend(_criteria)
            ocaml_criteria = criteria_xml_to_ocaml(criteria)
            ocaml_criteria = '[{}]'.format(ocaml_criteria)
        else:
            ocaml_criteria = []
            ocaml_criteria = '[]'
        ocaml_criterias[criteria_type] = ocaml_criteria
    
    ocaml_rule = ('\n{{domains = {domains};\n'
                  'qualifier = {qualifier};\n'
                  'publish = {publish};\n'
                  'subscribe = {subscribe};\n'
                  'relay = {relay}\n}}')
    ocaml_rule = ocaml_rule.format(
        domains=ocaml_domains,
        qualifier=ocaml_qualifier,
        publish=ocaml_criterias['publish'],
        subscribe=ocaml_criterias['subscribe'],
        relay=ocaml_criterias['relay'],
    )
    return ocaml_rule.replace("\n", "\n\t")


def grant_xml_to_ocaml(grant):
    ocaml_subject_name = '"{subject_name}"'.format(
        subject_name=grant.findtext('subject_name'))
    validity = grant.find('validity')
    not_before = datetime.datetime.strptime(
            validity.findtext('not_before'),
            "%Y-%m-%dT%H:%M:%S").timestamp()
    not_after = datetime.datetime.strptime(
            validity.findtext('not_after'),
            "%Y-%m-%dT%H:%M:%S").timestamp()
    ocaml_validity = '{{low = {low}; high = {high}}}'.format(
        low=int(not_before),
        high=int(not_after))
    ocaml_rules = []
    for rule in grant.findall('*/domains/..'):
        ocaml_rule = rule_xml_to_ocaml(rule)
        ocaml_rules.append(ocaml_rule)
    
    ocaml_rules = '[' + '";"'.join(ocaml_rules) + ']'
    ocaml_default = grant.findtext('default')
    
    
    ocaml_grant = ('\n{{subject_name = {subject_name};\n'
                  'validity = {validity};\n'
                  'rules = {rules};\n'
                  'default = {default}\n}}')
    ocaml_grant = ocaml_grant.format(
        subject_name=ocaml_subject_name,
        validity=ocaml_validity,
        rules=ocaml_rules,
        default=ocaml_default,
    )
    
    return ocaml_grant.replace("\n", "\n\t")


def permissions_xml_to_ocaml(root):
    ocaml_grants = []
    permissions = root.find('permissions')
    for grant in permissions.findall('grant'):
        ocaml_grant = grant_xml_to_ocaml(grant)
        ocaml_grants.append(ocaml_grant)
    
    ocaml_grants = '[' + '";"'.join(ocaml_grants) + ']'
    
    return ocaml_grants        

In [3]:
import pexpect
import shlex
import subprocess
import sys

In [4]:
class BaseProver:
    def __init__(self, logfile=sys.stdout):
        command = 'imandra-repl'
        args = ['-raw']
        try:
            self._init_child(command, args, logfile)
        except:
            try:
                self._reset()
                self._init_child(command, args, logfile)
            except:
                raise Exception('Prover reset error')
        
    def _init_child(self, command, args, logfile):
        self._child = pexpect.spawn(
            command=command,
            args=args,
            encoding='utf-8')
        self._child.logfile = logfile
        try:
            self._child.expect('type `#help;;` to access help')
            self._child.expect('#')
        except:
            raise Exception('Prover init error')
        
    def __del__(self):
        self._child.sendcontrol('d')
        self._child.kill(9)
    
    def _reset(self):
        command = shlex.split('imandra-repl --force-kill')
        completed_process = subprocess.run(
            command,
            stdout=subprocess.PIPE)
        print(completed_process.stdout.decode())

In [66]:
class PermissionProver(BaseProver):
    def __init__(self, logfile=sys.stdout):
        BaseProver.__init__(self, logfile)
        self._init_context('verification/type.ml')
        self._init_context('verification/fnmatch.ml')
        self._init_context('verification/evaluate.ml')
        
        self._child.send('#redef true')
        self._child.send(';;')
        self._child.sendcontrol('m')
        try:
            self._child.expect('#')
        except:
            raise Exception('Prover context error')
        
    def _init_context(self, contex_path):
        with open(contex_path, 'r') as f:
            context = f.read().replace('\n', '\r')
        self._child.send(context)
        self._child.send(';;')
        self._child.sendcontrol('m')
        try:
            self._child.expect('#')
        except:
            raise Exception('Prover context error')

    def check_from_x_to_y(self, perm_x, perm_y, constraints):
        perm_x = perm_x.replace('\n', '\r')
        perm_y = perm_y.replace('\n', '\r')
        constraints = constraints.replace('\n', '\r')
        
        self._child.send(perm_x)
        self._child.send(';;')
        self._child.sendcontrol('m')
        try:
            self._child.expect('#')
        except:
            raise Exception('Prover context error')
        
        self._child.send(perm_y)
        self._child.send(';;')
        self._child.sendcontrol('m')
        try:
            self._child.expect('#')
        except:
            raise Exception('Prover context error')
        
        self._child.send(constraints)
        self._child.send(';;')
        self._child.sendcontrol('m')
        try:
            self._child.expect('#')
        except:
            raise Exception('Prover context error')
        

In [67]:
permission_prover = PermissionProver()

            .__      /\ .__                           .___            
     _____  |__|    / / |__| _____ _____    ____    __| _/___________   
     \__  \ |  |   / /  |  |/     \__   \  /    \  / __ |\_  __ \__  \  
      / __ \|  |  / /   |  |  Y Y  \/ __ \|   |  \/ /_/ | |  | \// __ \_
     (____  /__| / /    |__|__|_|  (____  /___|  /\____ | |__|  (____  /
          \/     \/              \/     \/     \/      \/            \/ 
----------------------------------------------------------------------------
 Imandra Commander v1.0.3 - (c)Copyright Aesthetic Integration Ltd, 2014-18
----------------------------------------------------------------------------
type `#help;;` to access help
type qualifier =
  | ALLOW
  | DENY
  | NONE

type criteria = {
  topics : string list;
  partitions : string list;
  tags : (string * string) list
}

type domain =
  | DomainId of int
  | DomainRange of int * int

type rule = {
  domains : domain list;
  qualifier : qualifier;
  publish : criteria list

In [68]:
talker = ElementTree.parse('tests/data/talker.xml').getroot()
talker = 'let perms_x = ' + permissions_xml_to_ocaml(talker)

listener = ElementTree.parse('tests/data/listener.xml').getroot()
listener = 'let perms_y = ' + permissions_xml_to_ocaml(listener)

In [69]:
constraints = """
let test (x:subject) (y:subject) bool =
    let x_allowed = (evaluate perms_x x = ALLOW) in
    let y_allowed = (evaluate perms_y y = ALLOW) in
    
    let common_domain = (x.domainId = y.domainId) in
    let common_topic = (x.topic = y.topic) in
    let common_partition = (x.partition = y.partition) in
    let common_dataTag = (x.dataTag = y.dataTag) in
    let complementary_action = (x.action <> y.action) in
    let qos_match = common_domain 
        && common_topic 
        && common_partition 
        && common_dataTag 
        && complementary_action in
    
    let coincide_time = (x.time = y.time) in
    
    x_allowed && y_allowed && coincide_time && qos_match

instance test
"""

In [70]:
# constraints = """
# let test (x:subject) (y:subject) bool =
#     let x_allowed = (evaluate perms_x x = ALLOW) in
#     let y_allowed = (evaluate perms_y y = ALLOW) in
    
#     let common_domain = (x.domainId = y.domainId) in
#     let common_topic = (x.topic = y.topic) in
#     let common_partition = (x.partition = y.partition) in
#     let common_dataTag = (x.dataTag = y.dataTag) in
#     let complementary_action = (x.action <> y.action) in
#     let qos_match = common_domain 
#         && common_topic 
#         && common_partition 
#         && common_dataTag 
#         && complementary_action in
    
#     let coincide_time = (x.time = y.time) in
    
#     x_allowed && y_allowed && coincide_time && qos_match
# ;;
# #program;;
# > let my_wrapper (f_name : string) =
#      match Verify_frontend.top f_name with
#       | Verify_frontend.V_proved _ -> "proved"
#       | Verify_frontend.V_refuted _ -> "refuted"
#       | Verify_frontend.V_unknown _ -> "unknown"
# ;;
# #logic;;
# my_wrapper "test"
# """

In [71]:
permission_prover.check_from_x_to_y(talker, listener, constraints)

let perms_x = [
	{subject_name = "CN=talker";
	validity = {low = 1382770800; high = 1698385530};
	rules = [
		{domains = [DomainId (0)];
		qualifier = ALLOW;
		publish = [
			{topics = ["rt/chatter"];
			partitions = [];
			tags = []
			}];
		subscribe = [
			{topics = ["rt/clock";"rt/parameter_events"];
			partitions = [];
			tags = []
			}];
		relay = []
		}];
	default = DENY
	}]                                      ;;
val perms_x : grant list =
  [{subject_name = "CN=talker";
    validity = {low = 1382770800; high = 1698385530};
    rules =
     [{domains = [DomainId 0]; qualifier = ALLOW;
       publish = [{topics = ["rt/chatter"]; partitions = []; tags = []}];
       subscribe =
        [{topics = ["rt/clock"; "rt/parameter_events"]; partitions = [];
          tags = []}];
       relay = []}];
    default = DENY}]
let perms_y = [
	{subject_name = "CN=listener";
	validity = {low = 1382770800; high = 1698385530};
	rules = [
		{domains = [DomainId (0)];
		qualifier = ALLOW;
		publish

In [72]:
permission_prover._child.expect(['[✗].*\r','[✓].*\r'])

val test : subject -> subject -> 'a -> bool = <fun>
2 base ground recursive instances
Instance (after 22 steps, 0.513s):
 let _x_0 =
   {subName = "CN=talker"; action = PUBLISH; domainId = 0;
    topic = "rt/chatter"; partition = ""; dataTag = ("", "");
    time = 1382771546}
 let _x_1 =
   {subName = "CN=listener"; action = SUBSCRIBE; domainId = 0;
    topic = "rt/chatter"; partition = ""; dataTag = ("", "");
    time = 1382771546}
 let _x_2 = ()
[32;1m[✔][0m Instance found.
module CX : sig val _x_0 : subject val _x_1 : subject val _x_2 : unit end
# 

TIMEOUT: Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0x7f59909c66a0>
command: /usr/local/bin/imandra-repl
args: [b'/usr/local/bin/imandra-repl', b'-raw']
buffer (last 100 chars): 'Instance found.\r\n\r\x1b[2K\rmodule CX : sig val _x_0 : subject val _x_1 : subject val _x_2 : unit end\r\n# '
before (last 100 chars): 'Instance found.\r\n\r\x1b[2K\rmodule CX : sig val _x_0 : subject val _x_1 : subject val _x_2 : unit end\r\n# '
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 14709
child_fd: 63
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: <ipykernel.iostream.OutStream object at 0x7f5994d8a0f0>
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
    0: re.compile("[✗].*")
    1: re.compile("[✓].*")

In [47]:
print(permission_prover._child.before)

                               
let test (x:subject) (y:subject) bool =
    let x_allowed = (evaluate perms_x x = ALLOW) in
    let y_allowed = (evaluate perms_y y = ALLOW) in
    
    let common_domain = (x.domainId = y.domainId) in
    let common_topic = (x.topic = y.topic) in
    let common_partition = (x.partition = y.partition) in
    let common_dataTag = (x.dataTag = y.dataTag) in
    let complementary_action = (x.action <> y.action) in
    let qos_match = common_domain 
        && common_topic 
        && common_partition 
        && common_dataTag 
        && complementary_action in
    
    let coincide_time = (x.time = y.time) in
    
    x_allowed && y_allowed && coincide_time && qos_match

instance test
val perms_y : grant list =
  [{subject_name = "CN=listener";
    validity = {low = 1382770800; high = 1698385530};
    rules =
     [{domains = [DomainId 0]; qualifier = ALLOW; publish = [];
       subscribe =
        [{topics = ["rt/clock"; "rt/parameter_events"; "rt/chatte

In [None]:
# permission_prover._child.expect([
#     '- : string = "proved"',
#     '- : string = "refuted"',
#     '- : string = "unknown"'])

In [18]:
# print(permission_prover._child.before)

In [64]:
del permission_prover

