Skip to content

Commit

Permalink
implement and test for complex valued time signals
Browse files Browse the repository at this point in the history
  • Loading branch information
tluebeck committed Apr 10, 2024
1 parent 3f57cf4 commit 75f594a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 43 deletions.
97 changes: 55 additions & 42 deletions pyfar/dsp/dsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,9 @@ def find_impulse_response_delay(impulse_response, N=1):
2. By default a first order polynomial is used, as the slope of the
analytic signal should in theory be linear.
Alternatively see :py:func:`pyfar.dsp.find_impulse_response_start`.
Alternatively see :py:func:`pyfar.dsp.find_impulse_response_start`. For
complex-valued time signals, the delay is calculated separately for the
real and complex part, and its minimum value returned.
Parameters
----------
Expand Down Expand Up @@ -1253,54 +1255,65 @@ def find_impulse_response_delay(impulse_response, N=1):
n = int(np.ceil((N+2)/2))

start_samples = np.zeros(impulse_response.cshape)
modes = ['real', 'complex'] if impulse_response.complex else ['real']
start_sample = np.zeros((len(modes), 1), dtype=float)

for ch in np.ndindex(impulse_response.cshape):
# Calculate the correlation between the impulse response and its
# minimum phase equivalent. This requires a minimum phase equivalent
# in the strict sense, instead of the appriximation implemented in
# pyfar.
n_samples = impulse_response.n_samples
for idx, mode in zip(range(0, len(modes)), modes):
ir = impulse_response.time[ch]
ir = np.real(ir) if mode == 'real' else np.imag(ir)

if np.max(ir) > 1e-16:
# minimum phase warns if the input signal is not symmetric,
# which is not critical for this application
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="h does not appear to by symmetric",
category=RuntimeWarning)
ir_minphase = sgn.minimum_phase(
ir, n_fft=4*n_samples)

correlation = sgn.correlate(
ir,
np.pad(ir_minphase, (0, n_samples - (n_samples + 1)//2)),
mode='full')
lags = np.arange(-n_samples + 1, n_samples)

# calculate the analytic signal of the correlation function
correlation_analytic = sgn.hilbert(correlation)

# find the maximum of the analytic part of the correlation
# function and define the search range around the maximum
argmax = np.argmax(np.abs(correlation_analytic))
search_region_range = np.arange(argmax-n, argmax+n)
search_region = np.imag(
correlation_analytic[search_region_range])

# mask values with a negative gradient
mask = np.gradient(search_region, search_region_range) > 0

# fit a polygon and estimate its roots
search_region_poly = np.polyfit(
search_region_range[mask]-argmax, search_region[mask], N)
roots = np.roots(search_region_poly)

# Use only real-valued roots
if np.all(np.isreal(roots)):
root = roots[np.abs(roots) == np.min(np.abs(roots))]
start_sample[idx] = np.squeeze(lags[argmax] + root)
else:
start_sample[idx] = np.nan
warnings.warn('Starting sample not found for channel '
f'{ch}')
else:
start_sample[idx] = np.nan

# minimum phase warns if the input signal is not symmetric, which is
# not critical for this application
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="h does not appear to by symmetric",
category=RuntimeWarning)
ir_minphase = sgn.minimum_phase(
impulse_response.time[ch], n_fft=4*n_samples)

correlation = sgn.correlate(
impulse_response.time[ch],
np.pad(ir_minphase, (0, n_samples - (n_samples + 1)//2)),
mode='full')
lags = np.arange(-n_samples + 1, n_samples)

# calculate the analytic signal of the correlation function
correlation_analytic = sgn.hilbert(correlation)

# find the maximum of the analytic part of the correlation function
# and define the search range around the maximum
argmax = np.argmax(np.abs(correlation_analytic))
search_region_range = np.arange(argmax-n, argmax+n)
search_region = np.imag(correlation_analytic[search_region_range])

# mask values with a negative gradient
mask = np.gradient(search_region, search_region_range) > 0

# fit a polygon and estimate its roots
search_region_poly = np.polyfit(
search_region_range[mask]-argmax, search_region[mask], N)
roots = np.roots(search_region_poly)

# Use only real-valued roots
if np.all(np.isreal(roots)):
root = roots[np.abs(roots) == np.min(np.abs(roots))]
start_sample = np.squeeze(lags[argmax] + root)
else:
start_sample = np.nan
warnings.warn(f"Starting sample not found for channel {ch}")

start_samples[ch] = start_sample
start_samples[ch] = np.nanmin(start_sample)

return start_samples

Expand Down
7 changes: 6 additions & 1 deletion tests/test_dsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,14 +718,19 @@ def test_minimum_phase_multidim():
npt.assert_allclose(imp_minphase.time, imp_zerophase.time, atol=1e-10)


def test_impulse_response_delay():
@pytest.mark.parametrize("is_complex", [False, True])
def test_impulse_response_delay(is_complex):
"""Test delay of an ideal impulse"""
n_samples = 2**10
snr = 60
start_sample = np.array([24])

ir = pf.signals.impulse(n_samples, delay=start_sample)
noise = pf.signals.noise(n_samples, rms=10**(-snr/20), seed=1)
if is_complex:
ir.fft_norm = 'none'
noise.fft_norm = 'none'
ir.complex = is_complex

start_sample_est = dsp.find_impulse_response_delay(ir)
npt.assert_allclose(start_sample_est, start_sample, atol=1e-6)
Expand Down

0 comments on commit 75f594a

Please sign in to comment.