### Optimal filtering with Wiener Filters

In this notebook, we will explore the concept of optimal filtering using Wiener filters in the time domain using some generated/recorded audio signals. 

Firstly let us import several of the packages we will need. If these packages are not available on your machine, you will have to install them first. 

In [1]:
import numpy as np 
import scipy as sp
from scipy import signal
import sounddevice as sd
import soundfile as sf
from matplotlib import pyplot as plt
from ipywidgets import * # interactive plots
import IPython
from IPython.display import Audio, Image
%matplotlib notebook
from IPython.core.display import display, HTML
display(HTML('<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons Licence" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.'))

### Optimal filtering

Consider that we have the following signals:

1. A desired signal, $\mathbf{x}$ = [x(0) x(1) x(2) ... x(N-1)]$^T$
2. An input signal, $\mathbf{\overline{y}}$ = [y(0) y(1) y(2) ... y(N-1)]$^T$

where N is the length of the signals. Our goal is that we want to design some "optimal" filter to filter the input signal such that it is as similar as possible to our desired signal. So how do we do that and what is a Wiener filter?

Well let's firstly start by defining an optimal filter with $P$ coefficients, where $P < N$:

Optimal Filter: $\mathbf{w}$ = [w(0) w(1) w(2) ... w(P)]$^T$

Now we can compute the m$^{th}$ sample of the filtered input signal as follows:
\begin{align}
\hat{x}(m) &= \sum_{k=0}^{P} \mathrm{w}(k) \mathrm{y}(m-k) \\
            &= \mathrm{[w(0) \hspace{0.1cm} w(1) \hspace{0.1cm} w(2)\hspace{0.1cm}  ... \hspace{0.1cm} w(P)]} \begin{bmatrix} y(m) \\ y(m-1) \\ y(m-2) \\ \vdots \\ y(m-(P-1))\\ \end{bmatrix}\\
            & = \mathbf{w}^{T} \mathbf{y}
\end{align}

where we have defined a 'new' input signal, $\mathbf{{y}}$ = [y(m) y(m-1) y(m-2) ... y(m-(P-1))]$^T$ of length $P$. We can in fact recognise this operation as a convolution (see the notebook 04_Impulse Response and Convolution) between the optimal filter and the input signal. This filtered input signal, $\hat{x}$ (pronounced x-hat) is what we refer to as an **estimate** of our input signal.

To demonstrate how the filtering operation works, let's visualise this by using some short, arbitrary signals.


In [16]:
P = 5
N = 10
w = np.array([1, 4, 7, -0.8, 2])
np.random.seed(1)
y_bar = np.random.rand(N)*10

# Have a look at the filter 'w' and full input signal 'y-bar'
fig, axes = plt.subplots(1,2,figsize=(8, 2)) 
fig.subplots_adjust(hspace=0.5)
axes[0].plot(w,'-o')
axes[0].set_xlabel('Samples')
axes[0].set_ylabel('Amplitude')
axes[0].set_title('$\mathbf{w}$')

axes[1].plot(y_bar,'-o')
axes[1].set_xlabel('Samples')
axes[1].set_ylabel('Amplitude')
axes[1].set_title('$\overline{\mathbf{y}}$ (Complete input)')


# # Repeating the creation of these variables just for the purpose of plotting:
w_zp = np.zeros(N+P-1) # filter
y_zp = np.zeros(N+P-1) # input
x_hat = np.zeros(N+P-1) # output

w_zp[N-1:] = w
y_zp[0:N] = np.flip(y_bar)
sample_idx = np.arange(0,N+P-1,1)

#Pre-compute the convolution
for m in sample_idx:
    y_zp_shift = np.roll(y_zp,m)
    y_zp_shift[0:m] = 0
    x_hat[m] = w_zp@y_zp_shift  # compute inner product 
    
    
fig, axes = plt.subplots(2,1,figsize=(8, 6)) 
fig.subplots_adjust(hspace=0.7)
axes[0].plot(w_zp,'-o', label = '$\mathbf{w}$')
axes[0].axvline(x=9,color='black', ls='--')
lineneg, = axes[0].plot(y_zp[0:N], '-o', alpha=0.4)
linepos, = axes[0].plot(np.arange(9,14,1),y_zp[N-1:], '-o', alpha=0.4,label='$\mathbf{y}$')
axes[0].set_xlabel('Indices corresponding to $\mathbf{w}$')
axes[0].set_ylabel('Amplitude')
axes[0].set_xlim([0, N+P-2])
axes[0].set_xticks(sample_idx, sample_idx-9)
axes[0].set_title('Beyond the dotted line is where we multiply among P samples and sum (P = 5) \n The orange part of the signal are the samples of $\overline{\mathbf{y}}$ that are not used in the convolution')
axes[0].legend(loc='upper left')

