Skip to content

Commit

Permalink
Add a condp implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
brandonwillard committed Jan 24, 2023
1 parent ddf3607 commit c66811a
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 0 deletions.
167 changes: 167 additions & 0 deletions kanren/condp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from itertools import tee
from typing import Mapping, Optional, Sequence, Tuple, Union

from cons import car, cdr
from toolz import interleave
from unification import reify
from unification.utils import transitive_get as walk

from .core import lall, lconj_seq, ldisj_seq


def collect(s: Mapping, f_lists: Optional[Sequence] = None):
"""A function that produces suggestions (for `condp`) based on the values of
partially reified terms.
This goal takes a list of suggestion function, variable pairs lists and
evaluates them at their current, partially reified variable values
(i.e. ``f(walk(x, s))`` for pair ``(f, x)``). Each evaluated function should
return ``None``, a string label in a corresponding `condp` clause, or the
string ``"use-maybe"``.
Each list of suggestion functions is evaluated in order, their output is
concatenated, and, if the output contains a ``"use-maybe"`` string, the
next list of suggestion functions is evaluated.
Parameters
==========
s
miniKanren state.
f_lists
A collection of function + variable pair collections (e.g.
``[[(f0, x0), ...], ..., [(f, x), ...]]``).
"""
if isinstance(f_lists, Sequence):
# TODO: Would be cool if this was lazily evaluated, no?
# Seems like this whole thing would have to become a generator
# function, though.
ulos = ()
# ((f0, x0), ...), ((f, x), ...)
for f_list in f_lists:
f, args = car(f_list), cdr(f_list)
_ulos = f(*(walk(a, s) for a in args))
ulos += _ulos
if "use-maybe" not in _ulos:
return ulos
return ulos
else:
return ()


def condp(global_sugs: Tuple, branches: Union[Sequence, Mapping]):
"""A goal generator that produces a `conde`-like relation driven by
suggestions potentially derived from partial miniKanren state values.
From [1]_.
Parameters
==========
global_sugs
A tuple containing tuples of suggestion functions and their
logic variable arguments. Each suggestion function is evaluated
using the reified versions of its corresponding logic variables (i.e.
their "projected" values). Each suggestion function is expected to
return a tuple of branch labels corresponding to the keys in
`branches`.
branches
Sequence or mapping of string labels--for each branch in a conde-like
goal--to a tuple of goals pairs.
.. [1] Boskin, Benjamin Strahan, Weixi Ma, David Thrane Christiansen, and Daniel
P. Friedman, "A Surprisingly Competitive Conditional Operator."
"""
if isinstance(branches, Mapping):
branches_: Sequence = tuple(branches.items())
else:
branches_ = branches

def _condp(s):
global_los = collect(s, global_sugs)
yield from ldisj_seq(lconj_seq(g) for k, g in branches_ if k in global_los)(s)

return _condp


def collectseq(branch_s: Mapping, f_lists: Optional[Sequence] = None):
"""A version of `collect` that takes a `dict` of branches-to-states.
Parameters
==========
branch_s
Branch labels to miniKanren state/replacements dictionaries.
f_lists
A collection of function + variable pair collections (e.g.
``[[(f0, x0), ...], ..., [(f, x), ...]]``).
"""
if isinstance(f_lists, Sequence):
ulos = ()
for f_list in f_lists:
f, args = f_list
_ulos = f({k: reify(args, s) for k, s in branch_s.items()})
ulos += _ulos
if "use-maybe" not in _ulos:
return ulos
return ulos
else:
return ()


def condpseq(branches: Union[Sequence[Sequence], Mapping]):
r"""An experimental version of `condp` that passes branch-state-reified
maps to branch-specific suggestion functions.
In other words, each branch-specific suggestion function is passed a `dict`
with branch-label keys and the its function arguments are reified against
the state resulting from said branch.
.. note::
Only previously evaluated branches will show up in these `dict`\s, so
branch order will determine the information available to each suggestion
function.
Parameters
==========
branches
Ordered map or a sequence of sequences mapping string labels--for each
branch in a `conde`-like goal--to a tuple starting with a single
suggestion function followed by the branch goals.
"""
if isinstance(branches, Mapping):
branches_: Sequence = tuple(branches.items())
else:
branches_ = branches

