# Declarative Programming using Python

A declarative programming approach involves defining structures and describing transformations on those structures in an equational way. Using a declarative approach to implement a solution can have a number of benefits:

* the implementation is likely to be concise;
* it may be easier to quickly explore trade-offs between optimality and performance; and
* it may be easier to to store and later restart partial solutions.

In this article we will present a number of declarative Python solutions to a well-known optimization problem and illustrate how some of the above benefits may be realized. 

## Dependencies

In [1]:
# Presentation dependencies.
%matplotlib inline
%config InlineBackend.figure_format='retina'
import matplotlib as mp
import matplotlib.pyplot as plt
from importlib import reload
from IPython.display import Image
from IPython.display import display_html
from IPython.display import display
from IPython.display import Math
from IPython.display import Latex
from IPython.display import HTML

# Content dependencies (also reproduced inline).
from random import randint
from itertools import permutations
from functools import reduce
from tqdm import tqdm

## Review of Python Features

We first review a number of relevant Python language constructs, data structures, and libraries.

Python's comprehension notation makes it possible to define a tuple, set, or list. The `min` operator can be used to find the minimum value, and the `key` parameter can be used to obtain the minimum value with respect to some metric function.

In [2]:
[x for x in [1,2,3]] # A list of integers.

[1, 2, 3]

In [3]:
min(['abc','de','f'], key=len) # The shortest string.

'f'

In [112]:
from random import randint

def pick():
    return randint(0,2)

print(list(iter(pick, 0)))

[2, 1]


In [16]:
def count():
    num = 0
    while True:
        yield num
        num += 1

def tree(x):
    return (x, (tree(x+1) for _ in range(2)))

def search(t0):
    (x, (t1, t2)) = t0
    if x == 3:
        return [x]
    else:
        return search(t1) + search(t2)

print(search(tree(0)))

[1]
[3, 3, 3, 3, 3, 3, 3, 3]


## Shortest Common Superstring

**Definition:** Suppose we are given a collection of $n$ strings (repeats are permitted):

$$S = [s_1, ..., s_n]$$

For two strings $s$ and $r$, let $s \preceq r$ denote that $s$ is a substring of $t$. The *shortest common superstring* of $S$ is the shortest string $t$ such that for all $s_i$ in $S$, $s_i \preceq r$.

We build some collections of randomly generated strings that we will use for our examples.

In [113]:
from random import choice, randint

strings100 = ["".join([choice('abcd') for _ in range(randint(2,4))]) for _ in range(100)]
strings10 = ["".join([choice('abcd') for _ in range(randint(2,5))]) for _ in range(10)]
strings9 = ["".join([choice('abcd') for _ in range(randint(2,5))]) for _ in range(9)]
strings4 = ["".join([choice('abcd') for _ in range(randint(2,5))]) for _ in range(4)]
strings4

['cb', 'dcda', 'dbbab', 'dcbda']

We define two useful helper functions: a function to generate all possible ways of splitting a string and a function to merge two strings such that the overlapping portion is not duplicated (if it exists).

In [5]:
def splits(s):
    return list(reversed([(s[:i], s[i:]) for i in range(len(s)+1)]))

def merge(s, t):
    return min([s+tr for (tl,tr) in splits(t) if s.endswith(tl)], key=len)

(splits('abcdefg'), merge('abcd', 'cde'))

([('abcdefg', ''),
  ('abcdef', 'g'),
  ('abcde', 'fg'),
  ('abcd', 'efg'),
  ('abc', 'defg'),
  ('ab', 'cdefg'),
  ('a', 'bcdefg'),
  ('', 'abcdefg')],
 'abcde')

### Solution Approach: Permutations

One approach to solving this problem is to generate all possible permutations of the set of strings. This leads to an extremely concise description of the solution. Unfortunately, it is not very efficient for large inputs.

In [7]:
from itertools import permutations
from functools import reduce
from tqdm import tqdm

