### Sample code to do "query as inference" on a target pandas table.

Original first-order classes from https://github.com/jacopotagliabue/tarski-2.0; see the original blog post for more references and explanations.
    

In [1]:
from lark import Lark
from random import choice
import pandas as pd
from datetime import date

In [2]:
class FolSemantics:

    def __init__(self, grammar):
        self.grammar = grammar
        return

    def create_appropriate_assignment(self, domain, free_variables):
        return {free_v: choice(domain) for free_v in free_variables}

    def create_modified_assignment(self, original_assignment, modification):
        new_assignment = original_assignment.copy()
        for var, denotation in modification.items():
            new_assignment[var] = denotation

        return new_assignment

    def check_atomic_formula(self, model, assignment, predicate, args):
        # get args denotation through assignment if variable, through model if constant
        current_denotation = []
        for v in args:
            if self.grammar.is_variable(v):
                current_denotation.append(assignment[v])
            elif v.isdigit():
                current_denotation.append(int(v))
            else: 
                current_denotation.append(model['constants'][v])
        # check if denotation is in the predicate extension
        return current_denotation in model['extensions'].get(str(predicate), [])

    def check_formula_satisfaction_by_assignment(self, formula, model, assignment):
        # it's an atom
        if formula.data in ['unary', 'binary']:
            # get arguments for the predicate as an array to match the extensions in the model specs
            args = formula.children[1:]
            return self.check_atomic_formula(model, assignment, formula.children[0], args)
        # it's an AND
        elif formula.data == 'and':
            return self.check_formula_satisfaction_by_assignment(formula.children[0], model,assignment) \
                   and self.check_formula_satisfaction_by_assignment(formula.children[1], model, assignment)
        # it's an OR
        elif formula.data == 'or':
            return self.check_formula_satisfaction_by_assignment(formula.children[0], model, assignment) \
                   or self.check_formula_satisfaction_by_assignment(formula.children[1], model, assignment)
        # it's a negation
        elif formula.data == 'neg':
            return not (self.check_formula_satisfaction_by_assignment(formula.children[0], model, assignment))
        # it's an ex quantifier
        elif formula.data == 'q_ex':
            # first child is variable bounded
            bounded_variable = formula.children[0]
            return any([self.check_formula_satisfaction_by_assignment(formula.children[1],
                                                                      model,
                                                                      self.create_modified_assignment(
                                                                          assignment,
                                                                          {bounded_variable: d}))
                        for d in model['domain']
                        ])
        # it's a universal quantifier
        elif formula.data == 'q_un':
            # first child is variable bounded
            bounded_variable = formula.children[0]
            return all([self.check_formula_satisfaction_by_assignment(formula.children[1],
                                                                      model,
                                                                      self.create_modified_assignment(
                                                                          assignment,
                                                                          {bounded_variable: d}))
                        for d in model['domain']
                        ])
        else:
            raise Exception("Operation not defined!")

    def check_formula_satisfaction_in_model(self, expression, model, verbose=False):
        # get the first children as in the Lark grammar the first node is "start"
        formula = self.grammar.parse_expression_with_grammar(expression).children[0]
        free_vars = self.grammar.get_free_variables_from_formula_recursively(formula, [], [])
        assignment = self.create_appropriate_assignment(model['domain'], free_vars)
        if verbose:
            print(formula.pretty(), free_vars, assignment)

        return self.check_formula_satisfaction_by_assignment(formula, model, assignment)


