In [1]:
import torch
print(torch.cuda.is_available())

True


In [2]:
import random
import torch

def pad_or_trim(mel_spec, max_frames=300):
    """
    mel_spec: [1, n_mels, time]
    max_frames: desired fixed time dimension
    """
    channels, n_mels, time = mel_spec.shape
    if time < max_frames:
        # Pad with zeros at the end
        pad_size = max_frames - time
        pad = torch.zeros((channels, n_mels, pad_size), device=mel_spec.device)
        mel_spec = torch.cat([mel_spec, pad], dim=2)
    elif time > max_frames:
        # Random crop
        start = random.randint(0, time - max_frames)
        mel_spec = mel_spec[:, :, start:start+max_frames]
    return mel_spec


In [3]:
import os
import random
import torch
import torchaudio
from torch.utils.data import Dataset
import soundfile as sf

class TripletSpeakerDataset(Dataset):
    """
    Creates triplets (anchor, positive, negative) for metric learning.
    - anchor, positive: same speaker
    - negative: different speaker
    """
    def __init__(self, data_root, n_mels=40, transform=None):
        """
        data_root: root folder, subfolders named '1','2',... each with .wav chunks
        n_mels: number of mel filter banks
        transform: optional transform on the mel-spectrogram
        """
        self.data_root = data_root
        self.n_mels = n_mels
        self.transform = transform

        # Gather speaker folders
        self.speakers = sorted([
            d for d in os.listdir(data_root)
            if os.path.isdir(os.path.join(data_root, d))
        ])
        # speaker -> list of .wav file paths
        self.speaker_files = {}
        for spk in self.speakers:
            spk_dir = os.path.join(data_root, spk)
            wavs = [os.path.join(spk_dir, f)
                    for f in os.listdir(spk_dir) 
                    if f.lower().endswith('.wav')]
            self.speaker_files[spk] = wavs

        # Flatten all (speaker, file) pairs to create an indexable list
        self.index_list = []
        for spk in self.speakers:
            for wavpath in self.speaker_files[spk]:
                self.index_list.append((spk, wavpath))
        self.mel_transform = torchaudio.transforms.MelSpectrogram(
            sample_rate=16000,  # adjust if needed
            n_mels=self.n_mels
        )
        self.to_db = torchaudio.transforms.AmplitudeToDB()

    def __len__(self):
        return len(self.index_list)

    def __getitem__(self, idx):
        # 1) Get anchor info
        anchor_spk, anchor_path = self.index_list[idx]

        # 2) Randomly pick a positive sample from the same speaker
        positive_path = random.choice(self.speaker_files[anchor_spk])
        while positive_path == anchor_path and len(self.speaker_files[anchor_spk]) > 1:
            # ensure we pick a different file if possible
            positive_path = random.choice(self.speaker_files[anchor_spk])

        # 3) Pick a negative speaker
        neg_spk = random.choice(self.speakers)
        while neg_spk == anchor_spk and len(self.speakers) > 1:
            neg_spk = random.choice(self.speakers)
        negative_path = random.choice(self.speaker_files[neg_spk])

        # Load & transform the audio for anchor, positive, negative
        anchor_mel = self._wav_to_mel(anchor_path)
        positive_mel = self._wav_to_mel(positive_path)
        negative_mel = self._wav_to_mel(negative_path)

        return anchor_mel, positive_mel, negative_mel

    def _wav_to_mel(self, file_path):
        audio_data, sr = sf.read(file_path)
        audio_tensor = torch.from_numpy(audio_data).float()
        if audio_tensor.dim() == 1:
            audio_tensor = audio_tensor.unsqueeze(0)  # [1, samples]
        if sr != 16000:
            audio_tensor = torchaudio.functional.resample(audio_tensor, sr, 16000)
            sr = 16000

        # Mel spectrogram
        mel_spec = self.mel_transform(audio_tensor)
        mel_spec = self.to_db(mel_spec)
        mel_spec = pad_or_trim(mel_spec, max_frames=300)
        # shape: [channel=1, n_mels, time]
        if self.transform:
            mel_spec = self.transform(mel_spec)

        return mel_spec