# The shortest solution.
min((reduce(merge, p) for p in tqdm(set(permutations(strings9)))), key=len)

100%|██████████| 362880/362880 [00:17<00:00, 20321.78it/s]


'aabaddabcbbcccadb'

This solution also lends itself well to parallelization. In the variant below, we use multiple threads to perform the merging portion of the process. Note that we still generate all the permutations in advance using a single thread.

In [114]:
from mr4mp import mr4mp
from timeit import default_timer

def mapper(p):
    return reduce(merge, p)

def reducer(r1, r2):
    return min([r1, r2], key=len)

ps = list(set(permutations(strings9)))

start = default_timer()

pool = mr4mp.pool(4)
result = pool.mapreduce(mapper, reducer, ps)

print("Finished in " + str(default_timer()-start) + "s using " + str(len(pool)) + " process(es).")
result

Finished in 15.247711539035663s using 4 process(es).


'baabaadbdddcaddaacbbc'

Another advantage of this solution approach is that it is possible to sample the space of permutations uniformly at random. This can be viewed as a limited greedy state space search that is parametrized by an upper bound on the desired running time. As before, this solution lends itself well to parallelization.

In [9]:
import numpy as np
import math

# The number of attemps as a fraction of the
# number of all permutations.
attempts = int(math.factorial(len(strings9)) * 0.01)

# The subset of permutations chosen at random.
ps = set(tuple(np.random.permutation(strings9)) for _ in tqdm(range(attempts)))

# The shortest solution.
min((reduce(merge, p) for p in ps), key=len)

100%|██████████| 3628/3628 [00:00<00:00, 35025.10it/s]


'aabbcccadbabaddabcbb'

The above approach is more straightforward to parallelize, because now we can also parallize the step that generates permutations without dealing with the difficulty of ensuring that each thread generates a disjoint subset of the set of all permutations.

In [10]:
def mapper(thread_id):
    attempts = int(math.factorial(len(strings9)) * 0.1)
    ps = set(tuple(np.random.permutation(strings9)) for _ in tqdm(range(attempts)))
    return min((reduce(merge, p) for p in ps), key=len)

def reducer(r1, r2):
    return min([r1, r2], key=len)

start = default_timer()

pool = mr4mp.pool(4)
result = pool.mapreduce(mapper, reducer, range(4))

print("Finished in " + str(default_timer()-start) + "s using " + str(len(pool)) + " process(es).")
result

100%|██████████| 36288/36288 [00:02<00:00, 13948.57it/s]
100%|██████████| 36288/36288 [00:02<00:00, 13880.32it/s]
100%|██████████| 36288/36288 [00:02<00:00, 13437.75it/s]
100%|██████████| 36288/36288 [00:02<00:00, 13550.13it/s]


Finished in 6.605312054976821s using 4 process(es).


'aabaddabcbbcccadb'

### Solution Approach: Recursion

Another way that we can implement an algorithm that finds the optimal solution is using recursion.

In [11]:
# Take out the ith entry and merge it with the input r.
def pick(i, ss, r):
    return (ss[:i]+ss[i+1:], merge(r, ss[i]))

# Generate a nested list (essentially a tree).
def search(ss, r = ''):
    if ss == []:
        return [r]
    else:
        options = [pick(i, ss, r) for i in range(len(ss))]
        results = [search(*o) for o in options]
        return results

# All possible solutions in a nested tree-like data structure.
search(strings4)

[[[[['cdaabdcabdbad']], [['cdaabdbadcabd']]],
  [[['cdaacabdbad']], [['cdaacabdbadbd']]],
  [[['cdaabdbadbdcabd']], [['cdaabdbadcabd']]]],
 [[[['bdcdaacabdbad']], [['bdcdaabdbadcabd']]],
  [[['bdcabdcdaabdbad']], [['bdcabdbadcdaa']]],
  [[['bdbadcdaacabd']], [['bdbadcabdcdaa']]]],
 [[[['cabdcdaabdbad']], [['cabdcdaabdbadbd']]],
  [[['cabdcdaabdbad']], [['cabdbadcdaa']]],
  [[['cabdbadcdaabd']], [['cabdbadbdcdaa']]]],
 [[[['bdbadcdaabdcabd']], [['bdbadcdaacabd']]],
  [[['bdbadbdcdaacabd']], [['bdbadbdcabdcdaa']]],
  [[['bdbadcabdcdaabd']], [['bdbadcabdcdaa']]]]]

