### Split the data into train set and test set

In [3]:
import os
import numpy as np
import librosa
from torch.utils.data import DataLoader, random_split
from dap_datasets import DAPSAudioDataset
import torch

audio_dir = "daps"

class_1_speakers = ["f1", "f7", "f8", "m3", "m6", "m8"]

class_mapping = {}

for root, dirs, files in os.walk(audio_dir):
    for file in files:
        # Skip files that start with "._" or are not ".wav" files
        if file.startswith("._") or not file.endswith(".wav"):
            continue

        audio_path = os.path.join(root, file)
        try:
            audio, sr = librosa.load(audio_path, sr=None)
            speaker_prefix = file.split("_")[0]
            if speaker_prefix in class_1_speakers:
                class_mapping[audio_path] = 1
            else:
                class_mapping[audio_path] = 0
        except Exception as e:
            print(f"Error processing {audio_path}: {e}")

print("Class mapping created:")
#print(class_mapping)

class_0_count = sum(1 for label in class_mapping.values() if label == 0)
class_1_count = sum(1 for label in class_mapping.values() if label == 1)

print(f"Total Class 0 samples: {class_0_count}")
print(f"Total Class 1 samples: {class_1_count}")

def preprocess_audio(audio_path, max_length=16000):
    try:
        audio, sr = librosa.load(audio_path, sr=None)
        if audio is None or len(audio) == 0:
            return None

        mel_spectrogram = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=128, fmax=8000)
        mel_spectrogram_db = librosa.power_to_db(mel_spectrogram, ref=np.max)
        normalized_spectrogram = (mel_spectrogram_db - np.mean(mel_spectrogram_db)) / np.std(mel_spectrogram_db)

        target_length = max_length
        if normalized_spectrogram.shape[1] > target_length:
            normalized_spectrogram = normalized_spectrogram[:, :target_length]
        else:
            padding = target_length - normalized_spectrogram.shape[1]
            normalized_spectrogram = np.pad(normalized_spectrogram, ((0, 0), (0, padding)), mode="constant")

        spectrogram_tensor = torch.tensor(normalized_spectrogram, dtype=torch.float32).unsqueeze(0)
        return spectrogram_tensor
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")
        return None

class DAPSAudioDataset:
    def __init__(self, class_mapping, transform=None):
        self.class_mapping = list(class_mapping.items())
        self.transform = transform

    def __len__(self):
        return len(self.class_mapping)

    def __getitem__(self, idx):
        audio_path, label = self.class_mapping[idx]
        processed_audio = self.transform(audio_path) if self.transform else None

        if processed_audio is None:
            return None  

        return processed_audio, label

print("Preparing the dataset...")
full_dataset = DAPSAudioDataset(class_mapping=class_mapping, transform=preprocess_audio)
print("Dataset created")

train_ratio = 0.8
val_ratio = 0.1
train_size = int(train_ratio * len(full_dataset))
val_size = int(val_ratio * len(full_dataset))
test_size = len(full_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_size, val_size, test_size])

def collate_fn(batch):
    batch = [item for item in batch if item is not None]
    if len(batch) == 0:
        return None
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)

for batch in train_loader:
    inputs, labels = batch
    print(f"Train Inputs shape: {inputs.shape}")
    print(f"Train Labels: {labels}")
    break

for batch in val_loader:
    inputs, labels = batch
    print(f"Validation Inputs shape: {inputs.shape}")
    print(f"Validation Labels: {labels}")
    break

for batch in test_loader:
    inputs, labels = batch
    print(f"Test Inputs shape: {inputs.shape}")
    print(f"Test Labels: {labels}")
    break


Class mapping created:
Total Class 0 samples: 1050
Total Class 1 samples: 450
Preparing the dataset...
Dataset created
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
one spectro is computed
Train Inputs shape: torch.Size([16, 1, 128, 16000])
Train Labels: tensor([1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1])
Validation Inputs shape: torch.Size([16, 1, 128, 16000])
Validation Labels: tensor([0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0])
Test Inputs shape: torch.Size([16, 1, 128, 16000])
Test Labels: tensor([0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0])


