In [None]:
import numpy as np
import matplotlib.pyplot as plt

import scipy
from scipy.linalg import toeplitz, circulant, dft

plt.rcParams.update({
    "font.serif":["cm"],
    "font.size": 12})

colors = {
    "blue": "#377eb8",
    "orange": "#ff7f00",
    "green": "#4daf4a",
    "pink": "#f781bf",
    "brown": "#a65628",
    "purple": "#984ea3",
    "gray": "#999999",
    "red": "#e41a1c",
    "yellow": "#dede00",
    "black": "#000000",
}

## Part 0: Starter Exercise

Before using built-in functions, write the convolution function once!

**The Goal:** Implement 1D Linear Convolution (`mode='full'`) in three ways and verify they are identical.
1. **Manual Loops:** To appreciate the computational complexity.
2. **Matrix Algebra:** To leverage linear algebra tools.
3. **Built-in:** To make things more efficient.

In [None]:
def manual_convolution_loops(x, h, mode='full'):
    """
    Computes convolution y[n] = sum(x[k] * h[n-k]) using nested loops.
    Supports 'full', 'same', and 'valid' modes.
    TODO: Implement the convolution logic here
    """
    # TODO: Initialize output array
    # TODO: Loop over output samples
    # TODO: Handle different modes (full, same, valid)
    pass
    
