In [None]:
from pathlib import Path

import numpy as np
import scipy.signal
import librosa
import matplotlib.pyplot as plt
plt.style.use('ggplot')

import IPython.display as ipd
from ipywidgets import interact, fixed, Layout
import ipywidgets as widgets


sr = 16000

Uncomment if you are using Colab

In [None]:
# %%bash
# mkdir -p ./data/{aecs,rirs}
# export repo_prefix=https://raw.githubusercontent.com/karpnv/speech-tech-mipt/main/week04/data
# wget -q $repo_prefix/salut_time_query.wav -P data/
# wget -q $repo_prefix/aecs/doubletalk_mic.wav -P data/aecs
# wget -q $repo_prefix/aecs/doubletalk_spk.wav -P data/aecs
# wget -q $repo_prefix/aecs/salut_mic.wav -P data/aecs
# wget -q $repo_prefix/aecs/salut_spk.wav -P data/aecs
# wget -q $repo_prefix/rirs/greathall.wav -P data/rirs
# wget -q $repo_prefix/rirs/classroom.wav -P data/rirs
# wget -q $repo_prefix/rirs/octagon.wav -P data/rirs

# Signals

In [None]:
wav_signal, sr = librosa.load("./data/salut_time_query.wav", sr=sr)

time_interval = 0.1

n_frames = int(sr * time_interval)
target_sr = 1_000
scale_value = 1 << 12

signal_frame = wav_signal[:n_frames]

plt.figure(figsize=(20,12))

plt.subplot(221)
plt.title("Analog Signal")
plt.plot(np.arange(n_frames) * time_interval / sr, signal_frame)

resampled_signal = librosa.resample(signal_frame, orig_sr=sr, target_sr=target_sr)

plt.subplot(222)
plt.title("Discrete Signal")
plt.stem(
    np.arange(resampled_signal.size) * time_interval / target_sr, resampled_signal
)

plt.subplot(223)
plt.title("Quantized Signal")
plt.plot(
    np.arange(n_frames) * time_interval / sr, (signal_frame * scale_value).astype('int') / scale_value
)
plt.subplot(224)
plt.title("Digital Signal")
plt.stem(
    np.arange(resampled_signal.size) * time_interval / target_sr,
    (resampled_signal * scale_value).astype('int') / scale_value
)
plt.show()

# Sampling theorem

In [None]:
@interact(
    t0=widgets.FloatSlider(
        value=1,
        min=0.1,
        max=4,
        layout=Layout(width='960px'),
        description=r'$$t_0$$:'
    ),
    f0=widgets.FloatSlider(
        f0=10,
        min=1,
        max=1000,
        layout=Layout(width='960px'),
        description=r'$$f_0$$:'
    ),
    dt=widgets.FloatSlider(
        value=0.4,
        min=0.01,
        step=0.01,
        max=1,
        layout=Layout(width='960px'),
        description=r'$$\Delta t$$:'
    )
)
def guassian_pulse_discretization(t0: float, f0: float, dt: float):
    
    def x_time(t):
        return np.exp(-(t / t0) ** 2) * np.cos(f0 * t)

    def x_freq(f):
        return np.pi ** 0.5 * t0 * np.exp(- (np.pi * (f - f0) * t0) ** 2)
    
    def sinc(t):
        return np.sin(t) / t
    
    t = np.linspace(-2, 2, num=4 * 8000)
    f = np.linspace(-2.5 * f0, 2.5 * f0, num=4 * 8000)
    x_t = x_time(t)
    x_f = x_freq(f)

    t_steps = np.arange(0, t.max() + dt, step=dt)
    t_steps = np.concatenate([-t_steps[:0:-1], t_steps])
    x_t_steps = x_time(t_steps)
    x_t_reconstructed = sum([
        x_t_step * np.sinc((t - t_step) / dt)
        for t_step, x_t_step in zip(t_steps, x_t_steps)
    ])

    plt.figure(figsize=(12,10))
    plt.subplot(221)
    plt.plot(t, x_t)
    plt.title('original signal')
    plt.xlabel('time')
    plt.subplot(222)
    plt.title('spectrum')
    plt.plot(f, x_f)
    plt.xlabel('frequency');

    plt.subplot(223)
    plt.title('sampled signal points')
    plt.stem(t_steps, x_t_steps, linefmt='k')
    plt.xlabel('time')
    plt.subplot(224)
    plt.title('reconstruction')
    plt.plot(t, x_t, label='original')
    plt.plot(t, x_t_reconstructed, '--k', label='reconstructed')
    plt.scatter(t_steps, x_t_steps, color='k', label='sampled')
    plt.legend()
    plt.xlabel('time')
    plt.show()
    
    ipd.display(ipd.Audio(x_t, rate=8000))

# Frequency Aliasing

In [None]:
t1 = np.linspace(0, 1, 16)
t2 = np.linspace(0, 1, 800)

sin_n = lambda x, n: np.sin(2 * np.pi * n * x)

