# Session 5 : Pre-processing

>Intelligence Artificielle & Machine Learning pour la modélisation de séries temporelles et de signaux<br />
>Master 1 Parcours IA ENS Paris Saclay<br />
>Laurent Oudre (laurent.oudre@ens-paris-saclay.fr)

This fifth session is related to all pre-processing steps that can be used to transform a raw signal into a signal of interest for machine learning algorithms : denoising, detrending, removal of impulsive noise and interpolation. Slides are available at http://www.laurentoudre.fr/signalml.html

<div class="alert alert-block alert-info">

1. **Library and data loading**
2. **Exploratory study**
3. **Denoising**
4. **Detrending**
5. **Detection and removal of impulsive noise**
</div>

## 1. Library and data loading

In [1]:
%matplotlib notebook
import numpy as np
from matplotlib import pyplot as plt
from scipy import signal
from statsmodels.regression.linear_model import yule_walker
from sporco.admm import bpdn

In [2]:
plt.rcParams['figure.figsize'] = (8,5)

In [3]:
# Data loading
npzfile = np.load('Session5.npz')
x=npzfile['x']
t=npzfile['t']
y1=npzfile['y1']
y2=npzfile['y2']
y3=npzfile['y3']
n_corrupted=(np.sort(np.round(npzfile['n']))-1).astype(int)
Fs=npzfile['Fs']

# First investigations
N=np.size(x)
print("Sampling Frequency : {Fs} Hz".format(**locals()))
print("Number of samples : {N}".format(**locals()))

Sampling Frequency : 100 Hz
Number of samples : 300


## 2. Exploratory study

In [4]:
# x[n] as a function of time
plt.figure("x[n] as a function of time")
plt.plot(t,x)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.title('$x[n]$')
plt.show()

<IPython.core.display.Javascript object>

## 3. Denoising

### 3.1 Filtering

A popular solution for denoising a signal consists in filtering the signal with appropriate cut frequencies. One practical procedure is to study the PSD of the noisy signal in order to estimate the frequency bands that are unlikely to belong to the signal.

In [5]:
# Plot of the original and noisy signals
plt.figure("Plot of the original and noisy signals")
plt.plot(t,x)
plt.plot(t,y1)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Noisy signal'))
plt.show()

<IPython.core.display.Javascript object>

In [6]:
# Periodogram function
def periodogram(x,Fs):
    N=np.size(x)
    #Computation of the FFT
    X=np.fft.fft(x)
    X=np.fft.fftshift(X)
    Gammax=(np.abs(X)**2)/N
    # Computation the frequency vector
    f=np.fft.fftfreq(N, d=1/Fs)
    f=np.fft.fftshift(f)
    return Gammax,f  

In [7]:
# Plots of the log-periodograms of x[n] and y[n]
Gammax,f=periodogram(x,Fs)
Gammay,f=periodogram(y1,Fs)
plt.figure("Plots of the log-periodograms of x[n] and y[n]")
plt.plot(f,10*np.log10(Gammax))
plt.plot(f,10*np.log10(Gammay))
plt.ylabel('Log-Periodogram (in dB)')
plt.xlabel('Frequency')
plt.xlim((0,20))
plt.legend(('Original signal', 'Noisy signal'))
plt.show()

<IPython.core.display.Javascript object>

According to the PSD, it appears that most of the power of the original signal lies in the frequency band before 10 Hz: the remaining content is likely to be only composed of noise. Therefore, we can perform denoising by applying a low-pass filter

In [8]:
def low_pass_filter(x,fc,Fs):
    wc=fc/(Fs/2)
    b, a = signal.butter(4, wc, 'low')
    y = signal.filtfilt(b, a, x)
    return y    

In [9]:
# Low-pass filter with cut frequency fc = 10Hz
fc=10
x1_hat=low_pass_filter(y1,fc,Fs)
plt.figure("Denoising with low-pass filter")
plt.plot(t,x)
plt.plot(t,x1_hat)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Denoised signal'))
plt.show()