In [12]:
# Find the shortest entry using a tree traversal.
def search(ss, r = ''):
    if ss == []:
        return r
    else:
        options = [pick(i, ss, r) for i in range(len(ss))]
        results = [search(*o) for o in options]
        return min(results, key=len)

# The shortest solution.
search(strings9)

'aabaddabcbbcccadb'

One disadvantage of the above is that it is somewhat opaque. While the algorithm is running, there is no feedback on progress. We can address this, however, by building a *generator* using recursion (rather than returning the result). To see how this can be done, first consider how we would use the same recursive approach to build a flattened list of all possible solutions.

In [13]:
# Generate a flattened list.
def search(ss, r = ''):
    if ss == []:
        return [r]
    else:
        options = [pick(i, ss, r) for i in range(len(ss))]
        results = [r for o in options for r in search(*o)]
        return results

# All the solutions.
search(strings4)

['cdaabdcabdbad',
 'cdaabdbadcabd',
 'cdaacabdbad',
 'cdaacabdbadbd',
 'cdaabdbadbdcabd',
 'cdaabdbadcabd',
 'bdcdaacabdbad',
 'bdcdaabdbadcabd',
 'bdcabdcdaabdbad',
 'bdcabdbadcdaa',
 'bdbadcdaacabd',
 'bdbadcabdcdaa',
 'cabdcdaabdbad',
 'cabdcdaabdbadbd',
 'cabdcdaabdbad',
 'cabdbadcdaa',
 'cabdbadcdaabd',
 'cabdbadbdcdaa',
 'bdbadcdaabdcabd',
 'bdbadcdaacabd',
 'bdbadbdcdaacabd',
 'bdbadbdcabdcdaa',
 'bdbadcabdcdaabd',
 'bdbadcabdcdaa']

We can then convert the above by replacing the list comprehensions with generator comprehensions. Using this solution, it is possile to iterate over all the solutions built using the recursive tree-like traversal (and, for example, to pick the shortest one).

In [14]:
# Build a generator.
def search(ss, r = ''):
    if ss == []:
        return (r for _ in range(1))
    else:
        options = (pick(i, ss, r) for i in range(len(ss)))
        results = (r for o in options for r in search(*o))
        return results

# A generator for solutions.
rs = search(strings4)

# Iterate over the first 10 entries.
for (_, r) in zip(range(10), rs):
    print(r)
    
# Find the shortest solution in the first 10 entries.
from itertools import islice
min(islice(rs, 10), key=len)

cdaabdcabdbad
cdaabdbadcabd
cdaacabdbad
cdaacabdbadbd
cdaabdbadbdcabd
cdaabdbadcabd
bdcdaacabdbad
bdcdaabdbadcabd
bdcabdcdaabdbad
bdcabdbadcdaa


'cabdbadcdaa'

We can add randomization to the above solution by taking a permutation of the index list of strings at each level of the recursion.

In [15]:
# Build a generator.
def search(ss, r = ''):
    if ss == []:
        return (r for _ in range(1))
    else:
        options = (pick(i, ss, r) for i in np.random.permutation(range(len(ss))))
        results = (r for o in options for r in search(*o))
        return results

# A generator for solutions.
list(search(strings4))

