# HW2 - Speech Enhancement in Reverberant Environments

**Q1(a):** Generate Room Impulse Responses (RIRs) and visualize them in the time domain for different reverberation times.

## Setup and Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import sys

# Add rir-generator to path
sys.path.insert(0, './rir-generator/src')
import rir_generator as rir

np.random.seed(42)

## Simulation Parameters

In [None]:
# Room configuration
room_dim = [4.0, 5.0, 3.0]  # meters
c = 343.0  # speed of sound (m/s)
fs = 16000  # sample rate (Hz)

# Reverberation times to test
t60_ms_list = [150, 300]  # milliseconds

# Microphone array: 5-element ULA along x-axis, centered at [2, 1, 1.7]
n_mics = 5
d_mic = 0.05  # 5 cm spacing
center = np.array([2.0, 1.0, 1.7])

# Build mic positions: indices -2, -1, 0, 1, 2
mic_positions = []
for k in range(-2, 3):
    pos = center.copy()
    pos[0] += k * d_mic
    mic_positions.append(pos)
mic_positions = np.array(mic_positions)

# Source at 30 degrees, 1.5m from center
theta_src = np.deg2rad(30)
r_src = 1.5
source_pos = np.array([
    center[0] + r_src * np.cos(theta_src),
    center[1] + r_src * np.sin(theta_src),
    center[2]
])

print(f"Room: {room_dim} m")
print(f"Mic array center: {center}")
print(f"Mic positions (x): {mic_positions[:, 0]}")
print(f"Source position: {source_pos}")

## Generate RIRs for Different T60 Values

In [None]:
# Generate RIRs for each reverberation time
rir_dict = {}

for t60_ms in t60_ms_list:
    t60 = t60_ms / 1000.0  # convert to seconds
    n_samples = int(np.ceil(t60 * fs))  # as specified: nsample = T60 * fs
    
    # Generate RIR using rir_generator
    h = rir.generate(
        c=c,
        fs=fs,
        r=mic_positions,
        s=source_pos,
        L=room_dim,
        reverberation_time=t60,
        nsample=n_samples
    )
    rir_dict[t60_ms] = h
    print(f"T60={t60_ms}ms: RIR shape = {h.shape} (samples x mics)")

## Plot RIRs in Time Domain (First Microphone)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

for idx, t60_ms in enumerate(t60_ms_list):
    h = rir_dict[t60_ms]
    h_mic1 = h[:, 0]  # first microphone
    t = np.arange(len(h_mic1)) / fs * 1000  # time in ms
    
    axes[idx].plot(t, h_mic1, linewidth=0.7)
    axes[idx].set_xlabel('Time (ms)')
    axes[idx].set_ylabel('Amplitude')
    axes[idx].set_title(f'RIR - T60 = {t60_ms} ms')
    axes[idx].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Q1(b): Generate Measured Signals via Convolution

In [None]:
import scipy.signal as sig
import librosa

# Load a speech sample from LibriSpeech
speech_file = './speech_data/3081-166546-0000.flac'
speech, _ = librosa.load(speech_file, sr=fs)

print(f"Speech signal: {len(speech)} samples ({len(speech)/fs:.2f} sec)")

In [None]:
# Convolve speech with RIRs to create multichannel "clean" reverberant signals
clean_signals = {}

for t60_ms, h in rir_dict.items():
    n_samples_rir, n_mics = h.shape
    out_len = len(speech) + n_samples_rir - 1
    
    # Convolve speech with each mic's RIR
    y = np.zeros((n_mics, out_len))
    for m in range(n_mics):
        y[m] = sig.fftconvolve(speech, h[:, m])
    
    clean_signals[t60_ms] = y
    print(f"T60={t60_ms}ms: output shape = {y.shape} (mics x samples)")

In [None]:
# Plot measured signal at first microphone for both T60 values
fig, axes = plt.subplots(2, 1, figsize=(12, 5), sharex=True)

for idx, t60_ms in enumerate(t60_ms_list):
    y = clean_signals[t60_ms]
    t = np.arange(y.shape[1]) / fs
    
    axes[idx].plot(t, y[0], linewidth=0.5)
    axes[idx].set_ylabel('Amplitude')
    axes[idx].set_title(f'Reverberant Speech - T60 = {t60_ms} ms (Mic 1)')
    axes[idx].grid(True, alpha=0.3)