plt.figure(figsize=(16,5))
plt.scatter(t1, sin_n(t1, 1), label="discretization points", color='k', s=50)
plt.plot(t2, sin_n(t2, 1), label=r"$\sin(\omega t)$")
plt.plot(t2, sin_n(t2, 16), label=r"$\sin(16 \omega t)$")
plt.legend(fontsize=12)
plt.show()

# Spectral Filtering

In [None]:
def spectral_power(signal: np.ndarray) -> np.ndarray:
    spectral_power = np.abs(np.fft.rfft(signal))
    return spectral_power

@interact(
    window_type=widgets.ToggleButtons(
        options=['uniform', 'gaussian'],
        disabled=False,
        description='window type:'
    ),
    window_size=widgets.IntSlider(
        value=3,
        min=3,
        max=51,
        step=2,
        layout=Layout(width='960px'),
        description='window_size:'
    ),
    power=widgets.IntSlider(
        value=10,
        min=2,
        max=20,
        step=2,
        layout=Layout(width='960px'),
        description='power:'
    )
)
def smoothing_as_low_frequency_filter(window_type: str, window_size: float, power: int):
    t = np.arange(0, 3, step=0.005)
    y = np.exp( - ((t - 1) / 0.05) ** power)

    if window_type == 'uniform':
        window = np.ones(window_size)
    else:
        window = scipy.signal.gaussian(M=window_size, std=window_size // 5 + 1)
    
    window /= window.sum()
    
    y_smoothed = np.convolve(
        np.pad(y, pad_width=(window.size//2,), constant_values=(0,)),
        window,
        mode='valid'
    )

    plt.figure(figsize=(16,5))
    plt.subplot(121)
    plt.title('time domain')
    plt.plot(t, y, label='original')
    plt.plot(t, y_smoothed, label='smoothed')
    plt.legend()

    plt.subplot(122)
    plt.title('frequency domain')
    plt.plot(spectral_power(y), label='original')
    plt.plot(spectral_power(y_smoothed), label='smoothed')
    plt.show()

# Impulse Response

In [None]:
!ls data/rirs

In [None]:
def display_wav(wav, sr, title=None):
    ipd.display(ipd.Audio(wav, rate=sr))
    if title:
        plt.title(title)
    plt.plot(np.arange(wav.size) / sr, wav)
    plt.xlabel('seconds')
    plt.show()

music, sr = librosa.load("data/aecs/salut_spk.wav", sr=sr)
display_wav(music, sr, title="original music")

for rir_path in Path("data/rirs").iterdir():

    rir, sr = librosa.load(rir_path, sr=sr)
    
    if rir.size > music.size:
        music_padded = np.pad(music, pad_width=(0,rir.size - music.size), constant_values=(0,))
        rir_padded = rir[:]
    else:
        music_padded = music[:]
        rir_padded = np.pad(rir, pad_width=(0,music.size - rir.size), constant_values=(0,))
    
    music_f = np.fft.rfft(music_padded)
    rir_f = np.fft.rfft(rir_padded)
    
    room_music = np.fft.irfft(music_f * rir_f).real
    ipd.display(ipd.Audio(rir_f, rate=sr))
    ipd.display(ipd.Audio(room_music, rate=sr))
    
    
    plt.figure(figsize=(12,5))
    plt.subplot(121)
    plt.title("Room Impulse Response")
    plt.plot(np.arange(rir.size) / sr, rir)
    plt.xlabel('seconds')
    plt.subplot(122)
    plt.title(f"{rir_path.stem} music")
    plt.plot(np.arange(room_music.size) / sr, room_music)
    plt.xlabel('seconds')
    plt.show()

# Acoustic Echo Cancellation

In [None]:
def lms(mic_signal, spk_signal, window_size=2500, mu=0.01):
    
    x = np.pad(spk_signal, pad_width=(window_size - 1,0), constant_values=(0,))
    y = mic_signal[:]
    w = np.zeros(window_size)
    e = np.zeros(x.size - window_size + 1)
    
    for i in range(x.size - window_size):
        x_i = x[i:i + window_size]
        e[i] = y[i] - np.dot(x_i, w)
        w += mu * e[i] * x_i
    return e

In [None]:
parent_dir = Path("./data/aecs")
signal_names = ("salut", "doubletalk")

In [None]:
for name in signal_names:
    mic_signal, _ = librosa.load(parent_dir / f"{name}_mic.wav", sr=sr)
    spk_signal, _ = librosa.load(parent_dir / f"{name}_spk.wav", sr=sr)

    display_wav(mic_signal, sr, title="mic signal")
    display_wav(spk_signal, sr, title="spk signal")

    lms_result = lms(mic_signal, spk_signal)

    ipd.display(ipd.Audio(
        lms_result,
        rate=sr
    ))
    plt.plot(mic_signal, label="mic signal")
    plt.plot(lms_result, color='k', label="enhanced signal")
    plt.legend()
    plt.show()