In [1]:
import shutil
import librosa, librosa.display
import os
import subprocess

import torch
from torch import nn

import numpy as np
import pandas as pd
import polars as pl

import matplotlib.pyplot as plt
import seaborn as sns

from IPython.display import Audio

from pydub import AudioSegment

import torch.nn as nn
import torch
from torch.utils.data import Dataset, DataLoader

import warnings
warnings.filterwarnings("ignore")

import soundfile as sf


from statsmodels.distributions.empirical_distribution import ECDF
from sklearn.model_selection import train_test_split

import colorednoise as cn

#### From here it's mostly creating a root directory from the different sources of data and combining them and their subfolders etc
#### So I will not show it since it involves local directories and traversing subfolders etc. 

### What I will bring up
- I am using 4 sources of data (Imagimob, Common Voice, ESC-50, and COVID-19 coughs)
- Imagimob: 452 samples (chunks) | Coughvid: 1569 samples | The rest is Common Voice and ESC-50 (no cough sounds)
- Cough: 2001 samples | No cough: 1979 samples (BALANCED!)
- Data augmentation: 7962 samples in total which comes solely from inject noise into original samples (violating Nyquists theorem doesn't matter here since we're interested in model robustness)
- Cutting or padding audio into 7 second files
- Mel Spectrograms, Takens Embeddings, PCA, and 3D Point clouds | These are the features engineered

#### Things in the whole deliverable but not in this notebook:
- Monte Carlo dropout
- k-fold cross validation
- random subsampling
- weight resetting to avoid weight/parameter leakage 
- weight initialization
- hybrid CNN with 3D and 2D input
- taking 1D data, transforming it into 2D, dimensionality reduction into 3 features, and then slicing it such that it becomes 3D with a tensor
- **unit tests**
- terminal/shell training with arguments


#### Below are examples of how it has been done. See the other companion notebooks for fun stuff. <br>

**FYI**: Statistical data analysis has not been included because the task is not targeted at inferential statistics and the dataset bears no interesting statistical usage.

In [2]:

dataset_path = "C:/Users/jako/data/Imagimob_ML_test/"

catg = os.path.join(dataset_path, "data")

In [None]:
cc = [subfolder for subfolder in os.listdir(catg) if os.path.isdir(os.path.join(catg, subfolder))]

audio_path = os.path.join(catg, "audio")
metadata_path = os.path.join(catg, 'metadata')
coughs_path = os.path.join(audio_path, "coughs")
no_coughs_path = os.path.join(audio_path, "no_coughs")

os.makedirs(audio_path, exist_ok=True)
os.makedirs(metadata_path, exist_ok=True)
os.makedirs(coughs_path, exist_ok=True)
os.makedirs(no_coughs_path, exist_ok=True)

for category in cc:
    catpath = os.path.join(catg, category)
    
    sample_folders = [sampfold for sampfold in os.listdir(catpath) if os.path.isdir(os.path.join(catpath, sampfold))]
    
    
    if category in ["coughing", "coughing_batch_2"]:
        dest_audio_path = coughs_path
    else:
        dest_audio_path = no_coughs_path
        
    for sampfold in sample_folders:
        sample_folder_path = os.path.join(catpath, sampfold)
        
        for fname in os.listdir(sample_folder_path):
            
            
            filepath = os.path.join(sample_folder_path, fname)
            
            new_fname = category + "_" + sampfold + "_" + fname
            
            if fname.endswith(".wav"):
                dest_path = os.path.join(dest_audio_path, new_fname)
            elif fname.endswith(".label"):
                dest_path = os.path.join(metadata_path, new_fname)
            else:
                continue
        
            shutil.move(filepath, dest_path)

To avoid adding complexity because of a subfolder inside a subfolder    

In [None]:
mictap = os.path.join(catg, "mic_tapping")
micc = [subfolder for subfolder in os.listdir(mictap) if os.path.isdir(os.path.join(mictap, subfolder))]

for category in micc:
    catpath = os.path.join(mictap, category)
    
    sample_folders = [sampfold for sampfold in os.listdir(catpath) if os.path.isdir(os.path.join(catpath, sampfold))]

    dest_audio_path = no_coughs_path

    for sampfold in sample_folders:
        sample_folder_path = os.path.join(catpath, sampfold)
        
        for fname in os.listdir(sample_folder_path):
            
            
            filepath = os.path.join(sample_folder_path, fname)
            
            new_fname = "mic_tapping" + "_" + sampfold + "_" + fname
            
            if fname.endswith(".wav"):
                
                dest_path = os.path.join(dest_audio_path, new_fname)
            elif fname.endswith(".label"):
                dest_path = os.path.join(metadata_path, new_fname)
            else:
                continue
        
            shutil.move(filepath, dest_path)

