In [1]:
import numpy as np
import matplotlib.pylab as plt

In [2]:
def pad_zeros_to(x, new_length):
    """Append new_length - x.shape[0] zeros to x's end via copy."""
    output = np.zeros((new_length,))
    output[:x.shape[0]] = x
    return output

def native_convolution(x, h):
    if len(x) < len(h):
        x, h = h, x
        
    M = len(x)
    N = len(h)
    
    x = pad_zeros_to(x, M+2*(N-1))
    x = np.roll(x, N-1)
    
    h = np.flip(h)
    
    y = np.zeros(M+N-1)
    for i in range(len(y)):
        y[i] = x[i:i+N].dot(h)
        
    return y

def next_power_of_2(n):
    return 1 << (int(np.log2(n - 1)) + 1)

def fft_convolution(x, h, K=None):
    Nx = x.shape[0]
    Nh = h.shape[0]
    Ny = Nx + Nh - 1
    
    if K is None:
        K = next_power_of_2(Ny)
        
    X = np.fft.fft(pad_zeros_to(x, K))
    H = np.fft.fft(pad_zeros_to(h, K))
    
    Y = X*H
    
    y = np.real(np.fft.ifft(Y))
    
    return y[:Ny]

def overlap_add_convlution(x, h, B, K=None):
    M = len(x)
    N = len(h)
    
    num_input_blocks = np.ceil(M/B).astype(int)
        
    output_size = M + N - 1
    y = np.zeros((output_size,))
    
    for n in range(num_input_blocks):
        xb = x[n*B:(n+1)*B]
        
        u = fft_convolution(xb, h, K)
        
        y[n*B:n*B + len(u)] += u
    
    return y

In [3]:
def overlap_save_convolution(x, h, B, K=None):
    """Overlap-Save convolution of x and h with block length B"""

    M = len(x)
    N = len(h)

    if K is None:
        K = next_power_of_2(B + N - 1)
        # K = max(B, next_power_of_2(N))
        
    # Calculate the number of input blocks
    num_input_blocks = np.ceil(M / B).astype(int) \
                     + np.ceil(K / B).astype(int) - 1

    # Pad x to an integer multiple of B
    xp = pad_zeros_to(x, num_input_blocks*B)

    output_size = num_input_blocks * B + N - 1
    y = np.zeros((output_size,))
    
    # Input buffer
    xw = np.zeros((K,))

    # Convolve all blocks
    for n in range(num_input_blocks):
        # Extract the n-th input block
        xb = xp[n*B:n*B+B]

        # Sliding window of the input
        xw = np.roll(xw, -B)
        xw[-B:] = xb

        # Fast convolution
        u = fft_convolution(xw, h, K)

        # Save the valid output samples
        y[n*B:n*B+B] = u[-B:]

    return y[:M+N-1]

In [30]:
def realtime_uniformly_partitioned_convolution(x, h, B):
    M = len(x)
    N = len(h)
    P = np.ceil(N/B).astype('int')
    num_input_block = M//B
    
    print('num_input_block:',num_input_block)
    
    output = np.zeros(M)
    
    # precalculate sub filters fft
    sub_filters_fft = np.zeros((P, 2*B), dtype=np.complex64)
    for i in range(P):
        sub_filter = h[i*B:i*B + B]
        sub_filter_pad = pad_zeros_to(sub_filter, 2*B)
        sub_filters_fft[i,:] = np.fft.fft(sub_filter_pad)
        
    
    # input blocks
    freq_delay_line = np.zeros_like(sub_filters_fft)
    xw = np.zeros(2*B)
    for i in range(num_input_block):
        block_x = x[i*B:i*B + B]
        xw = np.roll(xw, -B)
        xw[-B:] = block_x
        
        xw_fft = np.fft.fft(xw)
        
        # inser fft into delay line
        freq_delay_line = np.roll(freq_delay_line, 1, axis=0)
        freq_delay_line[0,:] = xw_fft
        print(freq_delay_line)

        # ifft
        s = (freq_delay_line*sub_filters_fft).sum(axis=0)
        # print(s)
        ifft = np.fft.ifft(s).real[-B:]
        # print(o)
        output[i*B:i*B + B] = ifft
        
    return output

In [31]:
x = np.arange(12)
h = np.arange(12)
B = 4

