In [2]:
# Connect do google drive to access data
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Embedding extraction

In [None]:
import os
import librosa
import numpy as np
import torch
from transformers import WhisperProcessor, WhisperModel, Wav2Vec2Processor, Wav2Vec2Model
import torch.nn.functional as F

# Path to Audio directory
data_dir = 'data'

def load_whisper_model(model_name):
    processor = WhisperProcessor.from_pretrained(model_name)
    model = WhisperModel.from_pretrained(model_name)
    return processor, model

def load_wav2vec_model(model_name):
    processor = WhisperProcessor.from_pretrained(model_name)
    model = WhisperModel.from_pretrained(model_name)
    return processor, model

def extract_embeddings_whisper(audio_file):
    try:
        # Load the audio file using librosa (Resample to 16000 Hz, which Whisper expects)
        speech_array, sampling_rate = librosa.load(audio_file, sr=16000)

        # Preprocess the audio using the processor (this will create mel-spectrogram features)
        inputs = processor(speech_array, sampling_rate=sampling_rate, return_tensors="pt", padding=True)

        # Get the mel-spectrogram features from the processor's output
        mel_features = inputs['input_features']

        # Print the shape of the mel-spectrogram to debug
        print(f"Mel-spectrogram shape: {mel_features.shape}")

        # Whisper expects mel-spectrograms of length 3000
        target_length = 3000  # Whisper model's expected length
        current_length = mel_features.shape[2]  # Access time frames dimension

        if current_length < target_length:
            # If the features are shorter than the target, pad with zeros
            padding_length = target_length - current_length
            mel_features = F.pad(mel_features, (0, padding_length), value=0)
        elif current_length > target_length:
            # If the features are longer than the target, truncate to 3000 time frames
            mel_features = mel_features[:, :, :target_length]

        # Move the inputs and model to the same device (GPU if available)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)
        mel_features = mel_features.to(device)

        # Forward pass through the model to extract embeddings
        # We don't need the decoder for feature extraction, so we'll pass only the encoder inputs
        with torch.no_grad():
            # Pass the features only through the encoder part of the model (no decoder inputs required)
            outputs = model.encoder(input_features=mel_features, attention_mask=inputs.get('attention_mask'))

        # Extract the embeddings (mean across time steps)
        embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()  # Averaging over time steps
        return embeddings

    except Exception as e:
        print(f"Error processing {audio_file}: {e}")
        return None

def extract_embeddings_wav2vec(audio_file):
    try:
        # Load the audio file using librosa
        speech_array, sampling_rate = librosa.load(audio_file, sr=16000)

        # Preprocess the audio using the processor
        inputs = processor(speech_array, sampling_rate=sampling_rate, return_tensors="pt", padding=True)

        # Get the input values from the processor's output
        input_values = inputs['input_values']

        # Move the inputs and model to the same device (GPU if available)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)
        input_values = input_values.to(device)

        # Forward pass through the model to extract embeddings
        with torch.no_grad():
            outputs = model(input_values, attention_mask=inputs.get('attention_mask'))

        # Extract the embeddings (mean across time steps)
        embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()  # Averaging over time steps
        return embeddings

    except Exception as e:
        print(f"Error processing {audio_file}: {e}")
        return None

In [None]:
# Whisper extraction
model_name = "openai/whisper-small"
processor, model = load_whisper_model(model_name)

model.eval()

# Path to store embeddings
output_dir = 'whisper_embeddings'
speaker_folders = os.listdir(data_dir) # Create sub-folders in output dir
for speaker in speaker_folders:
    speaker_path = os.path.join(output_dir, speaker)
    os.makedirs(speaker_path, exist_ok=True)


for root, _, files in os.walk(data_dir):
    for file in files:
        if file.endswith(".wav"):
            audio_path = os.path.join(root, file)
            relative_path = os.path.relpath(audio_path, data_dir)
            parts = relative_path.split(os.sep)

            # Construct the output filename
            if len(parts) >= 3:  # Assuming the structure is data/class_label/speaker/audio.wav
                speaker = parts[0]
                audio_name = os.path.splitext(file)[0]
                output_filename = os.path.join(output_dir, speaker, f"{audio_name}_embedding.npy")

                # Extract and save the embeddings
                embeddings = extract_embeddings_whisper(audio_path)
                if embeddings is not None:
                    np.save(output_filename, embeddings)
                    print(f"Embeddings saved to {output_filename}")

print ("Embedding extraction successfully completed.")

In [None]:
# Wav2vec extraction
model_name = "facebook/wav2vec2-base-960h"
processor, model = load_wav2vec_model(model_name)

output_dir = 'wav2vec_embeddings'
speaker_folders = os.listdir(data_dir) # Create sub-folders in output dir
for speaker in speaker_folders:
    speaker_path = os.path.join(output_dir, speaker)
    os.makedirs(speaker_path, exist_ok=True)

