Installing packages

In [1]:
!pip install datasets --quiet
!pip install torch --quiet
!pip install keras --quiet

Importing relevant packages

In [2]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.model_selection import StratifiedKFold
import pandas as pd
from tqdm import tqdm

Load data for training

In [3]:
np.random.seed(42)  # Set the seed for NumPy random number generation

def load_data(base_dir, layer):
    embeddings = []
    labels = []
    label_map = {}
    current_label = 0

    # Iterate over each speaker's directory
    for speaker_dir in os.listdir(base_dir):
        # Build the path to the specific layer for the current speaker
        layer_dir = os.path.join(base_dir, speaker_dir, layer)

        if os.path.isdir(layer_dir):
            # Load all .npy files in this layer directory
            for file_name in os.listdir(layer_dir):
                if file_name.endswith('.npy'):
                    path = os.path.join(layer_dir, file_name)
                    embedding = np.load(path)
                    embeddings.append(embedding)

                    # Map speaker to a label if not already done
                    if speaker_dir not in label_map:
                        label_map[speaker_dir] = current_label
                        current_label += 1

                    # Append the label for each embedding
                    labels.append(label_map[speaker_dir])

    # Convert list of embeddings and labels to numpy arrays
    embeddings = np.array(embeddings)
    labels = np.array(labels)
    return embeddings, labels


Creating model

In [4]:
class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(5, 5), padding=(2, 2))
        self.pool = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3), padding=(1, 1))
        self.conv3 = nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1))
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(128 * 12 * 128, 128)  # Adjust the flattened size according to your input shape
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = x.view(-1, 128 * 12 * 128)  # Flatten the tensor for the fully connected layer
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


Function to train und evaluate all layers by iterating over them.

In [5]:
def train_and_evaluate(base_dir, layers, num_classes, device, epochs=30, weight_decay=0.01):
    results = []
    for layer in layers:
        embeddings, labels = load_data(base_dir, layer)

        train_dataset = TensorDataset(torch.from_numpy(embeddings), torch.from_numpy(labels))
        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

        model = CNN(num_classes=num_classes).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=weight_decay)

        epoch_data = {'epoch': [], 'train_loss': [], 'train_accuracy': [], 'train_precision': [], 'train_recall': [], 'train_f1': []}

        for epoch in range(epochs):
            model.train()
            total_loss = 0
            correct = 0
            total = 0
            predictions = []
            targets_list = []
            for inputs, targets in train_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                correct += (predicted == targets).sum().item()
                total += targets.size(0)
                predictions.extend(predicted.cpu().numpy())
                targets_list.extend(targets.cpu().numpy())

            accuracy = correct / total
            precision, recall, f1, _ = precision_recall_fscore_support(targets_list, predictions, average='weighted')

            epoch_data['epoch'].append(epoch)
            epoch_data['train_loss'].append(total_loss / len(train_loader))
            epoch_data['train_accuracy'].append(accuracy)
            epoch_data['train_precision'].append(precision)
            epoch_data['train_recall'].append(recall)
            epoch_data['train_f1'].append(f1)

            print(f"Layer: {layer}, Epoch: {epoch+1}, Train Loss: {total_loss / len(train_loader)}, Train Accuracy: {accuracy}, Train Precision: {precision}, Train Recall: {recall}, Train F1: {f1}")

        results.append((layer, total_loss / len(train_loader), accuracy, precision, recall, f1))
        df = pd.DataFrame(epoch_data)
        df.to_csv(f"{layer}_training_progress.csv", index=False)

    return results


Saving results and training progress of each step into CSV file.

In [6]:
def save_final_results(results):
    df_results = pd.DataFrame(results, columns=['Layer', 'Accuracy', 'Precision', 'Recall', 'F1-Score'])
    df_results.to_csv("final_results.csv", index=False)

Conducting training and evaluation

In [None]:
from google.colab import drive
drive.mount('/content/drive')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

base_dir = '/content/drive/My Drive/new_speaker_identification/clips__test/'
layers = ['layer_0_processed', 'layer_5_processed', 'layer_10_processed', 'layer_20_processed', 'layer_24_processed']
num_classes = 25
results = train_and_evaluate(base_dir, layers, num_classes, device)
save_final_results(results)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
cuda
Layer: layer_0_processed, Epoch: 1, Train Loss: 2.374937711510004, Train Accuracy: 0.3174038402551991, Train Precision: 0.343989696899871, Train Recall: 0.3174038402551991, Train F1: 0.31022669044002393
Layer: layer_0_processed, Epoch: 2, Train Loss: 1.0196667647829243, Train Accuracy: 0.6275688608060855, Train Precision: 0.6236216139843251, Train Recall: 0.6275688608060855, Train F1: 0.6219724458391239
Layer: layer_0_processed, Epoch: 3, Train Loss: 0.8568880525289797, Train Accuracy: 0.6777498312986934, Train Precision: 0.6769571779478768, Train Recall: 0.6777498312986934, Train F1: 0.6746172457635058
Layer: layer_0_processed, Epoch: 4, Train Loss: 0.7408432809745564, Train Accuracy: 0.7136372001717686, Train Precision: 0.7140360358928579, Train Recall: 0.7136372001717686, Train F1: 0.7109003675893407
Layer: layer_0_processed, Epoch: 5, Train Loss: 0.7