In [1]:
print("Hello")

Hello


In [2]:
import os
import torch
from datasets import load_dataset
from datasets import load_from_disk
from transformers import pipeline

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from datasets import load_dataset
from datasets import Audio

# Load dataset normally
dataset = load_dataset("gilkeyio/AudioMNIST", streaming=False)

# Disable decoding for the audio column
dataset = dataset.cast_column("audio", Audio(decode=False))

# Now you can safely access speaker_id without decoding audio
sample = dataset['train'][0]
print("Speaker ID:", sample['speaker_id'])

# Audio file path is still accessible
print("Audio path:", sample['audio']['path'])


Speaker ID: 59
Audio path: 7_59_29.wav


In [4]:
from datasets import Audio

# Ensure audio gets decoded
dataset = dataset.cast_column("audio", Audio(decode=True))

# Now you can access the array directly
sample = dataset['train'][0]
signal = sample['audio']['array']      # NumPy array
fs = sample['audio']['sampling_rate']  # int



In [5]:
print(dataset)

DatasetDict({
    train: Dataset({
        features: ['speaker_id', 'audio', 'digit', 'gender', 'accent', 'age', 'native_speaker', 'origin'],
        num_rows: 24000
    })
    test: Dataset({
        features: ['speaker_id', 'audio', 'digit', 'gender', 'accent', 'age', 'native_speaker', 'origin'],
        num_rows: 6000
    })
})


In [None]:
from speechbrain.pretrained import EncoderClassifier
import torchaudio

encoder = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb", 
    savedir="pretrained_models/spkrec-ecapa-voxceleb"
)

sample = dataset['train'][0]
signal = torch.tensor(sample['audio']['array'], dtype=torch.float32).unsqueeze(0)  # shape: (1, num_samples)
fs = sample['audio']['sampling_rate']

# The model expects audio at 16kHz, so resample if necessary
if fs != 16000:
    resampler = torchaudio.transforms.Resample(orig_freq=fs, new_freq=16000)
    signal = resampler(signal)

# 3. Get the embedding
with torch.no_grad():
    embedding = encoder.encode_batch(signal) # Shape will be (1, 1, 192)

# Squeeze to get the final vector
embedding = embedding.squeeze() # Shape is now (192)

print("Successfully extracted embedding!")
print("Shape of the voiceprint:", embedding.shape)

  from speechbrain.pretrained import EncoderClassifier


Successfully extracted embedding!
Shape of the voiceprint: torch.Size([192])


In [None]:
import torch
import torchaudio
from datasets import load_dataset, DatasetDict
import numpy as np
from speechbrain.pretrained import EncoderClassifier

# 1. Load the dataset normally
full_dataset = load_dataset("gilkeyio/AudioMNIST")

small_tr_dataset = full_dataset['train'].select(range(1000))
small_te_dataset = full_dataset['test'].select(range(200))

dataset = DatasetDict({
    'train': small_tr_dataset,
    'test': small_te_dataset
})

print("Using smaller dataset for pipeline test")

# 2. Load the pre-trained encoder
encoder = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    savedir="pretrained_models/spkrec-ecapa-voxceleb"
)

# 3. Define the CORRECTED extraction function
def extract_embedding(batch):
    """
    Function to extract embeddings from a batch of DECODED audio data.
    `batch['audio']` is a list of audio dictionaries.
    """
    embeddings = []
    # `batch['audio']` is a LIST where each element is a dictionary.
    # We need to iterate through this list.
    for audio_data in batch['audio']:
        # Add a check for potentially corrupt data points
        if audio_data is None or audio_data['array'] is None:
            continue

        signal_array = audio_data['array']
        fs = audio_data['sampling_rate']

        # Convert numpy array to torch tensor
        signal = torch.tensor(signal_array, dtype=torch.float32).unsqueeze(0)

        # Resample if necessary
        if fs != 16000:
            resampler = torchaudio.transforms.Resample(orig_freq=fs, new_freq=16000)
            signal = resampler(signal)

        # Extract embedding
        with torch.no_grad():
            embedding = encoder.encode_batch(signal)
            embedding = embedding.squeeze()
        embeddings.append(embedding.cpu().numpy())

    return {"embedding": embeddings}


# 4. Use map to apply the function to the entire dataset
# This structure is now correct for the batching logic.
dataset_with_embeddings = dataset.map(
    extract_embedding,
    batched=True,
    batch_size=8,
    remove_columns=["audio"] # We no longer need the raw audio, this saves memory
)

# Now your dataset has a new column 'embedding'
print("Successfully extracted embeddings!")
print(dataset_with_embeddings['train'][0])

Using smaller dataset for pipeline test


Map: 100%|██████████| 1000/1000 [03:04<00:00,  5.42 examples/s]
Map: 100%|██████████| 200/200 [00:38<00:00,  5.17 examples/s]