for root, _, files in os.walk(data_dir):
    for file in files:
        if file.endswith(".wav"):
            audio_path = os.path.join(root, file)
            relative_path = os.path.relpath(audio_path, data_dir)
            parts = relative_path.split(os.sep)

            if len(parts) >= 3:  # Assuming the structure is data/class_label/speaker/audio.wav
                speaker = parts[0]
                audio_name = os.path.splitext(file)[0]
                output_filename = os.path.join(output_dir, speaker, f"{audio_name}_embedding.npy")

                embeddings = extract_embeddings_wav2vec(audio_path)
                if embeddings is not None:
                    np.save(output_filename, embeddings)
                    print(f"Embeddings saved to {output_filename}")

print ("Embedding extraction successfully completed.")

# Classification





In [None]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

# Load embedings and assign lables
def load_embeddings_and_labels(data_dirs,label):
    embeddings = []
    labels = []
        for path, label in zip(data_paths, labels):
        for filename in os.listdir(path):
            if filename.endswith(".npy"):
                embedding = np.load(os.path.join(path, filename))
                embeddings.append(embedding)
                all_labels.append(label)
    return np.array(embeddings), np.array(labels)

# SVM classifier
def perform_svm_classification(X, y, title):
    # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y) # Split based on class lables
    svm_classifier = SVC(kernel='rbf')
    svm_classifier.fit(X_train, y_train)
    y_pred = svm_classifier.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy ({title}): {accuracy}")
    print(classification_report(y_test, y_pred))
    plot_confusion_matrix(y_test, y_pred, title)
    # plot_decision_boundary(svm_classifier, X_train, y_train, title)

# Confusion matrix Plot
def plot_confusion_matrix(y_true, y_pred, title):
  cm = confusion_matrix(y_true, y_pred)
  print("Confusion Matrix:")
  print(cm)
  plt.figure(figsize=(8, 6))
  sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
  plt.xlabel('Predicted')
  plt.ylabel('True')
  plt.title(f'Confusion Matrix ({title})')
  plt.show()

# PCA
def plot_decision_boundary(classifier, X, y, title):
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X)

    h = .02
    x_min, x_max = X_pca[:, 0].min() - 1, X_pca[:, 0].max() + 1
    y_min, y_max = X_pca[:, 1].min() - 1, X_pca[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))

    Z = classifier.predict(pca.inverse_transform(np.c_[xx.ravel(), yy.ravel()]))
    Z = Z.reshape(xx.shape)

    plt.figure(figsize=(8, 6))
    plt.contourf(xx, yy, Z, alpha=0.5)
    plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, edgecolor='k')
    plt.title(f'Decision Boundary ({title})')
    plt.show()

# View clasiification for each test data
def svm_classification_results(X, y, title):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    svm_classifier = SVC(kernel='rbf')
    svm_classifier.fit(X_train, y_train)
    y_pred = svm_classifier.predict(X_test)

    # Print predicted and actual classes for each embedding in the test set
    for i in range(len(y_test)):
        print(f"Embedding {i+1}: Predicted Class - {y_pred[i]}, Actual Class - {y_test[i]}")

# Analyse class distribution across data
def class_distribution(labels):
    class_counts = Counter(labels)
    total_samples = len(labels)
    for label, count in class_counts.items():
        percentage = (count / total_samples) * 100
        print(f"Class {label}: {count} samples ({percentage:.2f}%)")

# Example usage (replace with your actual labels):
# Assuming 'labels' is a NumPy array of your class labels

# Analysing train and test data
def svm_classification_analysis(X, y, title):
    # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Random samplings
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y) # Stratified samplings

    # Print the size of train and test data for each class
    print(f"Data sizes for {title}:")
    for class_label in np.unique(y):
        train_count = np.sum(y_train == class_label)
        test_count = np.sum(y_test == class_label)
        print(f"Class {class_label}: Train - {train_count}, Test - {test_count}")

## Whisper_small classification

In [None]:
# Define paths to embeddings file
data_control='whisper_embeddings/control'
data_verylow='whisper_embeddings/verylow'
data_low='whisper_embeddings/low'
data_medium='whisper_embeddings/medium'
data_high='whisper_embeddings/high'

In [None]:
# Severity classification
data_paths = [data_verylow, data_low, data_medium, data_high]
labels = ['verylow', 'low', 'medium', 'high']

# Load embeddings and labels
embeddings_severe, lables_severe = load_embeddings_and_labels(data_paths,labels)
print(embeddings_severe.shape) # (data size, hidden state)
print(lables_severe.shape)  # (label_size,)
print(np.unique(lables_severe))

# Call Svm classifier
perform_svm_classification(embeddings_severe, lables_severe, "Whisper_small")

In [None]:
# Binary classification
data_paths = [data_control, data_verylow, data_low, data_medium, data_high]
labels = ['Control', 'Dysarthria', 'Dysarthria', 'Dysarthria', 'Dysarthria']

# Load embeddings and labels
embeddings_binary, lables_binary = load_embeddings_and_labels(data_paths,labels)
print(embeddings_binary.shape) # (data size, hidden state)
print(lables_binary.shape)  # (label_size,)
print(np.unique(lables_binary))

# Call Svm classifier
perform_svm_classification(embeddings_binary, lables_binary, "Whisper_small")