In [4]:
import torch.nn as nn
import torch.nn.functional as F

class SpeakerEmbeddingNet(nn.Module):
    def __init__(self, embed_dim=128):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(2)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)

        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))

        self.fc = nn.Linear(64, embed_dim) 

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.global_pool(x) 
        x = x.view(x.size(0), -1)  # [B, 64]
        x = self.fc(x)            # [B, embed_dim]
        x = F.normalize(x, p=2, dim=1)
        return x


In [5]:
import torch
import torch.nn.functional as F

def triplet_loss(anchor_emb, positive_emb, negative_emb, margin=1.0):
    """
    Computes the triplet loss using cosine similarity.
    anchor_emb, positive_emb, negative_emb: [B, embed_dim]
    margin: Margin for the triplet loss.
    """
    # Cosine similarity
    cos_sim_pos = F.cosine_similarity(anchor_emb, positive_emb)  # [B]
    cos_sim_neg = F.cosine_similarity(anchor_emb, negative_emb)  # [B]

    # Convert cosine similarity to distance
    dist_pos = 1 - cos_sim_pos  # [B]
    dist_neg = 1 - cos_sim_neg  # [B]

    # Compute the triplet loss
    losses = torch.relu(dist_pos - dist_neg + margin)  # [B]
    return losses.mean()


In [8]:
from torch import optim
def train_siamese(train_loader, val_loader, embed_dim=128, epochs=10, lr=1e-3, margin=1.0):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = SpeakerEmbeddingNet(embed_dim=embed_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        total_loss, total_batches = 0.0, 0
        for anchor_mel, pos_mel, neg_mel in train_loader:
            anchor_mel = anchor_mel.to(device)
            pos_mel = pos_mel.to(device)
            neg_mel = neg_mel.to(device)

            optimizer.zero_grad()
            anchor_emb = model(anchor_mel)
            pos_emb = model(pos_mel)
            neg_emb = model(neg_mel)

            loss = triplet_loss(anchor_emb, pos_emb, neg_emb, margin=margin)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            total_batches += 1

        avg_train_loss = total_loss / total_batches

        # Evaluate on val_loader
        model.eval()
        val_loss, val_batches = 0.0, 0
        with torch.no_grad():
            for anchor_mel, pos_mel, neg_mel in val_loader:
                anchor_mel = anchor_mel.to(device)
                pos_mel = pos_mel.to(device)
                neg_mel = neg_mel.to(device)

                anchor_emb = model(anchor_mel)
                pos_emb = model(pos_mel)
                neg_emb = model(neg_mel)

                loss = triplet_loss(anchor_emb, pos_emb, neg_emb, margin=margin)
                val_loss += loss.item()
                val_batches += 1

        avg_val_loss = val_loss / val_batches if val_batches > 0 else 0
        print(f"Epoch [{epoch+1}/{epochs}] Train Loss: {avg_train_loss:.4f} Val Loss: {avg_val_loss:.4f}")

    return model

In [9]:
from torch.utils.data import random_split, DataLoader
import torchaudio
torchaudio.set_audio_backend("soundfile")      # Use "soundfile" backend if needed on Windows

data_root = r"C:\Users\VICTUS\Voice\A Dataset for Voice-Based Human Identity Recognition\output"
full_dataset = TripletSpeakerDataset(data_root)

train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=8, shuffle=False)

model = train_siamese(train_loader, val_loader, embed_dim=128, epochs=10, lr=1e-3, margin=1.0)
torch.save(model.state_dict(), "siamese_speaker_model.pth")


  torchaudio.set_audio_backend("soundfile")      # Use "soundfile" backend if needed on Windows


