In [1]:
%matplotlib qt
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt

from scipy import signal
from PyEMD import EMD

In [2]:
# Import data
x = np.load('./exampledata.npy')
x = x[7000:9001]
t = np.arange(0, len(x)*.001, .001)

# EMD application using PyEMD

In [3]:
emd_py = EMD()
imfs = emd_py(x)

x_recon_py = np.sum(imfs, axis=0)

np.max(np.abs(x - x_recon_py))

3.552713678800501e-15

# EMD implementation by Scott Cole

In [None]:
# Define decomposition parameters
nIMF = 3 # Number of components in which to decompose the signal
stoplim = .001 # Criterion to stop iteration and declare a signal to be a component

In [None]:
# Find all relative extrema
x_temp = x
pks = signal.argrelmax(x_temp)[0]
trs = signal.argrelmin(x_temp)[0]

In [None]:
# Plot
plt.figure(figsize=(15, 3))
plt.plot(t, x_temp, 'k')
plt.plot(t[pks], x_temp[pks], 'r.')
plt.plot(t[trs], x_temp[trs], 'b.')
plt.xlabel('Time (s)')
plt.ylabel('Voltage (µV)')

In [None]:
# Interpolate extrema
x_pks = x[pks]
fip = sp.interpolate.InterpolatedUnivariateSpline(pks, x_pks, k=3)
pks_t = fip(range(len(x)))

x_trs = x[trs]
fitr = sp.interpolate.InterpolatedUnivariateSpline(trs, x_trs, k=3)
trs_t = fitr(range(len(x)))

In [None]:
# Interpolate extrema
# spl_up = sp.interpolate.InterpolatedUnivariateSpline(t[pks], x_temp[pks], k=3)
# pks_t = spl_up(t)

# spl_dn = sp.interpolate.InterpolatedUnivariateSpline(t[trs], x_temp[trs], k=3)
# trs_t = spl_dn(t)

In [None]:
# Plot
plt.figure(figsize=(15, 3))
plt.plot(t, x, 'k')
plt.plot(t[pks], x_temp[pks], 'r.')
plt.plot(t[trs], x_temp[trs], 'b.')
plt.plot(t, pks_t, 'r')
plt.plot(t, trs_t, 'b')
plt.xlabel('Time (s)')
plt.ylabel('Voltage (µV)')
plt.ylim([-1000, 1000])

In [None]:
# Calculate mean of extrema envelopes
mean_t = (pks_t + trs_t) / 2

In [None]:
# Plot
plt.figure(figsize=(10, 6))
plt.subplot(2, 1, 1)
plt.plot(t, x, 'k', label='raw')
plt.plot(t, pks_t, 'b', label='extrema envelope')
plt.plot(t, trs_t, 'b')
plt.plot(t, mean_t, 'r', label='mean of extrema envelopes')
plt.ylabel('Voltage (µV)')
plt.ylim([-1000, 1000])
plt.legend(loc='best')

plt.subplot(2, 1, 2)
plt.plot(t, x, 'k', label='raw')
plt.plot(t, x-mean_t, 'r', label='potential IMF')
plt.xlabel('Time (s)')
plt.ylabel('Voltage (µV)')
plt.ylim([-1000, 1000])
plt.legend(loc='best')

In [None]:
# Determine if signal satisfies the requirement to be an IMF
# Signal is declared as IMF if the mean of extrema envelope is sufficiently close to 0 (WRONG!!)
samp_start = np.max((np.min(pks), np.min(trs)))
samp_end = np.min((np.max(pks), np.max(trs))) + 1
sdk = np.sum(np.abs(mean_t[samp_start:samp_end]**2)) / np.sum(np.abs(x_temp[samp_start:samp_end]**2))

if sdk < stoplim:
    is_imf = True
else:
    x_temp = x_temp - mean_t
    is_imf = False

In [None]:
def _emd_complim(mean_t, pks, trs):
    samp_start = np.max((np.min(pks), np.min(trs)))
    samp_end = np.min((np.max(pks), np.max(trs))) + 1
    mean_t[:samp_start] = mean_t[samp_start]
    mean_t[samp_end:] = mean_t[samp_end]
    return mean_t

def _emd_comperror(h, mean, pks, trs):
    """
    Calculate the normalized error of the current component.
    """
    samp_start = np.max((np.min(pks), np.min(trs)))
    samp_end = np.min((np.max(pks), np.max(trs))) + 1
    return np.sum(np.abs(mean[samp_start:samp_end]**2)) / np.sum(np.abs(h[samp_start:samp_end]**2))

In [None]:
# Visualization of iterations
nIMF = 3
imfs = np.zeros(nIMF, dtype=object)

