diff --git a/THANKS.txt b/THANKS.txt index 974247e85ea5..ca81fab713e0 100644 --- a/THANKS.txt +++ b/THANKS.txt @@ -95,7 +95,7 @@ Fazlul Shahriar for fixes to the NetCDF3 I/O. Chris Jordan-Squire for bug fixes, documentation improvements and scipy.special.logit & expit. Christoph Gohlke for many bug fixes and help with Windows specific issues. - +Jacob Silterra for cwt-based peak finding in scipy.signal. Institutions ------------ diff --git a/scipy/signal/__init__.py b/scipy/signal/__init__.py index 24ab2c8b554f..120ceb4b58a5 100644 --- a/scipy/signal/__init__.py +++ b/scipy/signal/__init__.py @@ -187,6 +187,19 @@ daub -- return low-pass morlet -- Complex Morlet wavelet. qmf -- return quadrature mirror filter from low-pass + ricker -- return ricker wavelet + cwt -- perform continuous wavelet transform + +Peak finding +============ + +.. autosummary:: + :toctree: generated/ + + find_peaks_cwt -- Attempt to find the peaks in the given 1-D array + argrelmin -- Calculate the relative minima of data + argrelmax -- Calculate the relative maxima of data + argrelextrema -- Calculate the relative extrema of data """ @@ -207,6 +220,7 @@ from signaltools import * from spectral import * from wavelets import * +from _peak_finding import * __all__ = filter(lambda s: not s.startswith('_'), dir()) from numpy.testing import Tester diff --git a/scipy/signal/_peak_finding.py b/scipy/signal/_peak_finding.py new file mode 100644 index 000000000000..1918ad0c1c24 --- /dev/null +++ b/scipy/signal/_peak_finding.py @@ -0,0 +1,367 @@ +""" +Functions for identifying peaks in signals. +""" + +import numpy as np + +from scipy.signal.wavelets import cwt, ricker +from scipy.stats import scoreatpercentile + + +def _boolrelextrema(data, comparator, + axis=0, order=1, mode='clip'): + """ + Calculate the relative extrema of `data`. + + Relative extrema are calculated by finding locations where + comparator(data[n],data[n+1:n+order+1]) = True. + + Parameters + ---------- + data: ndarray + comparator: function + function to use to compare two data points. + Should take 2 numbers as arguments + axis: int, optional + axis over which to select from `data` + order: int, optional + How many points on each side to require + a `comparator`(n,n+x) = True. + mode: string, optional + How the edges of the vector are treated. + 'wrap' (wrap around) or 'clip' (treat overflow + as the same as the last (or first) element). + Default 'clip'. See numpy.take + + Returns + ------- + extrema: ndarray + Indices of the extrema, as boolean array + of same shape as data. True for an extrema, + False else. + + See also + -------- + argrelmax,argrelmin + + Examples + -------- + >>> testdata = np.array([1,2,3,2,1]) + >>> argrelextrema(testdata, np.greater, axis=0) + array([False, False, True, False, False], dtype=bool) + """ + + if((int(order) != order) or (order < 1)): + raise ValueError('Order must be an int >= 1') + + datalen = data.shape[axis] + locs = np.arange(0, datalen) + + results = np.ones(data.shape, dtype=bool) + main = data.take(locs, axis=axis, mode=mode) + for shift in xrange(1, order + 1): + plus = data.take(locs + shift, axis=axis, mode=mode) + minus = data.take(locs - shift, axis=axis, mode=mode) + results &= comparator(main, plus) + results &= comparator(main, minus) + if(~results.any()): + return results + return results + + +def argrelmin(data, axis=0, order=1, mode='clip'): + """ + Calculate the relative minima of `data`. + + See also + -------- + argrelextrema,argrelmax + """ + return argrelextrema(data, np.less, axis, order, mode) + + +def argrelmax(data, axis=0, order=1, mode='clip'): + """ + Calculate the relative maxima of `data`. + + See also + -------- + argrelextrema,argrelmin + """ + return argrelextrema(data, np.greater, axis, order, mode) + + +def argrelextrema(data, comparator, + axis=0, order=1, mode='clip'): + """ + Calculate the relative extrema of `data` + + Returns + ------- + extrema: ndarray + Indices of the extrema, as an array + of integers (same format as argmin, argmax + + See also + -------- + argrelmin, argrelmax + + """ + results = _boolrelextrema(data, comparator, + axis, order, mode) + if ~results.any(): + return (np.array([]),) * 2 + else: + return np.where(results) + + +def _identify_ridge_lines(matr, max_distances, gap_thresh): + """ + Identify ridges in the 2D matrix. Expect that the width of + the wavelet feature increases with increasing row number. + + Parameters + ---------- + matr: 2-D ndarray + Matrix in which to identify ridge lines. + max_distances: 1-D sequence + At each row, a ridge line is only connected + if the relative max at row[n] is within + `max_distances`[n] from the relative max at row[n+1]. + gap_thresh: int + If a relative maximum is not found within `max_distances`, + there will be a gap. A ridge line is discontinued if + there are more than `gap_thresh` points without connecting + a new relative maximum. + + Returns + ------- + ridge_lines: tuple + tuple of 2 1-D sequences. `ridge_lines`[ii][0] are the rows of the ii-th + ridge-line, `ridge_lines`[ii][1] are the columns. Empty if none found. + Each ridge-line will be sorted by row (increasing), but the order + of the ridge lines is not specified + + References + ---------- + Bioinformatics (2006) 22 (17): 2059-2065. + doi: 10.1093/bioinformatics/btl355 + http://bioinformatics.oxfordjournals.org/content/22/17/2059.long + + Examples + -------- + >>> data = np.random.rand(5,5) + >>> ridge_lines = identify_ridge_lines(data, 1, 1) + + Notes: + ------ + This function is intended to be used in conjuction with `cwt` + as part of find_peaks_cwt. + """ + + if(len(max_distances) < matr.shape[0]): + raise ValueError('Max_distances must have at least as many rows as matr') + + all_max_cols = _boolrelextrema(matr, np.greater, axis=1, order=1) + #Highest row for which there are any relative maxima + has_relmax = np.where(all_max_cols.any(axis=1))[0] + if(len(has_relmax) == 0): + return [] + start_row = has_relmax[-1] + #Each ridge line is a 3-tuple: + #rows, cols,Gap number + ridge_lines = [[[start_row], + [col], + 0] for col in np.where(all_max_cols[start_row])[0]] + final_lines = [] + rows = np.arange(start_row - 1, -1, -1) + cols = np.arange(0, matr.shape[1]) + for row in rows: + this_max_cols = cols[all_max_cols[row]] + + #Increment gap number of each line, + #set it to zero later if appropriate + for line in ridge_lines: + line[2] += 1 + + #XXX These should always be all_max_cols[row] + #But the order might be different. Might be an efficiency gain + #to make sure the order is the same and avoid this iteration + prev_ridge_cols = np.array([line[1][-1] for line in ridge_lines]) + #Look through every relative maximum found at current row + #Attempt to connect them with existing ridge lines. + new_lines = [] + for ind, col in enumerate(this_max_cols): + """ + If there is a previous ridge line within + the max_distance to connect to, do so. + Otherwise start a new one. + """ + line = None + if(len(prev_ridge_cols) > 0): + diffs = np.abs(col - prev_ridge_cols) + closest = np.argmin(diffs) + if diffs[closest] <= max_distances[row]: + line = ridge_lines[closest] + if(line is not None): + #Found a point close enough, extend current ridge line + line[1].append(col) + line[0].append(row) + line[2] = 0 + else: + new_line = [[row], + [col], + 0] + ridge_lines.append(new_line) + + #Remove the ridge lines with gap_number too high + #XXX Modifying a list while iterating over it. + #Should be safe, since we iterate backwards, but + #still tacky. + for ind in xrange(len(ridge_lines) - 1, -1, -1): + line = ridge_lines[ind] + if line[2] > gap_thresh: + final_lines.append(line) + del ridge_lines[ind] + + out_lines = [] + for line in (final_lines + ridge_lines): + sortargs = np.array(np.argsort(line[0])) + rows, cols = np.zeros_like(sortargs), np.zeros_like(sortargs) + rows[sortargs] = line[0] + cols[sortargs] = line[1] + out_lines.append([rows, cols]) + return out_lines + + +def _filter_ridge_lines(cwt, ridge_lines, window_size=None, min_length=None, + min_snr=1, noise_perc=10): + """ + Filter ridge lines according to prescribed criteria. Intended + to be used for finding relative maxima. + + Parameters + ------------- + cwt : 2-D ndarray + Continuous wavelet transform from which + the ridge_lines were defined + ridge_lines: 1-D sequence + Each element should contain 2 sequences, the rows and columns + of the ridge line (respectively) + window_size: int, optional + Size of window to use to calculate noise floor. + Default is `cwt`.shape[1]/20 + min_length: int, optional + Minimum length a ridge line needs to be acceptable. + Default is `cwt`.shape[0]/4, ie 1/4th the number of widths. + min_snr: float, optional + Minimum SNR ratio. Default 0. The signal is the value of + the cwt matrix at the shortest length scale (`cwt`[0,loc]), the noise is + the `noise_perc`th percentile of datapoints contained within + a window of `window_size` around `cwt`[0,loc] + noise_perc: float,optional + When calculating the noise floor, percentile of data points + examined below which to consider noise. Calculated using + scipy.stats.scoreatpercentile. + + References + ---------- + Bioinformatics (2006) 22 (17): 2059-2065. doi: 10.1093/bioinformatics/btl355 + http://bioinformatics.oxfordjournals.org/content/22/17/2059.long + + """ + num_points = cwt.shape[1] + if min_length is None: + min_length = np.ceil(cwt.shape[0] / 4) + if window_size is None: + window_size = np.ceil(num_points / 20) + hf_window = window_size / 2 + + #Filter based on SNR + row_one = cwt[0, :] + noises = np.zeros_like(row_one) + for ind, val in enumerate(row_one): + window = np.arange(max([ind - hf_window, 0]), min([ind + hf_window, num_points])) + window = window.astype(int) + noises[ind] = scoreatpercentile(row_one[window], per=noise_perc) + + def filt_func(line): + if len(line[0]) < min_length: + return False + snr = abs(cwt[line[0][0], line[1][0]] / noises[line[1][0]]) + if snr < min_snr: + return False + return True + + return filter(filt_func, ridge_lines) + + +def find_peaks_cwt(vector, widths, wavelet=None, max_distances=None, gap_thresh=None, + min_length=None, min_snr=1, noise_perc=10): + """ + Attempt to find the peaks in the given 1-D array `vector`. + + The general approach is to smooth `vector` by convolving it with `wavelet(width)` + for each width in `widths`. Relative maxima which appear at enough length scales, + and with sufficiently high SNR, are accepted. + + Parameters + ---------- + vector: 1-D ndarray + widths: 1-D sequence + Widths to use for calculating the CWT matrix. In general, + this range should cover the expected width of peaks of interest. + wavelet: function + Should take a single variable and return a 1d array to convolve + with `vector`. Should be normalized to unit area. Default + is the ricker wavelet + max_distances: 1-D ndarray,optional + Default `widths`/4. See identify_ridge_lines + gap_thresh: float, optional + Default 2. See identify_ridge_lines + min_length: int, optional + Default None. See filter_ridge_lines + min_snr: float, optional + Default 1. See filter_ridge_lines + noise_perc: float, optional + Default 10. See filter_ridge_lines + + Notes + --------- + This approach was designed for finding sharp peaks among noisy data, however + with proper parameter selection it should function well for different + peak shapes. + The algorithm is as follows: + 1. Perform a continuous wavelet transform on `vector`, for the supplied + `widths`. This is a convolution of `vector` with `wavelet(width)` for + each width in `widths`. See `cwt` + 2. Identify "ridge lines" in the cwt matrix. These are relative maxima + at each row, connected across adjacent rows. See identify_ridge_lines + 3. Filter the ridge_lines using filter_ridge_lines. + + References + ---------- + Bioinformatics (2006) 22 (17): 2059-2065. doi: 10.1093/bioinformatics/btl355 + http://bioinformatics.oxfordjournals.org/content/22/17/2059.long + + Examples + -------- + >>> xs = np.arange(0, np.pi, 0.05) + >>> data = np.sin(xs) + >>> peakind = find_peaks_cwt(data, np.arange(1,10)) + >>> peakind, xs[peakind],data[peakind] + ([32], array([ 1.6]), array([ 0.9995736])) + """ + if gap_thresh is None: + gap_thresh = np.ceil(widths[0]) + if max_distances is None: + max_distances = widths / 4.0 + if wavelet is None: + wavelet = ricker + + cwt_dat = cwt(vector, wavelet, widths) + ridge_lines = _identify_ridge_lines(cwt_dat, max_distances, gap_thresh) + filtered = _filter_ridge_lines(cwt_dat, ridge_lines, min_length=min_length, + min_snr=min_snr, noise_perc=noise_perc) + max_locs = map(lambda x: x[1][0], filtered) + return sorted(max_locs) diff --git a/scipy/signal/tests/test_peak_finding.py b/scipy/signal/tests/test_peak_finding.py new file mode 100644 index 000000000000..7fa0c671c322 --- /dev/null +++ b/scipy/signal/tests/test_peak_finding.py @@ -0,0 +1,244 @@ +import copy + +import numpy as np +from numpy.testing import TestCase, run_module_suite, assert_equal, \ + assert_almost_equal, assert_array_equal, assert_array_almost_equal, \ + assert_raises, assert_ +from scipy.signal._peak_finding import argrelmax, find_peaks_cwt, _identify_ridge_lines + + +def _gen_gaussians(center_locs, sigmas, total_length): + num_peaks = len(sigmas) + xdata = np.arange(0, total_length).astype(float) + out_data = np.zeros(total_length, dtype=float) + for ind, sigma in enumerate(sigmas): + tmp = (xdata - center_locs[ind]) / sigma + out_data += np.exp(-(tmp**2)) + return out_data + + +def _gen_gaussians_even(sigmas, total_length): + num_peaks = len(sigmas) + delta = total_length / (num_peaks + 1) + center_locs = np.linspace(delta, total_length - delta, num=num_peaks).astype(int) + out_data = _gen_gaussians(center_locs, sigmas, total_length) + return out_data, center_locs + + +def _gen_ridge_line(start_locs, max_locs, length, distances, gaps): + """ + Generate coordinates for a ridge line. + + Will be a series of coordinates, starting a start_loc (length 2). + The maximum distance between any adjacent columns will be + `max_distance`, the max distance between adjacent rows + will be `map_gap'. + + `max_locs` should be the size of the intended matrix. The + ending coordinates are guaranteed to be less than `max_locs`, + although they may not approach `max_locs` at all. + """ + + def keep_bounds(num, max_val): + out = max(num, 0) + out = min(out, max_val) + return out + + gaps = copy.deepcopy(gaps) + distances = copy.deepcopy(distances) + + locs = np.zeros([length, 2], dtype=int) + locs[0, :] = start_locs + total_length = max_locs[0] - start_locs[0] - sum(gaps) + if total_length < length: + raise ValueError('Cannot generate ridge line according to constraints') + dist_int = length / len(distances) - 1 + gap_int = length / len(gaps) - 1 + for ind in xrange(1, length): + nextcol = locs[ind - 1, 1] + nextrow = locs[ind - 1, 0] + 1 + if (ind % dist_int == 0) and (len(distances) > 0): + nextcol += ((-1)**ind)*distances.pop() + if (ind % gap_int == 0) and (len(gaps) > 0): + nextrow += gaps.pop() + nextrow = keep_bounds(nextrow, max_locs[0]) + nextcol = keep_bounds(nextcol, max_locs[1]) + locs[ind, :] = [nextrow, nextcol] + + return [locs[:, 0], locs[:, 1]] + + +class TestRidgeLines(TestCase): + + def test_empty(self): + test_matr = np.zeros([20, 100]) + lines = _identify_ridge_lines(test_matr, 2*np.ones(20), 1) + assert_(len(lines) == 0) + + def test_minimal(self): + test_matr = np.zeros([20, 100]) + test_matr[0, 10] = 1 + lines = _identify_ridge_lines(test_matr, 2*np.ones(20), 1) + assert_(len(lines) == 1) + + test_matr = np.zeros([20, 100]) + test_matr[0:2, 10] = 1 + lines = _identify_ridge_lines(test_matr, 2*np.ones(20), 1) + assert_(len(lines) == 1) + + def test_single_pass(self): + distances = [0, 1, 2, 5] + gaps = [0, 1, 2, 0, 1] + test_matr = np.zeros([20, 50]) + 1e-12 + length = 12 + line = _gen_ridge_line([0, 25], test_matr.shape, length, distances, gaps) + test_matr[line[0], line[1]] = 1 + max_distances = max(distances)*np.ones(20) + identified_lines = _identify_ridge_lines(test_matr, max_distances, max(gaps) + 1) + assert_array_equal(identified_lines, [line]) + + def test_single_bigdist(self): + distances = [0, 1, 2, 5] + gaps = [0, 1, 2, 4] + test_matr = np.zeros([20, 50]) + length = 12 + line = _gen_ridge_line([0, 25], test_matr.shape, length, distances, gaps) + test_matr[line[0], line[1]] = 1 + max_dist = 3 + max_distances = max_dist*np.ones(20) + #This should get 2 lines, since the distance is too large + identified_lines = _identify_ridge_lines(test_matr, max_distances, max(gaps) + 1) + assert_(len(identified_lines) == 2) + + for iline in identified_lines: + adists = np.diff(iline[1]) + np.testing.assert_array_less(np.abs(adists), max_dist) + + agaps = np.diff(iline[0]) + np.testing.assert_array_less(np.abs(agaps), max(gaps) + 0.1) + + def test_single_biggap(self): + distances = [0, 1, 2, 5] + max_gap = 3 + gaps = [0, 4, 2, 1] + test_matr = np.zeros([20, 50]) + length = 12 + line = _gen_ridge_line([0, 25], test_matr.shape, length, distances, gaps) + test_matr[line[0], line[1]] = 1 + max_dist = 6 + max_distances = max_dist*np.ones(20) + #This should get 2 lines, since the gap is too large + identified_lines = _identify_ridge_lines(test_matr, max_distances, max_gap) + assert_(len(identified_lines) == 2) + + for iline in identified_lines: + adists = np.diff(iline[1]) + np.testing.assert_array_less(np.abs(adists), max_dist) + + agaps = np.diff(iline[0]) + np.testing.assert_array_less(np.abs(agaps), max(gaps) + 0.1) + + def test_single_biggaps(self): + distances = [0] + max_gap = 1 + gaps = [3, 6] + test_matr = np.zeros([50, 50]) + length = 30 + line = _gen_ridge_line([0, 25], test_matr.shape, length, distances, gaps) + test_matr[line[0], line[1]] = 1 + max_dist = 1 + max_distances = max_dist*np.ones(50) + #This should get 3 lines, since the gaps are too large + identified_lines = _identify_ridge_lines(test_matr, max_distances, max_gap) + assert_(len(identified_lines) == 3) + + for iline in identified_lines: + adists = np.diff(iline[1]) + np.testing.assert_array_less(np.abs(adists), max_dist) + + agaps = np.diff(iline[0]) + np.testing.assert_array_less(np.abs(agaps), max(gaps) + 0.1) + + +class TestArgrelmax(TestCase): + + def test_highorder(self, order=2): + sigmas = [1.0, 2.0, 10.0, 5.0, 15.0] + test_data, act_locs = _gen_gaussians_even(sigmas, 500) + test_data[act_locs + order] = test_data[act_locs]*0.99999 + test_data[act_locs - order] = test_data[act_locs]*0.99999 + rel_max_locs = argrelmax(test_data, order=2, mode='clip')[0] + #rel_max_locs = np.where(rel_max_matr)[0] + + assert_(len(rel_max_locs) == len(act_locs)) + assert_((rel_max_locs == act_locs).all()) + + def test_2d_gaussians(self): + sigmas = [1.0, 2.0, 10.0] + test_data, act_locs = _gen_gaussians_even(sigmas, 100) + rot_factor = 20 + rot_range = np.arange(0, len(test_data)) - rot_factor + test_data_2 = np.vstack([test_data, test_data[rot_range]]) + #rel_max_matr = argrelmax(test_data_2, axis=1, order=1) + #rel_max_rows, rel_max_cols = np.where(rel_max_matr) + rel_max_rows, rel_max_cols = argrelmax(test_data_2, axis=1, order=1) + + for rw in xrange(0, test_data_2.shape[0]): + inds = (rel_max_rows == rw) + + assert_(len(rel_max_cols[inds]) == len(act_locs)) + assert_((act_locs == (rel_max_cols[inds] - rot_factor*rw)).all()) + + +class TestFindPeaks(TestCase): + + def test_find_peaks_exact(self): + """ + Generate a series of gaussians and attempt to find the peak locations. + """ + sigmas = [5.0, 3.0, 10.0, 20.0, 10.0, 50.0] + num_points = 500 + test_data, act_locs = _gen_gaussians_even(sigmas, num_points) + widths = np.arange(0.1, max(sigmas)) + found_locs = find_peaks_cwt(test_data, widths, gap_thresh=2, min_snr=0, + min_length=None) + np.testing.assert_array_equal(found_locs, act_locs, + "Found maximum locations did not equal those expected") + + def test_find_peaks_withnoise(self): + """ + Verify that peak locations are (approximately) found + for a series of gaussians with added noise. + """ + sigmas = [5.0, 3.0, 10.0, 20.0, 10.0, 50.0] + num_points = 500 + test_data, act_locs = _gen_gaussians_even(sigmas, num_points) + widths = np.arange(0.1, max(sigmas)) + noise_amp = 0.07 + np.random.seed(18181911) + test_data += (np.random.rand(num_points) - 0.5)*(2*noise_amp) + found_locs = find_peaks_cwt(test_data, widths, min_length=15, + gap_thresh=1, min_snr=noise_amp / 5) + + np.testing.assert_equal(len(found_locs), len(act_locs), 'Different number' + + 'of peaks found than expected') + diffs = np.abs(found_locs - act_locs) + max_diffs = np.array(sigmas) / 5 + np.testing.assert_array_less(diffs, max_diffs, 'Maximum location differed' + + 'by more than %s' % (max_diffs)) + + def test_find_peaks_nopeak(self): + """ + Verify that no peak is found in + data that's just noise. + """ + noise_amp = 1.0 + num_points = 100 + np.random.seed(181819141) + test_data = (np.random.rand(num_points) - 0.5)*(2*noise_amp) + widths = np.arange(10, 50) + found_locs = find_peaks_cwt(test_data, widths, min_snr=5, noise_perc=30) + np.testing.assert_equal(len(found_locs), 0) + +if __name__ == "__main__": + run_module_suite() \ No newline at end of file diff --git a/scipy/signal/tests/test_wavelets.py b/scipy/signal/tests/test_wavelets.py index fbca1dc9e9c0..d77a4e679c19 100644 --- a/scipy/signal/tests/test_wavelets.py +++ b/scipy/signal/tests/test_wavelets.py @@ -7,74 +7,126 @@ class TestWavelets(TestCase): def test_qmf(self): - assert_array_equal(wavelets.qmf([1,1]),[1,-1]) + assert_array_equal(wavelets.qmf([1, 1]), [1, -1]) def test_daub(self): - for i in xrange(1,15): - assert_equal(len(wavelets.daub(i)),i*2) + for i in xrange(1, 15): + assert_equal(len(wavelets.daub(i)), i * 2) def test_cascade(self): - for J in xrange(1,7): - for i in xrange(1,5): + for J in xrange(1, 7): + for i in xrange(1, 5): lpcoef = wavelets.daub(i) k = len(lpcoef) - x,phi,psi = wavelets.cascade(lpcoef,J) + x, phi, psi = wavelets.cascade(lpcoef, J) assert_(len(x) == len(phi) == len(psi)) - assert_equal(len(x),(k-1)*2**J) + assert_equal(len(x), (k - 1) * 2 ** J) def test_morlet(self): - x = wavelets.morlet(50,4.1,complete=True) - y = wavelets.morlet(50,4.1,complete=False) + x = wavelets.morlet(50, 4.1, complete=True) + y = wavelets.morlet(50, 4.1, complete=False) # Test if complete and incomplete wavelet have same lengths: - assert_equal(len(x),len(y)) + assert_equal(len(x), len(y)) # Test if complete wavelet is less than incomplete wavelet: - assert_array_less(x,y) + assert_array_less(x, y) - x = wavelets.morlet(10,50,complete=False) - y = wavelets.morlet(10,50,complete=True) + x = wavelets.morlet(10, 50, complete=False) + y = wavelets.morlet(10, 50, complete=True) # For large widths complete and incomplete wavelets should be # identical within numerical precision: - assert_equal(x,y) + assert_equal(x, y) # miscellaneous tests: - x = np.array([1.73752399e-09 +9.84327394e-25j, - 6.49471756e-01 +0.00000000e+00j, - 1.73752399e-09 -9.84327394e-25j]) - y = wavelets.morlet(3,w=2,complete=True) - assert_array_almost_equal(x,y) - - x = np.array([2.00947715e-09 +9.84327394e-25j, - 7.51125544e-01 +0.00000000e+00j, - 2.00947715e-09 -9.84327394e-25j]) - y = wavelets.morlet(3,w=2,complete=False) - assert_array_almost_equal(x,y,decimal=2) - - x = wavelets.morlet(10000,s=4,complete=True) - y = wavelets.morlet(20000,s=8,complete=True)[5000:15000] - assert_array_almost_equal(x,y,decimal=2) - - x = wavelets.morlet(10000,s=4,complete=False) - assert_array_almost_equal(y,x,decimal=2) - y = wavelets.morlet(20000,s=8,complete=False)[5000:15000] - assert_array_almost_equal(x,y,decimal=2) - - x = wavelets.morlet(10000,w=3,s=5,complete=True) - y = wavelets.morlet(20000,w=3,s=10,complete=True)[5000:15000] - assert_array_almost_equal(x,y,decimal=2) - - x = wavelets.morlet(10000,w=3,s=5,complete=False) - assert_array_almost_equal(y,x,decimal=2) - y = wavelets.morlet(20000,w=3,s=10,complete=False)[5000:15000] - assert_array_almost_equal(x,y,decimal=2) - - x = wavelets.morlet(10000,w=7,s=10,complete=True) - y = wavelets.morlet(20000,w=7,s=20,complete=True)[5000:15000] - assert_array_almost_equal(x,y,decimal=2) - - x = wavelets.morlet(10000,w=7,s=10,complete=False) - assert_array_almost_equal(x,y,decimal=2) - y = wavelets.morlet(20000,w=7,s=20,complete=False)[5000:15000] - assert_array_almost_equal(x,y,decimal=2) + x = np.array([1.73752399e-09 + 9.84327394e-25j, + 6.49471756e-01 + 0.00000000e+00j, + 1.73752399e-09 - 9.84327394e-25j]) + y = wavelets.morlet(3, w=2, complete=True) + assert_array_almost_equal(x, y) + + x = np.array([2.00947715e-09 + 9.84327394e-25j, + 7.51125544e-01 + 0.00000000e+00j, + 2.00947715e-09 - 9.84327394e-25j]) + y = wavelets.morlet(3, w=2, complete=False) + assert_array_almost_equal(x, y, decimal=2) + + x = wavelets.morlet(10000, s=4, complete=True) + y = wavelets.morlet(20000, s=8, complete=True)[5000:15000] + assert_array_almost_equal(x, y, decimal=2) + + x = wavelets.morlet(10000, s=4, complete=False) + assert_array_almost_equal(y, x, decimal=2) + y = wavelets.morlet(20000, s=8, complete=False)[5000:15000] + assert_array_almost_equal(x, y, decimal=2) + + x = wavelets.morlet(10000, w=3, s=5, complete=True) + y = wavelets.morlet(20000, w=3, s=10, complete=True)[5000:15000] + assert_array_almost_equal(x, y, decimal=2) + + x = wavelets.morlet(10000, w=3, s=5, complete=False) + assert_array_almost_equal(y, x, decimal=2) + y = wavelets.morlet(20000, w=3, s=10, complete=False)[5000:15000] + assert_array_almost_equal(x, y, decimal=2) + + x = wavelets.morlet(10000, w=7, s=10, complete=True) + y = wavelets.morlet(20000, w=7, s=20, complete=True)[5000:15000] + assert_array_almost_equal(x, y, decimal=2) + + x = wavelets.morlet(10000, w=7, s=10, complete=False) + assert_array_almost_equal(x, y, decimal=2) + y = wavelets.morlet(20000, w=7, s=20, complete=False)[5000:15000] + assert_array_almost_equal(x, y, decimal=2) + + def test_ricker(self): + w = wavelets.ricker(1.0, 1) + expected = 2 / (np.sqrt(3 * 1.0) * (np.pi ** 0.25)) + assert_array_equal(w, expected) + + lengths = [5, 11, 15, 51, 101] + for length in lengths: + w = wavelets.ricker(length, 1.0) + assert_(len(w) == length) + max_loc = np.argmax(w) + assert_(max_loc == (length / 2)) + + points = 100 + w = wavelets.ricker(points, 2.0) + half_vec = np.arange(0, points / 2) + #Wavelet should be symmetric + assert_array_almost_equal(w[half_vec], w[-(half_vec + 1)]) + + #Check zeros + aas = [5, 10, 15, 20, 30] + points = 99 + for a in aas: + w = wavelets.ricker(points, a) + vec = np.arange(0, points) - (points - 1.0) / 2 + exp_zero1 = np.argmin(np.abs(vec - a)) + exp_zero2 = np.argmin(np.abs(vec + a)) + assert_array_almost_equal(w[exp_zero1], 0) + assert_array_almost_equal(w[exp_zero2], 0) + + def test_cwt(self): + widths = [1.0] + delta_wavelet = lambda s, t: np.array([1]) + len_data = 100 + test_data = np.sin(np.pi * np.arange(0, len_data) / 10.0) + + #Test delta function input gives same data as output + cwt_dat = wavelets.cwt(test_data, delta_wavelet, widths) + assert_(cwt_dat.shape == (len(widths), len_data)) + assert_array_almost_equal(test_data, cwt_dat.flatten()) + + #Check proper shape on output + widths = [1, 3, 4, 5, 10] + cwt_dat = wavelets.cwt(test_data, wavelets.ricker, widths) + assert_(cwt_dat.shape == (len(widths), len_data)) + + widths = [len_data * 10] + #Note: this wavelet isn't defined quite right, but is fine for this test + flat_wavelet = lambda l, w: np.ones(w) / w + cwt_dat = wavelets.cwt(test_data, flat_wavelet, widths) + assert_array_almost_equal(cwt_dat, np.mean(test_data)) + if __name__ == "__main__": run_module_suite() diff --git a/scipy/signal/wavelets.py b/scipy/signal/wavelets.py index 3db241215453..b64b1d555622 100644 --- a/scipy/signal/wavelets.py +++ b/scipy/signal/wavelets.py @@ -2,8 +2,9 @@ from numpy.dual import eig from scipy.misc import comb from scipy import linspace, pi, exp +from scipy.signal import convolve -__all__ = ['daub', 'qmf', 'cascade', 'morlet'] +__all__ = ['daub', 'qmf', 'cascade', 'morlet', 'ricker', 'cwt'] def daub(p): @@ -45,12 +46,12 @@ def daub(p): P = [comb(p - 1 + k, k, exact=1) for k in range(p)][::-1] yj = np.roots(P) else: # try different polynomial --- needs work - P = [comb(p - 1 + k, k, exact=1) / 4.0 ** k + P = [comb(p - 1 + k, k, exact=1) / 4.0**k for k in range(p)][::-1] yj = np.roots(P) / 4 # for each root, compute two z roots, select the one with |z|>1 # Build up final polynomial - c = np.poly1d([1, 1]) ** p + c = np.poly1d([1, 1])**p q = np.poly1d([1]) for k in range(p - 1): yval = yj[k] @@ -74,7 +75,7 @@ def qmf(hk): """Return high-pass qmf filter from low-pass """ N = len(hk) - 1 - asgn = [{0: 1, 1: -1}[k % 2] for k in range(N + 1)] + asgn = [{0: 1, 1:-1}[k % 2] for k in range(N + 1)] return hk[::-1] * np.array(asgn) @@ -241,8 +242,95 @@ def morlet(M, w=5.0, s=1.0, complete=True): output = exp(1j * w * x) if complete: - output -= exp(-0.5 * (w ** 2)) + output -= exp(-0.5 * (w**2)) - output *= exp(-0.5 * (x ** 2)) * pi ** (-0.25) + output *= exp(-0.5 * (x**2)) * pi**(-0.25) return output + + +def ricker(points, a): + """ + Also known as the "mexican hat wavelet", + models the function: + A ( 1 - x^2/a^2) exp(-t^2/a^2), + where ``A = 2/sqrt(3a)pi^1/3`` + + Parameters + ---------- + a: scalar + Width parameter of the wavelet. + points: int, optional + Number of points in `vector`. Default is ``10*a`` + Will be centered around 0. + Returns + ----------- + vector: 1-D ndarray + array of length `points` in shape of ricker curve. + Examples + -------- + >>> import matplotlib.pyplot as plt + >>> points = 100 + >>> a = 4.0 + >>> vec2 = ricker(a,points) + >>> print len(vec2) + 100 + >>> plt.plot(vec2) + >>> plt.show() + """ + + A = 2 / (np.sqrt(3 * a) * (np.pi**0.25)) + wsq = a**2 + vec = np.arange(0, points) - (points - 1.0) / 2 + tsq = vec**2 + mod = (1 - tsq / wsq) + gauss = np.exp(-tsq / (2 * wsq)) + total = A * mod * gauss + return total + + +def cwt(data, wavelet, widths): + """ + Performs a continuous wavelet transform on `data`, + using the wavelet function. A CWT performs a convolution + with `data` using the `wavelet` function, which is characterized + by a width parameter and length parameter. + + Parameters + ---------- + data : 1-D ndarray + data on which to perform the transform. + wavelet : function + Wavelet function, which should take 2 arguments. + The first argument is a width parameter, defining + the size of the wavelet (e.g. standard deviation of a gaussian). + The second is the number of points that the returned vector will have + (len(wavelet(width,length)) == length). See `ricker`, which + satisfies these requirements. + widths : sequence + Widths to use for transform. + + Returns + ------- + cwt: 2-D ndarray + Will be len(widths) x len(data). + + Notes + ------ + cwt[ii,:] = scipy.signal.convolve(data,wavelet(width[ii], length), mode='same') + where length = min(10 * width[ii], len(data)). + + Examples + -------- + >>> signal = np.random.rand(20) - 0.5 + >>> wavelet = ricker + >>> widths = np.arange(1, 11) + >>> cwtmatr = cwt(signal, wavelet, widths) + """ + + output = np.zeros([len(widths), len(data)]) + for ind, width in enumerate(widths): + wavelet_data = wavelet(min(10 * width, len(data)), width) + output[ind, :] = convolve(data, wavelet_data, + mode='same') + return output