Epoch [1/10] Train Loss: 0.7589 Val Loss: 0.6072
Epoch [2/10] Train Loss: 0.5776 Val Loss: 0.6352
Epoch [3/10] Train Loss: 0.5115 Val Loss: 0.4986
Epoch [4/10] Train Loss: 0.4650 Val Loss: 0.4889
Epoch [5/10] Train Loss: 0.4571 Val Loss: 0.4589
Epoch [6/10] Train Loss: 0.4467 Val Loss: 0.4499
Epoch [7/10] Train Loss: 0.4393 Val Loss: 0.4915
Epoch [8/10] Train Loss: 0.4203 Val Loss: 0.4454
Epoch [9/10] Train Loss: 0.4212 Val Loss: 0.4796
Epoch [10/10] Train Loss: 0.4385 Val Loss: 0.4433


In [10]:
import numpy as np

def enroll_speaker(model, file_paths, device='cpu'):
    """
    file_paths: list of .wav files for this speaker's enrollment
    returns: an average embedding (torch.Tensor) to represent the speaker
    """
    model.eval()
    embeddings = []
    for fp in file_paths:
        mel_spec = load_mel_spec(fp)  # same transform used in dataset
        mel_spec = mel_spec.unsqueeze(0).to(device)  # [1, 1, n_mels, time]
        with torch.no_grad():
            emb = model(mel_spec)
        embeddings.append(emb.cpu().numpy())

    # average
    avg_emb = np.mean(embeddings, axis=0)  # shape [1, embed_dim]
    avg_emb = torch.from_numpy(avg_emb).float()
    avg_emb = torch.nn.functional.normalize(avg_emb, p=2, dim=1)
    return avg_emb

def load_mel_spec(file_path, sr=16000, n_mels=40):
    """
    Quick utility to load a .wav and return a Mel-spectrogram (1, n_mels, time).
    Must match the approach used in your dataset's _wav_to_mel or transforms.
    """
    audio_data, orig_sr = sf.read(file_path)
    audio_tensor = torch.from_numpy(audio_data).float().unsqueeze(0)
    if orig_sr != sr:
        audio_tensor = torchaudio.functional.resample(audio_tensor, orig_sr, sr)

    mel_transform = torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_mels=n_mels)
    to_db = torchaudio.transforms.AmplitudeToDB()
    mel_spec = to_db(mel_transform(audio_tensor))
    return mel_spec

In [11]:
def verify_speaker(model, enrolled_embedding, test_wav, threshold=0.5, device='cpu'):
    """
    Compare test_wav embedding to the enrolled_embedding (already stored).
    Return 'ACCEPT' if distance < threshold, else 'REJECT'.

    threshold is a distance threshold in embedding space.
    If you L2-normalize embeddings, typical threshold might be around 0.5~1.0
    (Tune on a dev set).
    """
    model.eval()
    mel_spec = load_mel_spec(test_wav).to(device)
    mel_spec = mel_spec.unsqueeze(0)  # [1, 1, n_mels, time]
    with torch.no_grad():
        test_emb = model(mel_spec)     # [1, embed_dim]
    # L2-normalize
    test_emb = F.normalize(test_emb, p=2, dim=1)

    # distance
    dist = torch.norm(test_emb - enrolled_embedding.to(device), p=2).item()
    print(f"Distance to enrolled embedding: {dist:.3f}")
    if dist < threshold:
        return "ACCEPT"
    else:
        return "REJECT"

In [12]:
import torch
import torch.nn.functional as F
import random
import os
import soundfile as sf
import torchaudio
from torch.utils.data import Dataset, DataLoader

