# Combinators

Combinators:

* are functions that construct functions from other functions.
* provides a powerful mechanism for reusing logic...
  without having to anticpate the future.

In [109]:

from typing import Any, Optional, Union, List, Tuple, Dict, Mapping, Callable, Type, Literal
from numbers import Number
from collections.abc import Collection, Sequence
import re
import sys
import logging
from pprint import pformat
import dis

# logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

# https://stackoverflow.com/a/47024809/1141958
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [110]:
def identity(x: Any) -> Any:
  'Returns the first argument.'
  return x

identity(2)
f = identity
f
f(3)

2

<function __main__.identity(x: Any) -> Any>

3

In [111]:
def plus_three(x):
  return x + 3

g = plus_three  # g is a function
g
g(2)

<function __main__.plus_three(x)>

5

# Second-Order Functions

Second-Order Functions return other functions.

In [112]:
# Functions with zero or more arguments that return anything.
Variadic = Callable[..., Any]

def constantly(x: Any) -> Variadic:
  'Returns a function that always returns a constant value.'
  return lambda *_args, **_kwargs: x

always_7 = constantly(7)
always_7

<function __main__.constantly.<locals>.<lambda>(*_args, **_kwargs)>

In [113]:
always_7()
always_7(11)
always_7(13, 17)

7

7

7

# Container Adapters

In [114]:
# Function with one argument that returns anything.
Unary = Callable[[Any], Any]

# A value `x` that supports `x[i]`:
Indexable = Union[List, Tuple, Dict]

def at(i: Any) -> Unary:
  'Returns a function `f(x)` that returns `x[i]`.'
  return lambda x: x[i]

f = at(2)
f([0, 1, 2, 3])

g = at("a")
g({"a": 1, "b": 2})

2

1

In [115]:
def indexed(x: Indexable) -> Unary:
  'Returns a function `f(i)` that returns `x[i]`.'
  return lambda i: x[i]

f = indexed([0, 1, 2, 3])
f(2)

g = indexed({"a": 1, "b": 2})
g("a")

2

1

# Generators

In [116]:
def counter(i: int = 0):
  'Returns a "generator" function that returns `i`, `i + 1`, `i + 2`, ... .'
  i -= 1
  def g():
    nonlocal i
    return (i := i + 1)
  return g

c = counter(2)
c
c()
c()
c()

<function __main__.counter.<locals>.g()>

2

3

4

# Stateful Combinators

In [117]:
def with_index(f: Callable, i: int = 0):
  c = counter(i)
  return lambda *args, **kwargs: f(c(), *args, **kwargs)

def conjoin(a, b):
  return (a, b)

dict(map(with_index(conjoin), ["a", "b", "c", "d"]))

{0: 'a', 1: 'b', 2: 'c', 3: 'd'}

# Predicates

In [118]:

# Functions with zero or more arguments that return a boolean.
Predicate = Callable[..., bool]

def is_string(x: Any) -> bool:
  'Returns true if `x` is a string.'
  return isinstance(x, str)

is_string("hello")
is_string(3)

True

False

In [119]:

# Functions with zero or more arguments that return a boolean.
Predicate = Callable[..., bool]

def is_string(x: Any) -> bool:
  'Returns true if `x` is a string.'
  return isinstance(x, str)

is_string("hello")
is_string(3)

True

False

# Predicate Combinators

In [120]:
# Functions that take a Predicate and return a new Predicate.
PredicateCombinator = Callable[[Predicate], Predicate]

def not_(f: Predicate) -> Predicate:
  'Returns a function that logically negates the result of the given function.'
  return lambda *args, **kwargs: not f(*args, **kwargs)

f = not_(is_string)
f("hello")
f(3)

False

True

# Mapping functions over sequences

In [121]:
def map(f: Unary, xs: Sequence) -> Sequence:
  'Returns a sequence of `f(x)` for each element `x` in `xs`.'
  return [f(x) for x in xs]