Successfully extracted embeddings!
{'speaker_id': '59', 'digit': 7, 'gender': 1, 'accent': 'German', 'age': 31, 'native_speaker': False, 'origin': 'Europe, Germany, Berlin', 'embedding': [-2.6190550327301025, 8.382065773010254, -27.288331985473633, 3.5866665840148926, 15.723471641540527, -32.97261047363281, 17.482685089111328, 27.093141555786133, 3.7568836212158203, -21.41496467590332, -5.328612327575684, 34.56608581542969, 2.608301877975464, -43.041954040527344, -3.2688894271850586, -2.453723669052124, 12.625601768493652, -21.025436401367188, 22.610679626464844, 1.498557686805725, -34.95357131958008, -3.0614869594573975, 25.82297134399414, 11.781750679016113, 6.656280517578125, 31.23275375366211, -14.543136596679688, 20.62252426147461, 42.4605712890625, -12.683794975280762, 14.99742317199707, 2.4680941104888916, -22.08287811279297, 15.312729835510254, -3.7390244007110596, 10.76298713684082, 11.025298118591309, 7.638171195983887, -10.915263175964355, -4.514418601989746, -2.119663476943




In [14]:
save_path = "./audio_mnist_with_embeddings"
dataset_with_embeddings.save_to_disk(save_path)

Saving the dataset (1/1 shards): 100%|██████████| 1000/1000 [00:00<00:00, 80430.77 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 200/200 [00:00<00:00, 22287.01 examples/s]


In [None]:
from datasets import load_from_disk

load_path = "./audio_mnist_with_embeddings"
processed_dataset = load_from_disk(load_path)

print("Successfully loaded pre-processed dataset from disk!")
print(processed_dataset)
print("\nSample from the loaded dataset:")
print(processed_dataset['train'][0])


Successfully loaded pre-processed dataset from disk!
DatasetDict({
    train: Dataset({
        features: ['speaker_id', 'digit', 'gender', 'accent', 'age', 'native_speaker', 'origin', 'embedding'],
        num_rows: 1000
    })
    test: Dataset({
        features: ['speaker_id', 'digit', 'gender', 'accent', 'age', 'native_speaker', 'origin', 'embedding'],
        num_rows: 200
    })
})

Sample from the loaded dataset:
{'speaker_id': '59', 'digit': 7, 'gender': 1, 'accent': 'German', 'age': 31, 'native_speaker': False, 'origin': 'Europe, Germany, Berlin', 'embedding': [-2.6190550327301025, 8.382065773010254, -27.288331985473633, 3.5866665840148926, 15.723471641540527, -32.97261047363281, 17.482685089111328, 27.093141555786133, 3.7568836212158203, -21.41496467590332, -5.328612327575684, 34.56608581542969, 2.608301877975464, -43.041954040527344, -3.2688894271850586, -2.453723669052124, 12.625601768493652, -21.025436401367188, 22.610679626464844, 1.498557686805725, -34.95357131958008, -

In [None]:
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder


label_encoder = LabelEncoder()


all_speaker_ids = np.concatenate([
    dataset_with_embeddings['train']['speaker_id'], 
    dataset_with_embeddings['test']['speaker_id']
])

label_encoder.fit(all_speaker_ids)

train_labels = label_encoder.transform(dataset_with_embeddings['train']['speaker_id'])
test_labels = label_encoder.transform(dataset_with_embeddings['test']['speaker_id'])


train_embeddings = torch.tensor(np.array(dataset_with_embeddings['train']['embedding']), dtype=torch.float32)
train_labels_tensor = torch.tensor(train_labels, dtype=torch.long)
train_dataset = TensorDataset(train_embeddings, train_labels_tensor)

test_embeddings = torch.tensor(np.array(dataset_with_embeddings['test']['embedding']), dtype=torch.float32)
test_labels_tensor = torch.tensor(test_labels, dtype=torch.long)
test_dataset = TensorDataset(test_embeddings, test_labels_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print("DataLoaders created successfully!")
print(f"Number of unique speakers found: {len(label_encoder.classes_)}")

DataLoaders created successfully!
Number of unique speakers found: 3


In [18]:
import torch.nn as nn

class SpeakerClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(SpeakerClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

input_dim = 192 # From the ECAPA model
num_speakers = len(label_encoder.classes_)
model = SpeakerClassifier(input_dim, num_speakers)

In [21]:
print("num_speakers is ", num_speakers)

num_speakers is  3


In [19]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    for embeddings, labels in train_loader:
        # Forward pass
        outputs = model(embeddings)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

print("Training finished!")

Epoch [1/10], Loss: 0.0000
Epoch [2/10], Loss: 0.0000
Epoch [3/10], Loss: 0.0000
Epoch [4/10], Loss: 0.0000
Epoch [5/10], Loss: 0.0000
Epoch [6/10], Loss: 0.0000
Epoch [7/10], Loss: 0.0000
Epoch [8/10], Loss: 0.0000
Epoch [9/10], Loss: 0.0000
Epoch [10/10], Loss: 0.0000
Training finished!


In [20]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for embeddings, labels in test_loader:
        outputs = model(embeddings)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Accuracy of the model on the test embeddings: {100 * correct / total} %')

Accuracy of the model on the test embeddings: 0.0 %