In [3]:
AUDIO_PATH = "C:/Users/jako/data/Imagimob_ML_test/data/audio/"
LABEL_PATH = "C:/Users/jako/data/Imagimob_ML_test/data/metadata/"

In [4]:
cough_wav_list = cough_wav_list = [file for file in os.listdir(os.path.join(AUDIO_PATH,"coughs")) if file.endswith(".wav") ]
no_cough_wav_list = [file for file in os.listdir(os.path.join(AUDIO_PATH,"no_coughs")) if file.endswith(".wav") ]

full_lbl_list = os.listdir(LABEL_PATH)

In [5]:
NO_WAV_FILES = len(cough_wav_list) + len(no_cough_wav_list)
print("Number of .wav files: {}".format(NO_WAV_FILES))
print("Number of .label files: {}".format(len(full_lbl_list)))

Number of .wav files: 70
Number of .label files: 70


Let's scan through the raw audio files and check their length statistics.

In [14]:
length_of_file_coughs = []
helper_fname = os.path.join(AUDIO_PATH, "coughs") + "/"

for wavfile in os.listdir(os.path.join(AUDIO_PATH, "coughs")):
    if wavfile.endswith(".wav"):
        y, sr = librosa.load(helper_fname + wavfile, sr=None)
        soundlength = len(y)/sr
        length_of_file_coughs.append( soundlength )

In [16]:
print("Average length in seconds: {} seconds".format( round(np.mean(length_of_file_coughs), 2)))
print("Median length in seconds: {} seconds".format( np.median(length_of_file_coughs) ))
print("Standard deviation of the sample lengths: {} seconds".format( round(np.std(length_of_file_coughs), 2)))
print("The 95th percentile of sample lengths: {} seconds".format( np.quantile(length_of_file_coughs, 0.95) ))

Average length in seconds: 42.6 seconds
Median length in seconds: 61.08 seconds
Standard deviation of the sample lengths: 27.02 seconds
The 95th percentile of sample lengths: 70.824 seconds


Can't work with different lengths of data. How about splitting them into 5 second chunks?

In [18]:
def split_to_7(audio_array, samplerate, wav_name, output_dir):
    
    sample_chunk = 5 * samplerate
    
    numchunks = len(audio_array) // sample_chunk # Integer division to avoid exceeding length in forloop
    
    criteria = len(audio_array) // samplerate
    
    if criteria > 5*2:
        
        for i in range(numchunks):
            
            chunk = audio_array[i*sample_chunk:(i+1)*sample_chunk]
            new_name = wav_name[:-4]
            output_path = os.path.join(output_dir, f"{new_name}_chunk_{i+1}.wav")
            sf.write(output_path, chunk, samplerate)

    else:
        print("Criteria of 5 seconds not met. Minimum must be 10 seconds.")



def process_audio_files(directory, base_path, output_subfolder):
    """
    Process audio files in the given directory.
    """
    input_path = os.path.join(base_path, directory)
    output_path = os.path.join(base_path, directory, output_subfolder)
    os.makedirs(output_path, exist_ok=True)

    length_of_file = []
    for wavfile in os.listdir(input_path):
        file_path = os.path.join(input_path, wavfile)
        
        y, sr = librosa.load(file_path, sr=None)
        soundlength = len(y) / sr
        length_of_file.append(soundlength)
        
        split_to_7(y, sr, wavfile, output_path)
    
    return length_of_file


#### FEATURE ENGINEERING: Cutting or padding

Based on exploratory data analysis where skewness of the length distribution shows that the large majority are concentrated around 9 seconds. <br>
Choosing 7 seconds to keep everything computationally efficient but also practically valuable. <br>
Exploratory analysis not added because the task is not about statistical analysis and the data provides no interesting statistical usage.

In [None]:
def cut_when_needed(signal):
    if len(signal) > (7*16000):
        signal = signal[:7*16000]
    else:
        signal = signal
    return signal