items = [1, "string", False, True, None]
map(identity, items)
map(always_7, items)
map(is_string, items)
map(not_(is_string), items)
map(plus_three, [3, 5, 7, 11])

[1, 'string', False, True, None]

[7, 7, 7, 7, 7]

[False, True, False, False, False]

[True, False, True, True, True]

[6, 8, 10, 14]

# Filtering Sequences with Predicates

In [122]:
def filter(f: Unary, xs: Sequence) -> Sequence:
  'Returns a sequence of the elements of `xs` for which `f` returns true.'
  return [x for x in xs if f(x)]

items = [1, "string", False, True, None]
filter(is_string, items)
filter(not_(is_string), items)

['string']

[1, False, True, None]

# Reducing Sequences with Binary Functions

In [123]:
# Functions with two arguments that return anything.
Binary = Callable[[Any, Any], Any]

def reduce(f: Binary, init: Any, xs: Sequence) -> Sequence:
  'Returns the result of `init = f(x, init)` for each element `x` in `xs`.'
  for x in xs:
    init = f(init, x)
  return init

def add(x, y):
  return x + y

reduce(add, 2, [3, 5, 7])
a_list_of_strings = ["A", "List", 'Of', 'Strings']
reduce(add, "Here Is ", a_list_of_strings)

17

'Here Is AListOfStrings'

In [124]:
def conjoin(x, y):
  return (x, y)

items = [3, "a", 5, "b", 7, "c", 11, True]
reduce(conjoin, 2, items)

((((((((2, 3), 'a'), 5), 'b'), 7), 'c'), 11), True)

In [125]:
# Concat all strings:
reduce(add, "", filter(is_string, items))

# Sum of all numbers:
def is_number(x: Any) -> bool:
  return not isinstance(x, bool) and isinstance(x, Number)
reduce(add, 0, filter(is_number, items))

# Sum all non-strings:
reduce(add, 0, filter(not_(is_string), items))

'abc'

26

27

# Map as a Reduction

In [126]:
def map_r(f: Unary, xs: Sequence) -> Sequence:
  def acc(seq, x):
    return seq + [f(x)]
  return reduce(acc, [], xs)

map(plus_three, [3, 5, 7, 11])
map_r(plus_three, [3, 5, 7, 11])

[6, 8, 10, 14]

[6, 8, 10, 14]

# Filter as a Reduction

In [127]:
def filter_r(f: Unary, xs: Sequence) -> Sequence:
  def acc(seq, x):
    return seq + [x] if f(x) else seq
  return reduce(acc, [], xs)

items
filter(is_string, items)
filter_r(is_string, items)

[3, 'a', 5, 'b', 7, 'c', 11, True]

['a', 'b', 'c']

['a', 'b', 'c']

# Function Composition

In [128]:
def compose(*callables) -> Variadic:
  'Returns the composition one or more functions, in reverse order.'
  'For example, `compose(g, f)(x, y)` is equivalent to `g(f(x, y))`.'
  f: Callable = callables[-1]
  gs: Sequence[Unary] = tuple(reversed(callables[:-1]))
  def h(*args, **kwargs):
    result = f(*args, **kwargs)
    for g in gs:
      result = g(result)
    return result
  return h

def multiply_by_3(x):
  return x * 3

plus_three(multiply_by_3(5))

f = compose(plus_three, multiply_by_3)
f(5)

18

18

# Interlude

In [129]:
def modulo(modulus: int) -> Callable[[int], int]:
  return lambda x: x % modulus

mod_3 = modulo(3)
map(mod_3, range(10))

[0, 1, 2, 0, 1, 2, 0, 1, 2, 0]

In [130]:
f = compose(indexed(a_list_of_strings), mod_3)
map(f, range(10))

['A', 'List', 'Of', 'A', 'List', 'Of', 'A', 'List', 'Of', 'A']

# Partial Application

In [131]:
def add_and_multiply(a, b, c):
  return (a + b) * c

