/
_peak_finding_utils.pyx
375 lines (312 loc) · 12.7 KB
/
_peak_finding_utils.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
#cython: wraparound=False
#cython: boundscheck=False
#cython: nonecheck=False
"""Utility functions for finding peaks in signals."""
import warnings
import numpy as np
cimport numpy as np
from libc.math cimport ceil
np.import_array()
__all__ = ['_local_maxima_1d', '_select_by_peak_distance', '_peak_prominences',
'_peak_widths']
def _local_maxima_1d(const np.float64_t[::1] x not None):
"""
Find local maxima in a 1D array.
This function finds all local maxima in a 1D array and returns the indices
for their edges and midpoints (rounded down for even plateau sizes).
Parameters
----------
x : ndarray
The array to search for local maxima.
Returns
-------
midpoints : ndarray
Indices of midpoints of local maxima in `x`.
left_edges : ndarray
Indices of edges to the left of local maxima in `x`.
right_edges : ndarray
Indices of edges to the right of local maxima in `x`.
Notes
-----
- Compared to `argrelmax` this function is significantly faster and can
detect maxima that are more than one sample wide. However this comes at
the cost of being only applicable to 1D arrays.
- A maxima is defined as one or more samples of equal value that are
surrounded on both sides by at least one smaller sample.
.. versionadded:: 1.1.0
"""
cdef:
np.intp_t[::1] midpoints, left_edges, right_edges
np.intp_t m, i, i_ahead, i_max
# Preallocate, there can't be more maxima than half the size of `x`
midpoints = np.empty(x.shape[0] // 2, dtype=np.intp)
left_edges = np.empty(x.shape[0] // 2, dtype=np.intp)
right_edges = np.empty(x.shape[0] // 2, dtype=np.intp)
m = 0 # Pointer to the end of valid area in allocated arrays
with nogil:
i = 1 # Pointer to current sample, first one can't be maxima
i_max = x.shape[0] - 1 # Last sample can't be maxima
while i < i_max:
# Test if previous sample is smaller
if x[i - 1] < x[i]:
i_ahead = i + 1 # Index to look ahead of current sample
# Find next sample that is unequal to x[i]
while i_ahead < i_max and x[i_ahead] == x[i]:
i_ahead += 1
# Maxima is found if next unequal sample is smaller than x[i]
if x[i_ahead] < x[i]:
left_edges[m] = i
right_edges[m] = i_ahead - 1
midpoints[m] = (left_edges[m] + right_edges[m]) // 2
m += 1
# Skip samples that can't be maximum
i = i_ahead
i += 1
# Keep only valid part of array memory.
midpoints.base.resize(m, refcheck=False)
left_edges.base.resize(m, refcheck=False)
right_edges.base.resize(m, refcheck=False)
return midpoints.base, left_edges.base, right_edges.base
def _select_by_peak_distance(np.intp_t[::1] peaks not None,
np.float64_t[::1] priority not None,
np.float64_t distance):
"""
Evaluate which peaks fulfill the distance condition.
Parameters
----------
peaks : ndarray
Indices of peaks in `vector`.
priority : ndarray
An array matching `peaks` used to determine priority of each peak. A
peak with a higher priority value is kept over one with a lower one.
distance : np.float64
Minimal distance that peaks must be spaced.
Returns
-------
keep : ndarray[bool]
A boolean mask evaluating to true where `peaks` fulfill the distance
condition.
Notes
-----
Declaring the input arrays as C-contiguous doesn't seem to have performance
advantages.
.. versionadded:: 1.1.0
"""
cdef:
np.uint8_t[::1] keep
np.intp_t[::1] priority_to_position
np.intp_t i, j, k, peaks_size, distance_
peaks_size = peaks.shape[0]
# Round up because actual peak distance can only be natural number
distance_ = <np.intp_t>ceil(distance)
keep = np.ones(peaks_size, dtype=np.uint8) # Prepare array of flags
# Create map from `i` (index for `peaks` sorted by `priority`) to `j` (index
# for `peaks` sorted by position). This allows to iterate `peaks` and `keep`
# with `j` by order of `priority` while still maintaining the ability to
# step to neighbouring peaks with (`j` + 1) or (`j` - 1).
priority_to_position = np.argsort(priority)
with nogil:
# Highest priority first -> iterate in reverse order (decreasing)
for i in range(peaks_size - 1, -1, -1):
# "Translate" `i` to `j` which points to current peak whose
# neighbours are to be evaluated
j = priority_to_position[i]
if keep[j] == 0:
# Skip evaluation for peak already marked as "don't keep"
continue
k = j - 1
# Flag "earlier" peaks for removal until minimal distance is exceeded
while 0 <= k and peaks[j] - peaks[k] < distance_:
keep[k] = 0
k -= 1
k = j + 1
# Flag "later" peaks for removal until minimal distance is exceeded
while k < peaks_size and peaks[k] - peaks[j] < distance_:
keep[k] = 0
k += 1
return keep.base.view(dtype=np.bool_) # Return as boolean array
class PeakPropertyWarning(RuntimeWarning):
"""Calculated property of a peak has unexpected value."""
pass
def _peak_prominences(const np.float64_t[::1] x not None,
np.intp_t[::1] peaks not None,
np.intp_t wlen):
"""
Calculate the prominence of each peak in a signal.
Parameters
----------
x : ndarray
A signal with peaks.
peaks : ndarray
Indices of peaks in `x`.
wlen : np.intp
A window length in samples (see `peak_prominences`) which is rounded up
to the nearest odd integer. If smaller than 2 the entire signal `x` is
used.
Returns
-------
prominences : ndarray
The calculated prominences for each peak in `peaks`.
left_bases, right_bases : ndarray
The peaks' bases as indices in `x` to the left and right of each peak.
Raises
------
ValueError
If a value in `peaks` is an invalid index for `x`.
Warns
-----
PeakPropertyWarning
If a prominence of 0 was calculated for any peak.
Notes
-----
This is the inner function to `peak_prominences`.
.. versionadded:: 1.1.0
"""
cdef:
np.float64_t[::1] prominences
np.intp_t[::1] left_bases, right_bases
np.float64_t left_min, right_min
np.intp_t peak_nr, peak, i_min, i_max, i
np.uint8_t show_warning
show_warning = False
prominences = np.empty(peaks.shape[0], dtype=np.float64)
left_bases = np.empty(peaks.shape[0], dtype=np.intp)
right_bases = np.empty(peaks.shape[0], dtype=np.intp)
with nogil:
for peak_nr in range(peaks.shape[0]):
peak = peaks[peak_nr]
i_min = 0
i_max = x.shape[0] - 1
if not i_min <= peak <= i_max:
with gil:
raise ValueError("peak {} is not a valid index for `x`"
.format(peak))
if 2 <= wlen:
# Adjust window around the evaluated peak (within bounds);
# if wlen is even the resulting window length is is implicitly
# rounded to next odd integer
i_min = max(peak - wlen // 2, i_min)
i_max = min(peak + wlen // 2, i_max)
# Find the left base in interval [i_min, peak]
i = left_bases[peak_nr] = peak
left_min = x[peak]
while i_min <= i and x[i] <= x[peak]:
if x[i] < left_min:
left_min = x[i]
left_bases[peak_nr] = i
i -= 1
# Find the right base in interval [peak, i_max]
i = right_bases[peak_nr] = peak
right_min = x[peak]
while i <= i_max and x[i] <= x[peak]:
if x[i] < right_min:
right_min = x[i]
right_bases[peak_nr] = i
i += 1
prominences[peak_nr] = x[peak] - max(left_min, right_min)
if prominences[peak_nr] == 0:
show_warning = True
if show_warning:
warnings.warn("some peaks have a prominence of 0",
PeakPropertyWarning, stacklevel=2)
# Return memoryviews as ndarrays
return prominences.base, left_bases.base, right_bases.base
def _peak_widths(const np.float64_t[::1] x not None,
np.intp_t[::1] peaks not None,
np.float64_t rel_height,
np.float64_t[::1] prominences not None,
np.intp_t[::1] left_bases not None,
np.intp_t[::1] right_bases not None):
"""
Calculate the width of each each peak in a signal.
Parameters
----------
x : ndarray
A signal with peaks.
peaks : ndarray
Indices of peaks in `x`.
rel_height : np.float64
Chooses the relative height at which the peak width is measured as a
percentage of its prominence (see `peak_widths`).
prominences : ndarray
Prominences of each peak in `peaks` as returned by `peak_prominences`.
left_bases, right_bases : ndarray
Left and right bases of each peak in `peaks` as returned by
`peak_prominences`.
Returns
-------
widths : ndarray
The widths for each peak in samples.
width_heights : ndarray
The height of the contour lines at which the `widths` where evaluated.
left_ips, right_ips : ndarray
Interpolated positions of left and right intersection points of a
horizontal line at the respective evaluation height.
Raises
------
ValueError
If the supplied prominence data doesn't satisfy the condition
``0 <= left_base <= peak <= right_base < x.shape[0]`` for each peak or
if `peaks`, `left_bases` and `right_bases` don't share the same shape.
Or if `rel_height` is not at least 0.
Warnings
--------
PeakPropertyWarning
If a width of 0 was calculated for any peak.
Notes
-----
This is the inner function to `peak_widths`.
.. versionadded:: 1.1.0
"""
cdef:
np.float64_t[::1] widths, width_heights, left_ips, right_ips
np.float64_t height, left_ip, right_ip
np.intp_t p, peak, i, i_max, i_min
np.uint8_t show_warning
if rel_height < 0:
raise ValueError('`rel_height` must be greater or equal to 0.0')
if not (peaks.shape[0] == prominences.shape[0] == left_bases.shape[0]
== right_bases.shape[0]):
raise ValueError("arrays in `prominence_data` must have the same shape "
"as `peaks`")
show_warning = False
widths = np.empty(peaks.shape[0], dtype=np.float64)
width_heights = np.empty(peaks.shape[0], dtype=np.float64)
left_ips = np.empty(peaks.shape[0], dtype=np.float64)
right_ips = np.empty(peaks.shape[0], dtype=np.float64)
with nogil:
for p in range(peaks.shape[0]):
i_min = left_bases[p]
i_max = right_bases[p]
peak = peaks[p]
# Validate bounds and order
if not 0 <= i_min <= peak <= i_max < x.shape[0]:
with gil:
raise ValueError("prominence data is invalid for peak {}"
.format(peak))
height = width_heights[p] = x[peak] - prominences[p] * rel_height
# Find intersection point on left side
i = peak
while i_min < i and height < x[i]:
i -= 1
left_ip = <np.float64_t>i
if x[i] < height:
# Interpolate if true intersection height is between samples
left_ip += (height - x[i]) / (x[i + 1] - x[i])
# Find intersection point on right side
i = peak
while i < i_max and height < x[i]:
i += 1
right_ip = <np.float64_t>i
if x[i] < height:
# Interpolate if true intersection height is between samples
right_ip -= (height - x[i]) / (x[i - 1] - x[i])
widths[p] = right_ip - left_ip
if widths[p] == 0:
show_warning = True
left_ips[p] = left_ip
right_ips[p] = right_ip
if show_warning:
warnings.warn("some peaks have a width of 0",
PeakPropertyWarning, stacklevel=2)
return widths.base, width_heights.base, left_ips.base, right_ips.base