### Package

In [1]:
import os
import timm
import wandb
import torch
import librosa
import pandas as pd
import numpy as np 
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
from torchvision.transforms import ToTensor
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    classification_report,
)

### Process Data for Dataloader

In [2]:
path = "Data/TORGO"

In [3]:
output_path = Path(path)/f"{Path(path).stem}_info.csv"

In [4]:
audio_file_count = 0
audio_extensions = {".mp3", ".wav", ".m4a", ".aac", ".flac", ".ogg", ".wma"}
file_info =[]
# Traverse through the directory and subdirectories
for root, dirs, files in os.walk(path):
    for file in files:
        file_extension = Path(file).suffix.lower()
        if file_extension in audio_extensions:
            try:
                full_path = os.path.join(root, file)

                y, sr = librosa.load(full_path)
                directory_name = os.path.normpath(root).split(os.sep)
                # Preprocess the info from the filename 
                
                illness = 0 if ("c" in directory_name[-3][:2].lower()) else 1
                gender = 'F' if ("f" in directory_name[-3][:2].lower()) else 'M'
                file_info.append([full_path, illness, gender])
            except Exception as e:
                # Print the error message and the file path that caused the error
                print(f"Error processing {full_path}: {e}")

            # Preprocess the info from the filename 
            
            
data = pd.DataFrame(file_info, columns=["FileName", "Labels", "Gender"])

# Save DataFrame to a CSV file
output_csv = output_path 
data.to_csv(output_csv, sep=',', header='true', index=False)
    


  y, sr = librosa.load(full_path)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Error processing Data/TORGO/F/F01/Session1/wav_headMic/0067.wav: 
Error processing Data/TORGO/F/F01/Session1/wav_headMic/0068.wav: 


In [5]:

