# Explore SpeechBrain: Speaker Recognition

This notebook will explore speaker recognition using the Python library [SpeechBrain](https://speechbrain.github.io/index.html). It can be installed with `pip install speechbrain`. To run the notebook for the first time, it must be opened with administrator rights.

In the first part, a custom model for speaker recognition will be build from scratch. In the second part, a pretrained speaker recognition model will be fine-tuned for the example data set. The data files are snippets from speeches by US presidents Biden, Obama, and Trump (from [this](https://www.englishspeecheschannel.com/english-speeches/) website) and stored at `/speaker_id_model`.

In [1]:
""" Explore speechbrain speaker recognition """
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import speechbrain as sb
import torch
import torchaudio
from hyperpyyaml.core import load_hyperpyyaml
from IPython.display import Audio, display
from speechbrain.dataio.dataset import DynamicItemDataset
from speechbrain.dataio.encoder import CategoricalEncoder
from speechbrain.dataio.dataloader import SaveableDataLoader
from speechbrain.dataio.batch import PaddedBatch
from speechbrain.lobes.features import MFCC, Fbank
from speechbrain.nnet.losses import nll_loss

The torchaudio backend is switched to 'soundfile'. Note that 'sox_io' is not supported on Windows.
The torchaudio backend is switched to 'soundfile'. Note that 'sox_io' is not supported on Windows.


In [2]:
SAVE_DIR = "speaker_id_model/"
DATA_DIRS = ["training", "validation", "test"]

To fit SpeechBrain models to data, it is best to create a data loader pipeline. This pipeline requires a data annotation file (in `.json` or `.csv` format) that contains the metadata for the audio files. In this case, the metadata fields for the file path, speaker name, and signal length are specified.

In [3]:
def create_data_annotation_file(data_dir, out_file, force=False):
    """ Creates a data annotation file in .json format with three fiels:
            "file_path": Path to the sound file
            "spk_id": Name of the speaker
            "length": Length of the sound signal (frames)
    """
    if not os.path.isfile(out_file) or force:
        with os.scandir(data_dir) as sc:
            filenames = [
                filename.name for filename in sc if filename.is_file()]

        annotation_dict = {}

        for i, filename in enumerate(filenames):
            new_id = str(i)
            new_path = data_dir + filename
            new_spk = filename.split("_")[0]
            new_signal, _ = torchaudio.load(new_path)
            new_length = new_signal.shape[1]
            new_dict = {
                "file_path": new_path,
                "spk_id": new_spk,
                "length": new_length
            }
            annotation_dict[new_id] = new_dict

        with open(out_file, "w") as file:
            file.write(json.dumps(annotation_dict))

        print(f"Created data annotation file at {out_file}")

In [4]:
def create_data_annotation_files(save_dir, data_dir_names, force=False):
    """ Creates data annotation files for multiple directories """
    for data_dir in data_dir_names:
        create_data_annotation_file(
            save_dir + data_dir + "/", f"{save_dir}/{data_dir}.json", force=force)

In [5]:
create_data_annotation_files(SAVE_DIR, DATA_DIRS)

The data loader function reads the metadata from the annotation file and combines them with the audio signal, returning a readable dataset object.

In [6]:
def prepare_dataio(filename, training=False):
    """ Prepares data sets for the Brain class.
        Encodes speaker names and saves the encoding.
    """
    spk_id_encoder = CategoricalEncoder()

    @sb.utils.data_pipeline.takes("file_path")
    @sb.utils.data_pipeline.provides("sig")
    def audio_pipeline(file_path):
        sig = sb.dataio.dataio.read_audio(file_path)
        return sig

    @sb.utils.data_pipeline.takes("spk_id")
    @sb.utils.data_pipeline.provides("spk_id", "spk_id_encoded")
    def label_pipeline(spk_id):
        yield spk_id
        spk_id_encoded = torch.LongTensor(
            [spk_id_encoder.encode_label(spk_id)])
        yield spk_id_encoded

    dataset = sb.dataio.dataset.DynamicItemDataset.from_json(
        json_path=filename,
        dynamic_items=[audio_pipeline, label_pipeline],
        output_keys=["id", "sig", "spk_id_encoded"],
    )

    if training:
        spk_id_encoder.update_from_didataset(dataset, output_key="spk_id")
        spk_id_encoder.save("spk_id_encoder.txt")
    else:
        spk_id_encoder.load("spk_id_encoder.txt")

    return dataset

In [7]:
def prepare_datasets(save_dir, data_dirs):
    """ Prepares dataset for multiple directories """
    datasets = {}
    for data_dir in data_dirs:
        datasets[data_dir] = prepare_dataio(
            f"{save_dir}/{data_dir}.json", data_dir == "training")

    return datasets

A new model class is created from [this](https://github.com/speechbrain/speechbrain/blob/develop/templates/speaker_id/train.py) template that will perform the speaker recognition. Several methods are added for the new class. The model has three stages:

1. Computing features from audio signal
2. Calculating embeddings from the features
3. Classify speaker based on embeddings

It uses the negative log-likelihood loss function for learning. The specification of the model is stored in the `hyperparams.yaml` file.

In [8]:
class SpkIdBrain(sb.Brain):
    """ New speaker recognition class that inherits from Brain base class.
        Requires at least compute_forward() and compute_objective() methods.
    """

    def compute_forward(self, batch, stage):
        """Runs all the computation of that transforms the input into the
        output probabilities over the N classes.
        Arguments
        ---------
        batch : PaddedBatch
            This batch object contains all the relevant tensors for computation.
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, or sb.Stage.TEST.
        Returns
        -------
        predictions : Tensor
            Tensor that contains the posterior probabilities over the N classes.
        """

        # We first move the batch to the appropriate device.
        batch = batch.to(self.device)

        # Compute features, embeddings, and predictions
        feats, lens = self.prepare_features(batch.sig, stage)
        embeddings = self.modules.embedding_model(feats, lens)
        predictions = self.modules.classifier(embeddings)

        return predictions

    def prepare_features(self, wavs, stage):
        """ Prepare the features for computation, including augmentation.
        Arguments
        ---------
        wavs : tuple
            Input signals (tensor) and their relative lengths (tensor).
        stage : sb.Stage
            The current stage of training.
        """
        wavs, lens = wavs

        # Feature extraction and normalization
        feats = self.modules.compute_features(wavs)
        feats = self.modules.mean_var_norm(feats, lens)

        return feats, lens

    def compute_objectives(self, predictions, batch, stage):
        """ Computes the loss given the predicted and targeted outputs.
        Arguments
        ---------
        predictions : tensor
            The output tensor from `compute_forward`.
        batch : PaddedBatch
            This batch object contains all the relevant tensors for computation.
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, or sb.Stage.TEST.
        Returns
        -------
        loss : torch.Tensor
            A one-element tensor used for backpropagating the gradient.
        """

        _, lens = batch.sig
        spkid, _ = batch.spk_id_encoded

        # Concatenate labels (due to data augmentation)
        if stage == sb.Stage.TRAIN and hasattr(self.modules, "env_corrupt"):
            spkid = torch.cat([spkid, spkid], dim=0)
            lens = torch.cat([lens, lens])

        # Compute the cost function
        loss = sb.nnet.losses.nll_loss(predictions, spkid, lens)

        # Append this batch of losses to the loss metric for easy
        self.loss_metric.append(
            batch.id, predictions, spkid, lens, reduction="batch"
        )

        # Compute classification error at test time
        if stage != sb.Stage.TRAIN:
            self.error_metrics.append(batch.id, predictions, spkid, lens)

        return loss

    def on_stage_start(self, stage, epoch=None):
        """ Gets called at the beginning of each epoch.
        Arguments
        ---------
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, or sb.Stage.TEST.
        epoch : int
            The currently-starting epoch. This is passed
            `None` during the test stage.
        """

        # Set up statistics trackers for this stage
        self.loss_metric = sb.utils.metric_stats.MetricStats(
            metric=sb.nnet.losses.nll_loss
        )

        # Set up evaluation-only statistics trackers
        if stage != sb.Stage.TRAIN:
            self.error_metrics = self.hparams.error_stats()

    def on_stage_end(self, stage, stage_loss, epoch=None):
        """ Gets called at the end of an epoch.
        Arguments
        ---------
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, sb.Stage.TEST
        stage_loss : float
            The average loss for all of the data processed in this stage.
        epoch : int
            The currently-starting epoch. This is passed
            `None` during the test stage.
        """

        # Store the train loss until the validation stage.
        if stage == sb.Stage.TRAIN:
            self.train_loss = stage_loss

        # Summarize the statistics from the stage for record-keeping.
        else:
            stats = {
                "loss": stage_loss,
                "error": self.error_metrics.summarize("average"),
            }

        # At the end of validation...
        if stage == sb.Stage.VALID:

            old_lr, new_lr = self.hparams.lr_annealing(epoch)
            sb.nnet.schedulers.update_learning_rate(self.optimizer, new_lr)

            # The train_logger writes a summary to stdout and to the logfile.
            self.hparams.train_logger.log_stats(
                {"Epoch": epoch, "lr": old_lr},
                train_stats={"loss": self.train_loss},
                valid_stats=stats,
            )

            # Save the current checkpoint and delete previous checkpoints,
            # self.checkpointer.save_and_keep_only(meta=stats, min_keys=["error"])

        # We also write statistics about test data to stdout and to the logfile.
        if stage == sb.Stage.TEST:
            self.hparams.train_logger.log_stats(
                {"Epoch loaded": self.hparams.epoch_counter.current},
                test_stats=stats,
            )

In [9]:
datasets = prepare_datasets(SAVE_DIR, DATA_DIRS)

In [13]:
with open(f"{SAVE_DIR}/hyperparams.yaml") as file:
    hparams = load_hyperpyyaml(file)

In [14]:
spk_id_brain = SpkIdBrain(
    modules=hparams["modules"],
    opt_class=hparams["opt_class"],
    hparams=hparams,
)

In [15]:
spk_id_brain.fit(
    epoch_counter=spk_id_brain.hparams.epoch_counter,
    train_set=datasets["training"],
    valid_set=datasets["validation"],
    train_loader_kwargs=hparams["dataloader_options"],
    valid_loader_kwargs=hparams["dataloader_options"],
)

100%|███████████████████████████████████████████████████████████████████| 6/6 [00:30<00:00,  5.12s/it, train_loss=1.27]
100%|████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.23s/it]
100%|███████████████████████████████████████████████████████████████████| 6/6 [00:30<00:00,  5.14s/it, train_loss=1.16]
100%|████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.27s/it]
100%|███████████████████████████████████████████████████████████████████| 6/6 [00:33<00:00,  5.54s/it, train_loss=1.14]
100%|████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.41s/it]
100%|███████████████████████████████████████████████████████████████████| 6/6 [00:35<00:00,  5.85s/it, train_loss=1.13]
100%|████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.48s/it]
100%|███████████████████████████████████

In [None]:
test_stat = spk_id_brain.evaluate(
    test_set=datasets["test"],
    min_key="error",
    test_loader_kwargs=hparams["dataloader_options"],
)

The performance of the model is stored in `log.txt` file.