In [None]:
import torch, time
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from google.colab import drive
import pandas as pd
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import os, time
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score

In [None]:
drive.mount('/content/drive')

In [None]:
start = time.time()

raw_data_path = "/content/drive/MyDrive/Colab Notebooks/audio_triplet_dataset/spectogrammes"
actors = sorted(os.listdir(raw_data_path))

# --- Split actors en Train / Test ---
train_actors, test_actors = train_test_split(
    actors, test_size=0.2, random_state=42
)

print("Train actors:", len(train_actors))
print("Test actors :", len(test_actors))

def load_images(actor_list):
    images_by_actor = {}
    for actor in actor_list:
        actor_dir = os.path.join(raw_data_path, actor)
        images = [
            os.path.join(actor_dir, f) 
            for f in os.listdir(actor_dir) 
            if f.endswith(".png")
        ]
        if len(images) > 1:
            images_by_actor[actor] = images
    return images_by_actor


train_images_by_actor = load_images(train_actors)
test_images_by_actor = load_images(test_actors)


def generate_triplets(images_by_actor, all_actors):
    triplets = []
    for actor, images in images_by_actor.items():
        other_actors = [a for a in all_actors if a != actor]
        max_pairs = min(75, len(images) * len(images))
        for _ in range(max_pairs):
            anchor, positive = np.random.choice(images, 2, replace=False)
            negative_actor = np.random.choice(other_actors)
            negative = np.random.choice(images_by_actor[negative_actor])
            triplets.append((anchor, positive, negative))
    
    return triplets


print("\nGenerating TRAIN triplets...")
train_triplets = generate_triplets(train_images_by_actor, train_actors)

print("Generating TEST triplets...")
test_triplets = generate_triplets(test_images_by_actor, test_actors)

df_train = pd.DataFrame(train_triplets, columns=["anchor", "positive", "negative"])
df_train.to_csv("/content/drive/MyDrive/Colab Notebooks/audio_triplet_dataset/train_triplet/train_triplets.csv", index=False)

df_test = pd.DataFrame(test_triplets, columns=["anchor", "positive", "negative"])
df_test.to_csv("/content/drive/MyDrive/Colab Notebooks/audio_triplet_dataset/test_triplet/test_triplets.csv", index=False)


end = time.time()
print(f"\nTime taken: {end - start:.2f} seconds")


In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = nn.Conv2d( in_channels, out_channels, kernel_size=3, padding=1)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(2)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.pool(x)
        return x
class SpeakerEmbeddingCNN(nn.Module):
    def __init__(self, embedding_dim=128):
        super().__init__()
        # --- Feature extractor (4 blocks CNN)
        self.features = nn.Sequential(
            ConvBlock(3, 32),   
            ConvBlock(32, 64),
            ConvBlock(64, 128), 
            ConvBlock(128, 256)
        )
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # Output: 256 × 1 × 1
        self.embedding_fc = nn.Sequential(
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, embedding_dim)
        )
        self.embedding_dim = embedding_dim

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        # Fully connected embedding
        x = self.embedding_fc(x)
        # 4) Normalize embeddings for triplet loss
        x = F.normalize(x, p=2, dim=1)
        return x

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# training dataset preparation
transform = T.Compose([T.Resize((224, 224)), T.ToTensor(), T.Normalize(mean=[0.5], std=[0.5])])
train_dataset = TripletDataset(csv_file="/content/drive/MyDrive/Colab Notebooks/audio_triplet_dataset/train_triplet/train_triplets.csv",transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)

# model preparation
model = SpeakerEmbeddingCNN(embedding_dim=128).to(device)
criterion = nn.TripletMarginLoss(margin= 0.2)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
start = time.time()
for epoch in range(10):
    start_epoch = time.time()  
    print("start training...")
    for anchor, positive, negative in train_loader:
        print("batch...")
        start_batch = time.time()
        anchor = anchor.to(device)
        positive = positive.to(device)
        negative = negative.to(device)

        anchor_emb = model(anchor)
        positive_emb = model(positive)
        negative_emb = model(negative)

        loss = criterion(anchor_emb, positive_emb, negative_emb)

        # 3) Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        end_batch = time.time()
        print(f"Batch time: {end_batch - start_batch:.2f} seconds")
    print(f"Epoch {epoch} — Loss={loss.item():.4f}")
    end_epoch = time.time()
    print(f"Epoch time: {end_epoch - start_epoch:.2f} seconds")

end = time.time()
print(f"\nTraining time: {end - start:.2f} seconds")


In [None]:
torch.save(model.state_dict(), "/content/saved_models/speaker_model.pth")
model_path = "/content/saved_models/speaker_model.pth"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SpeakerEmbeddingCNN(embedding_dim=64).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()

In [None]:
test_dataset = TripletDataset(csv_file="/content/drive/MyDrive/Colab Notebooks/audio_triplet_dataset/test_triplet/test_triplets.csv",transform=transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
all_distances = []
all_labels = []  # 1 si même speaker, 0 si différent
with torch.no_grad():
    print("start testing...")
    for anchor, positive, negative in test_loader:
        print("test batch...")
        anchor = anchor.to(device)
        positive = positive.to(device)
        negative = negative.to(device)

        anchor_emb = model(anchor)
        positive_emb = model(positive)
        negative_emb = model(negative)
        # Cosine distance
        pos_dist = F.cosine_similarity(anchor_emb, positive_emb).cpu().item()
        neg_dist = F.cosine_similarity(anchor_emb, negative_emb).cpu().item()

        all_distances.append((pos_dist, neg_dist))
        all_labels.append((1, 0))  # 1 pour positive, 0 pour negative