### Generating audio spectrograms in .npy file format from the FSDD dataset
NOTEBOOK ADDED BY WARD

This notebook was made to create the spectrograms of AVMNIST, since the AVMNIST dataset is not publicly made
available. The creation process closely follows the one described in the CentralNet paper. Where it differs slightly is in the noise generation, here 5 different schemes are made available. BURST_NOISE is the one that came closest to the performance described in CentralNet and this is the one I used for my thesis.

The code assumes:
- a recordings folder in the ../data/recordings/ (from: https://github.com/Jakobovski/free-spoken-digit-dataset/tree/master/recordings).
- a noise_recordings folder in ../data/noise_recordings/ with inside of it the audio folder of ESC-50 (from: https://github.com/karolpiczak/ESC-50/tree/master/audio), and the accompanying metadata in a meta folder (from: https://github.com/karolpiczak/ESC-50/tree/master/meta).
- a folder ../data/avmnist/ with inside of it train_labels.npy and test_labels.npy (acquired from: https://github.com/pliang279/MultiBench?tab=readme-ov-file#multimedia, more specifically: https://drive.google.com/file/d/1KvKynJJca5tDtI5Mmp6CoRh9pQywH8Xp/view?usp=sharing)

In [1]:
import spectogramer
import os
import fsdd
from shutil import copyfile
import numpy as np
import matplotlib.pyplot as plt
from __future__ import division, print_function
import os
from os import listdir
from os.path import isfile, join
import scipy.io.wavfile as wav
import random
import librosa
from tqdm import tqdm
import pandas as pd
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from scipy import signal
from typing import Optional, Tuple, List

In [2]:
audio_dir = '../data/recordings/'
noise_dir = '../data/noise_recordings/audio/'
esc50_dir = '../data/noise_recordings/'
save_dir = labels_dir = '../data/avmnist/'
random.seed(0)

In [None]:
def file_names_by_category(esc50_dir):
    """
    Get a dictionary of file names grouped by category from the ESC-50 dataset.
    :param esc50_dir: Path to the ESC-50 dataset directory.
    :return: Dictionary where keys are category names and values are lists of file paths.
    """
    csv = pd.read_csv(os.path.join(esc50_dir, 'meta', 'esc50.csv'))
    grouped = csv.groupby('category')
    files_by_category = {}

    for category, entry in grouped:
        file_names = entry['filename'].tolist()
        files_by_category[category] = [os.path.join(esc50_dir, 'audio', name) for name in file_names]

    return files_by_category

In [4]:
def belongs_to_train_audio(filename):
    """ 
    Checks if a file belongs to the test set (first 4 samples) 
    or the train set (last 46 samples)
    """
    first_split = filename.rsplit("_", 1)[1] # i.e. 49.wav
    second_split = first_split.rsplit(".", 1)[0] # i.e. 49
    if int(second_split) <= 4:
        return False
    else:
        return True

In [5]:
def flatten(lst):
    return [item for sublist in lst for item in sublist]

names_by_cat = file_names_by_category(esc50_dir)
all_categories = list(names_by_cat.keys())

test_categories = random.sample(list(names_by_cat.keys()), 5)
train_categories = [cat for cat in names_by_cat.keys() if cat not in test_categories]

train_noises = {key: names_by_cat[key] for key in train_categories}
test_noises = {key: names_by_cat[key] for key in test_categories}

train_noises_file_names = flatten(train_noises.values())
test_noises_file_names = flatten(test_noises.values())

audio_file_names = [f for f in listdir(audio_dir) if isfile(join(audio_dir, f)) and '.wav' in f]

train_audio_file_names = [f for f in audio_file_names if belongs_to_train_audio(f)]
test_audio_file_names = [f for f in audio_file_names if not belongs_to_train_audio(f)]

train_audio_by_label = { str(i) : [] for i in range(10) }
test_audio_by_label = { str(i) : [] for i in range(10) }

for f in train_audio_file_names:
    label = f.split('_')[0]  # Extract label
    train_audio_by_label[label].append(os.path.join(audio_dir, f))  # Append to the list for this label

for f in test_audio_file_names:
      label = f.split('_')[0]  # Extract label
      test_audio_by_label[label].append(os.path.join(audio_dir, f))  # Append to the list for this label

print(test_audio_by_label)
print(train_audio_by_label)

n_noises = len(flatten(names_by_cat.values()))
n_audio = len(audio_file_names)
n_train_noises, n_test_noises = len(train_noises_file_names), len(test_noises_file_names)
n_train_audio, n_test_audio = len(train_audio_file_names), len(test_audio_file_names)

print(f"test_noise: {n_test_noises}, percentage: {n_test_noises / n_noises * 100}%", 
      f"train_noise: {n_train_noises}, percentage: {n_train_noises / n_noises * 100}%"),
print(f"test_audio: {n_test_audio}, percentage: {n_test_audio / n_audio * 100}%", 
      f"train_audio: {n_train_audio}, percentage: {n_train_audio / n_audio * 100}%")

{'0': ['../data/recordings/0_george_0.wav', '../data/recordings/0_george_1.wav', '../data/recordings/0_george_2.wav', '../data/recordings/0_george_3.wav', '../data/recordings/0_george_4.wav', '../data/recordings/0_jackson_0.wav', '../data/recordings/0_jackson_1.wav', '../data/recordings/0_jackson_2.wav', '../data/recordings/0_jackson_3.wav', '../data/recordings/0_jackson_4.wav', '../data/recordings/0_lucas_0.wav', '../data/recordings/0_lucas_1.wav', '../data/recordings/0_lucas_2.wav', '../data/recordings/0_lucas_3.wav', '../data/recordings/0_lucas_4.wav', '../data/recordings/0_nicolas_0.wav', '../data/recordings/0_nicolas_1.wav', '../data/recordings/0_nicolas_2.wav', '../data/recordings/0_nicolas_3.wav', '../data/recordings/0_nicolas_4.wav', '../data/recordings/0_theo_0.wav', '../data/recordings/0_theo_1.wav', '../data/recordings/0_theo_2.wav', '../data/recordings/0_theo_3.wav', '../data/recordings/0_theo_4.wav', '../data/recordings/0_yweweler_0.wav', '../data/recordings/0_yweweler_1.w

In [None]:
class AugmentationType(Enum):
    EXTREME_NOISE = "extreme_noise"      # Very low SNR
    MULTI_BAND = "multi_band"           # Multiple frequency bands removed
    BURST_NOISE = "burst_noise"         # Multiple time masks + noise
    ALIASED = "aliased"                 # Downsampling artifacts
    DISTORTED = "distorted"             # Non-linear distortion

@dataclass
class AudioConfig:
    """Enhanced configuration for audio augmentation"""
    snr_db: Optional[float] = None
    freq_mask_ratio: float = 0.0
    n_freq_masks: int = 1
    time_mask_ratio: float = 0.0
    n_time_masks: int = 1
    filter_bands: List[Tuple[float, float]] = None
    downsample_factor: Optional[int] = None
    distortion_factor: Optional[float] = None

def get_augmentation_config(aug_type: AugmentationType) -> AudioConfig:
    """Get configuration for specific augmentation type"""
    configs = {
        AugmentationType.EXTREME_NOISE: AudioConfig(
            snr_db=0,  # Extremely noisy but theoretically still intelligible
            time_mask_ratio=0.1,  # Add some temporal gaps too
            n_time_masks=2
        ),
        AugmentationType.MULTI_BAND: AudioConfig(
            freq_mask_ratio=0.2,  # Smaller individual masks
            n_freq_masks=3,       # But multiple of them
            filter_bands=[(50, 1000), (2000, 3500)]  # Keep only these bands
        ),
        AugmentationType.BURST_NOISE: AudioConfig(
            snr_db=3,            # Very noisy
            time_mask_ratio=0.15,
            n_time_masks=4       # Multiple shorter gaps
        ),
        AugmentationType.ALIASED: AudioConfig(
            downsample_factor=4,  # Aggressive downsampling
            freq_mask_ratio=0.3   # Combined with frequency masking
        ),
        AugmentationType.DISTORTED: AudioConfig(
            distortion_factor=2.0,  # Non-linear distortion
            snr_db=5               # Plus noise
        )
    }
    return configs[aug_type]

def apply_distortion(samples: np.ndarray, factor: float) -> np.ndarray:
    """Apply non-linear distortion to audio"""
    samples_norm = samples / (np.max(np.abs(samples)) + 1e-6)
    return np.tanh(samples_norm * factor)

# NOTE: See appendix section (bottom of notebook) for derivation of snr code used below
def add_noise(samples_audio: np.ndarray, noise_path: str, sr_audio: int, snr_db: float) -> np.ndarray:
    """Add scaled noise to audio samples"""
    samples_noise, sr_noise = librosa.load(noise_path, sr=None)
    samples_noise = librosa.resample(samples_noise, orig_sr=sr_noise, target_sr=sr_audio)
    samples_noise = samples_noise[:len(samples_audio)]
    
    A_signal = np.sqrt(np.mean(samples_audio ** 2))
    A_noise = np.sqrt(np.mean(samples_noise ** 2))
    if A_noise != 0:
        A_noise_target = A_signal / (10 ** (snr_db / 20))
        noise_scaled = samples_noise * (A_noise_target / A_noise)
        samples_audio = samples_audio + noise_scaled
    return samples_audio

def apply_time_masks(samples_audio: np.ndarray, time_mask_ratio: float, n_time_masks: int) -> np.ndarray:
    """Apply multiple time masks to audio samples"""
    for _ in range(n_time_masks):
        mask_length = int(len(samples_audio) * time_mask_ratio)
        mask_start = random.randint(0, len(samples_audio) - mask_length)
        samples_audio[mask_start:mask_start + mask_length] = 0
    return samples_audio

def apply_frequency_masks(samples_audio: np.ndarray, sr_audio: int, freq_mask_ratio: float, n_freq_masks: int) -> np.ndarray:
    """Apply multiple frequency masks to audio samples"""
    D = librosa.stft(samples_audio)
    n_freqs = D.shape[0]
    for _ in range(n_freq_masks):
        mask_size = int(n_freqs * freq_mask_ratio)
        mask_start = random.randint(0, n_freqs - mask_size)
        D[mask_start:mask_start + mask_size, :] = 0
    return librosa.istft(D)

def augment_audio(audio_path: str, noise_path: Optional[str], aug_type: AugmentationType) -> Tuple[np.ndarray, int]:
    """Apply enhanced augmentation to audio file"""
    samples_audio, sr_audio = librosa.load(audio_path, sr=None)
    
    # Ensure minimum length of 2048 samples to avoid STFT warnings
    min_length = 2048
    if len(samples_audio) < min_length:
        pad_width = min_length - len(samples_audio)
        samples_audio = np.pad(samples_audio, (0, pad_width), mode='constant')
    
    config = get_augmentation_config(aug_type)
    
    if aug_type in [AugmentationType.EXTREME_NOISE, AugmentationType.BURST_NOISE, AugmentationType.DISTORTED]:
        if noise_path:
            samples_audio = add_noise(samples_audio, noise_path, sr_audio, config.snr_db)
    
    if aug_type in [AugmentationType.EXTREME_NOISE, AugmentationType.BURST_NOISE]:
        samples_audio = apply_time_masks(samples_audio, config.time_mask_ratio, config.n_time_masks)
    
    if aug_type == AugmentationType.MULTI_BAND:
        samples_audio = apply_frequency_masks(samples_audio, sr_audio, config.freq_mask_ratio, config.n_freq_masks)
        if config.filter_bands:
            D = librosa.stft(samples_audio)
            n_freqs = D.shape[0]
            freq_response = np.zeros(n_freqs)
            for band in config.filter_bands:
                freq_bins = np.linspace(0, sr_audio/2, n_freqs)
                band_mask = (freq_bins >= band[0]) & (freq_bins <= band[1])
                freq_response[band_mask] = 1
            D *= freq_response[:, np.newaxis]
            samples_audio = librosa.istft(D)
    
    if aug_type == AugmentationType.ALIASED:
        samples_audio = librosa.resample(
            librosa.resample(samples_audio, orig_sr=sr_audio, target_sr=sr_audio//config.downsample_factor),
            orig_sr=sr_audio//config.downsample_factor, target_sr=sr_audio
        )
        samples_audio = apply_frequency_masks(samples_audio, sr_audio, config.freq_mask_ratio, 1)
    
    if aug_type == AugmentationType.DISTORTED:
        samples_audio = apply_distortion(samples_audio, config.distortion_factor)
    
    return samples_audio, sr_audio

In [None]:
# Code based on wav_to_spectrogram function from: 
# https://github.com/Jakobovski/free-spoken-digit-dataset/blob/master/utils/spectogramer.py#L10
def wav_to_spectrogram(samples, sample_rate, spectrogram_dimensions=(112, 112), NFFT=256, noverlap=128, cmap='gray_r'):
    """ Creates a spectrogram of a wav file.

    :param audio_path: path of wav file
    :param save_path:  path of spectrogram to save
    :param spectrogram_dimensions: number of pixels the spectrogram should be. Defaults (64,64)
    :param noverlap: See http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.spectrogram.html
    NOTE: the default NFFT is 256, and a recommended noverlap is NFFT / 2 (50%)
    :param cmap: the color scheme to use for the spectrogram. Defaults to 'gray_r'
    :return: the spectrogram as a numpy array
    """
    fig = plt.figure()
    fig.set_size_inches((spectrogram_dimensions[0]/fig.get_dpi(), spectrogram_dimensions[1]/fig.get_dpi()))
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    # Generate the spectrogram
    ax.specgram(samples, cmap=cmap, NFFT=NFFT, Fs=sample_rate, noverlap=noverlap)
    ax.xaxis.set_major_locator(plt.NullLocator())
    ax.yaxis.set_major_locator(plt.NullLocator())
    fig.canvas.draw()
    spectrogram_array = np.array(fig.canvas.renderer.buffer_rgba())[:, :, 0]  # Keep only one channel

    plt.close(fig)  # Close the figure to free memory

    return spectrogram_array

In [None]:
def generate_augmented_npy(train=True, spectrogram_dimensions=(112, 112), noverlap=128, cmap='gray_r', aug_type=AugmentationType.EXTREME_NOISE):
    """ 
    Creates spectrograms of all the audio files in a directory with 
    augmentation while optimizing memory usage 
    """

    prefix = 'train' if train else 'test'
    labels = np.load(os.path.join(labels_dir, 'train_labels.npy' if train else 'test_labels.npy'))
    audio_indices = [0] * 10
    audio_by_label = train_audio_by_label if train else test_audio_by_label
    max_audio_indices = [len(audio_by_label[str(i)]) for i in range(10)]
    max_noise_index = n_train_noises if train else n_test_noises
    noise_file_names = train_noises_file_names if train else test_noises_file_names
    save_path = f"{save_dir}audio/{prefix}_data_augmented_{aug_type.name.lower()}.npy"
    chunk_size = 1000

    plt.ioff()  # Disable interactive mode to prevent memory leaks

    if os.path.exists(save_path):
        print(f"File {save_path} already exists. Skipping...")
        return

    npy_file = np.memmap(save_path, mode='w+', shape=(len(labels), *spectrogram_dimensions), dtype='uint8')

    for i, val in enumerate(tqdm(labels, desc=f"Processing {prefix.capitalize()} Files ({aug_type.name})")):
        noise_file = noise_file_names[i % max_noise_index] if aug_type in [AugmentationType.EXTREME_NOISE, AugmentationType.BURST_NOISE, AugmentationType.DISTORTED] else None
        audio_idx = audio_indices[int(val)]
        audio_file = audio_by_label[str(val)][audio_idx]
        audio_indices[int(val)] = (audio_indices[int(val)] + 1) % max_audio_indices[int(val)]

        # Process one file at a time (reduces RAM usage)
        augmented_audio, sr_audio = augment_audio(audio_file, noise_file, aug_type)
        spectrogram = wav_to_spectrogram(augmented_audio, sr_audio,
                                         spectrogram_dimensions=spectrogram_dimensions,
                                         noverlap=noverlap, cmap=cmap)

        # Write directly to memmap
        npy_file[i] = spectrogram
        
        if (i + 1) % chunk_size == 0:
            npy_file.flush()  # Force write to disk more frequently

    npy_file.flush()
    print(f"Saved {prefix} data with augmentation ({aug_type.name}) as .npy successfully!")

In [12]:
# Generate datasets for all augmentation types
for aug_type in AugmentationType:
    print(f"Generating training data for {aug_type.name}...")
    generate_augmented_npy(train=True, aug_type=aug_type)

    print(f"Generating test data for {aug_type.name}...")
    generate_augmented_npy(train=False, aug_type=aug_type)

print("All .npy files generated successfully!")

Generating training data for EXTREME_NOISE...
File ../data/avmnist/audio/train_data_augmented_extreme_noise.npy already exists. Skipping...
Generating test data for EXTREME_NOISE...
File ../data/avmnist/audio/test_data_augmented_extreme_noise.npy already exists. Skipping...
Generating training data for MULTI_BAND...


Processing Train Files (MULTI_BAND): 100%|██████████| 60000/60000 [46:01<00:00, 21.73it/s]   


Saved train data with augmentation (MULTI_BAND) as .npy successfully!
Generating test data for MULTI_BAND...


Processing Test Files (MULTI_BAND): 100%|██████████| 10000/10000 [20:18<00:00,  8.20it/s]   


Saved test data with augmentation (MULTI_BAND) as .npy successfully!
Generating training data for BURST_NOISE...


  Z = 10. * np.log10(spec)
Processing Train Files (BURST_NOISE): 100%|██████████| 60000/60000 [34:50<00:00, 28.71it/s]  


Saved train data with augmentation (BURST_NOISE) as .npy successfully!
Generating test data for BURST_NOISE...


Processing Test Files (BURST_NOISE): 100%|██████████| 10000/10000 [27:00<00:00,  6.17it/s]    


Saved test data with augmentation (BURST_NOISE) as .npy successfully!
Generating training data for ALIASED...


Processing Train Files (ALIASED): 100%|██████████| 60000/60000 [41:24<00:00, 24.15it/s] 


Saved train data with augmentation (ALIASED) as .npy successfully!
Generating test data for ALIASED...


Processing Test Files (ALIASED): 100%|██████████| 10000/10000 [27:11<00:00,  6.13it/s]   


Saved test data with augmentation (ALIASED) as .npy successfully!
Generating training data for DISTORTED...


Processing Train Files (DISTORTED): 100%|██████████| 60000/60000 [36:49<00:00, 27.16it/s] 


Saved train data with augmentation (DISTORTED) as .npy successfully!
Generating test data for DISTORTED...


Processing Test Files (DISTORTED): 100%|██████████| 10000/10000 [05:32<00:00, 30.04it/s]


Saved test data with augmentation (DISTORTED) as .npy successfully!
All .npy files generated successfully!


### Appendix

#### Signal-to-Noise Ratio (SNR) Derivation
(based on: https://en.wikipedia.org/wiki/Signal-to-noise_ratio#:~:text=The%20signal%20and%20the%20noise%20must%20be%20measured%20the%20same%20way%2C%20for%20example%20as%20voltages%20across%20the%20same%20impedance)

The Signal-to-Noise Ratio (SNR) is given by:

$$
\text{SNR} = \frac{P_{\text{signal}}}{P_{\text{noise}}} = \left(\frac{A_{\text{signal}}}{A_{\text{noise}}}\right)^2
$$

where $ A $ represents the Root Mean Square (RMS) amplitude.

In decibels (dB), the SNR is expressed as:

$$
\text{SNR}_{dB} = 10 \log_{10} \left(\text{SNR}\right)
$$

Substituting this into the SNR formula:

$$
\text{SNR}_{dB} = 10 \log_{10} \left[\left(\frac{A_{\text{signal}}}{A_{\text{noise}}}\right)^2\right]
$$

Using the logarithm property $ \log_{10} (x^2) = 2 \log_{10} (x) $:

$$
\text{SNR}_{dB} = 20 \log_{10} \left(\frac{A_{\text{signal}}}{A_{\text{noise}}}\right)
$$

Solving for $ A_{\text{noise}} $:

$$
\frac{A_{\text{signal}}}{A_{\text{noise}}} = 10^{\frac{\text{SNR}_{dB}}{20}}
$$

$$
A_{\text{noise}} = A_{\text{signal}} \times 10^{-\frac{\text{SNR}_{dB}}{20}}
$$

Thus, the target noise amplitude is:

$$
A_{\text{noise\_target}} = A_{\text{signal}} / 10^{\frac{\text{SNR}_{dB}}{20}}
$$

##### Scaling the Noise to Match the Target SNR

We initially have noise samples with some RMS amplitude $ A_{\text{noise}} $. To achieve the desired SNR, we **scale** the noise so that its new RMS amplitude matches $ A_{\text{noise\_target}} $. 

To do this, we compute the scaling factor:

$$
\text{scale factor} = \frac{A_{\text{noise\_target}}}{A_{\text{noise}}}
$$

Multiplying the original noise samples by this factor scales them accordingly:

$$
\text{noise\_scaled} = \text{noise\_samples} \times \frac{A_{\text{noise\_target}}}{A_{\text{noise}}}
$$

##### **Why Does This Hold?**

The RMS formula for a discrete signal $ x $ is:

$$
A_x = \sqrt{\frac{1}{N} \sum_{i=1}^{N} x_i^2}
$$

For the original noise samples:

$$
A_{\text{noise}} = \sqrt{\frac{1}{N} \sum_{i=1}^{N} (\text{noise\_samples}_i)^2}
$$

After scaling, the new noise values are:

$$
\text{noise\_scaled} = \text{noise\_samples} \times \frac{A_{\text{noise\_target}}}{A_{\text{noise}}}
$$

So, the RMS of the scaled noise becomes:

$$
A_{\text{scaled\_noise}} = \sqrt{\frac{1}{N} \sum_{i=1}^{N} \left(\text{noise\_samples}_i \times \frac{A_{\text{noise\_target}}}{A_{\text{noise}}}\right)^2}
$$

Using the property $ \text{RMS}(c \cdot x) = |c| \cdot \text{RMS}(x) $, we can factor out the scaling term:

$$
A_{\text{scaled\_noise}} = \left| \frac{A_{\text{noise\_target}}}{A_{\text{noise}}} \right| \times A_{\text{noise}} = A_{\text{noise\_target}}
$$