def _condpseq(s, __bm=branches_):
__bm, local_items = tee(__bm)

# Provide each branch-specific suggestion function a copy of the state
# after the preceding branch's goals have been evaluated.
def f(items):
los = set()
branch_s = {}
for k, goals_branch_sugs in local_items:
# Branch suggestions can be `None` and all branch
# goals will be added.
branch_sugs = car(goals_branch_sugs)
goals = cdr(goals_branch_sugs)

if branch_sugs:
# We only expect one suggestion function per-branch.
branch_sugs = (branch_sugs,)
los |= set(collectseq(branch_s or {k: s}, branch_sugs))

if branch_sugs is None or k in los:
# TODO: Refactor!
a, b = tee(lall(*goals)(s))
branch_s[k] = next(a)
yield b

branch_s.setdefault(k, None)

yield from interleave(f(local_items))

return _condpseq
115 changes: 115 additions & 0 deletions tests/test_condp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from cons import car, cons
from cons.core import ConsNull, ConsPair
from unification import isvar, var

from kanren.condp import condp, condpseq
from kanren.core import Zzz, conde, eq, run
from kanren.goals import nullo


def test_condp():
"""Test `condp` using the example from [1]_.
.. [1] Boskin, Benjamin Strahan, Weixi Ma, David Thrane Christiansen, and Daniel
P. Friedman, "A Surprisingly Competitive Conditional Operator."
"""

def _ls_keys(ls):
if isvar(ls):
return ("use-maybe",)
elif isinstance(ls, ConsNull):
return ("BASE",)
elif isinstance(ls, ConsPair):
return ("KEEP", "SWAP")
else:
return ()

def _o_keys(o):
if isvar(o):
return ("BASE", "KEEP", "SWAP")
elif isinstance(o, ConsNull):
return ("BASE",)
elif isinstance(o, ConsPair):
if isvar(car(o)) or "novel" == car(o):
return ("KEEP", "SWAP")
else:
return ("KEEP",)
else:
return ()

def swap_somep(ls, o):
a, d, res = var(), var(), var()
res = condp(
((_ls_keys, ls), (_o_keys, o)),
{
"BASE": (nullo(ls), nullo(o)),
"KEEP": (
eq(cons(a, d), ls),
eq(cons(a, res), o),
Zzz(swap_somep, d, res),
),
"SWAP": (
eq(cons(a, d), ls),
eq(cons("novel", res), o),
Zzz(swap_somep, d, res),
),
},
)
return res

def swap_someo(ls, o):
"""The original `conde` version."""
a, d, res = var(), var(), var()
return conde(
[nullo(ls), nullo(o)],
[eq(cons(a, d), ls), eq(cons(a, res), o), Zzz(swap_someo, d, res)],
[eq(cons(a, d), ls), eq(cons("novel", res), o), Zzz(swap_someo, d, res)],
)

q, r = var("q"), var("r")

condp_res = run(0, [q, r], swap_somep(q, ["novel", r]))

assert len(condp_res) == 4
assert condp_res[0][0][0] == "novel"
assert isvar(condp_res[0][0][1])
assert isvar(condp_res[0][1])

assert isvar(condp_res[1][0][0])
assert isvar(condp_res[1][0][1])
assert isvar(condp_res[1][1])

assert condp_res[2][0][0] == "novel"
assert isvar(condp_res[2][0][1])
assert condp_res[2][1] == "novel"

assert isvar(condp_res[3][0][0])
assert isvar(condp_res[3][0][1])
assert condp_res[3][1] == "novel"


def test_condpseq():
def base_sug(a_branches):
if a_branches["BRANCH1"] == 1:
return ("BRANCH3",)
else:
return (
"BRANCH2",
"BRANCH3",
)

def test_rel(a):
return condpseq(
{
"BRANCH1": (None, eq(a, 1)),
"BRANCH2": ((base_sug, a), eq(a, 2)),
"BRANCH3": (None, eq(a, 3)),
}
)

q = var("q")

res = run(0, [q], test_rel(q))

assert res == ([1], [3])

0 comments on commit c66811a

Please sign in to comment.