In [7]:
# Check that the dependencies in your conda environment are available.
import torch
print("PyTorch version:", torch.__version__)

import torchaudio
print("torchaudio version:", torchaudio.__version__)

import librosa
print("Librosa version:", librosa.__version__)

import numpy as np
print("NumPy version:", np.__version__)

import pandas as pd
print("Pandas version:", pd.__version__)

import matplotlib
print("Matplotlib version:", matplotlib.__version__)

import scipy
print("SciPy version:", scipy.__version__)


PyTorch version: 2.6.0+cu126
torchaudio version: 2.6.0+cu126
Librosa version: 0.10.2.post1
NumPy version: 1.22.4
Pandas version: 1.5.3
Matplotlib version: 3.8.4
SciPy version: 1.8.1


# Demastering a Song from the FMA Dataset

This notebook demonstrates a simple "demastering" process. We will:
1. Load full tracks from the FMA dataset.
2. Apply several degradation functions to the entire track using random parameters—this includes EQ, gain adjustment, echo, reverb, and compression.
3. Display the STFT (spectrogram) of the original and degraded tracks.
4. Save the processed audio along with the original for future dataset creation.

> **Note:**  
> - Ensure you update the `song_path` variable below to point to a valid audio file from your FMA data (e.g., from the `data/fma_small/` folder).  
> - This notebook is saved in the "ENGS Honors Thesis" folder.


In [8]:
import os
import glob
import random
import librosa.display
import matplotlib.pyplot as plt
import soundfile as sf
from scipy.signal import iirpeak, lfilter, fftconvolve
import shutil
import numpy as np

print("Current working directory:", os.getcwd())

def plot_stft(audio, sr, title, extra_info = None):
    D = librosa.stft(audio)  # Compute STFT
    DB = librosa.amplitude_to_db(np.abs(D), ref=np.max)  # Convert amplitude to dB
    
    fig, ax = plt.subplots(figsize=(12, 6))
    
    img = librosa.display.specshow(DB, sr=sr, x_axis='time', y_axis='hz', ax=ax)
    cbar = fig.colorbar(img, ax=ax, format='%+2.0f dB')
    cbar.set_label("Amplitude (dB)", fontsize=12)
    
    ax.set_title(title, fontsize = 16)
    ax.set_xlabel("Time (s)", fontsize = 14)
    ax.set_ylabel("Frequency (Hz)", fontsize = 14)
    
    if extra_info:
            ax.text(0.01, 0.01, extra_info, transform=ax.transAxes,
                fontsize=12, verticalalignment='bottom',
                bbox=dict(boxstyle="round,pad=0.5", fc="yellow", alpha=0.5))
    plt.tight_layout()
    return fig
    
def apply_eq(audio, sr, fc, Q, gain_db):
    """
    Applying a random EQ (peaking filter) to the audio signal 
    to simulate an unmastered version
    
    Parameters audio (np.array): Input Audio Waveform, sr (int): Sample Rate
    
    Returns filtered_audio (np.array): EQ Processed audio
    """
    
    #Normalizing center frequency for iirpear (w0 in [0,1], where 1 = Nyquist Frequency)
    w0 = fc / (sr/2)
    b,a = iirpeak(w0,Q) #Design peak filter: iirpeak returns filter coefficients
    gain_linear = 10 ** (gain_db / 20.0)
    b=b*gain_linear #Scale filter numerator to apply gain
    return lfilter(b, a, audio)

def apply_gain(audio, gain_db):
    """Apply a gain adjustment in dB to the audio"""
    gain_linear = 10**(gain_db/20.0)
    return audio * gain_linear

def apply_echo(audio, sr, delay_seconds, attenuation):
    """Apply echo effect by adding delayed, attenuated copy of the audio"""
    
    delay_samples = int(sr * delay_seconds)
    echo = np.zeros_like(audio)
    if len(audio) > delay_samples:
        echo[delay_samples:] = audio[:-delay_samples] * attenuation
    return audio + echo

