<a href="https://colab.research.google.com/github/petopello/PSA/blob/main/hamming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Partial Sum Accumulation algorithm: Linear-time neighbor summation in hamming space

[Substack post](https://sudapollismo.substack.com/p/partial-sum-accumulation-algorithm)

[Github](https://github.com/petopello/PSA)

In [None]:
#SPACE STRUCTURE
import numpy as np

def get_coords(index, q):
    coords = []
    for dim in reversed(q):
        coords.append(index % dim)
        index //= dim
    return coords[::-1]

q=[2,3]
#q=[3,2]
space=np.arange(np.prod(q))

print(f'{str("coords"):<15}{str("space"):<10}')
for i, v in enumerate(space):
    print(f'{str(get_coords(i, q)):<15}{str(v):<10}')

coords         space     
[0, 0]         0         
[0, 1]         1         
[0, 2]         2         
[1, 0]         3         
[1, 1]         4         
[1, 2]         5         


In [None]:
#NAIVE ALGORITHM
import numpy as np

def get_coords(index, q):
    coords = []
    for dim in reversed(q):
        coords.append(index % dim)
        index //= dim
    return coords[::-1]

def SHN_naive_element(index, space, r, base, min=0):
    acum=space[index]
    if r>1:
        for i in range(min, len(base)):
            diff=apply_diff_index(index, i, base)
            diff=[SHN_naive_element(ind, space, r-1, base, i+1) for ind in diff]
            acum+=np.sum(diff)
    else:
        for i in range(min, len(base)):
            diff=apply_diff_index(index, i, base)
            acum+=np.sum([space[ind] for ind in diff])
    return acum

def apply_diff_index(index, d, base):
    return [index+b for b in base[d]]

def SHN_element(index, space, q, r):
    coords=get_coords(index, q)
    base=[[n*e for n in list(range(q[i]-1, 0, -1)) + list(range(-1, -q[i], -1))] for i, e in enumerate(np.cumprod([1]+q[:0:-1])[::-1])]
    base=[e[coords[i]:coords[i]+q[i]-1] for i, e in enumerate(base)]
    return SHN_naive_element(index, space, r, base)

r=1
q=[2,3]
#q=[3,2]
space=np.arange(np.prod(q))

print(f'{str("coords"):<15}{str("space"):<10}{str("naive"):<10}')
for i, v in enumerate(space):
    print(f'{str(get_coords(i, q)):<15}{str(v):<10}{str(SHN_element(i, space, q, r)):<10}')

coords         space     naive     
[0, 0]         0         6         
[0, 1]         1         7         
[0, 2]         2         8         
[1, 0]         3         12        
[1, 1]         4         13        
[1, 2]         5         14        


In [None]:
#BASIC TRANSFORMATION FOR MATRIX
import numpy as np

def apply_difference(space, g, j):
    aux=space.reshape(-1, g)
    aux=np.roll(aux, j, axis=1)
    return aux.flatten()

q=[2,3]
space=np.arange(np.prod(q))
diff=apply_difference(space, 3, 1)
#diff=apply_difference(space, 2, 1)

print(f'{str("coords"):<15}{str("space"):<10}{str("diff"):<10}')
for i, v in enumerate(space):
    print(f'{str(get_coords(i, q)):<15}{str(v):<10}{str(diff[i]):<10}')

coords         space     diff      
[0, 0]         0         2         
[0, 1]         1         0         
[0, 2]         2         1         
[1, 0]         3         5         
[1, 1]         4         3         
[1, 2]         5         4         


In [None]:
#PSA vs NAIVE MATRIX
import numpy as np

def apply_diff(space, d, base):
    aux=space.reshape(-1, base[d][0])
    return [np.roll(aux, b, axis=1) for b in base[d][1:]]

def SHN_naive_matrix(space, r, base, min=0, cont=0):
    acum=space.copy()
    if r>1:
        for i in range(min, len(base)):
            diff=apply_diff(space, i, base)
            diff=[SHN_naive_matrix(array.flatten(), r-1, base, i+1) for array in diff]
            s=[d[1] for d in diff]
            cont+=len(diff)+np.sum(s)
            diff=[d[0] for d in diff]
            acum+=np.sum(diff, axis=0)
    else:
        for i in range(min, len(base)):
            diff=apply_diff(space, i, base)
            cont+=len(diff)
            acum+=np.sum(diff, axis=0).flatten()
    return [acum, cont]

def SHN_matrix(space, q, r):
    if r == 0: return [space, 0]
    base = [[n * e for n in range(q[i], 0, -1)] for i, e in enumerate(np.cumprod([1]+q[:0:-1])[::-1])][::-1]
    return SHN_naive_matrix(space, r, base)

def SHN_PSA(space, q, r):
    if len(space) != np.prod(q): raise ValueError(f"'space' and 'q' are not compatible: len(space)={len(space)} but np.prod(q)={np.prod(q)}.")
    if len(q) < r: raise ValueError(f"'r' cannot be greater than len(q): r={r} but len(q)={len(q)}.")
    base = [[n * e for n in range(q[i], 0, -1)] for i, e in enumerate(np.cumprod([1]+q[:0:-1])[::-1])][::-1]
    d=len(q)
    sum=space.copy()
    cont=0
    acum = np.tile(space, (d, 1))
    for n in range(r):
        for i in range(n, d):
            diff=apply_diff(acum[i-n], i, base)
            cont+=len(diff)-1
            acum[i-n]=np.sum(diff, axis=0).flatten()
            if i > n:
                acum[i-n]+=acum[i-n-1]
                cont+=1
        sum+=acum[d-n-1]
        cont+=1
    return [sum,cont]

r=4
q=[3,3,3,3,3,3,3]
#q=[2,3,4,5,6,7,8]
#q=[8,7,6,5,4,3,2]
space=np.random.rand(np.prod(q))

psa=SHN_PSA(space, q, r)
print("PSA total sums: "+str(psa[1]))
naive=SHN_matrix(space, q, r)
print("NAIVE total sums: "+str(naive[1]))

print("abs: "+str(np.sum(np.abs(psa[0] - naive[0]))))

PSA total sums: 44
NAIVE total sums: 938
abs: 9.669065548223443e-11


In [None]:
#BONUS1 (BOXES ALGORITHM): SHORTCUT FOR HAMMING NEIGHBORS SUMMATION APPLICABLE IN SOME CASES WHEN IT IS KNOWN HOW 'space' WAS CONSTRUCTED
import numpy as np

def create_space(prob):
    space = [1.0]
    for i in range(len(prob)):
        space = [x*p for x in space for p in prob[i]]
    return space

def SHN_boxes(prob, r):
    space = [[1.0]+[0.0]*r]
    for i in range(len(prob)):
        space = [
            [s+(o-s)*p for o, s in zip(x, [0.0]+x[:-1])]
            for x in space for p in prob[i]
        ]
    return np.sum(space, axis=1)

def SHN_PSA(space, q, r):
    if len(space) != np.prod(q): raise ValueError(f"'space' and 'q' are not compatible: len(space)={len(space)} but np.prod(q)={np.prod(q)}.")
    if len(q) < r: raise ValueError(f"'r' cannot be greater than len(q): r={r} but len(q)={len(q)}.")
    base = [[n * e for n in range(q[i], 0, -1)] for i, e in enumerate(np.cumprod([1]+q[:0:-1])[::-1])][::-1]
    d=len(q)
    sum=space.copy()
    acum = np.tile(space, (d, 1))
    for n in range(r):
        for i in range(n, d):
            diff=apply_diff(acum[i-n], i, base)
            acum[i-n]=np.sum(diff, axis=0).flatten()
            if i > n:
                acum[i-n]+=acum[i-n-1]
        sum+=acum[d-n-1]
    return sum
def apply_diff(space, d, base):
    aux=space.reshape(-1, base[d][0])
    return [np.roll(aux, b, axis=1) for b in base[d][1:]]

PROB = [
    [0.1, 0.2, 0.7],
    [0.4, 0.6],
    [0.2, 0.3, 0.4, 0.1]
]
r=2
q=[len(p) for p in PROB]
space=create_space(PROB)

boxes=SHN_boxes(PROB, r)
psa=SHN_PSA(space, q, r)

print("abs: "+str(np.sum(np.abs(psa - boxes))))

abs: 1.4432899320127035e-15


In [None]:
#BONUS2 (DISTANCES ALGORITHM): CALCULATE HAMMING DISTANCES OF ALL PAIR OF POINTS IN A COMPLETE SPACE WITH FRACTAL APPROACH
import numpy as np

#CASE SIMETRIC SPACE q=3
def distances_q3(d):
    A=np.array([[0]])
    for i in range(d):
        A = np.block([
        [A, A+1, A+1],
        [A+1, A, A+1],
        [A+1, A+1, A]
        ])
    return A

#CASE GENERALIZED
def distances(q):
    A=np.array([[0]])
    for i in range(len(q)-1, -1, -1):
        A = np.block([
            [A if r == c else A+1 for r in range(q[i])]
            for c in range(q[i])
            ])
    return A

d=2
dis=distances_q3(d)
print(dis)

q=[2,3]
#q=[3,2]
#q=[2,3,4]
dis=distances(q)
print(dis)

[[0 1 1 1 2 2 1 2 2]
 [1 0 1 2 1 2 2 1 2]
 [1 1 0 2 2 1 2 2 1]
 [1 2 2 0 1 1 1 2 2]
 [2 1 2 1 0 1 2 1 2]
 [2 2 1 1 1 0 2 2 1]
 [1 2 2 1 2 2 0 1 1]
 [2 1 2 2 1 2 1 0 1]
 [2 2 1 2 2 1 1 1 0]]
[[0 1 1 1 2 2]
 [1 0 1 2 1 2]
 [1 1 0 2 2 1]
 [1 2 2 0 1 1]
 [2 1 2 1 0 1]
 [2 2 1 1 1 0]]


In [None]:
#FULL CLEAN CODE
import numpy as np
# SHN_PSA (Sum Hamming Neighbours with Partial Sums Accumulation)
# space (list[number]): complete hamming space (all possible combination of the elements in each dimension exists) flattened in 1D array
# q (list[number]): number of possible elements in each dimension
# r (int): maximum hamming distance for neighbour summation
# return (list[number]): hamming space with the sums of values of all neighbours within distance<=r flattened in 1D array
def SHN_PSA(space, q, r):
    if len(space) != np.prod(q): raise ValueError(f"'space' and 'q' are not compatible: len(space)={len(space)} but np.prod(q)={np.prod(q)}.")
    if len(q) < r: raise ValueError(f"'r' cannot be greater than len(q): r={r} but len(q)={len(q)}.")
    base = [[n * e for n in range(q[i], 0, -1)] for i, e in enumerate(np.cumprod([1]+q[:0:-1])[::-1])][::-1]
    d=len(q)
    sum=space.copy()
    acum = np.tile(space, (d, 1))
    for n in range(r):
        for i in range(n, d):
            diff=apply_diff(acum[i-n], i, base)
            acum[i-n]=np.sum(diff, axis=0).flatten()
            if i > n:
                acum[i-n]+=acum[i-n-1]
        sum+=acum[d-n-1]
    return sum

def SHN_matrix(space, q, r):
    if r == 0: return [space, 0]
    base = [[n * e for n in range(q[i], 0, -1)] for i, e in enumerate(np.cumprod([1]+q[:0:-1])[::-1])][::-1]
    return SHN_naive_matrix(space, r, base)

def SHN_element(index, space, q, r):
    coords=get_coords(index, q)
    base=[[n*e for n in list(range(q[i]-1, 0, -1)) + list(range(-1, -q[i], -1))] for i, e in enumerate(np.cumprod([1]+q[:0:-1])[::-1])]
    base=[e[coords[i]:coords[i]+q[i]-1] for i, e in enumerate(base)]
    return SHN_naive_element(index, space, r, base)

def SHN_naive_matrix(space, r, base, min=0):
    acum=space.copy()
    if r>1:
        for i in range(min, len(base)):
            diff=apply_diff(space, i, base)
            diff=[SHN_naive_matrix(array.flatten(), r-1, base, i+1) for array in diff]
            acum+=np.sum(diff, axis=0)
    else:
        for i in range(min, len(base)):
            diff=apply_diff(space, i, base)
            acum+=np.sum(diff, axis=0).flatten()
    return acum

def SHN_naive_element(index, space, r, base, min=0):
    acum=space[index]
    if r>1:
        for i in range(min, len(base)):
            diff=apply_diff_index(index, i, base)
            diff=[SHN_naive_element(ind, space, r-1, base, i+1) for ind in diff]
            acum+=np.sum(diff)
    else:
        for i in range(min, len(base)):
            diff=apply_diff_index(index, i, base)
            acum+=np.sum([space[ind] for ind in diff])
    return acum

def apply_diff(space, d, base):
    aux=space.reshape(-1, base[d][0])
    return [np.roll(aux, b, axis=1) for b in base[d][1:]]

def apply_diff_index(index, d, base):
    return [index+b for b in base[d]]

def get_coords(index, q):
    coords = []
    for dim in reversed(q):
        coords.append(index % dim)
        index //= dim
    return coords[::-1]

def create_space(prob):
    space = [1.0]
    for i in range(len(prob)):
        space = [x * p for x in space for p in prob[i]]
    return space

def SHN_boxes(prob, r):
    space = [[1.0] + [0.0] * r]
    for i in range(len(prob)):
        space = [
            [s + (o - s) * p for o, s in zip(x, [0.0] + x[:-1])]
            for x in space for p in prob[i]
        ]
    return np.sum(space, axis=1)

def distances(q):
    A=np.array([[0]])
    for i in range(len(q)-1, -1, -1):
        A = np.block([
            [A if r == c else A+1 for r in range(q[i])]
            for c in range(q[i])
            ])
    return A