<IPython.core.display.Javascript object>

### 3.2 Sparse coding

Another solution consists in using sparse coding techniques (described in Session 4), assuming that only the signal will be correctly modelled with the input dictionary, but not the noisy component.

In [10]:
# Fourier dictionary
N=np.size(x)
n = np.arange(N)/Fs
Nh=int(np.floor(N/2))
D=np.zeros((N,2*Nh+1))
D[:,0]=np.ones(N)
for k in range(Nh):
    f0=Fs/N
    D[:,2*k+1]=np.cos(2*np.pi*f0*(k+1)*n)
    D[:,2*k+2]=np.sin(2*np.pi*f0*(k+1)*n)

In [11]:
# Sparse coding based on Hard Thresholding Gradient Descent
def sparse_coding(x,D,k):
    niter = 100
    gamma = 1/(np.linalg.norm(D,2)**2)
    Nd=np.shape(D)
    z=np.zeros((Nd[1],))
    for i in range(niter):
        r=np.dot(D,z)-x
        c=z - gamma * np.dot(np.transpose(D),r)
        ind=np.argsort(np.abs(c))
        c[ind[np.arange(0,Nd[1]-k)]]=0        
        z=c
    return z

In [12]:
gamma=2.5 # Sparsity parameter
y1_=np.reshape(y1,(N,1))
opt = bpdn.BPDN.Options({'Verbose': False, 'MaxMainIter': 500,
                    'RelStopTol': 1e-8, 'AutoRho': {'RsdlTarget': 1.0}})
b = bpdn.BPDN(D, y1_, gamma, opt)
z = b.solve()
plt.figure("Denoising with sparse coding (L1)")
plt.plot(t,x)
plt.plot(t,np.dot(D,z))
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Denoised signal'))
plt.show()

<IPython.core.display.Javascript object>

## 4. Detrending

Trends can be removed either by using low-pass filtering or by using a regression on a set of smooth functions such as polynoms (see Session 3) so as to extract the trend component

In [13]:
# Plot of the original and trend signals
plt.figure("Plot of the original and trend signals")
plt.plot(t,x)
plt.plot(t,y2)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Signal with trend'))
plt.show()

<IPython.core.display.Javascript object>

In [14]:
# Regression on polynomial functions
D=np.zeros((N,5))
D[:,0]=np.ones(N)
D[:,1]=t
D[:,2]=np.power(t,2)
D[:,3]=np.power(t,3)
D[:,4]=np.power(t,4)
    
alpha=np.dot(np.linalg.inv(np.dot(np.transpose(D),D)),np.dot(np.transpose(D),y2)) #Regression parameters
alpha

array([ 0.22166171, -0.98128453,  1.29171137, -0.60672386,  0.08742166])

In [15]:
x2_hat=np.dot(D,alpha); #Reconstruction
plt.figure("Detrending")
plt.plot(t,x)
plt.plot(t,y2-x2_hat)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Detrended signal'))
plt.show()

<IPython.core.display.Javascript object>

## 5. Detection and removal of impulsive noise

Impulsive noise of small lengths (several samples) can be suppressed by applying non linear filters such as median filters. In case of large bursts (more than 10 samples), impulsive noise has to be handled in two phases : detection (finding the locations of the corrupted samples) and interpolation (replacing corrupted samples with more appropriate values). 

* **Detection phase :** given estimates of the AR parameters, computation of the quantity $$d[n]=x[n]+\sum_{i=1}^p \hat{a}_i x[n-i]$$ The set of corrupted samples can determined as $$\mathcal{T} = \left\lbrace n \mbox{ s.t. } |d[n]|>\lambda \right\rbrace$$
* **Interpolation phase :** minimization of the quantity $$\sum_{n=p+1}^{N-1}\left|x[n] + \sum_{i=1}^p \hat{a}_i x[n-i]\right|^2$$