def apply_reverb(audio, sr, decay, ir_length):
    """
    Apply a simple reverb effect by convolving the audio with a synthetic impulse response.
    
    Parameters:
    audio: input audio (numpy array)
    decay: decay rate for the exponential impulse response
    ir_length: length of the impulse response in samples
    sr: sampling rate
    """
    # Ensure ir_length is an integer and positive
    ir_length = int(ir_length)
    print("Inside apply_reverb: received ir_length =", ir_length)
    if ir_length <= 0:
        raise ValueError("ir_length must be positive, but got {}".format(ir_length))
    t = np.linspace(0, ir_length / sr, ir_length)
    ir = np.exp(-decay * t)
    try:
        result = fftconvolve(audio, ir, mode='same')
        if result.size == 0:
            raise ValueError("fftconvolve returned an empty array")
    except Exception as e:
        print(f"fftconvolve failed ({e}); using np.convolve instead.")
        result = np.convolve(audio, ir, mode='same')
    if result.size == 0:
        print("Warning: apply_reverb resulted in an empty array; returning original audio.")
        return audio
    return result

def apply_compression(audio, threshold_db, ratio, makeup_gain_db):
    """
    Apply a basic dynamic range compression to the audio.
    
    Parameters:
    audio: input audio (numpy array)
    threshold_db: threshold (in dB) above which compression occurs
    ratio: compression ratio
    makeup_gain_db: makeup gain in dB applied after compression
    """
    threshold_linear = 10**(threshold_db / 20.0)
    
    abs_audio = np.abs(audio)
    compressed = np.where(abs_audio > threshold_linear,
                        threshold_linear + (abs_audio - threshold_linear) / ratio,
                        abs_audio)
    compressed = np.sign(audio) * compressed
    makeup_gain = 10 ** (makeup_gain_db / 20.0)
    return compressed * makeup_gain


Current working directory: c:\Users\takak\OneDrive\Desktop\ENGS Honors Thesis\notebooks


In [9]:
base_output_dir = os.path.join("..","experiments","output_full")
output_audio_dir = os.path.join(base_output_dir, "output_audio")
output_spectrogram_dir = os.path.join(base_output_dir, "output_spectrograms")
output_txt_dir = os.path.join(base_output_dir, "output_txt")

for folder in [output_audio_dir, output_spectrogram_dir, output_txt_dir]:
    if os.path.exists(folder):
        shutil.rmtree(folder)
    os.makedirs(folder, exist_ok = True)

base_input_folder = os.path.join("..", "data", "raw", "fma_small")
subfolders = [name for name in os.listdir(base_input_folder)
            if os.path.isdir(os.path.join(base_input_folder, name))]