axes[1].set_xlabel('Time (s)')
plt.tight_layout()
plt.show()

## Q1(c): Add Noise to Measured Signals

Two noise types:
1. **White Gaussian Noise** - spatially and spectrally white
2. **Interfering Speaker** - at 150Â°, 2m from array center

In [None]:
# SNR levels to test
snr_levels = [0, 10]  # dB

# Reference microphone for SNR calculation (center mic)
ref_mic = 2

def scale_noise_to_snr(clean, noise, target_snr_db, ref_mic=2):
    """Scale noise to achieve target SNR at reference microphone."""
    p_clean = np.mean(clean[ref_mic] ** 2)
    p_noise = np.mean(noise[ref_mic] ** 2)
    # SNR = 10*log10(p_clean / p_noise) => p_noise_scaled = p_clean / 10^(SNR/10)
    scale = np.sqrt(p_clean / (p_noise * 10 ** (target_snr_db / 10)))
    return noise * scale

### Noise Type 1: White Gaussian Noise (spatially uncorrelated)

In [None]:
# Generate noisy signals with white Gaussian noise
noisy_wgn = {}  # noisy_wgn[t60_ms][snr_db] = (noisy_signal, noise)

for t60_ms in t60_ms_list:
    noisy_wgn[t60_ms] = {}
    clean = clean_signals[t60_ms]
    
    for snr_db in snr_levels:
        # Independent white noise at each mic
        noise = np.random.randn(*clean.shape)
        noise_scaled = scale_noise_to_snr(clean, noise, snr_db, ref_mic)
        noisy = clean + noise_scaled
        noisy_wgn[t60_ms][snr_db] = (noisy, noise_scaled)
        
        print(f"T60={t60_ms}ms, SNR={snr_db}dB: added WGN")

### Noise Type 2: Interfering Speaker

In [None]:
# Interferer position: 150 degrees, 2m from array center
theta_int = np.deg2rad(150)
r_int = 2.0
interferer_pos = np.array([
    center[0] + r_int * np.cos(theta_int),
    center[1] + r_int * np.sin(theta_int),
    center[2]
])

# Load a different speech file as interferer
interferer_file = './speech_data/5694-64025-0005.flac'
interferer_speech, _ = librosa.load(interferer_file, sr=fs)

print(f"Interferer position: {interferer_pos}")
print(f"Interferer signal: {len(interferer_speech)} samples")

In [None]:
# Generate RIRs for interferer location
rir_interferer = {}
for t60_ms in t60_ms_list:
    t60 = t60_ms / 1000.0
    n_samples = int(np.ceil(t60 * fs))
    h_int = rir.generate(
        c=c, fs=fs, r=mic_positions, s=interferer_pos,
        L=room_dim, reverberation_time=t60, nsample=n_samples
    )
    rir_interferer[t60_ms] = h_int

print("Generated RIRs for interferer")

In [None]:
def pad_to_length(x, L):
    """Zero-pad signal to length L."""
    if x.shape[1] >= L:
        return x[:, :L]
    out = np.zeros((x.shape[0], L))
    out[:, :x.shape[1]] = x
    return out

# Generate noisy signals with interfering speaker
noisy_interferer = {}  # noisy_interferer[t60_ms][snr_db] = (noisy_signal, interference)

for t60_ms in t60_ms_list:
    noisy_interferer[t60_ms] = {}
    clean = clean_signals[t60_ms]
    h_int = rir_interferer[t60_ms]
    
    # Crop/tile interferer to match target speech length
    int_len = len(speech)
    if len(interferer_speech) < int_len:
        int_sig = np.tile(interferer_speech, int(np.ceil(int_len / len(interferer_speech))))[:int_len]
    else:
        # Random crop
        start = np.random.randint(0, len(interferer_speech) - int_len + 1)
        int_sig = interferer_speech[start:start + int_len]
    
    # Convolve interferer with its RIRs
    int_conv = np.zeros((n_mics, int_len + h_int.shape[0] - 1))
    for m in range(n_mics):
        int_conv[m] = sig.fftconvolve(int_sig, h_int[:, m])
    
    # Align lengths
    max_len = max(clean.shape[1], int_conv.shape[1])
    clean_pad = pad_to_length(clean, max_len)
    int_pad = pad_to_length(int_conv, max_len)
    
    for snr_db in snr_levels:
        int_scaled = scale_noise_to_snr(clean_pad, int_pad, snr_db, ref_mic)
        noisy = clean_pad + int_scaled
        noisy_interferer[t60_ms][snr_db] = (noisy, int_scaled)
        
        print(f"T60={t60_ms}ms, SNR={snr_db}dB: added interfering speaker")

