/
signaltools.py
4282 lines (3513 loc) · 143 KB
/
signaltools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Author: Travis Oliphant
# 1999 -- 2002
import operator
import math
import timeit
from scipy.spatial import cKDTree
from . import sigtools, dlti
from ._upfirdn import upfirdn, _output_len, _upfirdn_modes
from scipy import linalg, fft as sp_fft
from scipy.fft._helper import _init_nd_shape_and_axes
from scipy._lib._util import prod as _prod
import numpy as np
from scipy.special import lambertw
from .windows import get_window
from ._arraytools import axis_slice, axis_reverse, odd_ext, even_ext, const_ext
from .filter_design import cheby1, _validate_sos
from .fir_filter_design import firwin
from ._sosfilt import _sosfilt
import warnings
__all__ = ['correlate', 'correlate2d',
'convolve', 'convolve2d', 'fftconvolve', 'oaconvolve',
'order_filter', 'medfilt', 'medfilt2d', 'wiener', 'lfilter',
'lfiltic', 'sosfilt', 'deconvolve', 'hilbert', 'hilbert2',
'cmplx_sort', 'unique_roots', 'invres', 'invresz', 'residue',
'residuez', 'resample', 'resample_poly', 'detrend',
'lfilter_zi', 'sosfilt_zi', 'sosfiltfilt', 'choose_conv_method',
'filtfilt', 'decimate', 'vectorstrength']
_modedict = {'valid': 0, 'same': 1, 'full': 2}
_boundarydict = {'fill': 0, 'pad': 0, 'wrap': 2, 'circular': 2, 'symm': 1,
'symmetric': 1, 'reflect': 4}
def _valfrommode(mode):
try:
return _modedict[mode]
except KeyError:
raise ValueError("Acceptable mode flags are 'valid',"
" 'same', or 'full'.")
def _bvalfromboundary(boundary):
try:
return _boundarydict[boundary] << 2
except KeyError:
raise ValueError("Acceptable boundary flags are 'fill', 'circular' "
"(or 'wrap'), and 'symmetric' (or 'symm').")
def _inputs_swap_needed(mode, shape1, shape2, axes=None):
"""Determine if inputs arrays need to be swapped in `"valid"` mode.
If in `"valid"` mode, returns whether or not the input arrays need to be
swapped depending on whether `shape1` is at least as large as `shape2` in
every calculated dimension.
This is important for some of the correlation and convolution
implementations in this module, where the larger array input needs to come
before the smaller array input when operating in this mode.
Note that if the mode provided is not 'valid', False is immediately
returned.
"""
if mode != 'valid':
return False
if not shape1:
return False
if axes is None:
axes = range(len(shape1))
ok1 = all(shape1[i] >= shape2[i] for i in axes)
ok2 = all(shape2[i] >= shape1[i] for i in axes)
if not (ok1 or ok2):
raise ValueError("For 'valid' mode, one must be at least "
"as large as the other in every dimension")
return not ok1
def correlate(in1, in2, mode='full', method='auto'):
r"""
Cross-correlate two N-dimensional arrays.
Cross-correlate `in1` and `in2`, with the output size determined by the
`mode` argument.
Parameters
----------
in1 : array_like
First input.
in2 : array_like
Second input. Should have the same number of dimensions as `in1`.
mode : str {'full', 'valid', 'same'}, optional
A string indicating the size of the output:
``full``
The output is the full discrete linear cross-correlation
of the inputs. (Default)
``valid``
The output consists only of those elements that do not
rely on the zero-padding. In 'valid' mode, either `in1` or `in2`
must be at least as large as the other in every dimension.
``same``
The output is the same size as `in1`, centered
with respect to the 'full' output.
method : str {'auto', 'direct', 'fft'}, optional
A string indicating which method to use to calculate the correlation.
``direct``
The correlation is determined directly from sums, the definition of
correlation.
``fft``
The Fast Fourier Transform is used to perform the correlation more
quickly (only available for numerical arrays.)
``auto``
Automatically chooses direct or Fourier method based on an estimate
of which is faster (default). See `convolve` Notes for more detail.
.. versionadded:: 0.19.0
Returns
-------
correlate : array
An N-dimensional array containing a subset of the discrete linear
cross-correlation of `in1` with `in2`.
See Also
--------
choose_conv_method : contains more documentation on `method`.
Notes
-----
The correlation z of two d-dimensional arrays x and y is defined as::
z[...,k,...] = sum[..., i_l, ...] x[..., i_l,...] * conj(y[..., i_l - k,...])
This way, if x and y are 1-D arrays and ``z = correlate(x, y, 'full')``
then
.. math::
z[k] = (x * y)(k - N + 1)
= \sum_{l=0}^{||x||-1}x_l y_{l-k+N-1}^{*}
for :math:`k = 0, 1, ..., ||x|| + ||y|| - 2`
where :math:`||x||` is the length of ``x``, :math:`N = \max(||x||,||y||)`,
and :math:`y_m` is 0 when m is outside the range of y.
``method='fft'`` only works for numerical arrays as it relies on
`fftconvolve`. In certain cases (i.e., arrays of objects or when
rounding integers can lose precision), ``method='direct'`` is always used.
When using "same" mode with even-length inputs, the outputs of `correlate`
and `correlate2d` differ: There is a 1-index offset between them.
Examples
--------
Implement a matched filter using cross-correlation, to recover a signal
that has passed through a noisy channel.
>>> from scipy import signal
>>> sig = np.repeat([0., 1., 1., 0., 1., 0., 0., 1.], 128)
>>> sig_noise = sig + np.random.randn(len(sig))
>>> corr = signal.correlate(sig_noise, np.ones(128), mode='same') / 128
>>> import matplotlib.pyplot as plt
>>> clock = np.arange(64, len(sig), 128)
>>> fig, (ax_orig, ax_noise, ax_corr) = plt.subplots(3, 1, sharex=True)
>>> ax_orig.plot(sig)
>>> ax_orig.plot(clock, sig[clock], 'ro')
>>> ax_orig.set_title('Original signal')
>>> ax_noise.plot(sig_noise)
>>> ax_noise.set_title('Signal with noise')
>>> ax_corr.plot(corr)
>>> ax_corr.plot(clock, corr[clock], 'ro')
>>> ax_corr.axhline(0.5, ls=':')
>>> ax_corr.set_title('Cross-correlated with rectangular pulse')
>>> ax_orig.margins(0, 0.1)
>>> fig.tight_layout()
>>> fig.show()
"""
in1 = np.asarray(in1)
in2 = np.asarray(in2)
if in1.ndim == in2.ndim == 0:
return in1 * in2.conj()
elif in1.ndim != in2.ndim:
raise ValueError("in1 and in2 should have the same dimensionality")
# Don't use _valfrommode, since correlate should not accept numeric modes
try:
val = _modedict[mode]
except KeyError:
raise ValueError("Acceptable mode flags are 'valid',"
" 'same', or 'full'.")
# this either calls fftconvolve or this function with method=='direct'
if method in ('fft', 'auto'):
return convolve(in1, _reverse_and_conj(in2), mode, method)
elif method == 'direct':
# fastpath to faster numpy.correlate for 1d inputs when possible
if _np_conv_ok(in1, in2, mode):
return np.correlate(in1, in2, mode)
# _correlateND is far slower when in2.size > in1.size, so swap them
# and then undo the effect afterward if mode == 'full'. Also, it fails
# with 'valid' mode if in2 is larger than in1, so swap those, too.
# Don't swap inputs for 'same' mode, since shape of in1 matters.
swapped_inputs = ((mode == 'full') and (in2.size > in1.size) or
_inputs_swap_needed(mode, in1.shape, in2.shape))
if swapped_inputs:
in1, in2 = in2, in1
if mode == 'valid':
ps = [i - j + 1 for i, j in zip(in1.shape, in2.shape)]
out = np.empty(ps, in1.dtype)
z = sigtools._correlateND(in1, in2, out, val)
else:
ps = [i + j - 1 for i, j in zip(in1.shape, in2.shape)]
# zero pad input
in1zpadded = np.zeros(ps, in1.dtype)
sc = tuple(slice(0, i) for i in in1.shape)
in1zpadded[sc] = in1.copy()
if mode == 'full':
out = np.empty(ps, in1.dtype)
elif mode == 'same':
out = np.empty(in1.shape, in1.dtype)
z = sigtools._correlateND(in1zpadded, in2, out, val)
if swapped_inputs:
# Reverse and conjugate to undo the effect of swapping inputs
z = _reverse_and_conj(z)
return z
else:
raise ValueError("Acceptable method flags are 'auto',"
" 'direct', or 'fft'.")
def _centered(arr, newshape):
# Return the center newshape portion of the array.
newshape = np.asarray(newshape)
currshape = np.array(arr.shape)
startind = (currshape - newshape) // 2
endind = startind + newshape
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
return arr[tuple(myslice)]
def _init_freq_conv_axes(in1, in2, mode, axes, sorted_axes=False):
"""Handle the axes argument for frequency-domain convolution.
Returns the inputs and axes in a standard form, eliminating redundant axes,
swapping the inputs if necessary, and checking for various potential
errors.
Parameters
----------
in1 : array
First input.
in2 : array
Second input.
mode : str {'full', 'valid', 'same'}, optional
A string indicating the size of the output.
See the documentation `fftconvolve` for more information.
axes : list of ints
Axes over which to compute the FFTs.
sorted_axes : bool, optional
If `True`, sort the axes.
Default is `False`, do not sort.
Returns
-------
in1 : array
The first input, possible swapped with the second input.
in2 : array
The second input, possible swapped with the first input.
axes : list of ints
Axes over which to compute the FFTs.
"""
s1 = in1.shape
s2 = in2.shape
noaxes = axes is None
_, axes = _init_nd_shape_and_axes(in1, shape=None, axes=axes)
if not noaxes and not len(axes):
raise ValueError("when provided, axes cannot be empty")
# Axes of length 1 can rely on broadcasting rules for multipy,
# no fft needed.
axes = [a for a in axes if s1[a] != 1 and s2[a] != 1]
if sorted_axes:
axes.sort()
if not all(s1[a] == s2[a] or s1[a] == 1 or s2[a] == 1
for a in range(in1.ndim) if a not in axes):
raise ValueError("incompatible shapes for in1 and in2:"
" {0} and {1}".format(s1, s2))
# Check that input sizes are compatible with 'valid' mode.
if _inputs_swap_needed(mode, s1, s2, axes=axes):
# Convolution is commutative; order doesn't have any effect on output.
in1, in2 = in2, in1
return in1, in2, axes
def _freq_domain_conv(in1, in2, axes, shape, calc_fast_len=False):
"""Convolve two arrays in the frequency domain.
This function implements only base the FFT-related operations.
Specifically, it converts the signals to the frequency domain, multiplies
them, then converts them back to the time domain. Calculations of axes,
shapes, convolution mode, etc. are implemented in higher level-functions,
such as `fftconvolve` and `oaconvolve`. Those functions should be used
instead of this one.
Parameters
----------
in1 : array_like
First input.
in2 : array_like
Second input. Should have the same number of dimensions as `in1`.
axes : array_like of ints
Axes over which to compute the FFTs.
shape : array_like of ints
The sizes of the FFTs.
calc_fast_len : bool, optional
If `True`, set each value of `shape` to the next fast FFT length.
Default is `False`, use `axes` as-is.
Returns
-------
out : array
An N-dimensional array containing the discrete linear convolution of
`in1` with `in2`.
"""
if not len(axes):
return in1 * in2
complex_result = (in1.dtype.kind == 'c' or in2.dtype.kind == 'c')
if calc_fast_len:
# Speed up FFT by padding to optimal size.
fshape = [
sp_fft.next_fast_len(shape[a], not complex_result) for a in axes]
else:
fshape = shape
if not complex_result:
fft, ifft = sp_fft.rfftn, sp_fft.irfftn
else:
fft, ifft = sp_fft.fftn, sp_fft.ifftn
sp1 = fft(in1, fshape, axes=axes)
sp2 = fft(in2, fshape, axes=axes)
ret = ifft(sp1 * sp2, fshape, axes=axes)
if calc_fast_len:
fslice = tuple([slice(sz) for sz in shape])
ret = ret[fslice]
return ret
def _apply_conv_mode(ret, s1, s2, mode, axes):
"""Calculate the convolution result shape based on the `mode` argument.
Returns the result sliced to the correct size for the given mode.
Parameters
----------
ret : array
The result array, with the appropriate shape for the 'full' mode.
s1 : list of int
The shape of the first input.
s2 : list of int
The shape of the second input.
mode : str {'full', 'valid', 'same'}
A string indicating the size of the output.
See the documentation `fftconvolve` for more information.
axes : list of ints
Axes over which to compute the convolution.
Returns
-------
ret : array
A copy of `res`, sliced to the correct size for the given `mode`.
"""
if mode == "full":
return ret.copy()
elif mode == "same":
return _centered(ret, s1).copy()
elif mode == "valid":
shape_valid = [ret.shape[a] if a not in axes else s1[a] - s2[a] + 1
for a in range(ret.ndim)]
return _centered(ret, shape_valid).copy()
else:
raise ValueError("acceptable mode flags are 'valid',"
" 'same', or 'full'")
def fftconvolve(in1, in2, mode="full", axes=None):
"""Convolve two N-dimensional arrays using FFT.
Convolve `in1` and `in2` using the fast Fourier transform method, with
the output size determined by the `mode` argument.
This is generally much faster than `convolve` for large arrays (n > ~500),
but can be slower when only a few output values are needed, and can only
output float arrays (int or object array inputs will be cast to float).
As of v0.19, `convolve` automatically chooses this method or the direct
method based on an estimation of which is faster.
Parameters
----------
in1 : array_like
First input.
in2 : array_like
Second input. Should have the same number of dimensions as `in1`.
mode : str {'full', 'valid', 'same'}, optional
A string indicating the size of the output:
``full``
The output is the full discrete linear convolution
of the inputs. (Default)
``valid``
The output consists only of those elements that do not
rely on the zero-padding. In 'valid' mode, either `in1` or `in2`
must be at least as large as the other in every dimension.
``same``
The output is the same size as `in1`, centered
with respect to the 'full' output.
axes : int or array_like of ints or None, optional
Axes over which to compute the convolution.
The default is over all axes.
Returns
-------
out : array
An N-dimensional array containing a subset of the discrete linear
convolution of `in1` with `in2`.
See Also
--------
convolve : Uses the direct convolution or FFT convolution algorithm
depending on which is faster.
oaconvolve : Uses the overlap-add method to do convolution, which is
generally faster when the input arrays are large and
significantly different in size.
Examples
--------
Autocorrelation of white noise is an impulse.
>>> from scipy import signal
>>> sig = np.random.randn(1000)
>>> autocorr = signal.fftconvolve(sig, sig[::-1], mode='full')
>>> import matplotlib.pyplot as plt
>>> fig, (ax_orig, ax_mag) = plt.subplots(2, 1)
>>> ax_orig.plot(sig)
>>> ax_orig.set_title('White noise')
>>> ax_mag.plot(np.arange(-len(sig)+1,len(sig)), autocorr)
>>> ax_mag.set_title('Autocorrelation')
>>> fig.tight_layout()
>>> fig.show()
Gaussian blur implemented using FFT convolution. Notice the dark borders
around the image, due to the zero-padding beyond its boundaries.
The `convolve2d` function allows for other types of image boundaries,
but is far slower.
>>> from scipy import misc
>>> face = misc.face(gray=True)
>>> kernel = np.outer(signal.gaussian(70, 8), signal.gaussian(70, 8))
>>> blurred = signal.fftconvolve(face, kernel, mode='same')
>>> fig, (ax_orig, ax_kernel, ax_blurred) = plt.subplots(3, 1,
... figsize=(6, 15))
>>> ax_orig.imshow(face, cmap='gray')
>>> ax_orig.set_title('Original')
>>> ax_orig.set_axis_off()
>>> ax_kernel.imshow(kernel, cmap='gray')
>>> ax_kernel.set_title('Gaussian kernel')
>>> ax_kernel.set_axis_off()
>>> ax_blurred.imshow(blurred, cmap='gray')
>>> ax_blurred.set_title('Blurred')
>>> ax_blurred.set_axis_off()
>>> fig.show()
"""
in1 = np.asarray(in1)
in2 = np.asarray(in2)
if in1.ndim == in2.ndim == 0: # scalar inputs
return in1 * in2
elif in1.ndim != in2.ndim:
raise ValueError("in1 and in2 should have the same dimensionality")
elif in1.size == 0 or in2.size == 0: # empty arrays
return np.array([])
in1, in2, axes = _init_freq_conv_axes(in1, in2, mode, axes,
sorted_axes=False)
s1 = in1.shape
s2 = in2.shape
shape = [max((s1[i], s2[i])) if i not in axes else s1[i] + s2[i] - 1
for i in range(in1.ndim)]
ret = _freq_domain_conv(in1, in2, axes, shape, calc_fast_len=True)
return _apply_conv_mode(ret, s1, s2, mode, axes)
def _calc_oa_lens(s1, s2):
"""Calculate the optimal FFT lengths for overlapp-add convolution.
The calculation is done for a single dimension.
Parameters
----------
s1 : int
Size of the dimension for the first array.
s2 : int
Size of the dimension for the second array.
Returns
-------
block_size : int
The size of the FFT blocks.
overlap : int
The amount of overlap between two blocks.
in1_step : int
The size of each step for the first array.
in2_step : int
The size of each step for the first array.
"""
# Set up the arguments for the conventional FFT approach.
fallback = (s1+s2-1, None, s1, s2)
# Use conventional FFT convolve if sizes are same.
if s1 == s2 or s1 == 1 or s2 == 1:
return fallback
if s2 > s1:
s1, s2 = s2, s1
swapped = True
else:
swapped = False
# There cannot be a useful block size if s2 is more than half of s1.
if s2 >= s1/2:
return fallback
# Derivation of optimal block length
# For original formula see:
# https://en.wikipedia.org/wiki/Overlap-add_method
#
# Formula:
# K = overlap = s2-1
# N = block_size
# C = complexity
# e = exponential, exp(1)
#
# C = (N*(log2(N)+1))/(N-K)
# C = (N*log2(2N))/(N-K)
# C = N/(N-K) * log2(2N)
# C1 = N/(N-K)
# C2 = log2(2N) = ln(2N)/ln(2)
#
# dC1/dN = (1*(N-K)-N)/(N-K)^2 = -K/(N-K)^2
# dC2/dN = 2/(2*N*ln(2)) = 1/(N*ln(2))
#
# dC/dN = dC1/dN*C2 + dC2/dN*C1
# dC/dN = -K*ln(2N)/(ln(2)*(N-K)^2) + N/(N*ln(2)*(N-K))
# dC/dN = -K*ln(2N)/(ln(2)*(N-K)^2) + 1/(ln(2)*(N-K))
# dC/dN = -K*ln(2N)/(ln(2)*(N-K)^2) + (N-K)/(ln(2)*(N-K)^2)
# dC/dN = (-K*ln(2N) + (N-K)/(ln(2)*(N-K)^2)
# dC/dN = (N - K*ln(2N) - K)/(ln(2)*(N-K)^2)
#
# Solve for minimum, where dC/dN = 0
# 0 = (N - K*ln(2N) - K)/(ln(2)*(N-K)^2)
# 0 * ln(2)*(N-K)^2 = N - K*ln(2N) - K
# 0 = N - K*ln(2N) - K
# 0 = N - K*(ln(2N) + 1)
# 0 = N - K*ln(2Ne)
# N = K*ln(2Ne)
# N/K = ln(2Ne)
#
# e^(N/K) = e^ln(2Ne)
# e^(N/K) = 2Ne
# 1/e^(N/K) = 1/(2*N*e)
# e^(N/-K) = 1/(2*N*e)
# e^(N/-K) = K/N*1/(2*K*e)
# N/K*e^(N/-K) = 1/(2*e*K)
# N/-K*e^(N/-K) = -1/(2*e*K)
#
# Using Lambert W function
# https://en.wikipedia.org/wiki/Lambert_W_function
# x = W(y) It is the solution to y = x*e^x
# x = N/-K
# y = -1/(2*e*K)
#
# N/-K = W(-1/(2*e*K))
#
# N = -K*W(-1/(2*e*K))
overlap = s2-1
opt_size = -overlap*lambertw(-1/(2*math.e*overlap), k=-1).real
block_size = sp_fft.next_fast_len(math.ceil(opt_size))
# Use conventional FFT convolve if there is only going to be one block.
if block_size >= s1:
return fallback
if not swapped:
in1_step = block_size-s2+1
in2_step = s2
else:
in1_step = s2
in2_step = block_size-s2+1
return block_size, overlap, in1_step, in2_step
def oaconvolve(in1, in2, mode="full", axes=None):
"""Convolve two N-dimensional arrays using the overlap-add method.
Convolve `in1` and `in2` using the overlap-add method, with
the output size determined by the `mode` argument.
This is generally much faster than `convolve` for large arrays (n > ~500),
and generally much faster than `fftconvolve` when one array is much
larger than the other, but can be slower when only a few output values are
needed or when the arrays are very similar in shape, and can only
output float arrays (int or object array inputs will be cast to float).
Parameters
----------
in1 : array_like
First input.
in2 : array_like
Second input. Should have the same number of dimensions as `in1`.
mode : str {'full', 'valid', 'same'}, optional
A string indicating the size of the output:
``full``
The output is the full discrete linear convolution
of the inputs. (Default)
``valid``
The output consists only of those elements that do not
rely on the zero-padding. In 'valid' mode, either `in1` or `in2`
must be at least as large as the other in every dimension.
``same``
The output is the same size as `in1`, centered
with respect to the 'full' output.
axes : int or array_like of ints or None, optional
Axes over which to compute the convolution.
The default is over all axes.
Returns
-------
out : array
An N-dimensional array containing a subset of the discrete linear
convolution of `in1` with `in2`.
See Also
--------
convolve : Uses the direct convolution or FFT convolution algorithm
depending on which is faster.
fftconvolve : An implementation of convolution using FFT.
Notes
-----
.. versionadded:: 1.4.0
Examples
--------
Convolve a 100,000 sample signal with a 512-sample filter.
>>> from scipy import signal
>>> sig = np.random.randn(100000)
>>> filt = signal.firwin(512, 0.01)
>>> fsig = signal.oaconvolve(sig, filt)
>>> import matplotlib.pyplot as plt
>>> fig, (ax_orig, ax_mag) = plt.subplots(2, 1)
>>> ax_orig.plot(sig)
>>> ax_orig.set_title('White noise')
>>> ax_mag.plot(fsig)
>>> ax_mag.set_title('Filtered noise')
>>> fig.tight_layout()
>>> fig.show()
References
----------
.. [1] Wikipedia, "Overlap-add_method".
https://en.wikipedia.org/wiki/Overlap-add_method
.. [2] Richard G. Lyons. Understanding Digital Signal Processing,
Third Edition, 2011. Chapter 13.10.
ISBN 13: 978-0137-02741-5
"""
in1 = np.asarray(in1)
in2 = np.asarray(in2)
if in1.ndim == in2.ndim == 0: # scalar inputs
return in1 * in2
elif in1.ndim != in2.ndim:
raise ValueError("in1 and in2 should have the same dimensionality")
elif in1.size == 0 or in2.size == 0: # empty arrays
return np.array([])
elif in1.shape == in2.shape: # Equivalent to fftconvolve
return fftconvolve(in1, in2, mode=mode, axes=axes)
in1, in2, axes = _init_freq_conv_axes(in1, in2, mode, axes,
sorted_axes=True)
s1 = in1.shape
s2 = in2.shape
if not axes:
ret = in1 * in2
return _apply_conv_mode(ret, s1, s2, mode, axes)
# Calculate this now since in1 is changed later
shape_final = [None if i not in axes else
s1[i] + s2[i] - 1 for i in range(in1.ndim)]
# Calculate the block sizes for the output, steps, first and second inputs.
# It is simpler to calculate them all together than doing them in separate
# loops due to all the special cases that need to be handled.
optimal_sizes = ((-1, -1, s1[i], s2[i]) if i not in axes else
_calc_oa_lens(s1[i], s2[i]) for i in range(in1.ndim))
block_size, overlaps, \
in1_step, in2_step = zip(*optimal_sizes)
# Fall back to fftconvolve if there is only one block in every dimension.
if in1_step == s1 and in2_step == s2:
return fftconvolve(in1, in2, mode=mode, axes=axes)
# Figure out the number of steps and padding.
# This would get too complicated in a list comprehension.
nsteps1 = []
nsteps2 = []
pad_size1 = []
pad_size2 = []
for i in range(in1.ndim):
if i not in axes:
pad_size1 += [(0, 0)]
pad_size2 += [(0, 0)]
continue
if s1[i] > in1_step[i]:
curnstep1 = math.ceil((s1[i]+1)/in1_step[i])
if (block_size[i] - overlaps[i])*curnstep1 < shape_final[i]:
curnstep1 += 1
curpad1 = curnstep1*in1_step[i] - s1[i]
else:
curnstep1 = 1
curpad1 = 0
if s2[i] > in2_step[i]:
curnstep2 = math.ceil((s2[i]+1)/in2_step[i])
if (block_size[i] - overlaps[i])*curnstep2 < shape_final[i]:
curnstep2 += 1
curpad2 = curnstep2*in2_step[i] - s2[i]
else:
curnstep2 = 1
curpad2 = 0
nsteps1 += [curnstep1]
nsteps2 += [curnstep2]
pad_size1 += [(0, curpad1)]
pad_size2 += [(0, curpad2)]
# Pad the array to a size that can be reshaped to the desired shape
# if necessary.
if not all(curpad == (0, 0) for curpad in pad_size1):
in1 = np.pad(in1, pad_size1, mode='constant', constant_values=0)
if not all(curpad == (0, 0) for curpad in pad_size2):
in2 = np.pad(in2, pad_size2, mode='constant', constant_values=0)
# Reshape the overlap-add parts to input block sizes.
split_axes = [iax+i for i, iax in enumerate(axes)]
fft_axes = [iax+1 for iax in split_axes]
# We need to put each new dimension before the corresponding dimension
# being reshaped in order to get the data in the right layout at the end.
reshape_size1 = list(in1_step)
reshape_size2 = list(in2_step)
for i, iax in enumerate(split_axes):
reshape_size1.insert(iax, nsteps1[i])
reshape_size2.insert(iax, nsteps2[i])
in1 = in1.reshape(*reshape_size1)
in2 = in2.reshape(*reshape_size2)
# Do the convolution.
fft_shape = [block_size[i] for i in axes]
ret = _freq_domain_conv(in1, in2, fft_axes, fft_shape, calc_fast_len=False)
# Do the overlap-add.
for ax, ax_fft, ax_split in zip(axes, fft_axes, split_axes):
overlap = overlaps[ax]
if overlap is None:
continue
ret, overpart = np.split(ret, [-overlap], ax_fft)
overpart = np.split(overpart, [-1], ax_split)[0]
ret_overpart = np.split(ret, [overlap], ax_fft)[0]
ret_overpart = np.split(ret_overpart, [1], ax_split)[1]
ret_overpart += overpart
# Reshape back to the correct dimensionality.
shape_ret = [ret.shape[i] if i not in fft_axes else
ret.shape[i]*ret.shape[i-1]
for i in range(ret.ndim) if i not in split_axes]
ret = ret.reshape(*shape_ret)
# Slice to the correct size.
slice_final = tuple([slice(islice) for islice in shape_final])
ret = ret[slice_final]
return _apply_conv_mode(ret, s1, s2, mode, axes)
def _numeric_arrays(arrays, kinds='buifc'):
"""
See if a list of arrays are all numeric.
Parameters
----------
ndarrays : array or list of arrays
arrays to check if numeric.
numeric_kinds : string-like
The dtypes of the arrays to be checked. If the dtype.kind of
the ndarrays are not in this string the function returns False and
otherwise returns True.
"""
if type(arrays) == np.ndarray:
return arrays.dtype.kind in kinds
for array_ in arrays:
if array_.dtype.kind not in kinds:
return False
return True
def _conv_ops(x_shape, h_shape, mode):
"""
Find the number of operations required for direct/fft methods of
convolution. The direct operations were recorded by making a dummy class to
record the number of operations by overriding ``__mul__`` and ``__add__``.
The FFT operations rely on the (well-known) computational complexity of the
FFT (and the implementation of ``_freq_domain_conv``).
"""
if mode == "full":
out_shape = [n + k - 1 for n, k in zip(x_shape, h_shape)]
elif mode == "valid":
out_shape = [abs(n - k) + 1 for n, k in zip(x_shape, h_shape)]
elif mode == "same":
out_shape = x_shape
else:
raise ValueError("Acceptable mode flags are 'valid',"
" 'same', or 'full', not mode={}".format(mode))
s1, s2 = x_shape, h_shape
if len(x_shape) == 1:
s1, s2 = s1[0], s2[0]
if mode == "full":
direct_ops = s1 * s2
elif mode == "valid":
direct_ops = (s2 - s1 + 1) * s1 if s2 >= s1 else (s1 - s2 + 1) * s2
elif mode == "same":
direct_ops = (s1 * s2 if s1 < s2 else
s1 * s2 - (s2 // 2) * ((s2 + 1) // 2))
else:
if mode == "full":
direct_ops = min(_prod(s1), _prod(s2)) * _prod(out_shape)
elif mode == "valid":
direct_ops = min(_prod(s1), _prod(s2)) * _prod(out_shape)
elif mode == "same":
direct_ops = _prod(s1) * _prod(s2)
full_out_shape = [n + k - 1 for n, k in zip(x_shape, h_shape)]
N = _prod(full_out_shape)
fft_ops = 3 * N * np.log(N) # 3 separate FFTs of size full_out_shape
return fft_ops, direct_ops
def _fftconv_faster(x, h, mode):
"""
See if using fftconvolve or convolve is faster.
Parameters
----------
x : np.ndarray
Signal
h : np.ndarray
Kernel
mode : str
Mode passed to convolve
Returns
-------
fft_faster : bool
Notes
-----
See docstring of `choose_conv_method` for details on tuning hardware.
See pull request 11031 for more detail:
https://github.com/scipy/scipy/pull/11031.
"""
fft_ops, direct_ops = _conv_ops(x.shape, h.shape, mode)
offset = -1e-3 if x.ndim == 1 else -1e-4
constants = {
"valid": (1.89095737e-9, 2.1364985e-10, offset),
"full": (1.7649070e-9, 2.1414831e-10, offset),
"same": (3.2646654e-9, 2.8478277e-10, offset)
if h.size <= x.size
else (3.21635404e-9, 1.1773253e-8, -1e-5),
} if x.ndim == 1 else {
"valid": (1.85927e-9, 2.11242e-8, offset),
"full": (1.99817e-9, 1.66174e-8, offset),
"same": (2.04735e-9, 1.55367e-8, offset),
}
O_fft, O_direct, O_offset = constants[mode]
return O_fft * fft_ops < O_direct * direct_ops + O_offset
def _reverse_and_conj(x):
"""
Reverse array `x` in all dimensions and perform the complex conjugate
"""
reverse = (slice(None, None, -1),) * x.ndim
return x[reverse].conj()
def _np_conv_ok(volume, kernel, mode):
"""
See if numpy supports convolution of `volume` and `kernel` (i.e. both are
1D ndarrays and of the appropriate shape). NumPy's 'same' mode uses the
size of the larger input, while SciPy's uses the size of the first input.
Invalid mode strings will return False and be caught by the calling func.
"""
if volume.ndim == kernel.ndim == 1:
if mode in ('full', 'valid'):
return True
elif mode == 'same':
return volume.size >= kernel.size
else:
return False
def _timeit_fast(stmt="pass", setup="pass", repeat=3):
"""
Returns the time the statement/function took, in seconds.
Faster, less precise version of IPython's timeit. `stmt` can be a statement
written as a string or a callable.
Will do only 1 loop (like IPython's timeit) with no repetitions
(unlike IPython) for very slow functions. For fast functions, only does
enough loops to take 5 ms, which seems to produce similar results (on