diff --git a/scipy/signal/_signaltools.py b/scipy/signal/_signaltools.py index 592346986582..78091377f758 100644 --- a/scipy/signal/_signaltools.py +++ b/scipy/signal/_signaltools.py @@ -7,6 +7,9 @@ import timeit import warnings +from scipy._lib._array_api import ( + array_namespace, size, +) from scipy.spatial import cKDTree from . import _sigtools from ._ltisys import dlti @@ -1956,8 +1959,8 @@ def medfilt2d(input, kernel_size=3): if kernel_size.shape == (): kernel_size = np.repeat(kernel_size.item(), 2) - for size in kernel_size: - if (size % 2) != 1: + for ksize in kernel_size: + if (ksize % 2) != 1: raise ValueError("Each element of kernel_size should be odd.") return _sigtools._medfilt2d(image, kernel_size) @@ -3560,20 +3563,28 @@ def detrend(data, axis=-1, type='linear', bp=0, overwrite_data=False): 0.06 # random """ + xp = array_namespace(data) if type not in ['linear', 'l', 'constant', 'c']: raise ValueError("Trend type must be 'linear' or 'constant'.") - data = np.asarray(data) - dtype = data.dtype.char - if dtype not in 'dfDF': - dtype = 'd' + data = xp.asarray(data) + dtype = data.dtype + if data.dtype not in [xp.float32, xp.float64, xp.complex128, xp.complex64]: + dtype = xp.float64 if type in ['constant', 'c']: - ret = data - np.mean(data, axis, keepdims=True) + ret = data - xp.mean(xp.asarray(data, dtype=dtype), axis=axis, keepdims=True) return ret else: dshape = data.shape N = dshape[axis] - bp = np.sort(np.unique(np.concatenate(np.atleast_1d(0, bp, N)))) - if np.any(bp > N): + if isinstance(bp, int): + bp = xp.sort(xp.unique(xp.asarray([0, bp, N]))) + else: + bp = xp.asarray(bp) + new_bp = xp.empty(size(bp) + 1) + new_bp[:size(bp)] = bp + new_bp[size(bp)] = N + bp = xp.sort(xp.unique(xp.asarray([0] + new_bp))) + if xp.any(bp > N): raise ValueError("Breakpoints must be less than length " "of data along given axis.") @@ -3582,28 +3593,32 @@ def detrend(data, axis=-1, type='linear', bp=0, overwrite_data=False): rnk = len(dshape) if axis < 0: axis = axis + rnk - newdata = np.moveaxis(data, axis, 0) + newdata = xp.moveaxis(data, axis, 0) newdata_shape = newdata.shape newdata = newdata.reshape(N, -1) if not overwrite_data: - newdata = newdata.copy() # make sure we have a copy - if newdata.dtype.char not in 'dfDF': + newdata = xp.asarray(newdata, copy=True) # make sure we have a copy + if newdata.dtype not in [xp.float64, xp.float32, xp.complex128, xp.complex64]: newdata = newdata.astype(dtype) # Nreg = len(bp) - 1 # Find leastsq fit and remove it for each piece for m in range(len(bp) - 1): Npts = bp[m + 1] - bp[m] - A = np.ones((Npts, 2), dtype) - A[:, 0] = np.arange(1, Npts + 1, dtype=dtype) / Npts - sl = slice(bp[m], bp[m + 1]) - coef, resids, rank, s = linalg.lstsq(A, newdata[sl]) + A = xp.ones((int(Npts), 2), dtype=dtype) + A[:, 0] = xp.arange(1, Npts + 1, dtype=dtype) / Npts + sl = slice(int(bp[m]), int(bp[m + 1])) + # NOTE: lstsq isn't in the array API standard + if "cupy" in xp.__name__ or "torch" in xp.__name__: + coef, resids, rank, s = xp.linalg.lstsq(A, newdata[sl], rcond=None) + else: + coef, resids, rank, s = linalg.lstsq(A, newdata[sl]) newdata[sl] = newdata[sl] - A @ coef # Put data back in original shape. newdata = newdata.reshape(newdata_shape) - ret = np.moveaxis(newdata, 0, axis) + ret = xp.moveaxis(newdata, 0, axis) return ret diff --git a/scipy/signal/_spectral_py.py b/scipy/signal/_spectral_py.py index ef7fd2e6e8b7..e1697dfe783c 100644 --- a/scipy/signal/_spectral_py.py +++ b/scipy/signal/_spectral_py.py @@ -2,6 +2,9 @@ """ import numpy as np from scipy import fft as sp_fft +from scipy._lib._array_api import ( + array_namespace, size, +) from . import _signaltools from .windows import get_window from ._spectral import _lombscargle @@ -597,28 +600,38 @@ def csd(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, nfft=None, >>> plt.show() """ + xp = array_namespace(x) freqs, _, Pxy = _spectral_helper(x, y, fs, window, nperseg, noverlap, nfft, detrend, return_onesided, scaling, axis, mode='psd') # Average over windows. - if len(Pxy.shape) >= 2 and Pxy.size > 0: + if len(Pxy.shape) >= 2 and size(Pxy) > 0: if Pxy.shape[-1] > 1: if average == 'median': - # np.median must be passed real arrays for the desired result + # xp.median must be passed real arrays for the desired result bias = _median_bias(Pxy.shape[-1]) - if np.iscomplexobj(Pxy): - Pxy = (np.median(np.real(Pxy), axis=-1) - + 1j * np.median(np.imag(Pxy), axis=-1)) + if Pxy.dtype in [xp.complex64, xp.complex128]: + Pxy = (xp.median(xp.real(Pxy), axis=-1) + + 1j * xp.median(xp.imag(Pxy), axis=-1)) else: - Pxy = np.median(Pxy, axis=-1) - Pxy /= bias + Pxy = xp.median(Pxy, axis=-1) + # for PyTorch, Pxy is torch.return_types.median + # which is super confusing... + try: + device_pxy = xp.device(Pxy) + except AttributeError: + Pxy = Pxy.values + device_pxy = xp.device(Pxy) + bias = xp.asarray(bias) + bias = xp.to_device(bias, device_pxy) + Pxy = Pxy / bias elif average == 'mean': Pxy = Pxy.mean(axis=-1) else: raise ValueError(f'average must be "median" or "mean", got {average}') else: - Pxy = np.reshape(Pxy, Pxy.shape[:-1]) + Pxy = xp.reshape(Pxy, Pxy.shape[:-1]) return freqs, Pxy @@ -1760,6 +1773,7 @@ def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, .. versionadded:: 0.16.0 """ + xp = array_namespace(x) if mode not in ['psd', 'stft']: raise ValueError("Unknown value for mode %s, must be one of: " "{'psd', 'stft'}" % mode) @@ -1782,13 +1796,15 @@ def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, axis = int(axis) - # Ensure we have np.arrays, get outdtype - x = np.asarray(x) + # Ensure we have xp.arrays, get outdtype + x = xp.asarray(x) + # https://github.com/data-apis/array-api-compat/issues/43 + tmp = xp.asarray([0], dtype=xp.complex64) if not same_data: - y = np.asarray(y) - outdtype = np.result_type(x, y, np.complex64) + y = xp.asarray(y) + outdtype = xp.result_type(x, y, xp.complex64) else: - outdtype = np.result_type(x, np.complex64) + outdtype = xp.result_type(x, tmp) if not same_data: # Check if we can broadcast the outer axes together @@ -1797,24 +1813,24 @@ def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, xouter.pop(axis) youter.pop(axis) try: - outershape = np.broadcast(np.empty(xouter), np.empty(youter)).shape + outershape = xp.broadcast(xp.empty(xouter), xp.empty(youter)).shape except ValueError as e: raise ValueError('x and y cannot be broadcast together.') from e if same_data: - if x.size == 0: - return np.empty(x.shape), np.empty(x.shape), np.empty(x.shape) + if size(x) == 0: + return xp.empty(x.shape), xp.empty(x.shape), xp.empty(x.shape) else: - if x.size == 0 or y.size == 0: + if size(x) == 0 or size(y) == 0: outshape = outershape + (min([x.shape[axis], y.shape[axis]]),) - emptyout = np.moveaxis(np.empty(outshape), -1, axis) + emptyout = xp.moveaxis(xp.empty(outshape), -1, axis) return emptyout, emptyout, emptyout if x.ndim > 1: if axis != -1: - x = np.moveaxis(x, axis, -1) + x = xp.moveaxis(x, axis, -1) if not same_data and y.ndim > 1: - y = np.moveaxis(y, axis, -1) + y = xp.moveaxis(y, axis, -1) # Check if x and y are the same length, zero-pad if necessary if not same_data: @@ -1822,11 +1838,11 @@ def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, if x.shape[-1] < y.shape[-1]: pad_shape = list(x.shape) pad_shape[-1] = y.shape[-1] - x.shape[-1] - x = np.concatenate((x, np.zeros(pad_shape)), -1) + x = xp.concatenate((x, xp.zeros(pad_shape)), -1) else: pad_shape = list(y.shape) pad_shape[-1] = x.shape[-1] - y.shape[-1] - y = np.concatenate((y, np.zeros(pad_shape)), -1) + y = xp.concatenate((y, xp.zeros(pad_shape)), -1) if nperseg is not None: # if specified by user nperseg = int(nperseg) @@ -1835,6 +1851,7 @@ def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, # parse window; if array like, then set nperseg = win.shape win, nperseg = _triage_segments(window, nperseg, input_length=x.shape[-1]) + win = xp.asarray(win) if nfft is None: nfft = nperseg @@ -1868,10 +1885,10 @@ def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, # I.e make x.shape[-1] = nperseg + (nseg-1)*nstep, with integer nseg nadd = (-(x.shape[-1]-nperseg) % nstep) % nperseg zeros_shape = list(x.shape[:-1]) + [nadd] - x = np.concatenate((x, np.zeros(zeros_shape)), axis=-1) + x = xp.concatenate((x, xp.zeros(zeros_shape)), axis=-1) if not same_data: zeros_shape = list(y.shape[:-1]) + [nadd] - y = np.concatenate((y, np.zeros(zeros_shape)), axis=-1) + y = xp.concatenate((y, xp.zeros(zeros_shape)), axis=-1) # Handle detrending and window functions if not detrend: @@ -1884,14 +1901,14 @@ def detrend_func(d): # Wrap this function so that it receives a shape that it could # reasonably expect to receive. def detrend_func(d): - d = np.moveaxis(d, -1, axis) + d = xp.moveaxis(d, -1, axis) d = detrend(d) - return np.moveaxis(d, axis, -1) + return xp.moveaxis(d, axis, -1) else: detrend_func = detrend - if np.result_type(win, np.complex64) != outdtype: - win = win.astype(outdtype) + if xp.result_type(win, xp.complex64) != outdtype: + win = xp.astype(win, outdtype) if scaling == 'density': scale = 1.0 / (fs * (win*win).sum()) @@ -1901,17 +1918,23 @@ def detrend_func(d): raise ValueError('Unknown scaling: %r' % scaling) if mode == 'stft': - scale = np.sqrt(scale) + scale = xp.sqrt(scale) if return_onesided: - if np.iscomplexobj(x): + try: + is_complex = xp.iscomplexobj(x) + except AttributeError: + # torch shim + is_complex = xp.is_complex(x) + + if is_complex: sides = 'twosided' warnings.warn('Input data is complex, switching to return_onesided=False', stacklevel=3) else: sides = 'onesided' if not same_data: - if np.iscomplexobj(y): + if xp.iscomplexobj(y): sides = 'twosided' warnings.warn('Input data is complex, switching to ' 'return_onesided=False', @@ -1920,9 +1943,9 @@ def detrend_func(d): sides = 'twosided' if sides == 'twosided': - freqs = sp_fft.fftfreq(nfft, 1/fs) + freqs = xp.fft.fftfreq(nfft, 1/fs) elif sides == 'onesided': - freqs = sp_fft.rfftfreq(nfft, 1/fs) + freqs = xp.fft.rfftfreq(nfft, 1/fs) # Perform the windowed FFTs result = _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft, sides) @@ -1931,9 +1954,9 @@ def detrend_func(d): # All the same operations on the y data result_y = _fft_helper(y, win, detrend_func, nperseg, noverlap, nfft, sides) - result = np.conjugate(result) * result_y + result = xp.conj(result) * result_y elif mode == 'psd': - result = np.conjugate(result) * result + result = xp.conj(result) * result result *= scale if sides == 'onesided' and mode == 'psd': @@ -1943,12 +1966,12 @@ def detrend_func(d): # Last point is unpaired Nyquist freq point, don't double result[..., 1:-1] *= 2 - time = np.arange(nperseg/2, x.shape[-1] - nperseg/2 + 1, + time = xp.arange(nperseg/2, x.shape[-1] - nperseg/2 + 1, nperseg - noverlap)/float(fs) if boundary is not None: time -= (nperseg/2) / fs - result = result.astype(outdtype) + result = xp.astype(result, outdtype) # All imaginary parts are zero anyways if same_data and mode != 'stft': @@ -1960,7 +1983,7 @@ def detrend_func(d): axis -= 1 # Roll frequency axis back to axis where the data came from - result = np.moveaxis(result, -1, axis) + result = xp.moveaxis(result, -1, axis) return freqs, time, result @@ -1987,12 +2010,14 @@ def _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft, sides): .. versionadded:: 0.16.0 """ - # Created sliding window view of array + xp = array_namespace(x) + x = xp.asarray(x) + # Created strided array of data segments if nperseg == 1 and noverlap == 0: - result = x[..., np.newaxis] + result = x[..., xp.newaxis] else: step = nperseg - noverlap - result = np.lib.stride_tricks.sliding_window_view( + result = xp.lib.stride_tricks.sliding_window_view( x, window_shape=nperseg, axis=-1, writeable=True ) result = result[..., 0::step, :] @@ -2001,14 +2026,18 @@ def _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft, sides): result = detrend_func(result) # Apply window by multiplication + # NOTE: torch device shim -- needs + # deeper analysis + result_device = xp.device(result) + win = xp.to_device(win, result_device) result = win * result # Perform the fft. Acts on last axis by default. Zero-pads automatically if sides == 'twosided': - func = sp_fft.fft + func = xp.fft.fft else: result = result.real - func = sp_fft.rfft + func = xp.fft.rfft result = func(result, n=nfft) return result @@ -2059,7 +2088,8 @@ def _triage_segments(window, nperseg, input_length): nperseg = input_length win = get_window(window, nperseg) else: - win = np.asarray(window) + xp = array_namespace(window) + win = xp.asarray(window) if len(win.shape) != 1: raise ValueError('window must be 1-D') if input_length < win.shape[-1]: diff --git a/scipy/signal/tests/test_spectral.py b/scipy/signal/tests/test_spectral.py index ed0af49b2ef8..f0013a671d66 100644 --- a/scipy/signal/tests/test_spectral.py +++ b/scipy/signal/tests/test_spectral.py @@ -19,6 +19,9 @@ from scipy.signal.tests._scipy_spectral_test_shim import stft_compare as stft from scipy.signal.tests._scipy_spectral_test_shim import istft_compare as istft from scipy.signal.tests._scipy_spectral_test_shim import csd_compare as csd +from scipy.conftest import array_api_compatible, skip_if_array_api_backend +from scipy._lib.array_api_compat import array_api_compat +from scipy._lib._array_api import SCIPY_ARRAY_API class TestPeriodogram: @@ -243,150 +246,202 @@ def test_shorter_window_error(self): class TestWelch: - def test_real_onesided_even(self): - x = np.zeros(16) + @array_api_compatible + def test_real_onesided_even(self, xp): + x = xp.zeros(16) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8) - assert_allclose(f, np.linspace(0, 0.5, 5)) q = np.array([0.08333333, 0.15277778, 0.22222222, 0.22222222, 0.11111111]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) - - def test_real_onesided_odd(self): - x = np.zeros(16) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.linspace(0, 0.5, 5)) + + @array_api_compatible + def test_real_onesided_odd(self, xp): + x = xp.zeros(16) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=9) - assert_allclose(f, np.arange(5.0)/9.0) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.arange(5.0)/9.0) q = np.array([0.12477455, 0.23430933, 0.17072113, 0.17072113, 0.17072113]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) - def test_real_twosided(self): - x = np.zeros(16) + @array_api_compatible + def test_real_twosided(self, xp): + x = xp.zeros(16) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8, return_onesided=False) - assert_allclose(f, fftfreq(8, 1.0)) + assert_allclose(array_api_compat.to_device(f, "cpu"), + fftfreq(8, 1.0)) q = np.array([0.08333333, 0.07638889, 0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.07638889]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) - def test_real_spectrum(self): - x = np.zeros(16) + @array_api_compatible + def test_real_spectrum(self, xp): + x = xp.zeros(16) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8, scaling='spectrum') - assert_allclose(f, np.linspace(0, 0.5, 5)) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.linspace(0, 0.5, 5)) q = np.array([0.015625, 0.02864583, 0.04166667, 0.04166667, 0.02083333]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) - def test_integer_onesided_even(self): - x = np.zeros(16, dtype=int) + @array_api_compatible + def test_integer_onesided_even(self, xp): + x = xp.zeros(16, dtype=int) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8) - assert_allclose(f, np.linspace(0, 0.5, 5)) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.linspace(0, 0.5, 5)) q = np.array([0.08333333, 0.15277778, 0.22222222, 0.22222222, 0.11111111]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) - def test_integer_onesided_odd(self): - x = np.zeros(16, dtype=int) + @array_api_compatible + def test_integer_onesided_odd(self, xp): + x = xp.zeros(16, dtype=int) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=9) - assert_allclose(f, np.arange(5.0)/9.0) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.arange(5.0)/9.0) q = np.array([0.12477455, 0.23430933, 0.17072113, 0.17072113, 0.17072113]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) - def test_integer_twosided(self): - x = np.zeros(16, dtype=int) + @array_api_compatible + def test_integer_twosided(self, xp): + x = xp.zeros(16, dtype=int) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8, return_onesided=False) - assert_allclose(f, fftfreq(8, 1.0)) + assert_allclose(array_api_compat.to_device(f, "cpu"), + fftfreq(8, 1.0)) q = np.array([0.08333333, 0.07638889, 0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.07638889]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) - def test_complex(self): - x = np.zeros(16, np.complex128) + @array_api_compatible + def test_complex(self, xp): + x = xp.zeros(16, dtype=xp.complex128) x[0] = 1.0 + 2.0j x[8] = 1.0 + 2.0j f, p = welch(x, nperseg=8, return_onesided=False) - assert_allclose(f, fftfreq(8, 1.0)) + assert_allclose(array_api_compat.to_device(f, "cpu"), + fftfreq(8, 1.0)) q = np.array([0.41666667, 0.38194444, 0.55555556, 0.55555556, 0.55555556, 0.55555556, 0.55555556, 0.38194444]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) - def test_unk_scaling(self): - assert_raises(ValueError, welch, np.zeros(4, np.complex128), + @array_api_compatible + def test_unk_scaling(self, xp): + assert_raises(ValueError, welch, xp.zeros(4, dtype=xp.complex128), scaling='foo', nperseg=4) - def test_detrend_linear(self): - x = np.arange(10, dtype=np.float64) + 0.04 + @array_api_compatible + def test_detrend_linear(self, xp): + x = xp.arange(10, dtype=xp.float64) + 0.04 f, p = welch(x, nperseg=10, detrend='linear') + p = array_api_compat.to_device(p, "cpu") assert_allclose(p, np.zeros_like(p), atol=1e-15) - def test_no_detrending(self): - x = np.arange(10, dtype=np.float64) + 0.04 + @array_api_compatible + def test_no_detrending(self, xp): + x = xp.arange(10, dtype=xp.float64) + 0.04 f1, p1 = welch(x, nperseg=10, detrend=False) f2, p2 = welch(x, nperseg=10, detrend=lambda x: x) - assert_allclose(f1, f2, atol=1e-15) - assert_allclose(p1, p2, atol=1e-15) - - def test_detrend_external(self): - x = np.arange(10, dtype=np.float64) + 0.04 + assert_allclose(array_api_compat.to_device(f1, "cpu"), + array_api_compat.to_device(f2, "cpu"), + atol=1e-15) + assert_allclose(array_api_compat.to_device(p1, "cpu"), + array_api_compat.to_device(p2, "cpu"), + atol=1e-15) + + @array_api_compatible + def test_detrend_external(self, xp): + x = xp.arange(10, dtype=xp.float64) + 0.04 f, p = welch(x, nperseg=10, detrend=lambda seg: signal.detrend(seg, type='l')) + p = array_api_compat.to_device(p, "cpu") assert_allclose(p, np.zeros_like(p), atol=1e-15) - def test_detrend_external_nd_m1(self): - x = np.arange(40, dtype=np.float64) + 0.04 + @array_api_compatible + def test_detrend_external_nd_m1(self, xp): + x = xp.arange(40, dtype=xp.float64) + 0.04 x = x.reshape((2,2,10)) f, p = welch(x, nperseg=10, detrend=lambda seg: signal.detrend(seg, type='l')) + p = array_api_compat.to_device(p, "cpu") assert_allclose(p, np.zeros_like(p), atol=1e-15) - def test_detrend_external_nd_0(self): - x = np.arange(20, dtype=np.float64) + 0.04 + @array_api_compatible + def test_detrend_external_nd_0(self, xp): + x = xp.arange(20, dtype=xp.float64) + 0.04 x = x.reshape((2,1,10)) - x = np.moveaxis(x, 2, 0) + x = xp.moveaxis(x, 2, 0) f, p = welch(x, nperseg=10, axis=0, detrend=lambda seg: signal.detrend(seg, axis=0, type='l')) + p = array_api_compat.to_device(p, "cpu") assert_allclose(p, np.zeros_like(p), atol=1e-15) - def test_nd_axis_m1(self): - x = np.arange(20, dtype=np.float64) + 0.04 + @array_api_compatible + def test_nd_axis_m1(self, xp): + x = xp.arange(20, dtype=xp.float64) + 0.04 x = x.reshape((2,1,10)) f, p = welch(x, nperseg=10) assert_array_equal(p.shape, (2, 1, 6)) - assert_allclose(p[0,0,:], p[1,0,:], atol=1e-13, rtol=1e-13) + assert_allclose(array_api_compat.to_device(p[0,0,:], "cpu"), + array_api_compat.to_device(p[1,0,:], "cpu"), + atol=1e-13, rtol=1e-13) f0, p0 = welch(x[0,0,:], nperseg=10) - assert_allclose(p0[np.newaxis,:], p[1,:], atol=1e-13, rtol=1e-13) + assert_allclose(array_api_compat.to_device(p0[None,:], "cpu"), + array_api_compat.to_device(p[1,:], "cpu"), + atol=1e-13, rtol=1e-13) - def test_nd_axis_0(self): - x = np.arange(20, dtype=np.float64) + 0.04 + @array_api_compatible + def test_nd_axis_0(self, xp): + x = xp.arange(20, dtype=xp.float64) + 0.04 x = x.reshape((10,2,1)) f, p = welch(x, nperseg=10, axis=0) assert_array_equal(p.shape, (6,2,1)) - assert_allclose(p[:,0,0], p[:,1,0], atol=1e-13, rtol=1e-13) + assert_allclose(array_api_compat.to_device(p[:,0,0], "cpu"), + array_api_compat.to_device(p[:,1,0], "cpu"), + atol=1e-13, rtol=1e-13) f0, p0 = welch(x[:,0,0], nperseg=10) - assert_allclose(p0, p[:,1,0], atol=1e-13, rtol=1e-13) - - def test_window_external(self): - x = np.zeros(16) + assert_allclose(array_api_compat.to_device(p0, "cpu"), + array_api_compat.to_device(p[:,1,0], "cpu"), + atol=1e-13, rtol=1e-13) + + @array_api_compatible + @skip_if_array_api_backend('torch') + def test_window_external(self, xp): + x = xp.zeros(16) x[0] = 1 x[8] = 1 f, p = welch(x, 10, 'hann', nperseg=8) win = signal.get_window('hann', 8) fe, pe = welch(x, 10, win, nperseg=None) - assert_array_almost_equal_nulp(p, pe) - assert_array_almost_equal_nulp(f, fe) + assert_array_almost_equal_nulp(array_api_compat.to_device(p, "cpu"), + array_api_compat.to_device(pe, "cpu")) + assert_array_almost_equal_nulp(array_api_compat.to_device(f, "cpu"), + array_api_compat.to_device(fe, "cpu")) assert_array_equal(fe.shape, (5,)) # because win length used as nperseg assert_array_equal(pe.shape, (5,)) assert_raises(ValueError, welch, x, @@ -395,8 +450,10 @@ def test_window_external(self): assert_raises(ValueError, welch, x, 10, win_err, nperseg=None) # win longer than signal - def test_empty_input(self): - f, p = welch([]) + @array_api_compatible + def test_empty_input(self, xp): + val = xp.asarray([]) if SCIPY_ARRAY_API else [] + f, p = welch(val) assert_array_equal(f.shape, (0,)) assert_array_equal(p.shape, (0,)) for shape in [(0,), (3,0), (0,5,2)]: @@ -404,14 +461,16 @@ def test_empty_input(self): assert_array_equal(f.shape, shape) assert_array_equal(p.shape, shape) - def test_empty_input_other_axis(self): + @array_api_compatible + def test_empty_input_other_axis(self, xp): for shape in [(3,0), (0,5,2)]: - f, p = welch(np.empty(shape), axis=1) + f, p = welch(xp.empty(shape), axis=1) assert_array_equal(f.shape, shape) assert_array_equal(p.shape, shape) - def test_short_data(self): - x = np.zeros(8) + @array_api_compatible + def test_short_data(self, xp): + x = xp.zeros(8) x[0] = 1 #for string-like window, input signal length < nperseg value gives #UserWarning, sets nperseg to x.shape[-1] @@ -421,103 +480,129 @@ def test_short_data(self): f, p = welch(x,window='hann') # default nperseg f1, p1 = welch(x,window='hann', nperseg=256) # user-specified nperseg f2, p2 = welch(x, nperseg=8) # valid nperseg, doesn't give warning - assert_allclose(f, f2) - assert_allclose(p, p2) - assert_allclose(f1, f2) - assert_allclose(p1, p2) - - def test_window_long_or_nd(self): - assert_raises(ValueError, welch, np.zeros(4), 1, np.array([1,1,1,1,1])) - assert_raises(ValueError, welch, np.zeros(4), 1, - np.arange(6).reshape((2,3))) - - def test_nondefault_noverlap(self): - x = np.zeros(64) + assert_allclose(array_api_compat.to_device(f, "cpu"), + array_api_compat.to_device(f2, "cpu")) + assert_allclose(array_api_compat.to_device(p, "cpu"), + array_api_compat.to_device(p2, "cpu")) + assert_allclose(array_api_compat.to_device(f1, "cpu"), + array_api_compat.to_device(f2, "cpu")) + assert_allclose(array_api_compat.to_device(p1, "cpu"), + array_api_compat.to_device(p2, "cpu")) + + @array_api_compatible + def test_window_long_or_nd(self, xp): + assert_raises(ValueError, welch, xp.zeros(4), 1, xp.asarray([1,1,1,1,1])) + assert_raises(ValueError, welch, xp.zeros(4), 1, + xp.arange(6).reshape((2,3))) + + @array_api_compatible + def test_nondefault_noverlap(self, xp): + x = xp.zeros(64) x[::8] = 1 f, p = welch(x, nperseg=16, noverlap=4) q = np.array([0, 1./12., 1./3., 1./5., 1./3., 1./5., 1./3., 1./5., 1./6.]) - assert_allclose(p, q, atol=1e-12) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-12) - def test_bad_noverlap(self): - assert_raises(ValueError, welch, np.zeros(4), 1, 'hann', 2, 7) + @array_api_compatible + def test_bad_noverlap(self, xp): + assert_raises(ValueError, welch, xp.zeros(4), 1, 'hann', 2, 7) - def test_nfft_too_short(self): - assert_raises(ValueError, welch, np.ones(12), nfft=3, nperseg=4) + @array_api_compatible + def test_nfft_too_short(self, xp): + assert_raises(ValueError, welch, xp.ones(12), nfft=3, nperseg=4) - def test_real_onesided_even_32(self): - x = np.zeros(16, 'f') + @array_api_compatible + def test_real_onesided_even_32(self, xp): + x = xp.zeros(16, dtype=xp.float32) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8) - assert_allclose(f, np.linspace(0, 0.5, 5)) - q = np.array([0.08333333, 0.15277778, 0.22222222, 0.22222222, - 0.11111111], 'f') - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.linspace(0, 0.5, 5)) + q = xp.asarray([0.08333333, 0.15277778, 0.22222222, 0.22222222, + 0.11111111], dtype=xp.float32) + assert_allclose(array_api_compat.to_device(p, "cpu"), + array_api_compat.to_device(q, "cpu"), + atol=1e-7, rtol=1e-7) assert_(p.dtype == q.dtype) - def test_real_onesided_odd_32(self): - x = np.zeros(16, 'f') + @array_api_compatible + def test_real_onesided_odd_32(self, xp): + x = xp.zeros(16, dtype=xp.float32) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=9) - assert_allclose(f, np.arange(5.0)/9.0) - q = np.array([0.12477458, 0.23430935, 0.17072113, 0.17072116, - 0.17072113], 'f') - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.arange(5.0)/9.0) + q = xp.asarray([0.12477458, 0.23430935, 0.17072113, 0.17072116, + 0.17072113], dtype=xp.float32) + assert_allclose(array_api_compat.to_device(p, "cpu"), + array_api_compat.to_device(q, "cpu"), + atol=1e-7, rtol=1e-7) assert_(p.dtype == q.dtype) - def test_real_twosided_32(self): - x = np.zeros(16, 'f') + @array_api_compatible + def test_real_twosided_32(self, xp): + x = xp.zeros(16, dtype=xp.float32) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8, return_onesided=False) - assert_allclose(f, fftfreq(8, 1.0)) - q = np.array([0.08333333, 0.07638889, 0.11111111, - 0.11111111, 0.11111111, 0.11111111, 0.11111111, - 0.07638889], 'f') - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(f, "cpu"), + fftfreq(8, 1.0)) + q = xp.asarray([0.08333333, 0.07638889, 0.11111111, + 0.11111111, 0.11111111, 0.11111111, 0.11111111, + 0.07638889], dtype=xp.float32) + assert_allclose(array_api_compat.to_device(p, "cpu"), + array_api_compat.to_device(q, "cpu"), + atol=1e-7, rtol=1e-7) assert_(p.dtype == q.dtype) - def test_complex_32(self): - x = np.zeros(16, 'F') + @array_api_compatible + def test_complex_32(self, xp): + x = xp.zeros(16, dtype=xp.complex64) x[0] = 1.0 + 2.0j x[8] = 1.0 + 2.0j f, p = welch(x, nperseg=8, return_onesided=False) - assert_allclose(f, fftfreq(8, 1.0)) - q = np.array([0.41666666, 0.38194442, 0.55555552, 0.55555552, - 0.55555558, 0.55555552, 0.55555552, 0.38194442], 'f') - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + assert_allclose(array_api_compat.to_device(f, "cpu"), + fftfreq(8, 1.0)) + q = xp.asarray([0.41666666, 0.38194442, 0.55555552, 0.55555552, + 0.55555558, 0.55555552, 0.55555552, 0.38194442], dtype=xp.float32) + assert_allclose(array_api_compat.to_device(p, "cpu"), + array_api_compat.to_device(q, "cpu"), atol=1e-7, rtol=1e-7) assert_(p.dtype == q.dtype, f'dtype mismatch, {p.dtype}, {q.dtype}') - def test_padded_freqs(self): - x = np.zeros(12) + @array_api_compatible + def test_padded_freqs(self, xp): + x = xp.zeros(12) nfft = 24 f = fftfreq(nfft, 1.0)[:nfft//2+1] f[-1] *= -1 fodd, _ = welch(x, nperseg=5, nfft=nfft) feven, _ = welch(x, nperseg=6, nfft=nfft) - assert_allclose(f, fodd) - assert_allclose(f, feven) + assert_allclose(f, array_api_compat.to_device(fodd, "cpu")) + assert_allclose(f, array_api_compat.to_device(feven, "cpu")) nfft = 25 f = fftfreq(nfft, 1.0)[:(nfft + 1)//2] fodd, _ = welch(x, nperseg=5, nfft=nfft) feven, _ = welch(x, nperseg=6, nfft=nfft) - assert_allclose(f, fodd) - assert_allclose(f, feven) + assert_allclose(f, array_api_compat.to_device(fodd, "cpu")) + assert_allclose(f, array_api_compat.to_device(feven, "cpu")) - def test_window_correction(self): + @array_api_compatible + def test_window_correction(self, xp): A = 20 fs = 1e4 nperseg = int(fs//10) fsig = 300 ii = int(fsig*nperseg//fs) # Freq index of fsig - tt = np.arange(fs)/fs - x = A*np.sin(2*np.pi*fsig*tt) + tt = xp.arange(fs)/fs + x = A*xp.sin(2*xp.pi*fsig*tt) for window in ['hann', 'bartlett', ('tukey', 0.1), 'flattop']: _, p_spec = welch(x, fs=fs, nperseg=nperseg, window=window, @@ -526,15 +611,18 @@ def test_window_correction(self): scaling='density') # Check peak height at signal frequency for 'spectrum' - assert_allclose(p_spec[ii], A**2/2.0) + assert_allclose(array_api_compat.to_device(p_spec[ii], "cpu"), + A**2/2.0, rtol=5e-7) # Check integrated spectrum RMS for 'density' - assert_allclose(np.sqrt(trapezoid(p_dens, freq)), A*np.sqrt(2)/2, - rtol=1e-3) + assert_allclose(np.sqrt(np.trapz(array_api_compat.to_device(p_dens, "cpu"), + array_api_compat.to_device(freq, "cpu"))), + A*np.sqrt(2)/2, rtol=1e-3) - def test_axis_rolling(self): + @array_api_compatible + def test_axis_rolling(self, xp): np.random.seed(1234) - x_flat = np.random.randn(1024) + x_flat = xp.asarray(np.random.randn(1024)) _, p_flat = welch(x_flat) for a in range(3): @@ -545,17 +633,24 @@ def test_axis_rolling(self): _, p_plus = welch(x, axis=a) # Positive axis index _, p_minus = welch(x, axis=a-x.ndim) # Negative axis index - assert_equal(p_flat, p_plus.squeeze(), err_msg=a) - assert_equal(p_flat, p_minus.squeeze(), err_msg=a-x.ndim) + assert_array_equal(array_api_compat.to_device(p_flat, "cpu"), + array_api_compat.to_device(p_plus.squeeze(), "cpu"), + err_msg=a) + assert_array_equal(array_api_compat.to_device(p_flat, "cpu"), + array_api_compat.to_device(p_minus.squeeze(), "cpu"), + err_msg=a-x.ndim) - def test_average(self): - x = np.zeros(16) + @array_api_compatible + def test_average(self, xp): + x = xp.zeros(16) x[0] = 1 x[8] = 1 f, p = welch(x, nperseg=8, average='median') - assert_allclose(f, np.linspace(0, 0.5, 5)) - q = np.array([.1, .05, 0., 1.54074396e-33, 0.]) - assert_allclose(p, q, atol=1e-7, rtol=1e-7) + q = np.asarray([.1, .05, 0., 1.54074396e-33, 0.]) + assert_allclose(array_api_compat.to_device(f, "cpu"), + np.linspace(0, 0.5, 5)) + assert_allclose(array_api_compat.to_device(p, "cpu"), + q, atol=1e-7, rtol=1e-7) assert_raises(ValueError, welch, x, nperseg=8, average='unrecognised-average') @@ -1608,8 +1703,8 @@ def test_roundtrip_scaling(self): # Since x is real, its Fourier transform is conjugate symmetric, i.e., # the missing 'second side' can be expressed through the 'first side': Zp1 = np.conj(Zp0[-2:0:-1, :]) # 'second side' is conjugate reversed - assert_allclose(Zp[:129, :], Zp0) - assert_allclose(Zp[129:, :], Zp1) + assert_allclose(Zp[:129, :], Zp0, atol=9e-16) + assert_allclose(Zp[129:, :], Zp1, atol=9e-16) # Calculate the spectral power: s2 = (np.sum(Zp0.real ** 2 + Zp0.imag ** 2, axis=0) +