add_and_multiply(2, 3, 5)

def partial(f: Callable, *args, **kwargs) -> Callable:
  def g(*args2, **kwargs2):
    return f(*(args + args2), **dict(kwargs, **kwargs2))
  return g

f = partial(add_and_multiply, 2)
f(3, 5)

25

25

# Methods are Partially Applied Functions

In [132]:
a = 2
b = 3
a + b
a.__add__(b)    # eqv. to `a + b`
f = a.__add__
f
f(3)
f(5)

5

5

<method-wrapper '__add__' of int object at 0x10273b4a0>

5

7

# Predicators

In [133]:
def re_pred(pat: str, re_func: Callable = re.search) -> Predicate:
  'Returns a predicate that matches a regular expression.'
  rx = re.compile(pat)
  return lambda x: re_func(rx, str(x)) is not None

re_pred("ab")("abc")
re_pred("ab")("nope")

True

False

In [134]:
def default(f: Variadic, g: Variadic) -> Variadic:
  def h(*args, **kwargs):
    if (result := f(*args, **kwargs)) is not None:
      return result
    return g(*args, **kwargs)
  return h

asdf

# Logical Predicate Composers

In [135]:
def and_(f: Variadic, g: Variadic) -> Variadic:
  'Returns a function `h(x, ...)` that returns `f(x, ...) and g(x, ...).'
  return lambda *args, **kwargs: f(*args, **kwargs) and g(*args, **kwargs)

def or_(f: Variadic, g: Variadic) -> Variadic:
  'Returns a function `h(x, ...)` that returns `f(x, ...) or g(x, ...).'
  return lambda *args, **kwargs: f(*args, **kwargs) or g(*args, **kwargs)

def is_string(x):
  return isinstance(x, str)

is_word = and_(is_string, re_pred(r'^[a-z]+$'))
is_word("hello")
is_word("not-a-word")
is_word(2)
is_word(None)

True

False

False

False

In [136]:
# If x is a number add three:
f = and_(is_number, plus_three)
# If x is a string, is it a word?:
g = and_(is_string, is_word)
# One or the other:
h = or_(f, g)
items = ["hello", "not-a-word", 2, None]
map(g, items)

[True, False, False, False]

In [137]:
Procedure = Callable[[], Any]

def if_(f: Variadic, g: Unary, h: Unary) -> Variadic:
  def i(*args, **kwargs):
    if (result := f(*args, **kwargs)):
      return g()
    return h()
  return i

# Operator Predicates

In [138]:

def binary_op(op: str) -> Optional[Callable[[Any, Any], Any]]:
  'Returns a function for a binary operator by name.'
  return BINARY_OPS.get(op)

BINARY_OPS = {
  '==': lambda a, b: a == b,
  '!=': lambda a, b: a != b,
  '<':  lambda a, b: a < b,
  '>':  lambda a, b: a > b,
  '<=': lambda a, b: a <= b,
  '>=': lambda a, b: a >= b,
  'and': lambda a, b: a and b,
  'or': lambda a, b: a or b,
}

binary_op('==') (2, 2)
binary_op('!=') (2, 2)

True

False

In [139]:
# Create a table where `x OP y` is true:
[
  f'{x} {op} {y}'
  for op in ['<', '==', '>']
  for x in (2, 3, 5)
  for y in (2, 3, 5)
  if binary_op(op)(x, y)
]

['2 < 3',
 '2 < 5',
 '3 < 5',
 '2 == 2',
 '3 == 3',
 '5 == 5',
 '3 > 2',
 '5 > 2',
 '5 > 3']

In [140]:
def op_pred(op: str, b: Any) -> Optional[Predicate]:
  'Returns a predicate function given an operator name and a constant.'
  if pred := binary_op(op):
    return lambda a: pred(a, b)
  if op == "not":
    return lambda a: not a
  if op == "~=":
    return re_pred(b)
  if op == "~!":
    return not_(re_pred(b))
  return None

