Intro to minikanren

TOC:
*   implement microkanren
*   write minikanren api through macros

In [1]:
import itertools as it

A $\mu$Kanren program proceeds through the application of a **goal** to a **state**. Goals are often understood by analogy to predicates. Whereas the application of a predicate to an element of its domain can be either true or false, a goal pursued in a given state can either succeed or fail.

A **state** is a pair of a substitution (represented as a dictionary) and a non-negative integer representing a fresh- variable counter.

LogicVariables are ints, until I want to distinguish them from ints. In that case, I check is_logic_var

In [2]:
empty_state = ({}, 0)

class LogicVariable(int):
    """
    An alias to an int
    """
    def __repr__(self):
        return "LV(%s)" % super(LogicVariable, self).__repr__()
    
def is_logic_var(x):
    return isinstance(x, LogicVariable)

The **walk** operator searches for a variable's value in the substitution; the *ext_s* operator extends the substitution with a new binding. When a non-variable term is walked, the term itself is returned. When extending the substitution, the 􏰀first argument is always a variable, and the second is an arbitrary term.

In [3]:
def walk(var, substitutions):
    while is_logic_var(var) and var in substitutions:
        var = substitutions[var]
    return var

def ext_s(var, value, substitutions):
    s = dict(substitutions)
    s[var] = value
    return s

In [4]:
LV = LogicVariable
assert walk(LV(0), {}) == LV(0)
assert walk(LV(0), {LV(0): 1}) == 1
assert walk(LV(0), {LV(0): LV(1), LV(1): "something else"}) == "something else"

Explain unify

In [5]:
import collections

def is_sequence(o):
    """
    I want it to fail on strings
    """
    return hasattr(o, '__iter__')

def unify(x, y, substitutions):
    if substitutions is None:
        return None

    x = walk(x, substitutions)
    y = walk(y, substitutions)

    if is_logic_var(x) and is_logic_var(y) and x == y:
        return substitutions
    elif is_logic_var(x):
        return ext_s(x, y, substitutions)
    elif is_logic_var(y):
        return ext_s(y, x, substitutions)
    elif is_sequence(x) and is_sequence(y):
        for a, b in it.izip_longest(x, y):
            substitutions = unify(a, b, substitutions)
        return substitutions
    elif x == y:
        return substitutions
    return None
        
        

In [6]:
assert unify(LV(0), LV(1), {}) == {0: LV(1)} #  0 is 1
assert unify(LV(0), LV(1), {LV(0): "x", LV(1): "y"}) is None  #  they do not unify
assert unify(LV(0), 5, {}) == {0: 5}
assert unify((LV(0), 1, LV(1)), (1, 2, 3), {}) is None # the second elements differ
assert unify((LV(0), 1, LV(1)), (1, 1, 3), {}) == {LV(0): 1, LV(1): 3} # assign the first and the last one



## Goals builder


1.   `equiv` builds a goal which succeeds if its two arguments unify, i.e. it yields the substitutions which make its arguments unify

In [7]:
def equiv(x, y):
    def _goal(state):
        substitutions, counter = state
        unified = unify(x, y, substitutions)
        if unified is not None:
            yield (unified, counter)
    return _goal

In [8]:
assert list(equiv(LV(0), 5)(empty_state)) == [({LV(0): 5}, 0)]
assert list(equiv(LV(0), LV(1))(empty_state)) == [({LV(0): LV(1)}, 0)]

The call/fresh goal constructor takes a unary function f whose body is a goal, and itself returns a goal. This returned goal, when provided a state s/c, binds the formal parameter of f to a new logic variable (built with the variable construc- tor operator var), and passes a state, with the substitution it originally received and a newly incremented fresh-variable counter, c, to the goal that is the body of f.

In [9]:
def call_fresh(goal):
    def _new_goal(state):
        substitutions, counter = state
        return goal(LogicVariable(counter))((substitutions, counter+1))
    return _new_goal


In [10]:
from functools import partial as p

is_five = p(equiv, 5)
assert list(call_fresh(is_five)(empty_state)) == [({LV(0): 5}, 1)]

Minikanren utility

fresh

In [47]:
import inspect

def f(x):
    def _g(y):
        return equiv(x, y)
    return call_fresh(_g)

list(call_fresh(f)(empty_state))


[({LV(0): LV(1)}, 2)]

In [52]:
def g(x, y):
    return disj(equiv(x, 2), equiv(y, 3), equiv(x, y))

list(call_fresh(lambda x: call_fresh(lambda y: g(x, y)))(empty_state))

[({LV(0): 2}, 2), ({LV(1): 3}, 2), ({LV(0): LV(1)}, 2)]

In [53]:
def fresh_helper(curried_goal, nargs):
    if nargs == 1:
        return call_fresh(curried_goal)
    else:
        return call_fresh(lambda x: fresh_helper(p(curried_goal, x), nargs-1))
    
def fresh(variadic_goal):
    positional_args = inspect.getargspec(variadic_goal).args
    assert positional_args > 0
    return fresh_helper(variadic_goal, len(positional_args))
    
    
            
print list(fresh(f)(empty_state))
print list(fresh(g)(empty_state))

[({LV(0): LV(1)}, 2)]
[({LV(0): 2}, 2), ({LV(1): 3}, 2), ({LV(0): LV(1)}, 2)]


In [11]:
def disj(*goals):
    """
    it returns a goal which succeeds if either of the arguments succeeds
    """
    def _new_goal(state):
        return it.chain.from_iterable([g(state) for g in goals])
    return _new_goal

def conj(*goals):
    """
    It returns a goal which succeeds if all the goals passed as argument succeed
    for that state.
    """
    def _new_goal(state):
        stream = goals[0](state)
        for g in goals[1:]:
            stream = it.chain.from_iterable([g(s) for s in stream])
        return stream
    return _new_goal

In [80]:
def run(goal):
    for s, c in fresh(goal)(empty_state):
        yield s

Examples

In [86]:
def conso(a, b, p):
    return equiv((a, b), p)

list(run(
        lambda x, y: conso(1, x, y)
        ))

[('x', 0), ('y', 1)]


[{LV(1): (1, LV(0))}]

In [125]:
def reify_s(names, o):
    if is_logic_var(o):
        return "_%s" % names.get(o, "dummy")
    elif is_sequence(o):
        return map(p(reify_s, names), o)
    else:
        return o
        
def reify(substitutions, names):
    reified = {reify_s(names, k): reify_s(names, v)
               for k,v in substitutions.iteritems()}
    # I add the free vars to the substitutions
    free_vars = [c for c in names if c not in substitutions]
    reified.update((names[c], '_') for c in free_vars)
    return reified

def run(goal):
    args = inspect.getargspec(goal).args
    names = {n: name for (n, name) in enumerate(args)}
    for s, c in fresh(goal)(empty_state):
        yield reify(s, names)

In [133]:
list(run(
        lambda x, y, z: conso(x, y, (1, (1,2,3,4,5)))
        ))

[{'_x': 1, '_y': [1, 2, 3, 4, 5], 'z': '_'}]

In [135]:
def firsto(a, b):
    return fresh(lambda y: conso(b, a, y))

list(run(
        lambda x: firsto((2,(3,4,5)), x)
        ))

[{'_dummy': ['_x', [2, [3, 4, 5]]], 'x': '_'}]