class AudioDataset(torch.utils.data.Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        spectrogram = self.data[idx]
        label = self.labels[idx]
        if self.transform:
            spectrogram = self.transform(spectrogram)
        return spectrogram, label

In [10]:
def ProcessData(d, batch_size= 512):
    
    data = []
    labels = []
    for _, row in tqdm(d.iterrows(), total=len(d), desc="Processing files"):
        # Get the file name/path from the DataFrame
        file = row['FileName']  
        
        # Compute the Mel-spectrogram
        #mels_db, sr = audio_to_melspectrograms(file)
        mels_db, sr = audio_to_mfcc(file)
        label_exploded = [row['Labels']] * mels_db.shape[0]
        data.append(mels_db)
        labels.append(label_exploded)
    data = np.vstack(data)
    labels = np.concatenate(labels)

    # Standardize the data
    mean = np.mean(data, axis=0)
    std = np.std(data, axis=0)
    epsilon = 1e-8  
    data_standardized = (data - mean) / (std+epsilon)

    # Split data into training and testing sets
    X_train, X_test, Y_train, Y_test = train_test_split(data_standardized, labels, test_size=0.2, random_state=42, stratify=labels)

    # Transform to Tensor
    transform = ToTensor()

    # Create the training and testing dataloader
    train_dataset = AudioDataset(X_train, Y_train, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    test_dataset = AudioDataset(X_test, Y_test, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, test_loader

In [7]:
def audio_to_mfcc(audio_file):
    """
    Create the MFCC (Mel-Frequency Cepstral Coefficients) for the signal
    
    Args:
        audio_file: The path to the audio file
        
    Returns:
        np.ndarray: MFCCs after normalizing
        int: Sampling rate of the audio file
    """
    y, sr = librosa.load(audio_file)
    
    # Frame the signal into short windows of 400ms and hop by 200ms
    # So that each frame can have fixed length 
    frame_length = int(sr * 4)  
    hop_length = int(sr * 2)   
    
    if len(y) > frame_length:
        y_framed = librosa.util.frame(
            y, frame_length=frame_length, hop_length=hop_length
        )
    else:
        # If the signal is too short, pad it with zeros
        y_padded = np.concatenate([y, np.zeros(frame_length - len(y))])
        y_framed = librosa.util.frame(
            y_padded, frame_length=frame_length, hop_length=hop_length
        )
    
    # Compute MFCCs from the framed signal
    mfccs = librosa.feature.mfcc(
        y=y_framed.T, sr=sr, n_mfcc=40, hop_length=int(sr * 0.03), n_fft=512
    )

    
    return mfccs, sr



### Define Model and its relevant function

In [6]:

class Net(torch.nn.Module):
    def __init__(self, device):
        super(Net, self).__init__()

        self.model = timm.create_model("resnet18", pretrained=True, num_classes=2)
        self.model.conv1 = torch.nn.Conv2d(
            1, self.model.conv1.out_channels,
            kernel_size=self.model.conv1.kernel_size,
            stride=self.model.conv1.stride,
            padding=self.model.conv1.padding,
            bias=False,
        )

        # Set the loss function and optimizer
        self.criterion = torch.nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.parameters(), lr=0.001)

        # Set the device (CPU or GPU)
        self.device = device

    
    def fit(self, train_loader, epochs):
        history = {'loss':[], 'accuracy':[]}

        for epoch in range(epochs):
            # Set model to training mode
            self.train()

            print("\nEpoch {}/{}".format(epoch+1, epochs))

            with tqdm(total=len(train_loader)) as pbar:
                for _, batch in enumerate(train_loader):
                    # Unpack the batch
                    inputs, labels = batch  
                    inputs = inputs.to(torch.float32).to(self.device)
                    labels = labels.to(torch.long).to(self.device)

                    # Zero the parameter gradients
                    self.optimizer.zero_grad()

                    
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels)
                    loss.backward()

                    # Update the parameters
                    self.optimizer.step()

                    pbar.update(1)        
            
            # Model evaluation on train data
            train_loss, train_report = self.evaluate(train_loader)
            train_acc = train_report[0]
            print(f"loss: {train_loss:.4f} - accuracy: {100 *train_acc:.4f}%")

            # Log metrics to WandB
            wandb.log({
                "epoch": epoch + 1,
                "train_loss": train_loss,
                "train_accuracy": train_acc
            })

            # Store the model's training progress
            history['loss'].append(train_loss)
            history['accuracy'].append(train_acc)
            
        return history

    def predict(self, X):
        # Set model to evaluation mode
        self.eval()  

        with torch.no_grad():
            outputs = self.model(X)
        return outputs
    
    def evaluate(self, data_loader):
        # Set model to evaluation mode
        self.eval()

        running_loss = torch.tensor(0.0).to(self.device)

        predictions = []
        true_labels = []
        report = []
        
        with tqdm(total=len(data_loader), desc="Evaluating") as pbar:  # Add colon and length of data_loader
        
            for step, batch in enumerate(data_loader):
                # Unpack the batch
                inputs, labels = batch  
                
                inputs = inputs.to(torch.float32).to(self.device)
                labels = labels.to(torch.long).to(self.device)
                
                outputs = self.predict(inputs)

                # Compute batch loss
                loss = self.criterion(outputs, labels)
                running_loss += loss

                # Calculate batch accuracy
                _, predicted = torch.max(outputs.data, 1)
                predictions.extend(predicted.cpu().detach().numpy())
                true_labels.extend(labels.cpu().detach().numpy())

                # Update the progress bar
                pbar.update(1)
        
            loss = running_loss.item() / (step+1)
            # Calculate metrics using accumulated true labels and predictions
            report.append(accuracy_score(true_labels, predictions))
            report.append(balanced_accuracy_score(true_labels, predictions))
            report.append(classification_report(true_labels, predictions))
            
        
        return loss, report

### Train & Test

In [11]:
def train(model, train_loader, test_loader):
    
    loss, score = model.evaluate(test_loader)
    
    wandb.log({
        "test_loss": loss,
        "test_accuracy": score[0]
    })
    print("Pre-training accuracy: %.4f%%" % (100 * score[0]))
    epochs = 5
    start_time = datetime.now()
    history = model.fit(train_loader, epochs=epochs)
    end_time = datetime.now() - start_time
    print("\nTraining completed in time: {}".format(end_time))
    loss, eval = model.evaluate(test_loader)
    wandb.log({
        "test_loss": loss,
        "test_accuracy": eval[0]
    })

    # Print the evaluation metrics
    print(f"testing accuracy: {eval[0] * 100:.2f}")
    print(f"Balanced Accuracy: {eval[1] * 100:.2f}%")
    print("Classification Report:\n", eval[2])

# %%
if __name__=="__main__":
    
    wandb.init(project="speech_bias")

    d = pd.read_csv("Data/TORGO/TORGO_info.csv")
    train_loader, test_loader = ProcessData(d, batch_size= 1024)
    print(len(train_loader))

    # Check if a GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize the model
    model = Net(device).to(device)

    train(model, train_loader, test_loader)
    
    wandb.finish()
# %%


VBox(children=(Label(value='0.009 MB of 0.009 MB uploaded\r'), FloatProgress(value=1.0, max=1.0)))

Processing files: 100%|██████████| 17631/17631 [01:12<00:00, 244.26it/s]


16


Evaluating: 100%|██████████| 4/4 [00:31<00:00,  7.77s/it]


Pre-training accuracy: 48.8692%

Epoch 1/5


 31%|███▏      | 5/16 [01:15<02:45, 15.09s/it]