# $\Delta$ITD and $\Delta$ILD

In [None]:
import os
from datetime import datetime
from tqdm import tqdm
import numpy as np
import pandas as pd
from scipy import signal
from scipy.fft import rfft, irfft
from sklearn.utils.extmath import weighted_mode
import soundfile as sf

In [None]:
# constants
SAMPLE_RATE = 44100
ENERGY_THRESHOLD = 5e-4
TMAX = int(1e-3 * SAMPLE_RATE)  # maximum lag in samples = +/- 1 ms
FRAME_LENGTH = 0.5  # seconds

## Evaluation Functions

Adapted from [`https://github.com/vb000/SemanticHearing/blob/main/src/helpers/eval_utils.py`](https://github.com/vb000/SemanticHearing/blob/main/src/helpers/eval_utils.py)

Veluri, B., Itani, M., Chan, J., Yoshioka, T., & Gollakota, S. (2023). Semantic hearing: Programming acoustic scenes with binaural hearables. In _Proceedings of the 36th Annual ACM Symposium on User Interface Software and Technology_ (pp. 1-15).

In [None]:
def tdoa(x1, x2, interp=1, fs=44100, phat=True, t_max=None):
    """
    This function computes the time difference of arrival (TDOA)
    of the signal at the two ears or microphones. We recover tau
    using the Generalized Cross Correlation - Phase Transform (GCC-PHAT)
    method.
    
    Knapp, C., & Carter, G. C. (1976). The generalized correlation method for
    estimation of time delay.
    
    Parameters
    ----------
    x1 : (1d-array) the signal of the reference microphone
    x2 : (1d-array) the signal of the second microphone
    interp : (int) optional (default 1), the interpolation value
             for the cross-correlation, it can improve the
             timeresolution (and hence DOA resolution)
    fs : (int), optional (default 44100 Hz), the sampling frequency
          of the input signal
    phat : (bool) whether to use phase transform normalization
    t_max : (int) the maximum tau (lag) to use
    
    Return
    ------
    tdoa : (float), the delay between the two microphones (in seconds)
    """
    # zero padded length for the FFT
    n = x1.shape[-1] + x2.shape[-1] - 1
    if n % 2 != 0:
        n += 1

    # Generalized Cross Correlation Phase Transform
    # used to find the delay between the two microphones
    X1 = rfft(np.array(x1, dtype=np.float32), n=n, axis=-1)
    X2 = rfft(np.array(x2, dtype=np.float32), n=n, axis=-1)

    if phat:
        X1 /= np.abs(X1) + 1e-15  # added epsilon to avoid division error
        X2 /= np.abs(X2) + 1e-15  # added epsilon to avoid division error

    cc = irfft(X1 * np.conj(X2), n=interp * n, axis=-1)

    # alternative: compute phase spectrum first and then normalize
    # R = X1 * np.conj(X2)
    # R_phat = R / (np.abs(R) + 1e-15)
    # cc = irfft(R_phat, n=interp * n, axis=-1)

    # maximum possible delay given distance between microphones
    if t_max is None:
        t_max = n // 2 + 1

    # reorder the cross-correlation coefficients
    cc = np.concatenate((cc[..., -t_max:], cc[..., :t_max]), axis=-1)

    # pick max cross correlation index as delay
    tau = np.argmax(np.abs(cc), axis=-1)
    tau -= t_max  # because zero time is at the center of the array

    return tau / (fs * interp)

In [None]:
def compute_ild(s_left, s_right):
    """
    Compute the interaural level (intensity) difference (ILD)
    between the left and right ears.
    
    Parameters
    ----------
    s_left : (1d-array) the signal of the left ear
    s_right : (1d-array) the signal of the right ear
    
    Return
    ------
    ild : (float) interaural level difference, in decibels (dB)
    """
    sum_sq_left = np.sum(s_left ** 2, axis=-1)
    sum_sq_right = np.sum(s_right ** 2, axis=-1)
    
    return 10 * np.log10(sum_sq_left / sum_sq_right)

In [None]:
def framewise_gccphat(x, frame_dur, sr, window='tukey'):
    """
    Compute the TDOA using the GCC-PHAT algorithm, in
    a frame-wise manner.
    
    Parameters
    ----------
    x : (2d-array) the binaural signal
    frame_dur : (float) length of frame (in seconds)
    sr : (int) sample rate of signal
    window : (str) type of window to apply to each frame
              using scipy window functions
    
    Return
    ------
    itd : (float) interaural time difference, in seconds
    """
    frame_width = int(frame_dur * sr)

    # total number of frames T
    T = (x.shape[-1]) // frame_width

    # drop samples to get a multiple of frame size
    if x.shape[-1] % T != 0:
        x = x[..., :(frame_width * T)]
    
    assert x.shape[-1] % T == 0

    # split into frames
    frames = np.array(np.split(x, T, axis=-1))

    # apply window
    window = signal.get_window(window, frame_width)
    frames = frames * window

    # consider only frames that have energy above some threshold (ignore silence)
    frame_energy = np.max(np.mean(frames**2, axis=-1)**0.5, axis=-1)
    mask = frame_energy > ENERGY_THRESHOLD
    frames = frames[mask]

    # compute TDOA by frame
    fw_gccphat = tdoa(frames[..., 0, :], frames[..., 1, :], fs=sr, t_max=TMAX)

    # apply weighted mode to get single ITD value
    itd = weighted_mode(fw_gccphat, frame_energy[mask], axis=-1)[0]

    return itd[0]

In [None]:
def fw_itd_diff(s_est, s_gt, sr, frame_duration=0.25):
    """
    Computes the ITD error between the signal estimated
    by the model and the ground truth signal using the
    frame-wise GCC-PHAT algorithm.

    Parameters
    ----------
    s_est : (2d-array) the estimated signal
    s_gt : (2d-array) the ground-truth signal
    sr : (int) sample rate of signal
    frame_duration : (float), optional (default 0.25 s),
                     length of frame (in seconds)
    
    Return
    ------
    itd : (float) delta ITD between estimated and 
          ground-truth signals, in microseconds   
    """
    itd_gt = framewise_gccphat(s_gt, frame_duration, sr) * 1e6
    itd_est = framewise_gccphat(s_est, frame_duration, sr) * 1e6
    return np.abs(itd_est - itd_gt)

In [None]:
def ild_diff(s_est, s_gt):
    """
    Computes the ILD error between the signal estimated
    by the model and the ground truth signal.

        Parameters
    ----------
    s_est : (2d-array) the estimated signal
    s_gt : (2d-array) the ground-truth signal
    
    Return
    ------
    ild : (float) delta ILD between estimated and 
          ground-truth signals, in decibels (dB)  
    """
    ild_est = compute_ild(s_est[..., 0, :], s_est[..., 1, :])
    ild_gt = compute_ild(s_gt[..., 0, :], s_gt[..., 1, :])
    return np.abs(ild_est - ild_gt)

### Low Frequency Test

In [None]:
f0 = 200
sr = 44100
N = sr * 5
time_vector = np.linspace(0, 5, num=N)
channel_1 = np.sin(f0 * time_vector)
delay = int(0.001 * sr)  # 1000 microseconds
channel_2 = np.pad(channel_1, (delay, 0))[:N]
y = np.stack((channel_1, channel_2), axis=0)

In [None]:
print(f"ITD with 0.25 s frames: {framewise_gccphat(y, 0.25, sr) * 1e6}")
print(f"ITD with 0.5 s frames: {framewise_gccphat(y, 0.5, sr) * 1e6}")
print(f"ITD with 1 s frames: {framewise_gccphat(y, 1, sr) * 1e6}")
print(f"ILD: {compute_ild(channel_1, 3 * channel_2)} dB")

## Evaluate

In [None]:
DATE =  datetime.now().strftime("%Y-%m-%d")
MODEL = '' # htdemucs_ft, spleeter, umxhq
EVAL_DIR = "../data/eval/interaural/"

In [None]:
STEMS = ["drums", "bass", "other", "vocals"]

### Stereo Data

In [None]:
DATASET = 'stereo'

# set input and output directories
REFERENCE_DIR = f"../data/musdb18hq/test/"
ESTIMATE_DIR = f"../data/output/{MODEL}/{DATASET}/test/"

In [None]:
# create the output directory if it does not already exist
print("Creating evaluation directory, if it does not already exist...")
os.makedirs(EVAL_DIR, exist_ok=True)

In [None]:
# get all of the files in the input directory
print("Loading list of files...")
song_list = [f for f in os.listdir(REFERENCE_DIR) if os.path.isdir(os.path.join(REFERENCE_DIR, f))]
print(f"There are {len(song_list)} files in the reference directory.")

In [None]:
title_list = []
source_list = []
itd_list = []
ild_list = []

print("Beginning to evaluate stems...")
for source in STEMS:
    print(f"\n>>>>{source} <<<<")
    for song in tqdm(song_list):
        
        ref_file = os.path.join(REFERENCE_DIR, song, f"{source}.wav")
        est_file = os.path.join(ESTIMATE_DIR, song, f"{source}.wav")

        # load reference and estimate stems
        y_ref, sr_ref = sf.read(ref_file)
        y_est, sr_est = sf.read(est_file)

        # check sample rates
        assert sr_ref == sr_est == SAMPLE_RATE

        # calculate Delta ITD
        delta_itd = fw_itd_diff(y_est.T, y_ref.T, SAMPLE_RATE, FRAME_LENGTH)

        # calculate ILD
        delta_ild = ild_diff(y_est.T, y_ref.T)

        title_list.append(song)
        source_list.append(source)
        itd_list.append(delta_itd)
        ild_list.append(delta_ild)

results_df = pd.DataFrame({"title": title_list, "source": source_list,
                           "diff_ITD": itd_list, "diff_ILD": ild_list})

print("Evaluation complete!")

In [None]:
# sort
results_df.sort_values(by=['title', 'source'], inplace=True, ignore_index=True)

In [None]:
save_path = os.path.join(EVAL_DIR, f'interaural_{DATE}_{MODEL}_{DATASET}.csv')

results_df.to_csv(save_path, index=False)

### Binaural Data

In [None]:
DATASET = 'binaural'

# set input and output directories
REFERENCE_DIR = f"../data/binaural_musdb18/test/"
ESTIMATE_DIR = f"../data/output/{MODEL}/{DATASET}/test/"

In [None]:
# create the output directory if it does not already exist
print("Creating evaluation directory, if it does not already exist...")
os.makedirs(EVAL_DIR, exist_ok=True)

In [None]:
# get all of the files in the input directory
print("Loading list of files...")
song_list = [f for f in os.listdir(REFERENCE_DIR) if os.path.isdir(os.path.join(REFERENCE_DIR, f))]
print(f"There are {len(song_list)} files in the reference directory.")

In [None]:
title_list = []
source_list = []
itd_list = []
ild_list = []

print("Beginning to evaluate stems...")
for source in STEMS:
    print(f"\n>>>>{source} <<<<")
    for song in tqdm(song_list):
        
        ref_file = os.path.join(REFERENCE_DIR, song, f"{source}.wav")
        est_file = os.path.join(ESTIMATE_DIR, song, f"{source}.wav")

        # load reference and estimate stems
        y_ref, sr_ref = sf.read(ref_file)
        y_est, sr_est = sf.read(est_file)

        # check sample rates
        assert sr_ref == sr_est

        # calculate Delta ITD
        delta_itd = fw_itd_diff(y_est.T, y_ref.T, sr_ref, FRAME_LENGTH)

        # calculate ILD
        delta_ild = ild_diff(y_est.T, y_ref.T)

        title_list.append(song)
        source_list.append(source)
        itd_list.append(delta_itd)
        ild_list.append(delta_ild)

results_df = pd.DataFrame({"title": title_list, "source": source_list,
                           "diff_ITD": itd_list, "diff_ILD": ild_list})

print("Evaluation complete!")

In [None]:
# sort
results_df.sort_values(by=['title', 'source'], inplace=True, ignore_index=True)

In [None]:
save_path = os.path.join(EVAL_DIR, f'interaural_{DATE}_{MODEL}_{DATASET}.csv')

results_df.to_csv(save_path, index=False)