['cabdcdaabdbad',
 'cabdcdaabdbadbd',
 'cabdbadbdcdaa',
 'cabdbadcdaabd',
 'cabdbadcdaa',
 'cabdcdaabdbad',
 'bdbadcdaacabd',
 'bdbadcabdcdaa',
 'bdcdaabdbadcabd',
 'bdcdaacabdbad',
 'bdcabdbadcdaa',
 'bdcabdcdaabdbad',
 'bdbadcdaacabd',
 'bdbadcdaabdcabd',
 'bdbadcabdcdaa',
 'bdbadcabdcdaabd',
 'bdbadbdcdaacabd',
 'bdbadbdcabdcdaa',
 'cdaabdbadbdcabd',
 'cdaabdbadcabd',
 'cdaabdcabdbad',
 'cdaabdbadcabd',
 'cdaacabdbad',
 'cdaacabdbadbd']

If we want to turn a generator into multiple generators, we can do so by wrapping it in generators that drop some of the values.

In [16]:
g = range(10000000)

def mapper(g):
    return max([r//2 for r in g])

def reducer(r1, r2):
    return max([r1, r2])

start = default_timer()

pool = mr4mp.pool(1)
result = pool.mapreduce(mapper, reducer, [g])

print("Finished in " + str(default_timer()-start) + "s using " + str(len(pool)) + " process(es).")
result

Finished in 1.7929296179208905s using 1 process(es).


4999999

In [115]:
from itertools import islice

g = range(10000000)

g1 = islice(g, 0, None, 4)
g2 = islice(g, 1, None, 4)
g3 = islice(g, 2, None, 4)
g4 = islice(g, 3, None, 4)

gs = [islice(g, start, None, 4) for start in range(0,4)]

def mapper(g):
    return max([r//2 for r in g])

def reducer(r1, r2):
    return max([r1, r2])

start = default_timer()

pool = mr4mp.pool(4)
result = pool.mapreduce(mapper, reducer, gs)

print("Finished in " + str(default_timer()-start) + "s using " + str(len(pool)) + " process(es).")
result

Finished in 2.7158426720416173s using 4 process(es).


4999999

### Parallelization

In [25]:
def word(): # Generate a random 7-letter "word".
    return ''.join(choice(ascii_lowercase) for _ in range(7))

def index(id): # Build an index mapping some random words to an identifier.
    return {w:{id} for w in {word() for _ in range(100)}}

def merge(i, j): # Merge two index dictionaries i and j.
    return {k:(i.get(k,set()) | j.get(k,set())) for k in i.keys() | j.keys()}

from mr4mp import mr4mp
from random import choice
from string import ascii_lowercase
from timeit import default_timer

start = default_timer()
pool = mr4mp.pool(4)
pool.mapreduce(index, merge, range(100))
print("Finished in " + str(default_timer()-start) + "s using " + str(len(pool)) + " process(es).")

Finished in 0.7503956459695473s using 4 process(es).


## Tic-tac-toe

In [73]:
from parts import parts
from itertools import count

class Board():
    def __init__(self, cells = ['_']*9):
        self.cells = cells
    
    def __repr__(self):
        return "\n"+"\n".join(tuple(" ".join(row) for row in parts(self.cells,3)))+"\n\n"

    def move(self, p, j):
        return Board([p if k==j else c for (c,k) in zip(self.cells, count())])

    def moves(self, p):
        return [self.move(p,k) for (c,k) in zip(self.cells, count()) if c == '_']

    def win(self, p):
        rs = list(map(tuple, parts(self.cells, 3)))
        return (p,p,p) in rs or\
               (p,p,p) in map(tuple, zip(*rs)) or\
               (p,p,p) in [(rs[0][0], rs[1][1], rs[2][2]), rs[0][2], rs[1][1], rs[2][0]]

    def end(self):
        return any([self.win('X'), self.win('O')])

    def __len__(self):
        return self.cells.count('_')
    
    def __eq__(self, other):
        return tuple(self.cells) == tuple(other.cells)
    
b = Board()
b = b.move('X', 1).move('O', 2)
b.moves('X')

[
 X X O
 _ _ _
 _ _ _
 , 
 _ X O
 X _ _
 _ _ _
 , 
 _ X O
 _ X _
 _ _ _
 , 
 _ X O
 _ _ X
 _ _ _
 , 
 _ X O
 _ _ _
 X _ _
 , 
 _ X O
 _ _ _
 _ X _
 , 
 _ X O
 _ _ _
 _ _ X
 ]

In [85]:
from itertools import islice

b = Board()

def search(b, p = 'X'):
    if b.end():
        return [b]
    else:
        return [b for m in b.moves(p) for b in search(m, 'O' if p=='X' else 'X')]

search(b)

In [93]:
b = Board()

def gen(b, p = 'X'):
    if b.end():
        return (b for _ in range(1))
    else:
        return (b for m in b.moves(p) for b in gen(m, 'O' if p=='X' else 'X'))

len(list(gen(b)))

182988

In [96]:
from itertools import islice

turns = (p for _ in count() for p in ['X', 'O'])
print(next(turns))
print(next(turns))
print(next(turns))

X
O
X


In [111]:
from itertools import islice

turns = (p for _ in count() for p in ['X', 'O'])
print(list(islice(turns, 4)))

turn_gs = ((p for _ in count() for p in ['X', 'O']) for _ in count())

b = Board()

def gen(b, turn_gs):
    ts = next(turn_gs)
    p = next(ts)
    turn_gs = (islice(ts, 1, None) for ts in turn_gs)
    if len(b.moves(p)) == 7:
        return [b]
    else:
        ms = b.moves(p)
        return (mb for m in ms for mb in gen(m, turn_gs))

list(gen(b, turn_gs))

['X', 'O', 'X', 'O']


[
 X O _
 _ _ _
 _ _ _
 , 
 X _ O
 _ _ _
 _ _ _
 , 
 X _ _
 O _ _
 _ _ _
 , 
 X _ _
 _ O _
 _ _ _
 , 
 X _ _
 _ _ O
 _ _ _
 , 
 X _ _
 _ _ _
 O _ _
 , 
 X _ _
 _ _ _
 _ O _
 , 
 X _ _
 _ _ _
 _ _ O
 , 
 O X _
 _ _ _
 _ _ _
 , 
 _ X O
 _ _ _
 _ _ _
 , 
 _ X _
 O _ _
 _ _ _
 , 
 _ X _
 _ O _
 _ _ _
 , 
 _ X _
 _ _ O
 _ _ _
 , 
 _ X _
 _ _ _
 O _ _
 , 
 _ X _
 _ _ _
 _ O _
 , 
 _ X _
 _ _ _
 _ _ O
 , 
 O _ X
 _ _ _
 _ _ _
 , 
 _ O X
 _ _ _
 _ _ _
 , 
 _ _ X
 O _ _
 _ _ _
 , 
 _ _ X
 _ O _
 _ _ _
 , 
 _ _ X
 _ _ O
 _ _ _
 , 
 _ _ X
 _ _ _
 O _ _
 , 
 _ _ X
 _ _ _
 _ O _
 , 
 _ _ X
 _ _ _
 _ _ O
 , 
 O _ _
 X _ _
 _ _ _
 , 
 _ O _
 X _ _
 _ _ _
 , 
 _ _ O
 X _ _
 _ _ _
 , 
 _ _ _
 X O _
 _ _ _
 , 
 _ _ _
 X _ O
 _ _ _
 , 
 _ _ _
 X _ _
 O _ _
 , 
 _ _ _
 X _ _
 _ O _
 , 
 _ _ _
 X _ _
 _ _ O
 , 
 O _ _
 _ X _
 _ _ _
 , 
 _ O _
 _ X _
 _ _ _
 , 
 _ _ O
 _ X _
 _ _ _
 , 
 _ _ _
 O X _
 _ _ _
 , 
 _ _ _
 _ X O
 _ _ _
 , 
 _ _ _
 _ X _
 O _ _
 , 
 _ _ _
 _ X _
 _ O _
 , 
 _ _ _
 _ X _
 _ _ O
 ,

This is the end of the article.