Permalink
Browse files

Refactored EMD, 99% PEP8, Full Docstring and comments, tidy and as co…

…ncise as possible
  • Loading branch information...
1 parent dd4f0b3 commit 7b8ab719203b1540cf231765369e71650797ea27 @Cadair Cadair committed Apr 19, 2012
Showing with 165 additions and 205 deletions.
  1. +165 −205 EMD.py
View
@@ -1,231 +1,191 @@
+"""
+To do:
+ Tests / Examples (same in some literature)
+ Instanous freequncuies
+ Hilbert-Huang Transform
+"""
import numpy as np
from scipy import interpolate
-import matplotlib.pyplot as plt
__all__ = 'EMD'
-def EMD(data, inter = 'Default'):
+def emd(data, extrapolation='mirror', nimfs=12, shifting_distance=0.2):
"""
- OMG, HERE IS SOME EMD ON EMD AWESOME SAUCE ACTION GOING ON!
+ Perform a Empirical Mode Decomposition on a data set.
+
+ This function will return an array of all the Imperical Mode Functions as
+ defined in [1]_, which can be used for further Hilbert Spectral Analysis.
+
+ The EMD uses a spline interpolation function to approcimate the upper and
+ lower envelopes of the signal, this routine implements a extrapolation
+ routine as described in [2]_ as well as the standard spline routine.
+ The extrapolation method removes the artifacts introduced by the spline fit
+ at the ends of the data set, by making the dataset a continuious circle.
+
+ Parameters
+ ----------
+ data : array_like
+ Signal Data
+ extrapolation : str, optional
+ Sets the extrapolation method for edge effects.
+ Options: None
+ 'mirror'
+ Default: 'mirror'
+ nimfs : int, optional
+ Sets the maximum number of IMFs to be found
+ Default : 12
+ shifiting_distance : float, optional
+ Sets the minimum variance between IMF iterations.
+ Default : 0.2
+
+ Returns
+ -------
+ IMFs : ndarray
+ An array of shape (len(data),N) where N is the number of found IMFs
+
+ Notes
+ -----
+
+ References
+ ----------
+ .. [1] Huang H. et al. 1998 'The empirical mode decomposition and the Hilbert spectrum for nonlinear and non-stationary time series analysis.'
+ Procedings of the Royal Society 454, 903-995
+
+ .. [2] Zhao J., Huang D. 2001 'Mirror extending and circular spline function for empirical mode decomposition method'
+ Journal of Zhejiang University (Science) V.2, No.3,P247-252
+
+ .. [3] Rato R.T., Ortigueira M.D., Batista A.G 2008 'On the HHT, its problems, and some solutions.'
+ Mechanical Systems and Signal Processing 22 1374-1394
+
+
"""
-
- if inter == 'Default' or inter == 'extrap':
+
+ #Set up signals array and IMFs array based on type of extrapolation
+ # No extrapolation and 'extend' use signals array which is len(data)
+ # Mirror extrapolation (Zhao 2001) uses a signal array len(2*data)
+ if not(extrapolation):
base = len(data)
- nimfs = range(12) # Max number of IMFs
- IMFs = np.zeros([base,len(nimfs)])
+ signals = np.zeros([base, 2])
+ nimfs = range(nimfs) # Max number of IMFs
+ IMFs = np.zeros([base, len(nimfs)])
ncomp = 0
- residual = 0
- signals = np.zeros([base,2])
- signals[:,0] = data
- elif inter == 'Mirror':
+ residual = data
+ signals[:, 0] = data
+ #DON'T do spline fitting with periodic bounds
+ inter_per = 0
+
+ elif extrapolation == 'mirror':
+ #Set up base
base = len(data)
- nimfs = range(12) # Max number of IMFs
- IMFs = np.zeros([base,len(nimfs)])
+ nimfs = range(nimfs) # Max number of IMFs
+ IMFs = np.zeros([base, len(nimfs)])
ncomp = 0
- residual = 0
- signals = np.zeros([base*3,2])
- signals[0:base,0] = data[::-1]
- signals[base:base*2,0] = data
- signals[base*2:base*3,0] = data[::-1]
- base = len(signals)
- data_length = len(data)
+ residual = data
+ #Signals is 2*base
+ signals = np.zeros([base*2, 2])
+ #Mirror Dataset
+ signals[0:base / 2, 0] = data[::-1][base / 2:]
+ signals[base / 2:base + base / 2, 0] = data
+ signals[base + base / 2:base * 2, 0] = data[::-1][0:base / 2]
+ # Redfine base as len(signals) for IMFs
+ base = len(signals)
+ data_length = len(data) # Data length is used in recovering input data
+ #DO spline fitting with periodic bounds
+ inter_per = 1
+
else:
- raise Exception("Bite me bitch")
+ raise Exception(
+ "Please Specifiy extrapolation keyword as None or 'mirror'")
for j in nimfs:
-
+# Extract at most nimfs IMFs no more IMFs to be found when Finish is True
k = 0
sd = 1.
finish = False
- while sd > 0.2 and not(finish):
+ while sd > shifting_distance and not(finish):
+ min_env = np.zeros(base)
+ max_env = min_env.copy()
+
+ min_env = np.logical_and(
+ np.r_[True, signals[1:,0] > signals[:-1,0]],
+ np.r_[signals[:-1,0] > signals[1:,0], True])
+ max_env = np.logical_and(
+ np.r_[True, signals[1:,0] < signals[:-1,0]],
+ np.r_[signals[:-1,0] < signals[1:,0], True])
+ max_env[0] = max_env[-1] = False
+ min_env = min_env.nonzero()[0]
+ max_env = max_env.nonzero()[0]
+
+ #Cubic Spline by default
+ order_max = 3
+ order_min = 3
+
+ if len(min_env) < 2 or len(max_env) < 2:
+ #If this IMF has become a straight line
+ finish = True
+ else:
+ if len(min_env) < 4:
+ order_min = 1 #Do linear interpolation if not enough points
- x = np.zeros(base)
- y = x.copy()
+ if len(max_env) < 4:
+ order_max = 1 #Do linear interpolation if not enough points
-
- for i in range(1,len(signals[:,0])-1):
- if (signals[i,0] > signals[i-1,0] and signals[i,0] >= signals[i+1,0]) or (signals[i,0] >= signals[i-1,0] and signals[i,0] > signals[i+1,0]):
- x[i]=1
- if signals[i,0] < signals[i-1,0] and signals[i,0] <= signals[i+1,0] or (signals[i,0] <= signals[i-1,0] and signals[i,0] < signals[i+1,0]):
- y[i]=1
- xc = x.nonzero()[0]
- yc = y.nonzero()[0]
+#==============================================================================
+# Mirror Method requires per flag = 1 No extrapolation requires per flag = 0
+# This is set in intial setup at top of function.
+#==============================================================================
+ t = interpolate.splrep(min_env, signals[min_env,0],
+ k=order_min, per=inter_per)
+ top = interpolate.splev(
+ np.arange(len(signals[:,0])), t)
- if len(xc) < 2 or len(yc) < 2:
- print "escape"
- finish = True
- else:
-
- if len(xc) < 4:
- t = interpolate.splrep(xc,signals[xc,0], k=1)
- top = interpolate.splev(np.arange(len(signals[:,0])),t)
- else:
- if inter == 'extrap':
- x0 = x[:xc[0]+1][::-1]
- x1 = x[xc[-1]:][::-1]
- xn = np.hstack((x0,x,x1)).nonzero()[0]
- sx = np.hstack((signals[:xc[0]+1,0][::-1],signals[:,0],signals[xc[-1]:,0][::-1]))
- t = interpolate.splrep(xn,sx[xn])
- top = interpolate.splev(np.arange(xc[0]+1,len(signals[:,0])+xc[0]+1),t)
- else:
- t = interpolate.splrep(xc,signals[xc,0])
- top = interpolate.splev(np.arange(len(signals[:,0])),t)
-
- if len(yc) < 4:
- b = interpolate.splrep(yc,signals[yc,0], k=1)
- bot = interpolate.splev(np.arange(len(signals[:,0])),b)
- else:
- if inter == 'extrap':
- y0 = y[:yc[0]+1][::-1]
- y1 = y[yc[-1]:][::-1]
- yn = np.hstack((y0,y,y1)).nonzero()[0]
- sy = np.hstack((signals[:yc[0]+1,0][::-1],signals[:,0],signals[yc[-1]:,0][::-1]))
- b = interpolate.splrep(yn,sy[yn])
- bot = interpolate.splev(np.arange(yc[0]+1,len(signals[:,0])+yc[0]+1),b)
- else:
- b = interpolate.splrep(yc,signals[yc,0])
- bot = interpolate.splev(np.arange(len(signals[:,0])),b)
-
-
-# plt.plot(signals[: ,0])
-# plt.plot(top,'bo')
-# plt.plot(bot,'ro')
-# plt.show()
+ b = interpolate.splrep(max_env, signals[max_env,0],
+ k=order_max, per=inter_per)
+ bot = interpolate.splev(
+ np.arange(len(signals[:,0])), b)
+
+ #Calculate the Mean and remove from the data set.
+ mean = (top + bot)/2
+ signals[:,1] = signals[:,0] - mean
+
+ #Calculate the shifting distance which is a measure of
+ #simulartity to previous IMF
+ if k > 0:
+ sd = (np.sum((np.abs(signals[:,0] - signals[:,1])**2))
+ / (np.sum(signals[:,0]**2)))
+
+ #Set new iteration as previous and loop
+ signals = signals[:,::-1]
+ k += 1
- mean = (top + bot)/2
- signals[:,1] = signals[:,0] - mean
+ if finish:
+ #If IMF is a straight line we are done here.
+ IMFs[:,j]= residual
+ ncomp += 1
+ break
+
+ if not(extrapolation):
+ IMFs[:,j] = signals[:,0]
+ residual = residual - IMFs[:,j]#For j==0 residual is initially data
+ signals[:,0] = residual
+ ncomp += 1
- if k > 0:
- sd = np.sum((np.abs(signals[:,0] - signals[:,1])**2))/(np.sum(signals[:,0]**2))
+ elif extrapolation == 'mirror':
+ IMFs[:,j] = signals[data_length / 2:data_length
+ + data_length / 2,0]
+ residual = residual - IMFs[:,j]#For j==0 residual is initially data
+
+ #Mirror case requires IMF subtraction from data range then
+ # re-mirroring for each IMF
+ signals[0:data_length / 2,0] = residual[::-1][data_length / 2:]
+ signals[data_length / 2:data_length + data_length / 2,0] = residual
+ signals[data_length
+ + data_length / 2:,0] = residual[::-1][0:data_length / 2]
+ ncomp += 1
- signals = signals[:,::-1]
- k += 1
-
- if inter == 'Default' or inter == 'extrap':
- if finish:
- IMFs[:,j]= residual
- ncomp += 1
- break
- elif j == 0:
- IMFs[:,j] = signals[:,0]
- residual = data - IMFs[:,j]
- signals[:,0] = residual
- ncomp = 1
- else:
- IMFs[:,j] = signals[:,0]
- residual = residual - IMFs[:,j]
- signals[:,0] = residual
- ncomp += 1
- elif inter == 'Mirror':
- if finish:
- IMFs[:,j]= residual
- ncomp += 1
- break
- elif j == 0:
- IMFs[:,j] = signals[data_length:data_length*2,0]
- residual = data - IMFs[:,j]
- signals[0:data_length,0] = residual[::-1]
- signals[data_length:data_length*2,0] = residual
- signals[data_length*2:data_length*3,0] = residual[::-1]
- ncomp = 1
- else:
- IMFs[:,j] = signals[data_length:data_length*2,0]
- residual = residual - IMFs[:,j]
- signals[0:data_length,0] = residual[::-1]
- signals[data_length:data_length*2,0] = residual
- signals[data_length*2:data_length*3,0] = residual[::-1]
- ncomp += 1
else:
- raise Exception("Bite me bitch")
+ raise Exception(
+ "Please Specifiy extrapolation keyword as None or 'mirror'")
- return IMFs[:,0:ncomp]
-
-if __name__ == "__main__":
- import matplotlib.ticker as tick
-
- # Signal creation
-# base = np.linspace(0,250,500)
-# a = 100 * np.sin(base)
-# b = 50 * np.cos(base/5)
-# c = 100 * np.sin(base/10)
-# d = 50 * np.cos(base)
-# data = 2*base + a + b + c + d #+ 50*np.random.random(base)
-
- # Real data
- int_data = np.load('/home/stuart/Documents/Ellerman_data/IBIS/Int_red130.npy')
- area_data = np.load('/home/stuart/Documents/Ellerman_data/IBIS/Area_red130.npy')
-# int_data = np.load('/home/nabobalis/Dropbox/Int_red130.npy')
-# area_data = np.load('/home/nabobalis/Dropbox/Area_red130.npy')
- ind = 3801
- fin = np.isfinite(int_data[ind,:])
- int_data = int_data[ind,:][fin]
- area_data = area_data[ind,:][fin]
- time = (np.arange(0,len(int_data))*26.9)/60.
-
- # Calls our EMD!
- imfs_area = EMD(area_data,inter = 'Mirror')
- imfs_int = EMD(int_data,inter = 'Mirror')
- print "-------------------------------------"
- print "Area:",imfs_area.shape
- print "Intensity:",imfs_int.shape
- print "-------------------------------------"
-
- # Sexy image plotting
- plt.figure()
- ax1 = plt.subplot(111)
- ax1.yaxis.set_major_locator(tick.MaxNLocator(7,symmetric=True))
- ax1.plot(time,imfs_area[:,0],'x--',color='k',label="Area")
- ax1.plot(time,imfs_area[:,2:],'x--',color='k',label="Area")
- pltA, = ax1.plot(time,imfs_area[:,1],'o-',color='r',label="Area")
-# pltDA, = ax1.plot(time,area_data-imfs_area[:,-1],'o--',color='r')
-# ax2 = ax1.twinx()
-# ax2= plt.subplot(111)
-# ax2.yaxis.set_major_locator(tick.MaxNLocator(7,symmetric=True))
-# ax2.plot(time,imfs_int[:,0],'x--',color='k',label="Intensity")
-# ax2.plot(time,imfs_int[:,2:],'x--',color='k',label="Intensity")
-# pltI, = ax2.plot(time,imfs_int[:,1],'o-',color='b',label="Intensity")
-# pltDI, = ax2.plot(time,int_data-imfs_int[:,-1],'o--',color='b')
-# plt.title("Detrended Data EB $%i$"%ind)
- ax1.set_ylabel("Area (pixels)")
-# ax2.set_ylabel("Intensity (A.U.)")
- ax1.set_xlabel("Time [Minutes]")
-# leg = ax2.legend([pltDA,pltDI],["Area","Intensity"],loc='best', fancybox=True)
-# leg.get_frame().set_alpha(0.5)
-# plt.show()
-
- plt.figure()
- n= len(time)
- #Create blank array
- fft_a = np.zeros([n],dtype=np.complex64)
- result_a = np.fft.fft(imfs_area[:,1])
- xa = np.linspace(0, len(result_a)/2., n/2)
- xa /= n * 26.9
- xa = 1./xa
- xa /= 60.
- # Swap halfs
- fft_a[0:n/2] = result_a[n/2:n]
- fft_a[n/2:n] = result_a[0:n/2]
- fft_a = np.absolute(fft_a[n/2:])
-
- fft_i = np.zeros([n],dtype=np.complex64)
- result_i = np.fft.fft(imfs_int[:,1])
- xi = np.linspace(0, len(result_i)/2., n/2)
- xi /= n * 26.9
- xi = 1./xi
- xi /= 60.
- # Swap halfs
- fft_i[0:n/2] = result_i[n/2:n]
- fft_i[n/2:n] = result_i[0:n/2]
- fft_i = np.absolute(fft_i[n/2:])
- ax1 = plt.subplot(111)
-# plt.tight_layout(pad=5)
- pltA, = plt.plot(xa,fft_a**2,'ro-',label="Area")
- ax2 = ax1.twinx()
- pltI, = plt.plot(xi,fft_i**2,'bo-',label="Intensity")
- ax1.set_xlabel("Period [Minutes]")
- ax1.set_ylabel("Power [A.U.]")
- ax2.set_ylabel("Power [A.U.]")
- leg = ax2.legend([pltA,pltI],["Area","Intensity"],loc='best', fancybox=True)
- leg.get_frame().set_alpha(0.5)
- plt.show()
+ return IMFs[:,0:ncomp]

0 comments on commit 7b8ab71

Please sign in to comment.