# Partitioned Convolution

[link](https://dsp-nbsphinx.readthedocs.io/en/nbsphinx-experiment/nonrecursive_filters/segmented_convolution.html)

## Overlap Save Method

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import importlib
import util
from scipy.signal import fftconvolve as conv
from util import db

importlib.reload(util)

In [None]:
fs = 44100
Lx = int(1 * fs)
Lh = int(0.5 * fs)
Ly = Lx + Lh - 1

n_start, n_stop = 24000, 28192
N = n_stop - n_start

x1 = 0.1 * np.random.randn(Lx)
x2 = 0.1 * np.random.randn(Lx)

n1_prepend = 2 * N - n_start % N
n1_append = 2 * N - (Lx + n1_prepend) % N
x1p = util.prepend_append_zeros(x1, n1_prepend, n1_append)

n2_append = np.mod(N - Lx % N, N)
x2p = util.prepend_append_zeros(x2, 0, n2_append)

M1 = int(len(x1p) / N)
M2 = int(len(x2p) / N)

In [None]:
fig, ax = plt.subplots(figsize=(15, 5), ncols=2)
nn1 = np.arange(len(x1p)) - n1_prepend
nn2 = np.arange(len(x2p))

ax[0].plot(nn1, x1p + 1, c='gray')
for m in range(M1):
    idx = m * N + np.arange(N)
    ax[0].plot(-n1_prepend + idx, x1p[idx])
    ax[0].plot(-n1_prepend + m * N, 0, 'k.')
ax[0].plot(n_start, 0, 'yx')
ax[0].plot(n_stop, 0, 'yx')
ax[0].grid()

ax[1].plot(nn2, x2p + 1, c='gray')
for m in range(M2):
    idx = m * N + np.arange(N)
    ax[1].plot(idx, x2p[idx])
ax[1].grid()

In [None]:
Nfft = 2 * N
x1b = np.zeros(2 * N)
x2b = np.zeros(N)
yp = np.zeros(len(x1p) + len(x2p) - 1)
for m in range(M1):
    x1b[N:] = x1p[m * N:(m + 1) * N]
    for k in range(M2):
        x2b = x2p[k * N:(k + 1) * N]
        yb = util.cconv(x1b, x2b, nfft=Nfft)[N:]
        yp[(m + k) * N: (m + k + 1) * N] += yb
    x1b = np.roll(x1b, N)

y0 = conv(x1, x2)
y = yp[n1_prepend:-n1_append - n2_append]
e = y0 - y

fig, ax = plt.subplots(figsize=(12, 5), ncols=2)
ax[0].plot(y0)
ax[0].plot(y - 10)
ax[1].plot(db(e));

Evaluate the convolution for a selected range

In [None]:
Nfft = 2 * N
x1b = np.zeros(2 * N)
x2b = np.zeros(N)
yp = np.zeros(len(x1p) + len(x2p) - 1)

M0 = int((n1_prepend + n_start) / N)

for m in range(M1):
    x1b[N:] = x1p[m * N:(m + 1) * N]
    for k in range(M2):
        if (m + k) == M0:
            x2b = x2p[k * N:(k + 1) * N]
            yb = util.cconv(x1b, x2b, nfft=Nfft)[N:]
            yp[(m + k) * N: (m + k + 1) * N] += yb
    x1b = np.roll(x1b, N)

y0p = conv(x1p, x2p)
e = yp - y0p

fig, ax = plt.subplots(figsize=(12, 5), ncols=2)
ax[0].plot(y0p)
ax[0].plot(yp)
ax[1].plot(db(e));

### Constant Phase Shift

In [None]:
fs = 44100
L = int(1 * 60 * fs)
x = np.random.randn(L)
n_start, n_stop = 844100, 888200

# Fast convolution
phase_angle = -np.pi / 4
filter_order = int(2 * L)
h0 = util.constant_phase_shifter(filter_order, phase_angle)[1]
y0 = util.acausal_filter(x, h0)
y0_selection = y0[n_start:n_stop]

# Overlap save method
y_selection = util.constant_phase_shift_nonrecursive_iir(x, n_start, n_stop, phase_angle)

In [None]:
fig, ax = plt.subplots(figsize=(12, 5), nrows=2)
nn = np.arange(n_start, n_stop)

ax[0].plot(y0, c='lightgray')
ax[0].plot(nn, y_selection, c='red')
ax[1].plot(db(y0_selection - y_selection))

## Overlap Add Method

### Computational complexity

For a fast convolution of two very long signals with the same length $N \gg 1$,
the required FFT length is at least $2N - 1$ and
the numerical complexity is in the order of

$$
\mathcal{O}\left(2N (\log_{2} N + 1)\right)
$$

where the FFT length is set to $2N$.

If the signals are segmented into $M$ blocks in order to
use the overlap-add method, a total of $M^2$ fast convolutions
have to be performed. The length of each block
is $\frac{N}{M}$ and the FFT length should be $\frac{2N}{M}$.
Therefore the overall numerical complexity is in the order of

$$
\mathcal{O}\left(M^2 \cdot \frac{2N}{M} \log_{2} \frac{2N}{M}\right)
= \mathcal{O}(2MN (\log_{2}N - \log_{2}M + 1)).
$$

For $M=1$ where the whole signal is treated as a single block,
this equals to $\mathcal{O}\left(2N (\log_{2} N + 1)\right)$. 

In [None]:
N = 2**20
comp_fftconv = 2 * N * (np.log2(N) + 1)

num_blocks = 2**np.arange(0, 20)

plt.figure()
plt.semilogx(num_blocks, np.log2(comp_fftconv * np.ones_like(num_blocks)), 'r--')
for M in num_blocks:
    P = N / M
    Nfft = 2 * P
    plt.semilogx(M, np.log2(M**2 * Nfft * (np.log2(Nfft))), 'kx')
plt.xlabel('Number of Blocks')
plt.ylabel('Computations $\log_{2}\mathcal{O}$')
plt.grid();

In [None]:
def partconv(x1, x2, nblock, nfft=None):
    '''Block-based FFT convolution using the overlap-add method'''
    if nfft is None:
        nfft = 2 * nblock
    N1 = len(x1)
    N2 = len(x2)
    N = N1 + N2 - 1  # output length
    M1 = N1 // nblock + 1  # number of blocks
    M2 = N2 // nblock + 1
    x1 = np.append(x1, np.zeros(N1 % nblock))  # append with zeros
    x2 = np.append(x2, np.zeros(N2 % nblock))
    y = np.zeros((M1 + M2 - 2) * nblock + nfft)
    for i in range(M1):
        X1 = np.fft.rfft(x1[i * nblock:(i + 1) * nblock], n=nfft)
        for j in range(M2):
            X2 = np.fft.rfft(x2[j * nblock:(j + 1) * nblock], n=nfft)
            idx = (i + j) * nblock + np.arange(nfft)
            y[idx] += np.fft.irfft(X1 * X2)
    return y[:N]

### Convolution of Two Random Sequences

In [None]:
fs = 44100
N1 = 1 * 60 * fs
N2 = 1 * 60 * fs
L = N1 + N2 - 1

x1 = np.random.randn(N1)
x2 = np.random.randn(N2)

In [None]:
# Fast convolution using FFT
y0 = conv(x1, x2)

`scipy.signal.fftconvolve` is quite usable
for signals up to several minutes long.

In [None]:
# Partitioned convolution using the Overlap-Add method
nblock = 2**16  # block length
y = partconv(x1, x2, nblock=nblock)

This requires more computations.

### Check Accuracy

In [None]:
# Mean Square Error in dB
print(db(np.mean((y0 - y)**2)**0.5))