def stretch_when_needed(signal):
    signal_length = len(signal)
    if signal_length < (7*16000):
        num_missing = (7*16000) - signal_length
        last = (0, num_missing)
        signal = torch.nn.functional.pad(torch.tensor(signal), last)
    else:
        signal = signal
    return signal


#### FEATURE ENGINEERING: Mel Spectrograms

In [None]:
wav_paths = ["/data/custom_cough/cough", "/data/custom_cough/no_cough"]
dest_path = "/data/custom_cough/mels"

meller = MelSpectrogram(16000, n_fft=1024, hop_length=512, n_mels=64)


# Using Polars library
for group in dataframe["label"].unique():
    data = dataframe.filter(pl.col("label") == group)
    
    for row in data["id"]:
        if group == 1:
            fullpath = os.path.join(wav_paths[0], row)
        else:
            fullpath = os.path.join(wav_paths[1], row)

        y, sr = librosa.load(fullpath, sr=None)
        y = cut_when_needed(y)                      # Apply cutting if signal too long
        y = stretch_when_needed(y)                  # Apply right padding if signal too short
        melconvert = meller(torch.tensor(y))        # Convert into mel spectrogram
        newname = row[:-4] + ".pt"          # Use .pt format
        torch.save(melconvert, os.path.join(dest_path, newname)) # Save as a torch tensor

#### FEATURE ENGINEERING: Create a noise generator

In [None]:
def generate_some_noise(beta: int = 1, freq: int = 44100) -> np.array:
  """
  Function to generate some noise (defaults to pink noise with beta=1)
  beta = 1 is for pink noise and is standard, 0 is Gaussian
  freq =  is just the number of samples 
  """
  noise_array = cn.powerlaw_psd_gaussian(beta, freq)      # Generate the noise
  noise_array = (noise_array - np.min(noise_array)) / (np.max(noise_array) - np.min(noise_array))     # Normalize
  noise_array = 2*noise_array - 1     # Scale between -1 and 1

  return noise_array

#### FEATURE ENGINEERING: Noisy Mel Spectrograms

In [None]:
wav_paths = ["/data/custom_cough/noise_cough_augmented", "/data/custom_cough/noise_no_cough_augmented"]
dest_path = "/data/custom_cough/mels_w_noise"


custom_path_dest = "/custom_cough/noise_cough_augmented"
custom_path = "/data/custom_cough/cough"

for wav in os.listdir(custom_path):
    y, sr = librosa.load(os.path.join(custom_path, wav), sr=None)
    brown_noise = generate_pink_noise(2, len(y))
    pink_noise = generate_pink_noise(1, len(y))
    mixed_noise = brown_noise*pink_noise
    
    flipmulti = np.random.multinomial(1, pvals=[1/3, 1/3, 1/3])

    if flipmulti[0] == 1:
        y_noise_injected = y + mixed_noise  # Additive mixed noise. Can also have a multiplicative noise like below but for the sake of demonstration keep it like this
    elif flipmulti[1] == 1:
        y_noise_injected = y + (pink_noise*y)   # Multiplicative pink noise (decaying noise)
    else: 
        y_noise_injected = y + (brown_noise*y)  # Multiplicative brown noise (decaying soft noise)
    
    new_fname = "noise" + "_" + wav
    
    sf.write(os.path.join(custom_path_dest, new_fname), y_noise_injected, samplerate=sr)




meller = MelSpectrogram(16000, n_fft=1024, hop_length=512, n_mels=64)

for group in dataframe["label"].unique():
    data = dataframe.filter(pl.col("label") == group)
    
    for row in data["id"]:
        if group == 1:
            fullpath = os.path.join(wav_paths[0], row)
        else:
            fullpath = os.path.join(wav_paths[1], row)

        y, sr = librosa.load(fullpath, sr=None)
        y = cut_when_needed(y)
        y = stretch_when_needed(y)
        melconvert = meller(torch.tensor(y))
        newname = row[:-4] + ".pt"
        torch.save(melconvert, os.path.join(dest_path, newname))

#### FEATURE ENGINEERING: Takens Embeddings - Delayed copies of a signal

In [None]:
wav_paths = ["/data/custom_cough/cough", "/data/custom_cough/noise_cough_augmented", 
             
             "/data/custom_cough/no_cough", "/data/custom_cough/noise_no_cough_augmented"]

tks = SingleTakensEmbedding(time_delay=1, dimension=100, stride=30, n_jobs=-1, parameters_type="fixed")

dir_path = "/data/custom_cough/topological_signal"