In [3]:
class FolGrammar:

    FOL_NAMES = ['a', 'b', 'c', 'd', 'e', 'j', 'm', 'r'] + [str(_) for _ in range(0, 10)]
    FOL_VARIABLES = ['x', 'y', 'w', 'z']
    FOL_UNARY = ['I', 'P', 'Q', 'R', 'S', 'T', 'U']
    FOL_BINARY = ['A', 'B', 'C', 'D', 'E', 'F']

    LARK_FOL_GRAMMAR_SCHEMA = '''
        start: formula
          
        formula: UNARY (VARIABLE|NAME)     -> unary
            | BINARY (VARIABLE|NAME) (VARIABLE|NAME) -> binary
            | "(" formula "&" formula ")"  -> and
            | "(" formula "^" formula ")"  -> or
            | "-" formula                  -> neg
            | "$" VARIABLE "(" formula ")" -> q_ex
            | "@" VARIABLE "(" formula ")" -> q_un
         
        UNARY: ({})   
        BINARY: ({})
        VARIABLE: ({})
        NAME: ({})
        WHITESPACE: (" " | "\\n")+
        %ignore WHITESPACE
        '''

    def __init__(self):
        names = '|'.join(['"{}"'.format(n) for n in self.FOL_NAMES])
        variables = '|'.join(['"{}"'.format(v) for v in self.FOL_VARIABLES])
        unary = '|'.join(['"{}"'.format(u) for u in self.FOL_UNARY])
        binary = '|'.join(['"{}"'.format(b) for b in self.FOL_BINARY])
        self.LARK_FOL_GRAMMAR = self.LARK_FOL_GRAMMAR_SCHEMA.format(unary, binary, variables, names)
        # load parser
        self.fol_parser = Lark(self.LARK_FOL_GRAMMAR)

        return

    def parse_expression_with_grammar(self, expression):
        try:
            return self.fol_parser.parse(expression)
        except Exception as ex:
            print(ex)
            print('Expression "{}" cannot be parsed'.format(expression))

        return None

    def get_free_variables_from_formula_recursively(self, formula, free_variables, bound_variables):
        # if it is a quantifier node, mark the variable as bound and go on
        if formula.data in ['q_ex', 'q_un']:
            # first child is variable bounded
            bound_variables.append(formula.children[0])
            self.get_free_variables_from_formula_recursively(formula.children[1], free_variables, bound_variables)
        # if it is a terminal, check that variables are not bound/already included in the list
        elif formula.data in ['unary', 'binary']:
            args = formula.children[1:]
            for a in args:
                if self.is_variable(a) and a not in bound_variables and a not in free_variables:
                    free_variables.append(str(a))
        # if anything else, just continue the examination in all children path
        else:
            for f in formula.children:
                self.get_free_variables_from_formula_recursively(f, free_variables, bound_variables)

        return free_variables

    def get_lark_grammar(self):
        return self.LARK_FOL_GRAMMAR

    def is_variable(self, x):
        return x in self.FOL_VARIABLES

    def is_name(self, x):
        return x in self.FOL_NAMES

In [4]:
def build_domain_from_crm_data_frame(df, grammar):
    unique_customers = list(df.customer.unique())
    payments = list(df[df.payment.notnull()].T.to_dict().values())
    # just use the first letter as the constant for each customer name 
    constants = {n[0] : (_ + 1)  for _, n in enumerate(unique_customers)}
    # enumerate the integers in the constant mapping as the domain + 1 element for each payment
    domain = [_ + 1 for _ in range(0, len(unique_customers) + len(payments))]
    # build predicate extension
    extensions = {
        'I': [[d] for d in list(constants.values())], # predicate for isACustomer, all items with a constant are customers
        'P': [[d] for d in domain[len(constants):]],  # predicate for isPayment, all items in domain after customers are a payment
        'A': [[constants[p['customer'][0]], d] for p, d in zip(payments, domain[len(constants):])],  # binary predicate for MadePayment, relating customers to payments
        'B': [[d, int(p['payment'])] for d,p in zip(domain[len(constants):], payments)],  # binary predicate for HasTotal, relating payment to sums
    }
    
    fol_world = {
        'domain': domain,
        'constants': constants,
        'extensions': extensions
    }
    
    return fol_world

In [5]:
d = [
    {'customer': 'bob', 'payment': 5.0},
    {'customer': 'dana', 'payment': 9.0},
    {'customer': 'ada', 'payment': None},
    {'customer': 'colin', 'payment': None},
]
df = pd.DataFrame(d)
df

Unnamed: 0,customer,payment
0,bob,5.0
1,dana,9.0
2,ada,
3,colin,


In [6]:
grammar = FolGrammar()
semantics = FolSemantics(grammar)
fol_domain = build_domain_from_crm_data_frame(df, grammar)
fol_domain

{'domain': [1, 2, 3, 4, 5, 6],
 'constants': {'b': 1, 'd': 2, 'a': 3, 'c': 4},
 'extensions': {'I': [[1], [2], [3], [4]],
  'P': [[5], [6]],
  'A': [[1, 5], [2, 6]],
  'B': [[5, 5], [6, 9]]}}

In [7]:
TEST_EXPRESSIONS = [ 
    ('@x($y((Py&(-Ix^Axy))))', False), #did all customers pay?
    ('$y(((Py&Ib)&Aby))', True), # did bob pay?
    ('$y(((Py&Ia)&Aay))', False), # did ada pay?
    ('$y(((Py&Aby)&By5))', True), # Did Bob pay five dollars?
    ('$y(((Py&Aby)&By7))', False), # Did Bob pay seven dollars?
]

In [8]:
for ex in TEST_EXPRESSIONS:
    assert semantics.check_formula_satisfaction_in_model(ex[0], fol_domain) == ex[1]