# Loop over subfolders and process each MP3 file found
for sub in subfolders:
    folder_path = os.path.join(base_input_folder, sub)
    mp3_files = glob.glob(os.path.join(folder_path, "*.mp3"))
    for song_path in mp3_files:
        print(f"Processing song: {song_path}")
        
        # Load audio
        audio, sr = librosa.load(song_path, sr=None)
        if len(audio) == 0:
            print(f"Warning: Audio from {song_path} is empty. Skipping.")
            continue
        
        # Generate a unique identifier based on file name
        song_id = os.path.splitext(os.path.basename(song_path))[0]
        
        # Save original spectrogram
        fig_orig = plot_stft(audio, sr, f"Original Audio STFT - {song_id}")
        orig_spec_path = os.path.join(output_spectrogram_dir, f"{song_id}_input.png")
        fig_orig.savefig(orig_spec_path)
        plt.close(fig_orig)
        
        # Randomly generate parameters (adjust these ranges as desired for realistic effects)
        # EQ parameters:
        fc = random.uniform(200, 8000)         # Center frequency in Hz
        Q = random.uniform(0.7, 2.5)             # Q-factor
        eq_gain_db = random.uniform(-2, 2)       # EQ gain adjustment in dB
        
        # Gain adjustment:
        gain_db = random.uniform(-1, 1)
        
        # Echo:
        delay_seconds = random.uniform(0.3, 1.0)  # Allow bigger echoes
        attenuation = random.uniform(0.3, 0.5)
        
        # Reverb:
        decay = random.uniform(0.2, 1.0)
        lower_ir = max(int(0.1 * sr), 1)
        upper_ir = max(int(0.3 * sr), lower_ir + 1)
        ir_length = random.randint(lower_ir, upper_ir)
        
        # Compression:
        threshold_db = random.uniform(-20, -12)
        ratio = random.uniform(2, 3)
        makeup_gain_db = random.uniform(0, 0.5)
        
        # Save the parameters to a text file
        txt_path = os.path.join(output_txt_dir, f"{song_id}_params.txt")
        with open(txt_path, "w") as f:
            f.write(f"EQ: fc={fc:.2f} Hz, Q={Q:.2f}, eq_gain_db={eq_gain_db:.2f}\n")
            f.write(f"Gain: gain_db={gain_db:.2f}\n")
            f.write(f"Echo: delay_seconds={delay_seconds:.2f}, attenuation={attenuation:.2f}\n")
            f.write(f"Reverb: decay={decay:.2f}, ir_length={ir_length}\n")
            f.write(f"Compression: threshold_db={threshold_db:.2f}, ratio={ratio:.2f}, makeup_gain_db={makeup_gain_db:.2f}\n")
        
        print(f"Parameters for {song_id} saved to {txt_path}")
        
        # Apply degradation effects sequentially
        modified_audio = apply_eq(audio, sr, fc, Q, eq_gain_db)
        modified_audio = apply_gain(modified_audio, gain_db)
        modified_audio = apply_echo(modified_audio, sr, delay_seconds, attenuation)
        modified_audio = apply_reverb(modified_audio, sr, decay, ir_length)
        modified_audio = apply_compression(modified_audio, threshold_db, ratio, makeup_gain_db)
        
        #Saving modified spectrogram image with extra info annotation
        extra = (f"EQ: fc={fc:.2f} Hz, Q={Q:.2f}, gain_db={eq_gain_db:.2f}\n"
                f"Gain: {gain_db:.2f} dB\n"
                f"Echo: delay={delay_seconds:.2f}s, att={attenuation:.2f}\n"
                f"Reverb: decay={decay:.2f}, IR_len={ir_length}\n"
                f"Comp: thr={threshold_db:.2f} dB, ratio={ratio:.2f}, makeup={makeup_gain_db:.2f} dB")
        fig_mod = plot_stft(modified_audio, sr, f"Modified Audio STFT - {song_id}", extra_info=extra)
        mod_spec_path = os.path.join(output_spectrogram_dir, f"{song_id}_output.png")
        fig_mod.savefig(mod_spec_path)
        plt.close(fig_mod)
        
        # Save original and modified audio files
        orig_audio_path = os.path.join(output_audio_dir, f"{song_id}_original.wav")
        mod_audio_path = os.path.join(output_audio_dir, f"{song_id}_modified.wav")
        sf.write(orig_audio_path, audio, sr)
        sf.write(mod_audio_path, modified_audio, sr)
        
        print(f"Saved audio for {song_id} to {output_audio_dir}\n")

Processing song: ..\data\raw\fma_small\000\000002.mp3
Parameters for 000002 saved to ..\experiments\output_full\output_txt\000002_params.txt
Inside apply_reverb: received ir_length = 11474
Saved audio for 000002 to ..\experiments\output_full\output_audio

Processing song: ..\data\raw\fma_small\000\000005.mp3
Parameters for 000005 saved to ..\experiments\output_full\output_txt\000005_params.txt
Inside apply_reverb: received ir_length = 9079
Saved audio for 000005 to ..\experiments\output_full\output_audio

Processing song: ..\data\raw\fma_small\000\000010.mp3
Parameters for 000010 saved to ..\experiments\output_full\output_txt\000010_params.txt
Inside apply_reverb: received ir_length = 7431
Saved audio for 000010 to ..\experiments\output_full\output_audio

Processing song: ..\data\raw\fma_small\000\000140.mp3
Parameters for 000140 saved to ..\experiments\output_full\output_txt\000140_params.txt
Inside apply_reverb: received ir_length = 13028
Saved audio for 000140 to ..\experiments\outp

MemoryError: Unable to allocate 20.2 MiB for an array with shape (1025, 2582) and data type float64