In [141]:
f = op_pred(">", 3)
f(2)
f(5)

False

True

In [142]:
g = op_pred("~=", 'ab+c')
g('')
g('ab')
g('abbbcc')

False

False

True

# Sequencing

In [143]:
def progn(*fs: Sequence[Callable]) -> Callable:
  'Returns a function that calls each function in turn and returns the last result.'
  def g(*args, **kwargs):
    result = None
    for f in fs:
      result = f(*args, **kwargs)
    return result
  return g

In [144]:
def prog1(*fs: Sequence[Callable]) -> Callable:
  'Returns a function that calls each function in turn and returns the last result.'
  def g(*args, **kwargs):
    result = fs[0](*args, **kwargs)
    for f in fs[1:]:
      result = f(*args, **kwargs)
    return result
  return g

In [145]:
##############################################
## Extraction
##############################################

def reverse_apply(x: Any) -> Callable:
  return lambda f, *args, **kwargs: f(x, *args, **kwargs)

reverse_apply(1) (plus_three)

4

In [146]:
def demux(*funcs) -> Unary:
  'Return a function `h(x)` that returns `[f(x), g(x), ...].'
  return lambda x: map(reverse_apply(x), funcs)

demux(identity, len, compose(tuple, reversed))("abcd")

['abcd', 4, ('d', 'c', 'b', 'a')]

# Parser Combinators

In [147]:

# Parser input: a sequence of lexemes:
Input = Sequence[Any]

# A parsed value and remaining input:
Parsed = Tuple[Any, Input]

# A parser matches the input sequence and produces a result or nothing:
Parser = Callable[[Input], Optional[Parsed]]


In [148]:

def show_match(p: Parser) -> Variadic:
  def g(input: Input):
    return (p(input) or False, '<=', input)
  return g

In [149]:
first = at(0)
def rest(x: Input) -> Input:
  return x[1:]

def which(p: Predicate) -> Parser:
  'Returns a parser for the next lexeme when `p(lexeme)` is true.'
  def g(input: Input):
    if p(first(input)):
      return (first(input), rest(input))
  return g

g = which(is_string)
g(['a'])
g([2])
g(['a', 'b'])

('a', [])

('a', ['b'])

# Sequence Parsers

In [150]:
ParsedSequence = Tuple[Sequence, Input]
SequenceParser = Callable[[Input], Optional[ParsedSequence]]

def one(p: Parser) -> SequenceParser:
  'Returns a parser for one lexeme.'
  def g(input: Input):
    if input and (result := p(input)):
      parsed, input = result
      return ([parsed], input)
  return g

g = one(which(is_string))
g([])
g(['a'])
g([2])
g(['a', 'b'])


(['a'], [])

(['a'], ['b'])

In [151]:
def zero_or_more(p: Parser) -> SequenceParser:
  'Returns a parser for zero or more lexemes.'
  def g(input: Input):
    acc = []
    while input and (result := p(input)):
      parsed, input = result
      acc.append(parsed)
    return (acc, input)
  return g

g = zero_or_more(which(is_string))
g([])
g(['a'])
g([2])
g(['a', 'b'])
g(['a', 'b', 2])
g(['a', 'b', 3, 5])

([], [])

(['a'], [])

([], [2])

(['a', 'b'], [])

(['a', 'b'], [2])

(['a', 'b'], [3, 5])

In [152]:
def one_or_more(p: Parser) -> SequenceParser:
  'Returns a parser for one or more lexemes as a sequence.'
  p = zero_or_more(p)
  def g(input: Input):
    if (result := p(input)) and len(result[0]) >= 1:
      return result
  return g

In [153]:
g = one_or_more(which(is_string))
g([])
g(['a'])
g([2])
g(['a', 'b'])
g(['a', 'b', 2])
g(['a', 'b', 3, 5])

(['a'], [])

(['a', 'b'], [])