def findnextcomponent(x, orig, t):
    """
    Find the next IMF of the input signal 'x' and return it as 'r'.
    The plots are compared to the original signal, 'orig'.
    """
    r_t = x
    is_imf = False
    
    while is_imf == False:
        # Identify peaks and troughs
        pks = sp.signal.argrelmax(r_t)[0]
        trs = sp.signal.argrelmin(r_t)[0]
        
        # Interpolate extrema
        pks_r = r_t[pks]
        fip = sp.interpolate.InterpolatedUnivariateSpline(pks, pks_r, k=3)
        pks_t = fip(range(len(r_t)))
        
        trs_r = r_t[trs]
        fitr = sp.interpolate.InterpolatedUnivariateSpline(trs, trs_r, k=3)
        trs_t = fitr(range(len(r_t)))
        
        # Calculate mean
        mean_t = (pks_t + trs_t) / 2
        mean_t = _emd_complim(mean_t, pks, trs)
        
        # Assess if this is an IMF
        sdk = _emd_comperror(r_t, mean_t, pks, trs)
        
        # debug: make sure everything looks right, because getting weird interp error after a few iterations;
        # error not converging?
        plt.figure(figsize=(10, 1))
        plt.plot(orig, color='0.8')
        plt.plot(r_t - mean_t, 'k')
        plt.ylim((-1000, 1000))
        
        # if not imf, update r_t and is_imf
        if sdk < stoplim:
            is_imf = True
            print('IMF found!')
        else:
            r_t = r_t - mean_t
            
    return x - r_t

In [None]:
r = findnextcomponent(x, x, t)

In [None]:
r = findnextcomponent(r, x, t)

# EMD implementation using Huang et al. 1998

In [None]:
def emd(x, t=None, tol_sd=0.2, max_IMFs=25, max_siftings=100, plot_emd=None):
    """
    Perform empirical mode decomposition on a signal 'x' as described in Huang et al. 1998.
    The decomposition terminates whence either the sifting process is unable to find local
    peaks or valleys in the residual signal or the max. no. of intended IMFs have already
    been extratced.
    
    Parameters
    ----------
    x : 1D array
        Signal of interest.
    t : 1D array
        Time (or space) at which elements of 'x' were measured.
    tol_sd : scalar
        Tolerance in standard deviation between two consecutive siftings. Used as a stopping
        criterion. See Eq. (5.5) in Huang et al. 1998 for more details.
    max_IMFs : scalar
        Max. no. of IMFs to be extracted.
    max_siftings : scalar
        Max. no. of siftings to be performed for extracting each IMF.
        
    Returns
    -------
    c : 2D array
        IMFs extracted from the EMD.
    r : 1D array
        Residual signal.
    """
    JJ = np.array(range(len(x)))
    r = x
    c = np.zeros((max_IMFs, len(x)))
    i = 0; stop_emd = False
    while (i < max_IMFs) and not stop_emd:
        print('Finding IMF-%02d...' % (i + 1))
        h_km1 = r
        k = 1
        while k < max_siftings:
            # Find upper and lower extrema
            j_pks = signal.argrelmax(h_km1)[0]
            j_vls = signal.argrelmin(h_km1)[0]

            if len(j_pks) > 1 and len(j_vls) > 1:
                # Mirror extrema
                M = h_km1[j_pks]; m = h_km1[j_vls]
                M = np.append(M[0], M); m = np.append(m[0], m) # Add extrema at left corner
                M = np.append(M, M[-1]); m = np.append(m, m[-1]) # Add extrema at right corner

                # -> Shift indices and add indices for both added extrema at the left corner
                J1 = j_pks[0]; j1 = j_vls[0]
                j_shift = max(J1, j1); j_pks += j_shift; j_vls += j_shift
                if J1 > j1:
                    j_vls = np.append(0, j_vls)
                    j_pks = np.append(J1 - j1, j_pks)
                else:
                    j_pks = np.append(0, j_pks)
                    j_vls = np.append(j1 - J1, j_vls)

                # -> Add indices for both added extrema at the right corner
                J_T = len(r) + j_shift
                Jn = j_pks[-1]; jn = j_vls[-1]
                Jnp1 = J_T + J_T - jn; jnp1 = J_T + J_T - Jn
                j_pks = np.append(j_pks, Jnp1); j_vls = np.append(j_vls, jnp1)

                # Make upper and lower envelopes
                spl_up = sp.interpolate.InterpolatedUnivariateSpline(j_pks, M, k=3)
                envp_up = spl_up(JJ)
                spl_lw = sp.interpolate.InterpolatedUnivariateSpline(j_vls, m, k=3)
                envp_lw = spl_lw(JJ)

                # Calculate mean envelope
                e_k = (envp_up + envp_lw) / 2

                # Find the next sifted signal
                h_k = h_km1 - e_k

                # Test for stopping criterion
                sd = np.sum((h_km1 - h_k) ** 2) / np.sum(h_km1 ** 2)
                print(sd)
                if sd < tol_sd:
                    print('IMF found!')
                    c[i] = h_k
                    break
                else:
                    k += 1
                    h_km1 = h_k
            else:
                print('Not enough points for cubic spline interpolation. Stopping.')
                stop_emd = True
                break   
        r = r - c[i]
        i += 1

    # Delete extra zero rows that haven't been populated
    c = np.delete(c, range(i-1, max_IMFs), axis=0)

    # Plot
    if plot_emd and plot_emd is not None:
        plt.figure(figsize=(12, 12))
        for i in range(len(c)):
            plt.subplot(len(c)+1, 1, i+1)
            plt.plot(t, x, color='0.8')
            plt.plot(t, c[i], 'k')
            plt.xlim([np.min(t), np.max(t)])
            plt.ylabel('IMF ' + str(i + 1))
        plt.subplot(len(c)+1, 1, i+2)
        plt.plot(t, x, color='0.8')
        plt.plot(t, r, 'k')
        plt.xlim([np.min(t), np.max(t)])
        plt.ylabel('Residual')
        plt.xlabel('Time (s)')
        plt.tight_layout()
        plt.show()
    
    return c, r

