From 75f594a4a43510d578706bd8350a616cb3381b13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20L=C3=BCbeck?= Date: Wed, 10 Apr 2024 07:52:16 +0200 Subject: [PATCH] implement and test for complex valued time signals --- pyfar/dsp/dsp.py | 97 +++++++++++++++++++++++++++-------------------- tests/test_dsp.py | 7 +++- 2 files changed, 61 insertions(+), 43 deletions(-) diff --git a/pyfar/dsp/dsp.py b/pyfar/dsp/dsp.py index 2f763b993..05b60c802 100644 --- a/pyfar/dsp/dsp.py +++ b/pyfar/dsp/dsp.py @@ -1205,7 +1205,9 @@ def find_impulse_response_delay(impulse_response, N=1): 2. By default a first order polynomial is used, as the slope of the analytic signal should in theory be linear. - Alternatively see :py:func:`pyfar.dsp.find_impulse_response_start`. + Alternatively see :py:func:`pyfar.dsp.find_impulse_response_start`. For + complex-valued time signals, the delay is calculated separately for the + real and complex part, and its minimum value returned. Parameters ---------- @@ -1253,54 +1255,65 @@ def find_impulse_response_delay(impulse_response, N=1): n = int(np.ceil((N+2)/2)) start_samples = np.zeros(impulse_response.cshape) + modes = ['real', 'complex'] if impulse_response.complex else ['real'] + start_sample = np.zeros((len(modes), 1), dtype=float) + for ch in np.ndindex(impulse_response.cshape): # Calculate the correlation between the impulse response and its # minimum phase equivalent. This requires a minimum phase equivalent # in the strict sense, instead of the appriximation implemented in # pyfar. n_samples = impulse_response.n_samples + for idx, mode in zip(range(0, len(modes)), modes): + ir = impulse_response.time[ch] + ir = np.real(ir) if mode == 'real' else np.imag(ir) + + if np.max(ir) > 1e-16: + # minimum phase warns if the input signal is not symmetric, + # which is not critical for this application + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", message="h does not appear to by symmetric", + category=RuntimeWarning) + ir_minphase = sgn.minimum_phase( + ir, n_fft=4*n_samples) + + correlation = sgn.correlate( + ir, + np.pad(ir_minphase, (0, n_samples - (n_samples + 1)//2)), + mode='full') + lags = np.arange(-n_samples + 1, n_samples) + + # calculate the analytic signal of the correlation function + correlation_analytic = sgn.hilbert(correlation) + + # find the maximum of the analytic part of the correlation + # function and define the search range around the maximum + argmax = np.argmax(np.abs(correlation_analytic)) + search_region_range = np.arange(argmax-n, argmax+n) + search_region = np.imag( + correlation_analytic[search_region_range]) + + # mask values with a negative gradient + mask = np.gradient(search_region, search_region_range) > 0 + + # fit a polygon and estimate its roots + search_region_poly = np.polyfit( + search_region_range[mask]-argmax, search_region[mask], N) + roots = np.roots(search_region_poly) + + # Use only real-valued roots + if np.all(np.isreal(roots)): + root = roots[np.abs(roots) == np.min(np.abs(roots))] + start_sample[idx] = np.squeeze(lags[argmax] + root) + else: + start_sample[idx] = np.nan + warnings.warn('Starting sample not found for channel ' + f'{ch}') + else: + start_sample[idx] = np.nan - # minimum phase warns if the input signal is not symmetric, which is - # not critical for this application - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", message="h does not appear to by symmetric", - category=RuntimeWarning) - ir_minphase = sgn.minimum_phase( - impulse_response.time[ch], n_fft=4*n_samples) - - correlation = sgn.correlate( - impulse_response.time[ch], - np.pad(ir_minphase, (0, n_samples - (n_samples + 1)//2)), - mode='full') - lags = np.arange(-n_samples + 1, n_samples) - - # calculate the analytic signal of the correlation function - correlation_analytic = sgn.hilbert(correlation) - - # find the maximum of the analytic part of the correlation function - # and define the search range around the maximum - argmax = np.argmax(np.abs(correlation_analytic)) - search_region_range = np.arange(argmax-n, argmax+n) - search_region = np.imag(correlation_analytic[search_region_range]) - - # mask values with a negative gradient - mask = np.gradient(search_region, search_region_range) > 0 - - # fit a polygon and estimate its roots - search_region_poly = np.polyfit( - search_region_range[mask]-argmax, search_region[mask], N) - roots = np.roots(search_region_poly) - - # Use only real-valued roots - if np.all(np.isreal(roots)): - root = roots[np.abs(roots) == np.min(np.abs(roots))] - start_sample = np.squeeze(lags[argmax] + root) - else: - start_sample = np.nan - warnings.warn(f"Starting sample not found for channel {ch}") - - start_samples[ch] = start_sample + start_samples[ch] = np.nanmin(start_sample) return start_samples diff --git a/tests/test_dsp.py b/tests/test_dsp.py index 3645e26cf..49a792989 100644 --- a/tests/test_dsp.py +++ b/tests/test_dsp.py @@ -718,7 +718,8 @@ def test_minimum_phase_multidim(): npt.assert_allclose(imp_minphase.time, imp_zerophase.time, atol=1e-10) -def test_impulse_response_delay(): +@pytest.mark.parametrize("is_complex", [False, True]) +def test_impulse_response_delay(is_complex): """Test delay of an ideal impulse""" n_samples = 2**10 snr = 60 @@ -726,6 +727,10 @@ def test_impulse_response_delay(): ir = pf.signals.impulse(n_samples, delay=start_sample) noise = pf.signals.noise(n_samples, rms=10**(-snr/20), seed=1) + if is_complex: + ir.fft_norm = 'none' + noise.fft_norm = 'none' + ir.complex = is_complex start_sample_est = dsp.find_impulse_response_delay(ir) npt.assert_allclose(start_sample_est, start_sample, atol=1e-6)