(['a', 'b'], [2])

(['a', 'b'], [3, 5])

In [154]:
def sequence_of(*parsers) -> SequenceParser:
  'Returns a parser for parsers of a sequence.'
  def g(input: Input):
    acc = []
    for p in parsers:
      if result := p(input):
        parsed, input = result
        acc.extend(parsed)
      else:
        return None
    return (acc, input)
  return g

g = sequence_of(one(which(is_string)), one(which(is_string)))
g([])
g(['a'])
g([2])
g(['a', 'b'])
g(['a', 'b', 2])
g(['a', 'b', 3, 5])

(['a', 'b'], [])

(['a', 'b'], [2])

(['a', 'b'], [3, 5])

In [155]:
g = sequence_of(one_or_more(which(is_number)))
g([])
g(['a'])
g([2])
g([2, 3])
g([2, 3, False])

([2], [])

([2, 3], [])

([2, 3], [False])

In [156]:
g = sequence_of(one(which(is_string)), one_or_more(which(is_number)))
g([])
g(['a'])
g([2])
g(['a', 'b'])
g(['a', 2])
g(['a', 2, 3])
g(['a', 2, 'b', 3])
g(['a', 2, 3, False])
g(['a', 2, 3, False, 'more'])

(['a', 2], [])

(['a', 2, 3], [])

(['a', 2], ['b', 3])

(['a', 2, 3], [False])

(['a', 2, 3], [False, 'more'])

In [157]:
#################################################
## Other
#################################################

def projection(key: Any, default: Any = None) -> Callable:
  'Returns a function `f(a)` that returns `a.get(key, default)`.'
  return lambda a: a.get(key, default)


In [158]:
def trace(
    f: Callable,
    name: str,
    log: Unary,
  ) -> Callable:
  def g(*args, **kwargs):
    msg = f"{name}({format_args(args, kwargs)})"
    # log(f"{msg} => ...")
    result = f(*args, **kwargs)
    log(f"{msg} => {result!r}")
    return result
  return g

def format_args(args, kwargs):
  return ', '.join(map(repr, args) + [f'{k}={v!r}' for k, v in kwargs])

def log(msg):
  sys.stderr.write(f'  ## {msg}\n')

f = compose(str, plus_three)
map(f, [2, 3, 5])

g = trace(f, "g", log)
map(g, [2, 3, 5])

map_g = trace(partial(map, g), "map_g", log)
map_g([2, 3, 5])


['5', '6', '8']

  ## g(2) => '5'
  ## g(3) => '6'
  ## g(5) => '8'


['5', '6', '8']

  ## g(2) => '5'
  ## g(3) => '6'
  ## g(5) => '8'
  ## map_g([2, 3, 5]) => ['5', '6', '8']


['5', '6', '8']

# Mapcat (aka Flat-Map)

In [159]:
ConcatableUnary = Callable[[Any], Sequence]

def mapcat(f: ConcatableUnary, xs: Sequence):
  'Concatenate the results of `map(f, xs)`.'
  return reduce(add, [], map(f, xs))

def duplicate(n, x):
  return [x] * n

duplicate_each_3_times = partial(mapcat, partial(duplicate, 3))
duplicate_each_3_times(["a", "b"])
duplicate_each_3_times(range(4, 7))


['a', 'a', 'a', 'b', 'b', 'b']

[4, 4, 4, 5, 5, 5, 6, 6, 6]

# Manipulating Arguments

In [160]:
def reverse_args(f: Callable) -> Callable:
  def g(*args, **kwargs):
    return f(*reversed(args), **kwargs)
  return g

def divide(x, y):
  return x / y

divide(2, 3)
reverse_args(divide)(2, 3)

reduce(reverse_args(add), " reversed ", a_list_of_strings)
reduce(reverse_args(conjoin), 2, [3, 5, 7])

0.6666666666666666

1.5

'StringsOfListA reversed '

(7, (5, (3, 2)))