In [None]:
c, r = emd(x, t, tol_sd=0.2, plot_emd=True)
x_recon = np.sum(c, axis=0) + r
err_huang = np.max(np.abs(x - x_recon))
print('Error in reconstruction = ', err_huang)
plt.figure()
plt.plot(t, x, t, x_recon)
plt.show()

In [None]:
# Given
max_IMFs = 10
tol_sd = 0.2
max_siftings = 100

In [None]:
JJ = np.array(range(len(x)))
r = x
c = np.zeros((max_IMFs, len(x)))
i = 0; stop_emd = False
while (i < max_IMFs) and not stop_emd:
    print('Finding IMF-%02d...' % (i + 1))
    h_km1 = r
    k = 1
    while k < max_siftings:
        # Find upper and lower extrema
        j_pks = signal.argrelmax(h_km1)[0]
        j_vls = signal.argrelmin(h_km1)[0]
        
        if len(j_pks) > 1 and len(j_vls) > 1:
            # Mirror extrema
            M = h_km1[j_pks]; m = h_km1[j_vls]
            M = np.append(M[0], M); m = np.append(m[0], m) # Add extrema at left corner
            M = np.append(M, M[-1]); m = np.append(m, m[-1]) # Add extrema at right corner

            # -> Shift indices and add indices for both added extrema at the left corner
            J1 = j_pks[0]; j1 = j_vls[0]
            j_shift = max(J1, j1); j_pks += j_shift; j_vls += j_shift
            if J1 > j1:
                j_vls = np.append(0, j_vls)
                j_pks = np.append(J1 - j1, j_pks)
            else:
                j_pks = np.append(0, j_pks)
                j_vls = np.append(j1 - J1, j_vls)

            # -> Add indices for both added extrema at the right corner
            J_T = len(r) + j_shift
            Jn = j_pks[-1]; jn = j_vls[-1]
            Jnp1 = J_T + J_T - jn; jnp1 = J_T + J_T - Jn
            j_pks = np.append(j_pks, Jnp1); j_vls = np.append(j_vls, jnp1)
            
            # Make upper and lower envelopes
            spl_up = sp.interpolate.InterpolatedUnivariateSpline(j_pks, M, k=3)
            envp_up = spl_up(JJ)
            spl_lw = sp.interpolate.InterpolatedUnivariateSpline(j_vls, m, k=3)
            envp_lw = spl_lw(JJ)
            
            # Calculate mean envelope
            e_k = (envp_up + envp_lw) / 2

            # Find the next sifted signal
            h_k = h_km1 - e_k

            # Test for stopping criterion
            sd = np.sum((h_km1 - h_k) ** 2) / np.sum(h_km1 ** 2)
            print(sd)
            if sd < tol_sd:
                print('IMF found!')
                c[i] = h_k
                break
            else:
                k += 1
                h_km1 = h_k
        else:
            print('Not enough points for cubic spline interpolation. Stopping.')
            stop_emd = True
            break   
    r = r - c[i]
    i += 1
    
c = np.delete(c, range(i-1, max_IMFs), axis=0)

# Plot
plt.figure(figsize=(12, 12))
for i in range(len(c)):
    plt.subplot(len(c)+1, 1, i+1)
    plt.plot(t, x, color='0.8')
    plt.plot(t, c[i], 'k')
#     plt.ylim([-1000, 1000])
    plt.ylabel('IMF ' + str(i + 1))
plt.subplot(len(c)+1, 1, i+2)
plt.plot(t, x, color='0.8')
plt.plot(t, r, 'k')
# plt.ylim([-1000, 1000])
plt.ylabel('Residual')

# Find reconstruction error
x_recon = np.sum(c, axis=0) + r
err_huang = np.max(np.abs(x - x_recon))
print('Error in reconstruction = ', err_huang)

In [None]:
plt.figure()
plt.plot(t, np.abs(x - x_recon))
plt.show()