In [4]:
print(len(train_loader))
print(len(val_loader))

75
10


In [77]:
len(test_loader)

10

### The model


In [68]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)  
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        
    
        self.fc1_input_size = self._get_fc1_input_size()
        self.fc1 = nn.Linear(self.fc1_input_size, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)  

    def _get_fc1_input_size(self):
        with torch.no_grad():
            x = torch.zeros(1, 1, 128, 16000)  
            x = self.pool(F.relu(self.conv1(x)))
            x = self.pool(F.relu(self.conv2(x)))
            return x.numel()  

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x



### Train the model

In [69]:
### reduce the train set to few samples for faster training
### only for tests/ reduces the efficiency of the model
from torch.utils.data import DataLoader, Subset


subset_dataset = Subset(train_dataset, list(range(20)))

train_loader = DataLoader(subset_dataset, batch_size=1, shuffle=False)

In [80]:
len(train_loader)

75

In [81]:
# AVOID YOUR COMPUTER TO CRASH
import torch

torch.set_num_threads(4) 

In [83]:
import multiprocessing


print("Nombre de cœurs physiques:", multiprocessing.cpu_count())


logical_cores = torch.get_num_threads()
print("Nombre de cœurs logiques (threads disponibles):", logical_cores)


Nombre de cœurs physiques: 8
Nombre de cœurs logiques (threads disponibles): 4


In [84]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Net().to(device)  
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)  

num_epochs = 10

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}, start training...")
    model.train()  
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)  

        optimizer.zero_grad()

        outputs = model(inputs)

        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()  

    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}")

torch.save(model.state_dict(), "simple_cnn_model.pth")
print("Model saved successfully.")


Epoch 1/10, start training...
Epoch [1/10], Loss: 2.2052
Epoch 2/10, start training...
Epoch [2/10], Loss: 0.0185
Epoch 3/10, start training...
Epoch [3/10], Loss: 0.0014
Epoch 4/10, start training...
Epoch [4/10], Loss: 0.0006
Epoch 5/10, start training...
Epoch [5/10], Loss: 0.0001
Epoch 6/10, start training...
Epoch [6/10], Loss: 0.0001
Epoch 7/10, start training...
Epoch [7/10], Loss: 0.0001
Epoch 8/10, start training...
Epoch [8/10], Loss: 0.0000
Epoch 9/10, start training...
Epoch [9/10], Loss: 0.0000
Epoch 10/10, start training...
Epoch [10/10], Loss: 0.0000
Model saved successfully.


### Evaluate Model

In [85]:
import torch
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

model = Net()

model.load_state_dict(torch.load("simple_cnn_model.pth"))
print("the model is loaded")
model.eval()

all_labels = []
all_predictions = []

occurence = 0
with torch.no_grad():
    length = len(test_loader)
    for inputs, labels in test_loader:
        print(f"start testing occurence {occurence}/{length}")
        occurence += 1
        inputs, labels = inputs.to(device), labels.to(device)

        
        outputs = model(inputs)
        _, predicted = torch.max(
            outputs, 1
        )  

        
        all_labels.extend(labels.cpu().numpy())  
        all_predictions.extend(predicted.cpu().numpy())

f1 = f1_score(all_labels, all_predictions, average="binary")
precision = precision_score(all_labels, all_predictions, average="binary")
recall = recall_score(all_labels, all_predictions, average="binary")
accuracy = accuracy_score(all_labels, all_predictions)

# Print metrics
print(f"F1-Score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"Accuracy: {accuracy:.4f}")

  model.load_state_dict(torch.load("simple_cnn_model.pth"))


the model is loaded
start testing occurence 0/10
start testing occurence 1/10
start testing occurence 2/10
start testing occurence 3/10
start testing occurence 4/10
start testing occurence 5/10
start testing occurence 6/10
start testing occurence 7/10
start testing occurence 8/10
start testing occurence 9/10
F1-Score: 1.0000
Precision: 1.0000
Recall: 1.0000
Accuracy: 1.0000