##################################
# 1. Create a Pairwise Test Dataset
##################################
class VerificationDataset(Dataset):
    """
    Yields (mel1, mel2, label), where label=1 if same speaker, 0 otherwise.
    We assume we have a root folder with subfolders for each speaker.
    We'll sample pairs from the same speaker for label=1, and from different speakers for label=0.
    """
    def __init__(self, data_root, n_mels=40, num_pairs=2000):
        super().__init__()
        self.data_root = data_root
        self.n_mels = n_mels
        self.speakers = sorted([
            d for d in os.listdir(data_root)
            if os.path.isdir(os.path.join(data_root, d))
        ])
        self.speaker_files = {}
        for spk in self.speakers:
            spk_dir = os.path.join(data_root, spk)
            wavs = [
                os.path.join(spk_dir, f)
                for f in os.listdir(spk_dir)
                if f.lower().endswith('.wav')
            ]
            self.speaker_files[spk] = wavs

        self.mel_transform = torchaudio.transforms.MelSpectrogram(
            sample_rate=16000,  # adjust to your sr
            n_mels=self.n_mels
        )
        self.to_db = torchaudio.transforms.AmplitudeToDB()

        self.pairs = []
        
        half_pairs = num_pairs // 2     # half same-speaker, half different

        # same-speaker pairs
        for _ in range(half_pairs):
            spk = random.choice(self.speakers)
            if len(self.speaker_files[spk]) < 2:
                continue
            path1 = random.choice(self.speaker_files[spk])
            path2 = random.choice(self.speaker_files[spk])
            self.pairs.append((path1, path2, 1))

        # different-speaker pairs
        for _ in range(half_pairs):
            spk1, spk2 = random.sample(self.speakers, 2)
            path1 = random.choice(self.speaker_files[spk1])
            path2 = random.choice(self.speaker_files[spk2])
            self.pairs.append((path1, path2, 0))

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        path1, path2, label = self.pairs[idx]
        mel1 = self._wav_to_mel(path1)
        mel2 = self._wav_to_mel(path2)
        return mel1, mel2, label

    def _wav_to_mel(self, file_path):
        audio_data, sr = sf.read(file_path)
        audio_tensor = torch.from_numpy(audio_data).float().unsqueeze(0)
        if sr != 16000:
            audio_tensor = torchaudio.functional.resample(audio_tensor, sr, 16000)

        mel_spec = self.mel_transform(audio_tensor)
        mel_spec = self.to_db(mel_spec)
        mel_spec = pad_or_trim(mel_spec, max_frames=300)
        return mel_spec

##################################
# 2. Evaluate the Model's Verification Accuracy
##################################
def evaluate_verification(model, loader, threshold=0.8, device='cpu'):
    """
    model: your Siamese or embedding network returning [batch, embed_dim].
    loader: yields pairs of mel-spectrogram + label.
    threshold: distance threshold below which we predict 'same speaker'.
    Returns: (accuracy, total_samples)
    """
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for mel1, mel2, label in loader:
            mel1 = mel1.to(device)
            mel2 = mel2.to(device)

            emb1 = model(mel1)  # [B, embed_dim]
            emb2 = model(mel2)  # [B, embed_dim]

            # L2 normalize if your model doesn't already
            emb1 = F.normalize(emb1, p=2, dim=1)
            emb2 = F.normalize(emb2, p=2, dim=1)

            # Euclidean distance
            dist = torch.norm(emb1 - emb2, p=2, dim=1)

            # Predict same-speaker if dist < threshold
            pred_same = (dist < threshold).long()
            label = label.to(device).long()

            correct += (pred_same == label).sum().item()
            total += label.size(0)

    accuracy = correct / total if total > 0 else 0
    return accuracy, total


##################################
# 3. Usage Example (After Training)
##################################
if __name__ == "__main__":
    import torch

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model = SpeakerEmbeddingNet(embed_dim=128).to(device)
    model.load_state_dict(torch.load("siamese_speaker_model.pth", map_location=device))

    data_root = r"C:\Users\VICTUS\Voice\A Dataset for Voice-Based Human Identity Recognition\output"
    verification_dataset = VerificationDataset(data_root, n_mels=40, num_pairs=1000)
    verif_loader = DataLoader(verification_dataset, batch_size=16, shuffle=False)

    threshold = 0.8 
    acc, total = evaluate_verification(model, verif_loader, threshold=threshold, device=device)
    print(f"Verification Accuracy: {acc*100:.2f}% over {total} pairs")


  model.load_state_dict(torch.load("siamese_speaker_model.pth", map_location=device))


Verification Accuracy: 80.10% over 1000 pairs