In [32]:
realtime_uniformly_partitioned_convolution(x, h, B)

num_input_block: 3
[[ 6.       +0.j          1.4142135+4.8284273j  -2.       +2.j
  -1.4142135+0.82842714j -2.       +0.j         -1.4142135-0.82842714j
  -2.       -2.j          1.4142135-4.8284273j ]
 [ 0.       +0.j          0.       +0.j          0.       +0.j
   0.       +0.j          0.       +0.j          0.       +0.j
   0.       +0.j          0.       +0.j        ]
 [ 0.       +0.j          0.       +0.j          0.       +0.j
   0.       +0.j          0.       +0.j          0.       +0.j
   0.       +0.j          0.       +0.j        ]]
[[28.       +0.j         -4.       +9.656855j   -4.       +4.j
  -4.       +1.6568543j  -4.       +0.j         -4.       -1.6568543j
  -4.       -4.j         -4.       -9.656855j  ]
 [ 6.       +0.j          1.4142135+4.8284273j  -2.       +2.j
  -1.4142135+0.82842714j -2.       +0.j         -1.4142135-0.82842714j
  -2.       -2.j          1.4142135-4.8284273j ]
 [ 0.       +0.j          0.       +0.j          0.       +0.j
   0.       +0.j   

array([-7.15255737e-07, -4.87370253e-07,  9.99999881e-01,  4.00000039e+00,
        1.00000007e+01,  2.00000007e+01,  3.50000000e+01,  5.59999996e+01,
        8.39999995e+01,  1.19999999e+02,  1.64999997e+02,  2.19999998e+02])

In [13]:
native_convolution(x, h)

array([  0.,   0.,   1.,   4.,  10.,  20.,  35.,  56.,  84., 120., 165.,
       220., 286., 340., 381., 408., 420., 416., 395., 356., 298., 220.,
       121.])

In [238]:
a = np.roll(a, 1, axis=0)
a[0,:] = [1,1,1]
a

array([[1, 1, 1],
       [0, 0, 0],
       [0, 0, 0]])

In [239]:
a = np.roll(a, 1, axis=0)
a[0,:] = [2,2,2]
a

array([[2, 2, 2],
       [1, 1, 1],
       [0, 0, 0]])

In [106]:
native_convolution(x, h)

array([  0.,   1.,   3.,   6.,  10.,  15.,  21.,  28.,  36.,  45.,  55.,
        66.,  78.,  90., 102., 114., 110., 105.,  99.,  92.,  84.,  75.,
        65.,  54.,  42.,  29.,  15.])

In [107]:
h0 = np.array([1,1,1,1])
h1 = np.array([1,1,1,1])
h2 = np.array([1,1,1,1])
h0_pad = pad_zeros_to(h0, 2*B)
h1_pad = pad_zeros_to(h1, 2*B)
h2_pad = pad_zeros_to(h2, 2*B)

In [108]:
h0_fft = np.fft.fft(h0_pad)
h1_fft = np.fft.fft(h1_pad)
h2_fft = np.fft.fft(h2_pad)

In [109]:
xw = np.zeros(2*B)
block_i = 0
block_x = x[block_i*B:block_i*B + B]
xw = np.roll(xw, -B)
xw[-B:] = block_x
xw_fft_0 = np.fft.fft(xw)

s = xw_fft_0*h0_fft + 0*h1_fft + 0*h2_fft
output = np.fft.ifft(s).real
print(output[-B:])

[0. 1. 3. 6.]


In [110]:
block_i = 1
block_x = x[block_i*B:block_i*B + B]
xw = np.roll(xw, -B)
xw[-B:] = block_x
xw_fft_1 = np.fft.fft(xw)

s = xw_fft_1*h0_fft + xw_fft_0*h1_fft + 0*h2_fft
output = np.fft.ifft(s).real
printVector(output[-B:])

[10. 15. 21. 28.]


In [111]:
block_i = 2
block_x = x[block_i*B:block_i*B + B]
xw = np.roll(xw, -B)
xw[-B:] = block_x
xw_fft_2 = np.fft.fft(xw)

s = xw_fft_2*h0_fft + xw_fft_1*h1_fft + xw_fft_0*h2_fft
output = np.fft.ifft(s).real
output[-B:]

array([36., 45., 55., 66.])