In [16]:
# Plot of the original and noisy signals
plt.figure("Plot of the original and noisy (impulsive) signals")
plt.plot(t,x)
plt.plot(t,y3)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Signal with impulsive noise'))
plt.show()

<IPython.core.display.Javascript object>

In [17]:
# Naive approach : Median filtering
x3_hat=signal.medfilt(y3,3)
plt.figure("Denoising with median filtering")
plt.plot(t,x)
plt.plot(t,x3_hat)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Denoised signal'))
plt.show()

<IPython.core.display.Javascript object>

In [18]:
# Detection of corrupted samples based on AR model
def detection_impulsive(x,p,K):
    # Estimation of the AR parameters
    a, sigma_e = yule_walker(x, p, method='mle',demean=False) 
    Nw=np.size(x)
    # Computation of the prediction error
    d=np.zeros((Nw,))
    for j in range(p,Nw):
        d[j]=x[j]-np.sum(np.dot(x[j-p:j],np.flip(a)))
    d=np.abs(d)
    # Thresholding the prediction error to find the locations of the corrupted samples
    lambda_K=K*sigma_e
    T=np.where(d>lambda_K)
    T=T[0]
    T=T[(T>p-1) & (T<Nw-p)]
    return T

In [19]:
p=10 # Order of the model
K=1 # Detection threshold
T=detection_impulsive(y3,p,K)
print(T) # Detected corrupted samples
print(n_corrupted) # Real corrupted samples
plt.figure("Detection of the corrupted samples")
plt.plot(t,y3)
plt.plot(t[T],y3[T],'o')
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Detected corrupted samples'))
plt.show()

[ 19  43  66  68  74  89  96  99 104 105 111 136 165 181 190 196 211 222
 244 249 251 252 253 254 268 269 270 273 278 279]
[ 19  22  43  56  66  68  74  83  89  96  99 104 109 111 118 120 136 137
 143 165 178 181 183 187 188 190 196 211 222 239 242 244 248 249 251 268
 273 277 279]


<IPython.core.display.Javascript object>

In [20]:
# Interpolation based on AR model
def interpolation_impulsive(x,p,T):
    a, sigma_e = yule_walker(x, p, method='mle',demean=False)
    Nw=np.size(x)
    a=np.concatenate(([1],-a))
    m=np.size(T)
    b=np.zeros((p+1,))
    for i in range(p+1):
        b[i]=np.sum(np.dot(a[0:p-i+1],a[i:p+1]))

    B=np.zeros((m,m))
    for i in range(m):
        for j in range(m):
            if np.abs(T[i]-T[j])<p+1:
                B[i,j]=b[np.abs(T[i]-T[j])]

    z=np.zeros((m,))
    for i in range(m):
        for k in range(-p,p+1):
            if np.all(T!=T[i]-k):
                z[i]=z[i]+b[np.abs(k)]*x[T[i]-k]
    
    y=-np.dot(np.linalg.inv(B),z);
    return y

In [21]:
# Interpolation when the locations of corrupted samples are known
p=10 # Order of the model
y3_hat=interpolation_impulsive(y3,p,n_corrupted)
x3_hat=np.copy(x)
x3_hat[n_corrupted]=y3_hat
plt.figure("Denoising (with known locations)")
plt.plot(t,x)
plt.plot(t,x3_hat)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Denoised signal with known locations'))
plt.show()

<IPython.core.display.Javascript object>

In [22]:
# Interpolation when the locations of corrupted samples are unknown
p=10 # Order of the model
K=1 # Detection threshold
T=detection_impulsive(y3,p,K)
y3_hat=interpolation_impulsive(y3,p,T)
x3_hat=np.copy(x)
x3_hat[T]=y3_hat
plt.figure("Denoising (with unknown locations)")
plt.plot(t,x)
plt.plot(t,x3_hat)
plt.xlim((0,(N-1)/Fs))
plt.xlabel('Time (seconds)')
plt.legend(('Original signal', 'Denoised signal with unknown locations'))
plt.show()


<IPython.core.display.Javascript object>