def test_implementations():
    x = np.array([1, 2, 3, 4, 5])
    h = np.array([1, -1, 2])
    
    print(f"Input x: {x}")
    print(f"Filter h: {h}")
    print(f"-"*30)

    for mode in ['full', 'same', 'valid']:
        # 1. Manual Implementation
        y_manual = manual_convolution_loops(x, h, mode=mode)
        
        # 2. Built-in Implementation (Numpy)
        y_builtin = np.convolve(x, h, mode=mode)
        
        print(f"Mode '{mode}':\")
        print(f"  Manual:   {y_manual}")
        print(f"  Built-in: {y_builtin}")
        
        if np.allclose(y_manual, y_builtin):
            print(f"  Result:   MATCH")
        else:
            print(f"  Result:   MISMATCH")
    print(f"-"*30)

test_implementations()

## Part 1: Linear Convolution as a Toeplitz Matrix

Now that we understand the loops, we show that convolution is simply a matrix-vector product $y = Ax$.

In [None]:
def demo_toeplitz_convolution():
    # Define a simple signal x and filter h
    x = np.array([1, 2, 3, 4])
    h = np.array([1, -1, 2]) # Length 3
    
    # Standard Numpy Convolve
    y_numpy = np.convolve(x, h, mode='full')
    print(f"Standard Convolution Output: {y_numpy}")
    
    # TODO: Construct Toeplitz matrix
    # H_mat = ...
    
    print("\nToeplitz Convolution Matrix (H):")
    # print(H_mat)
    
    # TODO: Perform matrix multiplication
    # y_matrix = ...
    # print(f"\nMatrix Multiplication Output: {y_matrix}")
    
    # assert np.allclose(y_numpy, y_matrix), "Mismatch between numpy and matrix method!"

demo_toeplitz_convolution()

def demo_circulant_matrix():
    x = np.array([1, 2, 3, 4])
    h = np.array([1, 0, -1, 0]) # Same length as x for circular conv
    
    # TODO: Create circulant matrix from h
    # C = circulant(h)
    # print("Circulant Matrix C:")
    # print(C)
    
    # TODO: Compare matrix multiplication with FFT-based circular convolution
    # y_matrix = C @ x
    # y_fft = np.fft.ifft(np.fft.fft(x) * np.fft.fft(h)).real
    # print(f"\nMatrix Result: {y_matrix}")
    # print(f"FFT Result: {y_fft}")
    
demo_circulant_matrix()

## Part 3: Rank, Zeros, and Diagonalization

**Theory:** The eigenvalues of the Circulant matrix $C$ are exactly the DFT coefficients of the filter $h$.
If the DFT of $h$ has a zero at index $k$, the matrix $C$ loses rank.

In [None]:
def demo_rank_and_zeros():
    N = 8
    # TODO: Design filter with zeros at specific frequencies
    # H_spectrum = ...
    # h = ...
    # C = circulant(h)
    
    # rank = np.linalg.matrix_rank(C)
    # print(f"Matrix Size: {N}x{N}")
    # print(f"Matrix Rank: {rank}")
    # print("Rank should be N - (number of zeros in DFT). Expecting 6.")
    
    # TODO: Verify diagonalization property
    # F = dft(N) 
    # F_inv = np.linalg.inv(F)
    # Lambda = F @ C @ F_inv
    # eigenvals = np.diag(Lambda)
    # H_fft = np.fft.fft(h)
    # print("\nFirst 4 Eigenvalues from Diagonalization:")
    # print(np.round(eigenvals[:4], 3))
    # print("First 4 DFT coefficients of h:")
    # print(np.round(H_fft[:4], 3))

demo_rank_and_zeros()

## Part 4: Audio De-reverberation (1D Application)

Use the functions to construct reverberated audio and perform de-reverberation
* **Model:** Room impulse response
* **Input:** Standard speech file
* **Restoration:** Inverse/Wiener filter

In [None]:
import scipy.io.wavfile as wavfile
import urllib.request
import io

In [None]:
def get_audio_sample():
    """Downloads a standard wav file or generates a fallback."""
    url = "https://www.signalogic.com/melp/EngSamples/Orig/male.wav"
    try:
        print(f"Attempting to download audio from {url}...")
        response = urllib.request.urlopen(url)
        data = response.read()
        sr, y = wavfile.read(io.BytesIO(data))
        # Normalize to -1..1
        y = y.astype(float) / np.max(np.abs(y))
        # Take a 3-second slice to keep processing fast
        y = y[:sr*3]
        print("Download successful.")
        return sr, y
    except Exception as e:
        print(f"Download failed: {e}. Generating fallback audio.")

In [None]:
def create_room_impulse_response(sr):
    """Creates a synthetic Room Impulse Response (Echo)."""
    # A decaying exponential with random spikes
    T_echo = 0.3 # 300ms reverb
    n_echo = int(sr * T_echo)
    t = np.linspace(0, T_echo, n_echo)
    
    # Exponential decay envelope
    envelope = np.exp(-10 * t)
    
    # Random reflections
    noise = np.random.randn(n_echo)
    rir = noise * envelope
    
    # Normalize energy
    rir = rir / np.sqrt(np.sum(rir**2))
    return rir

In [None]:
def wiener_deconvolution(observed, kernel, K):
    """
    Apply Wiener Deconvolution in Frequency Domain.
    TODO: Implement H_hat = H* / (|H|^2 + K)
    """
    pass

In [None]:
# 1. Setup
sr, clean_speech = get_audio_sample()
rir = create_room_impulse_response(sr)

# 2. Simulate "The Room" (Convolution)
reverbed_speech = np.convolve(clean_speech, rir, mode='same')

# 3. Add Noise (The "Hiss")
noise_level = 0.02
noise = np.random.randn(len(reverbed_speech)) * noise_level
observed_signal = reverbed_speech + noise

# 4. Attempt Recovery
# recovered_naive = ...
# recovered_wiener = ...

In [None]:
# 5. Visualization
plt.figure(figsize=(12, 8))

plt.subplot(4, 1, 1)
plt.title("Clean Speech (Source)")
plt.plot(clean_speech, color=colors['blue'])
plt.grid(True)

plt.subplot(4, 1, 2)
plt.title("Reverberant + Noisy (Measurements)")
plt.plot(observed_signal, color=colors['orange'])
plt.grid(True)

plt.subplot(4, 1, 3)
plt.title("Naive Inverse Filter")
plt.plot(recovered_naive, color=colors['red'])
plt.grid(True)

plt.subplot(4, 1, 4)
plt.title("Wiener Filter Restoration")
plt.plot(recovered_wiener, color=colors['green'])
plt.grid(True)
plt.xlabel("$n$")

plt.tight_layout()
plt.show()

In [None]:
import IPython.display as ipd

print("1. Clean Speech")
ipd.display(ipd.Audio(clean_speech, rate=sr))

print("2. Reverberant + Noisy")
ipd.display(ipd.Audio(observed_signal, rate=sr))

print("3. Naive Inverse Filter")
ipd.display(ipd.Audio(recovered_naive, rate=sr))

print("4. Wiener Restored")
ipd.display(ipd.Audio(recovered_wiener, rate=sr))