axes[1].plot([])
axes[1].set_xlabel('m')
axes[1].set_ylabel('Amplitude')
axes[1].set_title('Estimate of desired signal, $\hat{x} = \mathbf{w}^{T}\mathbf{y}$')
axes[1].set_xlim([0, N+P-2])
axes[1].set_ylim([np.min(x_hat), np.max(x_hat)])

line2, = axes[1].plot([],[],'k-o')


def update(m = 0):
    
    y_shifted = np.roll(y_zp,m)
    y_shifted[0:m] = 0
    lineneg.set_ydata(y_shifted[0:N])
    linepos.set_ydata(y_shifted[N-1:])
#     axes[0].set_title('m = ' + str(n))
    
    line2.set_data(sample_idx[0:m+1], x_hat[0:m+1])

print('The slider controls the value of m, move it to see the convolution!')
interact(update, m = (0,N+P-2,1));    

print(x_hat)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

The slider controls the value of m, move it to see the convolution!


interactive(children=(IntSlider(value=0, description='m', max=13), Output()), _dom_classes=('widget-interact',…

[ 4.17022005 23.88412512 58.00566382 50.11443922 16.1467122  42.36247653
 13.41268518 22.24232169 33.02472768 45.80480741 50.28711097 41.45424613
  3.62481561 10.77633468]


### Designing the optimal filter - Wiener Filter

Now that we understand how to 'filter' a signal, let's go back to our original question - how do we design this filter? Well for each sample we can observe the difference or error between the desired, $x(m)$ and its estimate $\hat{x}(m)$, which can be a first indicator as to how well our estimate approximates the desired signal. However, a better option would be to observe such an error in a more statistical sense by what is called the mean-squared error (MSE), which is defined as follows: 

\begin{equation}
MSE = \mathbb{E}\{e^{2}(m)\} = \mathbb{E}\{(x(m) - \hat{x}(m))^{2}\} = \mathbb{E}\{(x(m) - \mathbf{w}^{T} \mathbf{y})^{2}\}
\end{equation}

where $\mathbb{E}$ is the expectation operator. In other words, the MSE is the expected or average squared error between the input and its estimate. We use the squared-error as opposed to just the error because many times we use signals with zero-mean, and hence we are not able to distinguish if a zero error would come from the zero mean of the signals or if the signals are really that similar. Therefore we can say that if the MSE between two signals is small, this corresponds to the two signals being more similar. 

This is precisely the criterion under which the Wiener filter is designed. The Wiener filter is an optimal filter which **minimises** the MSE between the desired signal and its estimate. This MSE is commonly referred to as a *cost function*, and so we can view the process of finding the Wiener filter, i.e. the coefficients in $\mathbf{w}$, as finding the minimum of this cost function, which is the best we can do, and is consequently optimal in this sense. The following optimisation problem can be formulated to find the coefficients in $\mathbf{w}$:

\begin{equation}
\underset{\mathbf{w}}{\text{minimise}} \hspace{0.2cm} \mathbb{E}\{(x(m) - \mathbf{w}^{T} \mathbf{y})^{2}\}
\end{equation}

We can compute the optimal filter by computing the derivative of the cost function and setting it to zero. If we do this (an exercise for you!), then we can show that the Wiener filter is given by:
\begin{equation}
\mathbf{w} = \mathbf{R}^{-1}_{yy} \mathbf{r}_{yx} 
\end{equation}

where $\mathbf{R}_{yy}$ is the autocorrelation matrix of the input signal and $\mathbf{r}_{yx}$ is the cross-correlation vector between the input and the desired signal. (Later on we will elaborate on these equations).
In summary, we have defined a mathematically tractable criterion under which we want to design the filters and found an "optimum" solution to the cost function, i.e. by finding the filter coefficients which resulted in the minimum MSE. 



### Wiener filtering of a distorted, noisy signal in an attempt to retrieve the desired signal.

In the demo that follows, we firstly create a desired signal, which will be a simple sine wave. We then play this sine wave through the loudspeaker of our computer and record it with the microphone. Essentially what we would have done is introduced some linear distortion and noise to our desired signal.

The question is now - *Can we design a Wiener filter that can be applied to the recorded microphone signal so that we can retrieve the desired signal?* 

Let's find out!


First up, let's create our desired signal. We use low sampling frequencies to keep the computations manageable. 

In [32]:
# Create a desired signal

T = 4  # time duration (seconds)
fs = 3000    # Sampling frequency (Hz)
t = np.arange(0, T, 1/fs)
fo = 300
x_init = 0.5*np.sin(2*np.pi*fo*t)  # will truncate later
N_init = len(x_init)
print ('Number of samples = '+str(N_init))
IPython.display.display(Audio(x_init, rate=fs, normalize=False))

fig, axes = plt.subplots(figsize=(4, 2)) 
axes.plot(t,x_init)
axes.set_title('Desired Signal')
axes.set_xlabel('Time (s)')
axes.set_ylabel('Amplitude')

# Uncomment this if you want to see the spectrum
# Signal spectrum
Xf = np.fft.fft(x_init,axis=0)  
Xfss = Xf[0:N_init//2+1]  # Single sided spectrum. The last value will be N/2, since python goes up to N-1 when slicing vectors like this.


#Set up the frequency vector
df = fs/N_init  # the frequency bin spacing
freqs = np.arange(0,fs/2 + df,df) # first value = DC, last value = fs/2

fig, axes = plt.subplots(figsize=(4, 2)) 
axes.plot(freqs,10*np.log10(np.abs(Xfss**2)))
# axes.set_yscale('log')  # toggle this to check out a log y-axis
axes.set_title('FFT of Sinusoid')
axes.set_xlabel('Frequency (Hz)')
axes.set_ylabel('Magnitude-Squared (dB)')

Number of samples = 12000


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Text(0, 0.5, 'Magnitude-Squared (dB)')

#### Play the desired signal through our loudspeaker and record it with the microphone

In [40]:
# Recording the desired signal as it propagates through the channel

# Use autoplay to play generated signal
print('x: ')
IPython.display.display(Audio(x_init.T, rate=fs,autoplay=True,normalize=False)) # play autoplay signal to record

print ('recording x ...')
y_init = sd.rec(T*fs, blocking=True,samplerate=fs, channels=1)
print ('finished recording')



x: 


recording x ...
finished recording


In [41]:
print("Data shape: ", y_init.shape)
print("Recorded Signal:")
IPython.display.display(Audio(y_init.T, rate=fs))

fig, axes = plt.subplots(figsize=(4, 2)) 

axes.plot(t,x_init, label = 'Desired')
axes.plot(t,y_init, label = 'Input')
axes.set_xlabel('Time (s)')
axes.set_ylabel('Amplitude')
axes.legend()

Data shape:  (12000, 1)
Recorded Signal:


<IPython.core.display.Javascript object>

<matplotlib.legend.Legend at 0x7fd731bfab20>

Okay, so we can clearly see the distorted, noisy sinusoid (and maybe hear some subtle differences). Due to the way python does the recording, there are also some artifacts introduced in the very beginning and ending of the recording, so let us just use the section in the middle where things appear more stationary. You can try with and without removing these sections to also observe the performance, but let's redefine the input and output signal as follows by truncating a second from the beginning and a second from the end.


In [42]:
st_time = np.round(1*fs)
end_time = np.round(3*fs)
x = x_init[st_time:end_time]
y = y_init[st_time:end_time]
N = len(y)

# Create new time variable:
t = np.arange(0, N//fs, 1/fs)

fig, axes = plt.subplots(figsize=(4, 2)) 

axes.plot(t,x, label = 'Desired')
axes.plot(t,y, label = 'Input')
axes.set_xlabel('Time (s)')
axes.set_ylabel('Amplitude')
axes.set_title('Truncated version of signals')
axes.legend()




<IPython.core.display.Javascript object>

<matplotlib.legend.Legend at 0x7fd733cdb880>

### Construting autocorrelation matrices and cross-correlation vector to compute the Wiener filter

Okay so now that we have our desired and input signals, there are two steps that we need to perform in order to obtain an estimate of our desired signal:
1. Compute the Wiener filter, $\mathbf{w}$
2. Filter in the input signal with $\mathbf{w}$

Let's start with the first step.

Recall that the optimal Wiener filter is given as follows:
\begin{equation}
\mathbf{w} = \mathbf{R}^{-1}_{yy} \mathbf{r}_{yx},
\end{equation}

where $\mathbf{R}_{yy} = \mathbb{E} \{ \mathbf{y y}^{\mathit{T}} \}$ is the autocorrelation matrix of dimension $(P \times P)$, and $\mathbf{r}_{yx} = \mathbb{E} \{ \mathbf{y}x \}$ is the cross-correlation vector of dimension P. The elements of $\mathbf{R}_{yy}$ are given by the autocorrelation function, $r_{yy}(k) = \mathbb{E} \{y(m) y(m+k) \}$ for the m$^{th}$ sample at lag k.

Hence the previous equation can be expanded as follows:

\begin{align}
\begin{bmatrix} \mathrm{w}(0) \\ \mathrm{w}(1) \\ \vdots \\ \mathrm{w}(P-1)\\ \end{bmatrix}  & =  \begin{bmatrix} r_{yy}(0)  & r_{yy}(1) & \dots & r_{yy}(P-1)  \\ r_{yy}(1)  & r_{yy}(0) & \dots & r_{yy}(P-2) \\  \vdots  & \vdots & \ddots & \vdots \\ r_{yy}(P-1)  & r_{yy}(P-2) & \dots & r_{yy}(0)\\ \end{bmatrix}^{-1} \begin{bmatrix} {r}_{yx}(0) \\ {r}_{yx}(1) \\ \vdots \\ {r}_{yx}(P-1)\\ \end{bmatrix} \\
\\
\end{align}

In practice, the question is how should one estimate  $\mathbf{R}_{yy}$ and $\mathbf{r}_{yx}$. One option is to average over an ensemble of different realizations of desired/observed signals. Another option, in particular for correlation-ergodic processes (i.e. the single realisation must be representative of the entire process see here: https://en.wikipedia.org/wiki/Ergodic_process), ensemble averaging can be replaced by time averaging so only 1 realization needed. We will use the latter of these methods in the following. 

Hence we compute $r_{yy}(k)$ and $r_{yx}(k)$ as follows:

\begin{align}
r_{yy}(k) &= \frac{1}{N} \sum_{m=0}^{N-1} y(m) y(m+k) \\
r_{yx}(k) &= \frac{1}{N} \sum_{m=0}^{N-1} y(m) x(m+k)
\end{align}

These values are then substituted into the equations above to solve for the Wiener filter. 

In [43]:
# The approach to doing the computations of ryy and ryx is as follows.
# For each time lag, k:
#     1. Advance the appropriate signal by lag k - this will be done using a circular shift
#     2. Zero out last k circularly shifted samples
#     3. Then perform the sum as an inner product between the unshifted and shifted version and avg.


# Select a value for P
P = 5

ryy = np.zeros([P,1])
ryx = np.zeros([P,1])

for k in range(P):

    y_lag = np.roll(y,-k) # circular shift, note the -k here, it means we are advancing, i.e. (m+k) sample
    x_lag = np.roll(x,-k) 
    
    if k > 0:
        y_lag[-k:] = 0       # zero out last k values
        x_lag[-k:] = 0       
    
        
    ryy[k,0] = (1/N)*y_lag.T@y  # Inner products and average
    ryx[k,0] = (1/N)*x_lag.T@y
    

print('ryy: \n'+str(ryy))
    
# Create autocorrelation matrix Ryy
row1 = ryy.T  # first row in toeplitz matrix
col1 = ryy # first column in toeplitz matrix
Ryy = sp.linalg.toeplitz(col1, row1)  # Toeplitz matrix


print('Autocorrelation Matrix, Ryy: \n'+str(Ryy))
print('Cross-correlation vector ryx: \n'+str(ryx))



# Solve the system of linear equations
# Use scipy.linalg.solve for solving systems of eqns
w = sp.linalg.solve(Ryy, ryx, assume_a='sym')

print('Optimal Wiener Filter: \n'+str(w))



ryy: 
[[ 0.02406755]
 [ 0.01946869]
 [ 0.00744517]
 [-0.00740751]
 [-0.01941734]]
Autocorrelation Matrix, Ryy: 
[[ 0.02406755  0.01946869  0.00744517 -0.00740751 -0.01941734]
 [ 0.01946869  0.02406755  0.01946869  0.00744517 -0.00740751]
 [ 0.00744517  0.01946869  0.02406755  0.01946869  0.00744517]
 [-0.00740751  0.00744517  0.01946869  0.02406755  0.01946869]
 [-0.01941734 -0.00740751  0.00744517  0.01946869  0.02406755]]
Cross-correlation vector ryx: 
[[-0.03683114]
 [-0.00594186]
 [ 0.02720538]
 [ 0.04995088]
 [ 0.05361156]]
Optimal Wiener Filter: 
[[-2.05490998]
 [ 0.9935677 ]
 [ 0.48600234]
 [ 0.45109066]
 [ 0.36023767]]


### Observing the convolution matrices


Before we filter the input, let's firstly observe the system of equations obtained when stacking the error signal for all samples. Recall that for the m$^{th}$ sample, we could express the error as follows:

\begin{align}
e(m) &= x(m) - \hat{x}(m) \\
    & = x(m) -  \mathrm{[w(0) \hspace{0.1cm} w(1) \hspace{0.1cm} w(2)\hspace{0.1cm}  ... \hspace{0.1cm} w(P)]} \begin{bmatrix} y(m) \\ y(m-1) \\ y(m-2) \\ \vdots \\ y(m-(P-1))\\ \end{bmatrix} 
\end{align}

Stacking these samples together we can once again see that our estimated signal is computed as a convolution. Previously, we demonstrated this convolution as a shift-sum operation, whereas here we see this same operation, but rather in the form of a matrix-vector product:

\begin{align}
\begin{bmatrix} \hat{x}(0) \\ \hat{x}(1) \\ \hat{x}(2) \\ \vdots \\ \hat{x}(N-1))\\ \end{bmatrix}  & =  \begin{bmatrix} y(0)  & y(-1) & \dots & y(1-P)  \\  y(1)  & y(0) & \dots & y(2-P) \\  y(2)  & y(1) & \dots & y(3-P) \\  \vdots  & \vdots & \ddots & \vdots \\  y(N-1)  & y(N-2) & \dots & y(N-P)\\ \end{bmatrix} \begin{bmatrix} \mathrm{w}(0) \\ \mathrm{w}(1) \\ \vdots \\ \mathrm{w}(P-1)\\ \end{bmatrix} \\
\\
\mathbf{\hat{x}} &= \mathbf{Y}\mathbf{w}
\end{align}

In the following, let's construct the convolution matrix $\mathbf{Y}$ (which is toeplitz) and observe a few of the rows. 

In [58]:
# Create toeplitz matrix for Y
row1 = np.r_[y[0], np.zeros(P-1)]  # first row in toeplitz matrix
col1 = np.r_[y] # first column in toeplitz matrix
Y = sp.linalg.toeplitz(col1, row1)  # Toeplitz matrix
print ('Dimension of Y = '+str(Y.shape))

# We can check out a few rows of Y to see how it looks (note that the first few rows may be equal to zero).
# Can compare to the corresponding value of x

row_number = 103
print('Row '+str(row_number) + ' of Y: '+str(Y[row_number,:]))
print('Sample '+str(row_number)+ ' of x: '+str(x[row_number]))



Dimension of Y = (6000, 5)
Row 103 of Y: [-0.19720456 -0.0955334   0.04149013  0.16171299  0.21907242]
Sample 103 of x: 0.47552825814760225


### Filtering the input signal with the Wiener filter

Okay now we are finally ready to filter the input signal! This is the moment of truth...

In [61]:
w_rand = np.random.rand(P,1) # Replace w below with w_rand and see what happens

x_hat = Y@w  # signal estimate
error = (x-x_hat[:,0]) # error between both signals
MSE = (1/N)*np.sum(error**2) # Mean-sqaured error between both signals


print("Input, y:")
IPython.display.display(Audio(y.T, rate=fs,normalize=False))

print("Desired, x:")
IPython.display.display(Audio(x.T, rate=fs,normalize=False))

print("Estimate, x_hat:")
IPython.display.display(Audio(x_hat.T, rate=fs,normalize=False))

print("Error signal, error:")
IPython.display.display(Audio(error.T, rate=fs,normalize=False))

print("MSE = "+str(MSE))


fig, axes = plt.subplots(figsize=(4, 2)) 

axes.plot(t,x,label='Desired')
axes.plot(t,x_hat,label='Wiener Filtered')
axes.plot(t,y,label='Input')
axes.plot(t,error,label='Squared Error')

axes.set_xlabel('Time (s)')
axes.set_ylabel('Amplitude')
axes.legend()



Input, y:


Desired, x:


Estimate, x_hat:


Error signal, error:


MSE = 0.00011121996774318536


<IPython.core.display.Javascript object>

<matplotlib.legend.Legend at 0x7fd712a97790>

### Something for you to try or think about.

In this notebook, we tried to estimate the desired signal. 
If alternatively we wanted to estimate the parameters that produced the distorted desired signal, how would we change things to accomplish this? Such a task is typically useful for applications such as echo/feedback control, where we would like to estimate the filter coefficients that correspond to the path between a loudspeaker and a microphone. 