In [None]:
# Plot example: T60=300ms, SNR=10dB - compare both noise types
t60_show = 300
snr_show = 10

fig, axes = plt.subplots(3, 1, figsize=(12, 7), sharex=True)

clean = clean_signals[t60_show]
noisy_w, _ = noisy_wgn[t60_show][snr_show]
noisy_i, _ = noisy_interferer[t60_show][snr_show]

# Align lengths for plotting
max_len = max(clean.shape[1], noisy_w.shape[1], noisy_i.shape[1])
t = np.arange(max_len) / fs

axes[0].plot(t[:clean.shape[1]], clean[0], linewidth=0.5)
axes[0].set_title('Clean Reverberant Signal (Mic 1)')
axes[0].set_ylabel('Amplitude')

axes[1].plot(t[:noisy_w.shape[1]], noisy_w[0], linewidth=0.5)
axes[1].set_title(f'With White Gaussian Noise (SNR = {snr_show} dB)')
axes[1].set_ylabel('Amplitude')

axes[2].plot(t[:noisy_i.shape[1]], noisy_i[0], linewidth=0.5)
axes[2].set_title(f'With Interfering Speaker (SNR = {snr_show} dB)')
axes[2].set_ylabel('Amplitude')
axes[2].set_xlabel('Time (s)')

for ax in axes:
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Q1(d): Time and Frequency Domain Plots

Plot original, clean reverberant, and noisy signals (T60=300ms, SNR=10dB) in both time and frequency domains.

In [None]:
def plot_time_and_spectrum(signal, fs, title):
    """Plot signal in time domain and its magnitude spectrum."""
    fig, axes = plt.subplots(1, 2, figsize=(12, 3))
    
    # Time domain
    t = np.arange(len(signal)) / fs
    axes[0].plot(t, signal, linewidth=0.5)
    axes[0].set_xlabel('Time (s)')
    axes[0].set_ylabel('Amplitude')
    axes[0].set_title(f'{title} - Time Domain')
    axes[0].grid(True, alpha=0.3)
    
    # Frequency domain (magnitude spectrum using Welch's method)
    from scipy.signal import welch
    freqs, psd = welch(signal, fs=fs, nperseg=1024)
    axes[1].semilogy(freqs, psd, linewidth=0.8)
    axes[1].set_xlabel('Frequency (Hz)')
    axes[1].set_ylabel('PSD')
    axes[1].set_title(f'{title} - Power Spectrum')
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

In [None]:
# Settings for plots
t60_plot = 300
snr_plot = 10

# Get signals
original = speech
clean_rev = clean_signals[t60_plot][0]  # first mic
noisy_wgn_sig, _ = noisy_wgn[t60_plot][snr_plot]
noisy_int_sig, _ = noisy_interferer[t60_plot][snr_plot]

# Plot each signal
plot_time_and_spectrum(original, fs, 'Original (Dry) Signal')
plot_time_and_spectrum(clean_rev, fs, 'Clean Reverberant (T60=300ms, Mic 1)')
plot_time_and_spectrum(noisy_wgn_sig[0], fs, 'Noisy - White Gaussian (SNR=10dB, Mic 1)')
plot_time_and_spectrum(noisy_int_sig[0], fs, 'Noisy - Interfering Speaker (SNR=10dB, Mic 1)')

## Q1(e): Save Noisy Signals as WAV Files

In [None]:
import soundfile as sf
import os

# Create output directory
out_dir = './outputs'
os.makedirs(out_dir, exist_ok=True)

# Save noisy signals (first microphone, T60=300ms, SNR=10dB)
# Normalize to prevent clipping
def normalize(x):
    return x / (np.max(np.abs(x)) + 1e-8)

sf.write(f'{out_dir}/noisy_wgn_300ms_10dB.wav', normalize(noisy_wgn_sig[0]), fs)
sf.write(f'{out_dir}/noisy_interferer_300ms_10dB.wav', normalize(noisy_int_sig[0]), fs)

print(f"Saved WAV files to {out_dir}/")