for group in dataframe["label"].unique():
    data = dataframe.filter(pl.col("label") == group)
    paths_check = wav_paths[:2] if group == 1 else wav_paths[2:]
    
    
    for row in data["id"]:
        for path in paths_check:
            fullpath = os.path.join(path, row)
            
            if os.path.exists(fullpath):
                y, sr = librosa.load(fullpath, sr=None)
                y = cut_when_needed(y)
                y = stretch_when_needed(y)
                y = tks.fit_transform(y)
                
                newname = row[:-4] + ".pt"
                
                torch.save(y, os.path.join(dir_path, newname))
                
                break
        

# Doing likewise for testset (I use the words in opposite)
dir_path = "C:/Users/jako/data/custom_cough/topological_signal_test"
for group in dataframe["label"].unique():
    data = dataframe.filter(pl.col("label") == group)
    paths_check = wav_paths[:2] if group == 1 else wav_paths[2:]
    
    
    for row in data["id"]:
        for path in paths_check:
            fullpath = os.path.join(path, row)
            
            if os.path.exists(fullpath):
                y, sr = librosa.load(fullpath, sr=None)
                y = cut_when_needed(y)
                y = stretch_when_needed(y)
                y = tks.fit_transform(y)
                
                newname = row[:-4] + ".pt"
                
                torch.save(y, os.path.join(dir_path, newname))
                
                break


#### FEATURE ENGINEERING: 3D Point Clouds of topological transforms
#### See *parse_tensors.py* for the full structure

In [None]:
# First thing we do is to take the Takens embeddings
tf = SingleTakensEmbedding(time_delay=1, dimension=100, stride=1, n_jobs=-1, parameters_type="fixed") # Must be set to "fixed" and n_jobs=-1 for fast computation
y_no = tf.fit_transform(y_no_cough)
y_c = tf.fit_transform(y_cough )

# Then do PCA with 3 components such that we have a 3D space
pca = PCA(n_components=3)
y_no_pca = pca.fit_transform(y_no)
y_c_pca = pca.fit_transform(y_c)


# Now we want to slice the 3D space into 24 frames by size 24x24 matrices (check the topology notebooks for cool viz!)
def create_3d_tensor_from_pca(pca_data, shape=(24, 24, 24)):
    # Determine the range for each axis
    mins = pca_data.min(axis=0)
    maxs = pca_data.max(axis=0)
    
    # Create the 3D tensor filled with zeros
    tensor_3d = np.zeros(shape)
    
    # Determine the step size for each dimension
    step_sizes = (maxs - mins) / np.array(shape)
    
    # Distribute the PCA points into the tensor
    for point in pca_data:
        # Calculate the voxel's index for each point
        idx = ((point - mins) / step_sizes).astype(int)
        
        # Clip to ensure we don't exceed the shape due to floating point inaccuracies
        idx = np.clip(idx, 0, np.array(shape) - 1)
        
        # Increment the voxel where the point falls
        tensor_3d[tuple(idx)] += 1
    
    return tensor_3d


#### DATA CONVERSION: Converting .webm to .wav 
See *convert_webm_to_wav.py*

In [None]:
def convert_webm_to_wav(webm):
    if webm.endswith(".webm"):
        WEBM_FILE = os.path.join(coughvid_from_kaggle_path, webm)
        
        # Check the file size
        if os.path.getsize(WEBM_FILE) > MAX_SIZE:
            return f"Skipping {webm} because it's larger than 100 kB."
        
        output_name = os.path.join(output_path, webm[:-5] + ".wav")
        
        command = [
            "ffmpeg", "-i", WEBM_FILE,
            "-vn",
            "-acodec", "pcm_s16le",  # 16-bit depth
            "-ac", "1",  # Mono
            "-ar", "16000",  # 44.1 kHz sample rate
            "-f", "wav", 
            output_name
        ]
        try:
            subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            return f"Successfully converted {webm} to {output_name}"
        except subprocess.CalledProcessError as e:
            return f"Error processing {webm}: {e.stderr.decode('utf-8')}"
    return None

if __name__ == '__main__':
    webm_files = [f for f in os.listdir(kaggle_path) if f.endswith(".webm")]

    with Pool(processes=cpu_count() // 2) as pool:  # Using half the CPU cores
        results = pool.map(convert_webm_to_wav, webm_files)
        for result in results:
            if result:
                print(result)