Permalink
Find file Copy path
c755695 Dec 11, 2017
1 contributor

Users who have contributed to this file

675 lines (534 sloc) 22 KB
#!/usr/bin/python
# coding: UTF-8
#
# Author: Dawid Laszuk
# Contact: laszukdawid@gmail.com
#
# Edited: 11/05/2017
#
# Feel free to contact for any information.
from __future__ import division, print_function
import logging
import numpy as np
import time
from scipy.interpolate import interp1d
from PyEMD.splines import *
class EMD:
"""
Empirical Mode Decomposition
*Note:*
Default and recommended package for EMD is EMD.py.
This is meant to provide with the same results as MATLAB version of EMD,
which is not necessarily the most efficient or numerically accurate.
Method of decomposing signal into Intrinsic Mode Functions (IMFs)
based on algorithm presented in Huang et al. [1].
Algorithm was validated with Rilling et al. [2] Matlab's version from 3.2007.
[1] N. E. Huang et al., "The empirical mode decomposition and the
Hilbert spectrum for non-linear and non stationary time series
analysis", Proc. Royal Soc. London A, Vol. 454, pp. 903-995, 1998
[2] G. Rilling, P. Flandrin and P. Goncalves, "On Empirical Mode
Decomposition and its algorithms", IEEE-EURASIP Workshop on
Nonlinear Signal and Image Processing NSIP-03, Grado (I), June 2003
"""
logger = logging.getLogger(__name__)
def __init__(self):
self.splineKind = 'cubic'
self.nbsym = 2
self.reduceScale = 1.
self.maxIteration = 500
self.scaleFactor = 100
self.FIXE = 0
self.FIXE_H = 0
self.stop1 = 0.05
self.stop2 = 0.5
self.stop3 = 0.05
self.DTYPE = np.float64
self.MAX_ITERATION = 1000
self.TIME = False
def extractMaxMinSpline(self, T, S):
"""
Input:
-----------------
S - Input signal array. Should be 1D.
T - Time array. If none passed numpy arange is created.
Output:
-----------------
maxSpline - Upper envelope of signal S.
minSpline - Bottom envelope of signal S.
maxExtrema - Position (1st row) and values (2nd row) of maxima.
minExtrema - Position (1st row) and values (2nd row) of minma.
"""
# Get indexes of extrema
maxPos, maxVal, minPos, minVal, _ = self.findExtrema(T, S)
if len(maxPos) + len(minPos) < 3: return [-1]*4
# Extrapolation of signal (ober boundaries)
maxExtrema, minExtrema = self.preparePoints(S, T, maxPos, maxVal, minPos, minVal)
_, maxSpline = self.splinePoints(T, maxExtrema, self.splineKind)
_, minSpline = self.splinePoints(T, minExtrema, self.splineKind)
return maxSpline, minSpline, maxExtrema, minExtrema
def preparePoints(self, S, T, maxPos, maxVal, minPos, minVal):
"""
Adds to signal extrema according to mirror technique.
Number of added points depends on nbsym variable.
Input:
---------
S: Signal (1D numpy array).
T: Timeline (1D numpy array).
maxPos: sorted time positions of maxima.
maxVal: signal values at maxPos positions.
minPos: sorted time positions of minima.
minVal: signal values at minPos positions.
Output:
---------
minExtrema: Position (1st row) and values (2nd row) of minima.
minExtrema: Position (1st row) and values (2nd row) of maxima.
"""
# Find indices for time array of extrema
indmin = np.array([np.nonzero(T==t)[0] for t in minPos]).flatten()
indmax = np.array([np.nonzero(T==t)[0] for t in maxPos]).flatten()
# Local variables
nbsym = self.nbsym
endMin, endMax = len(minPos), len(maxPos)
####################################
# Left bound - mirror nbsym points to the left
if indmax[0] < indmin[0]:
if S[0] > S[indmin[0]]:
lmax = indmax[1:min(endMax,nbsym+1)][::-1]
lmin = indmin[0:min(endMin,nbsym+0)][::-1]
lsym = indmax[0]
else:
lmax = indmax[0:min(endMax,nbsym)][::-1]
lmin = np.append(indmin[0:min(endMin,nbsym-1)][::-1],0)
lsym = 0
else:
if S[0] < S[indmax[0]]:
lmax = indmax[0:min(endMax,nbsym+0)][::-1]
lmin = indmin[1:min(endMin,nbsym+1)][::-1]
lsym = indmin[0]
else:
lmax = np.append(indmax[0:min(endMax,nbsym-1)][::-1],0)
lmin = indmin[0:min(endMin,nbsym)][::-1]
lsym = 0
####################################
# Right bound - mirror nbsym points to the right
if indmax[-1] < indmin[-1]:
if S[-1] < S[indmax[-1]]:
rmax = indmax[max(endMax-nbsym,0):][::-1]
rmin = indmin[max(endMin-nbsym-1,0):-1][::-1]
rsym = indmin[-1]
else:
rmax = np.append(indmax[max(endMax-nbsym+1,0):], len(S)-1)[::-1]
rmin = indmin[max(endMin-nbsym,0):][::-1]
rsym = len(S)-1
else:
if S[-1] > S[indmin[-1]]:
rmax = indmax[max(endMax-nbsym-1,0):-1][::-1]
rmin = indmin[max(endMin-nbsym,0):][::-1]
rsym = indmax[-1]
else:
rmax = indmax[max(endMax-nbsym,0):][::-1]
rmin = np.append(indmin[max(endMin-nbsym+1,0):], len(S)-1)[::-1]
rsym = len(S)-1
# In case any array missing
if not lmin.size: lmin = indmin
if not rmin.size: rmin = indmin
if not lmax.size: lmax = indmax
if not rmax.size: rmax = indmax
# Mirror points
tlmin = 2*T[lsym]-T[lmin]
tlmax = 2*T[lsym]-T[lmax]
trmin = 2*T[rsym]-T[rmin]
trmax = 2*T[rsym]-T[rmax]
# If mirrored points are not outside passed time range.
if tlmin[0] > T[0] or tlmax[0] > T[0]:
if lsym == indmax[0]:
lmax = indmax[0:min(endMax,nbsym)][::-1]
else:
lmin = indmin[0:min(endMin,nbsym)][::-1]
if lsym == 0:
raise Exception('bug')
lsym = 0
tlmin = 2*T[lsym]-T[lmin]
tlmax = 2*T[lsym]-T[lmax]
if trmin[-1] < T[-1] or trmax[-1] < T[-1]:
if rsym == indmax[-1]:
rmax = indmax[max(endMax-nbsym,0):][::-1]
else:
rmin = indmin[max(endMin-nbsym,0):][::-1]
if rsym == len(S)-1:
raise Exception('bug')
rsym = len(S)-1
trmin = 2*T[rsym]-T[rmin]
trmax = 2*T[rsym]-T[rmax]
zlmax = S[lmax]
zlmin = S[lmin]
zrmax = S[rmax]
zrmin = S[rmin]
tmin = np.append(tlmin, np.append(T[indmin], trmin))
tmax = np.append(tlmax, np.append(T[indmax], trmax))
zmin = np.append(zlmin, np.append(S[indmin], zrmin))
zmax = np.append(zlmax, np.append(S[indmax], zrmax))
maxExtrema = np.array([tmax, zmax], dtype=self.DTYPE)
minExtrema = np.array([tmin, zmin], dtype=self.DTYPE)
# Make double sure, that each extremum is significant
maxExtrema = np.delete(maxExtrema, np.where(maxExtrema[0,1:]==maxExtrema[0,:-1]),axis=1)
minExtrema = np.delete(minExtrema, np.where(minExtrema[0,1:]==minExtrema[0,:-1]),axis=1)
return maxExtrema, minExtrema
def splinePoints(self, T, extrema, splineKind):
"""
Constructs spline over given points.
Input:
---------
T: Time array.
extrema: Poistion (1st row) and values (2nd row) of points.
splineKind: Type of spline.
Output:
---------
T: Poistion array.
spline: Spline over the given points.
"""
kind = splineKind.lower()
t = T[np.r_[T>=extrema[0,0]] & np.r_[T<=extrema[0,-1]]]
if t.dtype != self.DTYPE: self.logger.error('t.dtype: '+str(t.dtype))
if extrema.dtype != self.DTYPE: self.logger.error('extrema.dtype: '+str(xtrema.dtype))
if kind == "akima":
return t, akima(extrema[0], extrema[1], t)
elif kind == 'cubic':
if extrema.shape[1]>3:
return t, interp1d(extrema[0], extrema[1], kind=kind)(t).astype(self.DTYPE)
else:
return self.cubicSpline_3points(T, extrema)
elif kind in ['slinear', 'quadratic', 'linear']:
return T, interp1d(extrema[0], extrema[1], kind=kind)(t).astype(self.DTYPE)
else:
raise ValueError("No such interpolation method!")
def cubicSpline_3points(self, T, extrema):
"""
Apperently scipy.interpolate.interp1d does not support
cubic spline for less than 4 points.
"""
x0, x1, x2 = extrema[0]
y0, y1, y2 = extrema[1]
x1x0, x2x1 = x1-x0, x2-x1
y1y0, y2y1 = y1-y0, y2-y1
_x1x0, _x2x1 = 1./x1x0, 1./x2x1
m11, m12, m13= 2*_x1x0, _x1x0, 0
m21, m22, m23 = _x1x0, 2.*(_x1x0+_x2x1), _x2x1
m31, m32, m33 = 0, _x2x1, 2.*_x2x1
v1 = 3*y1y0*_x1x0*_x1x0
v3 = 3*y2y1*_x2x1*_x2x1
v2 = v1+v3
M = np.matrix([[m11,m12,m13],[m21,m22,m23],[m31,m32,m33]])
v = np.matrix([v1,v2,v3]).T
k = np.array(np.linalg.inv(M)*v)
a1 = k[0]*x1x0 - y1y0
b1 =-k[1]*x1x0 + y1y0
a2 = k[1]*x2x1 - y2y1
b2 =-k[2]*x2x1 + y2y1
t = T[np.r_[T>=x0] & np.r_[T<=x2]]
t1 = (T[np.r_[T>=x0]&np.r_[T< x1]] - x0)/x1x0
t2 = (T[np.r_[T>=x1]&np.r_[T<=x2]] - x1)/x2x1
t11, t22 = 1.-t1, 1.-t2
q1 = t11*y0 + t1*y1 + t1*t11*(a1*t11 + b1*t1)
q2 = t22*y1 + t2*y2 + t2*t22*(a2*t22 + b2*t2)
q = np.append(q1,q2)
return t, q.astype(self.DTYPE)
@classmethod
def findExtrema(cls, t, s):
"""
Finds extrema and zero-crossings.
Input:
---------
S: Signal.
T: Time array.
Output:
---------
localMaxPos: Time positions of maxima.
localMaxVal: Values of signal at localMaxPos positions.
localMinPos: Time positions of minima.
localMinVal: Values of signal at localMinPos positions.
indzer: Indexes of zero crossings.
"""
# Finds indexes of zero-crossings
s1, s2 = s[:-1], s[1:]
indzer = np.nonzero(s1*s2<0)[0]
if np.any(s==0):
iz = np.nonzero(s==0)[0]
indz = []
if np.any(np.diff(iz)==1):
zer = (s==0)
dz = np.diff(np.append(np.append(0, zer), 0))
debz = np.nonzero(dz==1)[0]
finz = np.nonzero(dz==-1)[0]-1
indz = np.round((debz+finz)/2)
else:
indz = iz
indzer = np.sort(np.append(indzer, indz))
# Finds local extrema
d = np.diff(s)
d1, d2 = d[:-1], d[1:]
indmin = np.nonzero(np.r_[d1*d2<0] & np.r_[d1<0])[0]+1
indmax = np.nonzero(np.r_[d1*d2<0] & np.r_[d1>0])[0]+1
# When two or more points have the same value
if np.any(d==0):
imax, imin = [], []
bad = (d==0)
dd = np.diff(np.append(np.append(0, bad), 0))
debs = np.nonzero(dd==1)[0]
fins = np.nonzero(dd==-1)[0]
if debs[0]==1:
if len(debs) > 1:
debs, fins = debs[1:], fins[1:]
else:
debs, fins = [], []
if len(debs) > 0:
if fins[-1] == len(s)-1:
if len(debs) > 1:
debs, fins = debs[:-1], fins[:-1]
else:
debs, fins = [], []
lc = len(debs)
if lc > 0:
for k in range(lc):
if d[debs[k]-1] > 0:
if d[fins[k]] < 0:
imax.append(round((fins[k]+debs[k])/2.))
else:
if d[fins[k]] > 0:
imin.append(round((fins[k]+debs[k])/2.))
if len(imax) > 0:
indmax = indmax.tolist()
for x in imax: indmax.append(int(x))
indmax.sort()
if len(imin) > 0:
indmin = indmin.tolist()
for x in imin: indmin.append(int(x))
indmin.sort()
localMaxPos = t[indmax]
localMaxVal = s[indmax]
localMinPos = t[indmin]
localMinVal = s[indmin]
return localMaxPos, localMaxVal, localMinPos, localMinVal, indzer
def stop_sifting(self, imf, envMax, envMin, mean, extNo):
"""
Criterium for stopping sifting process.
Based on conditions presented in [1].
[1] G. Rilling, P. Flandrin and P. Goncalves
"On Empirical Mode Decomposition and its
algorithms", 2003
Input:
---------
imf: Current imf.
envMax: Upper envelope of imf.
envMin: Bottom envelope of imf.
mean: Mean of envelopes.
extNo: Number of extrema.
Output:
---------
boolean: True if stopping criteria are meet.
"""
amp = np.abs(envMax - envMin)/2.
sx = np.abs(mean)/amp
f1 = np.mean(sx > self.stop1) > self.stop3
f2 = np.any(sx > self.stop2)
f3 = extNo > 2
if ( not (f1 or f2) ) and f3:
return True
else:
return False
@staticmethod
def _common_dtype(x, y):
dtype = np.find_common_type([x.dtype, y.dtype], [])
if x.dtype != dtype: x = x.astype(dtype)
if y.dtype != dtype: y = y.astype(dtype)
return x, y
def emd(self, S, T=None, maxImf=None):
"""
Performs Emerical Mode Decomposition on signal S.
The decomposition is limited to maxImf imf. No limitation as default.
Returns IMF functions in dic format. IMF = {0:imf0, 1:imf1...}.
Input:
---------
S: Signal.
T: Positions of signal. If none passed numpy arange is created.
maxImf: IMF number to which decomposition should be performed.
As a default, all IMFs are returned.
Output:
---------
return IMF, EXT, TIME, ITER, imfNo
IMF: Signal IMFs in dictionary type. IMF = {0:imf0, 1:imf1...}
EXT: Number of extrema for each IMF. IMF = {0:ext0, 1:ext1...}
ITER: Number of iteration for each IMF.
imfNo: Number of IMFs.
"""
if T is None: T = np.arange(len(S), dtype=S.dtype)
if maxImf is None: maxImf = -1
# Make sure same types are dealt
S, T = self._common_dtype(S, T)
self.DTYPE = S.dtype
Res = S.astype(self.DTYPE)
scale = 1.
Res, scaledS = Res/scale, S/scale
imf = np.zeros(len(S), dtype=self.DTYPE)
imfOld = Res.copy()
if Res.dtype!=self.DTYPE: self.logger.error('Res.dtype: '+str(Res.dtype))
if scaledS.dtype!=self.DTYPE: self.logger.error('scaledS.dtype: '+str(scaledS.dtype))
if imf.dtype!=self.DTYPE: self.logger.error('imf.dtype: '+str(imf.dtype))
if imfOld.dtype!=self.DTYPE: self.logger.error('imfOld.dtype: '+str(imfOld.dtype))
if T.dtype!=self.DTYPE: self.logger.error('T.dtype: '+str(T.dtype))
if S.shape != T.shape:
info = "Time array should be the same size as signal."
raise Exception(info)
# Create arrays
IMF = {} # Dic for imfs signals
EXT = {} # Dic for number of extrema
ITER = {} # Dic for number of iterations
TIME = {} # Dic for time of computation
imfNo = 0
notFinish = True
while(notFinish):
self.logger.debug('IMF -- '+str(imfNo))
#~ Res = scaledS - np.sum([IMF[i] for i in range(imfNo)],axis=0)
Res -= imf
imf = Res.copy()
mean = np.zeros(len(S), dtype=self.DTYPE)
# Counters
n = 0 # All iterations for current imf.
n_h = 0 # counts when |#zero - #ext| <=1
# Time counter
timeInit = time.time()
if self.TIME:
singleTime = time.time()
while(n<self.MAX_ITERATION):
n += 1
if self.TIME:
self.logger.info("Execution time: "+str(time.time() - singleTime))
singleTime = time.time()
ext_res = self.findExtrema(T, imf)
MP, mP = ext_res[0], ext_res[2]
indzer = ext_res[4]
extNo = len(mP)+len(MP)
nzm = len(indzer)
if extNo > 2:
# Plotting. Either into file, or on-screen display.
imfOld = imf.copy()
imf = imf - self.reduceScale*mean
env_ext = self.extractMaxMinSpline(T, imf)
maxEnv, minEnv = env_ext[0], env_ext[1]
if isinstance(maxEnv, int):
notFinish = True
break
mean = 0.5*(maxEnv+minEnv)
if maxEnv.dtype!=self.DTYPE: self.logger.error('maxEnvimf.dtype: '+str(maxEnv.dtype))
if minEnv.dtype!=self.DTYPE: self.logger.error('minEnvimf.dtype: '+str(minEnvimf.dtype))
if imf.dtype!=self.DTYPE: self.logger.error('imf.dtype: '+str(imf.dtype))
if mean.dtype!=self.DTYPE: self.logger.error('mean.dtype: '+str(mean.dtype))
# Stop, because of too many iterations
if n > self.maxIteration:
self.logger.info('TOO MANY ITERATIONS! BREAK!')
break
# Fix number of iterations
if self.FIXE:
if n>=self.FIXE+1: break
# Fix number of iterations after number of zero-crossings
# and extrema differ at most by one.
elif self.FIXE_H:
ext_res = self.findExtrema(T, imf)
mP, MP, indzer = ext_res[0], ext_res[2], ext_res[4]
extNo = len(mP)+len(MP)
nzm = len(indzer)
if n == 1: continue
if abs(extNo-nzm)>1: n_h = 0
else: n_h += 1
# STOP
if n_h >= self.FIXE_H: break
# Stops after default stopping criteria are meet.
else:
mP,mV,MP,MV, indzer = self.findExtrema(T, imf)
extNo = len(mP)+len(MP)
nzm = len(indzer)
f1 = self.stop_sifting(imf, maxEnv, minEnv, mean, extNo)
f2 = abs(extNo - nzm)<2
# STOP
if f1 and f2: break
else:
notFinish = False
break
IMF[imfNo] = imf.copy()
ITER[imfNo] = n
EXT[imfNo] = extNo
TIME[imfNo] = time.time() - timeInit
imfNo += 1
if imfNo==maxImf-1:
notFinish = False
break
#~ Saving residuum if meaningful
Res = scaledS - np.sum([IMF[i] for i in range(imfNo)],axis=0)
if np.sum(np.abs(Res)) > 1e-10:
IMF[imfNo] = Res
ITER[imfNo] = 0
EXT[imfNo] = extNo
TIME[imfNo] = 0
imfNo += 1
for key in list(IMF.keys()):
IMF[key] *= scale
return IMF, EXT, ITER, imfNo
###################################################
## Beggining of program
if __name__ == "__main__":
import pylab as plt
# Logging options
logging.basicConfig(level=logging.DEBUG)
# EMD options
maxImf = -1
DTYPE = np.float64
# Signal options
N = 1000
tMin, tMax = 0, 1
T = np.linspace(tMin, tMax, N, dtype=DTYPE)
S = 6*T +np.cos(8*np.pi**T)+0.5*np.cos(40*np.pi*T)
S = S.astype(DTYPE)
# Prepare and run EMD
emd = EMD()
emd.FIXE_H = 5
#~ emd.FIXE = 10
emd.nbsym = 2
emd.splineKind = 'cubic'
emd.DTYPE = DTYPE
IMF, EXT, ITER, imfNo = emd.emd(S, T, maxImf)
# Save results (IMFs) into file
npIMF = np.zeros((imfNo, N), dtype=DTYPE)
for i in range(imfNo): npIMF[i] = IMF[i]
np.save('imfs', npIMF)
# Plotting
#~ c = np.floor(np.sqrt(imfNo+3))
#~ r = np.ceil( (imfNo+3)/c)
c = np.floor(np.sqrt(imfNo+1))
r = np.ceil( (imfNo+1)/c)
plt.ioff()
plt.subplot(r,c,1)
plt.plot(T, S, 'r')
plt.title("Original signal")
plt.xlabel('Time [s]')
plt.ylabel('Amplitude')
#~ plt.subplot(r,c,2)
#~ plt.plot([EXT[i] for i in range(imfNo)], 'o')
#~ plt.ylim(0, max([EXT[i] for i in range(imfNo)])+1)
#~ plt.title("Number of extrema")
#~
#~ plt.subplot(r,c,3)
#~ plt.plot([ITER[i] for i in range(imfNo)], 'o')
#~ plt.ylim(0, max([ITER[i] for i in range(imfNo)])+1)
#~ plt.title("Number of iterations")
for num in range(imfNo):
#~ plt.subplot(r,c,num+4)
plt.subplot(r,c,num+2)
plt.plot(T, IMF[num],'g')
plt.xlabel('Time')
plt.ylabel('Amplitude')
if num == imfNo-1:
plt.title('Residue')
else:
plt.title("Imf "+str(num))
plt.tight_layout()
plt.show()