In [None]:
# Install SpeachBrain Library and import it
%%capture
!pip install speechbrain

import os
import speechbrain as sb
import torch
import numpy as np
from IPython.display import Audio, display



In [None]:
# mount google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#Config and hyper paramters

In [None]:
# path to the cvfa.tar.gz
zip_path = '/content/drive/MyDrive/speechbrain/templates/enhancement/data/cvfa.tar.gz'

# path to save results after training
save_path = '/content/drive/MyDrive/FA_spech_enhance_exp_5020/'

# device to train on
device = 'cpu'
# choose wich data subset you want to see results from ['train', 'valid', 'test']
data_subset = ['train']
#choose number of samples you want see results
sample_num = 5



In [None]:
%%writefile train.yaml

# #################################
# Basic training parameters for enhancement.
# #################################

# Seed needs to be set at top of yaml, before objects with parameters are made
seed: 5020
__set_seed: !!python/object/apply:torch.manual_seed [!ref <seed>]

# Set up folders for reading from and writing to
data_folder: ./data
output_folder: !ref ./results/<seed>
save_folder: !ref <output_folder>/save
train_log: !ref <output_folder>/train_log.txt

# Path where data manifest files will be stored
# The data manifest files are created by the data preparation script.
train_annotation: train.json
valid_annotation: valid.json
test_annotation: test.json


# FFT parameters
sample_rate: 48000
win_length: 32
hop_length: 16
n_fft: 512
window_fn: !name:torch.hamming_window

# Training Parameters
number_of_epochs: 40
batch_size: 8
learning_rate: 0.0001
dataloader_options:
    batch_size: !ref <batch_size>

# Added noise and reverb come from OpenRIR dataset, automatically
# downloaded and prepared with this Environmental Corruption class.
# The babble is generated from other utterances in each batch.
env_corruption: !new:speechbrain.lobes.augment.EnvCorrupt
    openrir_folder: !ref <data_folder>
    #reverb_csv:
    #noise_csv:
    #noise_prob: 0
    openrir_max_noise_len: 10
    noise_snr_low: 0
    noise_snr_high: 15
    #babble_prob: 1
    babble_speaker_count: !ref <batch_size> - 1
    babble_snr_low: 0
    babble_snr_high: 15



# The train logger writes training statistics to a file, as well as stdout.
train_logger: !new:speechbrain.utils.train_logger.FileTrainLogger
    save_file: !ref <train_log>


# The mask operates on log-spectral features, computed using these
# STFT parameters, as well as computing magnitude and log1p.
compute_STFT: !new:speechbrain.processing.features.STFT
    sample_rate: !ref <sample_rate>
    win_length: !ref <win_length>
    hop_length: !ref <hop_length>
    n_fft: !ref <n_fft>
    window_fn: !ref <window_fn>
compute_ISTFT: !new:speechbrain.processing.features.ISTFT
    sample_rate: !ref <sample_rate>
    win_length: !ref <win_length>
    hop_length: !ref <hop_length>
    window_fn: !ref <window_fn>

# Resynthesize combines noisy phase with enhanced magnitudes.
resynth: !name:speechbrain.processing.signal_processing.resynthesize
    stft: !ref <compute_STFT>
    istft: !ref <compute_ISTFT>

# To design a custom model, either just edit the simple CustomModel
# class that's listed here, or replace this `!new` call with a line
# pointing to a different file you've defined.
model: !new:custom_model.CustomModel
    input_size: !ref <n_fft> // 2 + 1

# The first object passed to the Brain class is this "Epoch Counter"
# which is saved by the Checkpointer so that training can be resumed
# if it gets interrupted at any point.
epoch_counter: !new:speechbrain.utils.epoch_loop.EpochCounter
    limit: !ref <number_of_epochs>

# Objects in "modules" dict will have their parameters moved to the correct
# device, as well as having train()/eval() called on them by the Brain class.
modules:
    model: !ref <model>

# This optimizer will be constructed by the Brain class after all parameters
# are moved to the correct device. Then it will be added to the checkpointer.
opt_class: !name:torch.optim.Adam
    lr: !ref <learning_rate>

# This object is used for saving the state of training both so that it
# can be resumed if it gets interrupted, and also so that the best checkpoint
# can be later loaded for evaluation or inference.
checkpointer: !new:speechbrain.utils.checkpoints.Checkpointer
    checkpoints_dir: !ref <save_folder>
    recoverables:
        model: !ref <model>
        counter: !ref <epoch_counter>


Writing train.yaml


In [None]:
%%writefile custom_model.py
# define custome model that predicts the mask
import torch

class CustomModel(torch.nn.Module):
    """Basic RNN model with projection layers between RNN layers.

    Arguments
    ---------
    input_size : int
        Size of the expected input in the 3rd dimension.
    rnn_size : int
        Number of neurons to use in rnn (for each direction -> and <-).
    projection : int
        Number of neurons in projection layer.
    layers : int
        Number of RNN layers to use.
    """

    def __init__(self, input_size, rnn_size=256, projection=128, layers=2):
        super().__init__()
        self.layers = torch.nn.ModuleList()

        # Alternate RNN and projection layers.
        for i in range(layers):
            self.layers.append(
                torch.nn.LSTM(
                    input_size=input_size if i == 0 else projection,
                    hidden_size=rnn_size,
                    bidirectional=True,
                )
            )

            # Projection layer reduces size, except last layer, which
            # goes back to input size to create the mask
            linear_size = input_size if i == layers - 1 else projection
            self.layers.append(
                torch.nn.Linear(
                    in_features=rnn_size * 2, out_features=linear_size,
                )
            )

        # Use ReLU to make sure outputs aren't negative (unhelpful for masking)
        self.layers.append(torch.nn.ReLU())

    def forward(self, x):
        """Shift to time-first, pass layers, then back to batch-first."""
        x = x.transpose(0, 1)
        for layer in self.layers:
            x = layer(x)
            if isinstance(x, tuple):
                x = x[0]
        x = x.transpose(0, 1)
        return x

Writing custom_model.py


In [None]:
from hyperpyyaml import load_hyperpyyaml
with open("train.yaml") as fin:
  hparams = load_hyperpyyaml(fin)


Downloading http://www.openslr.org/resources/28/rirs_noises.zip to ./data/rirs_noises.zip


rirs_noises.zip: 1.31GB [01:11, 18.4MB/s]                            


Extracting ./data/rirs_noises.zip to ./data


In [None]:
# make data folder if does not exist
if not os.path.exists(hparams["data_folder"]):
    os.makedirs(hparams["data_folder"])

# Prepare Data

## Noise Data

we use `speechbrain.lobes.augment.EnvCorrupt` to add noise to data. by default it uses OpenIrir Data set as noises. by setting `openrir_folder` in hyperparameters it searches the folder for Openrir dataset and downloads it if it wasn't available.


in order to use other files as noise. we should provide `noise_csv` and `reverb_csv` in hyperparameters with csv files conrain wav files in bellow format.

\begin{array}{ccc}
\text{ID}&\text{duration}&\text{wav}&\text{wav_format}&\text{wav_opts}\\
file\_id&file\_durarion&path/to/file& file_format
\end{array}

## Speech Data

### Extract and split speech data

in data folder there should be a folder containing speechdata. this folder must have three subfolders name 'train', 'valid', 'test' that contain our clean speech samples for train, validation and test.



In [None]:
extract_path = hparams["data_folder"] + '/cv-extracted/'
clips_path = extract_path + 'cv-corpus-7.0-2021-07-21/fa/clips'
voices_folder_path = hparams["data_folder"] + "/fa_speech"

In [None]:

# make folder to extract files
!mkdir -p $extract_path
#extract files
!tar -xvf $zip_path -C $extract_path
!mv -v  $clips_path ./data
!mv -v ./data/clips $voices_folder_path

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310504.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310505.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310506.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310507.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310508.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310509.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310510.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310511.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310512.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310513.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310514.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310515.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310516.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_25310517.mp3
cv-corpus-7.0-2021-07-21/fa/clips/common_voice_fa_253

In [None]:

'''
Create train, test, valid folders and move respective data to each folder
'''
import os, os.path, shutil
import numpy as np

# get list of all voices
voices = [f for f in os.listdir(voices_folder_path) if os.path.isfile(os.path.join(voices_folder_path, f))]
voices = np.array(voices)


# randomize idxs in order to choose train, validation and test data randomly
shuffeled_voices_idxs = np.random.permutation(len(voices)).reshape(-1)

# ratio of test and validation to all of data
test_ratio = 0.1
validation_ratio = 0.1


# split indexs
train_idx_end = int(len(shuffeled_voices_idxs) * (1 - (test_ratio + validation_ratio)))
valid_idx_end = int(len(shuffeled_voices_idxs) * (1 - test_ratio))

train_idxs = shuffeled_voices_idxs[:train_idx_end].astype(int)
valid_idxs = shuffeled_voices_idxs[train_idx_end:valid_idx_end].astype(int)
test_idxs = shuffeled_voices_idxs[valid_idx_end:].astype(int)

# move train, validation, test voices to its folder
for subset_idxs, folder_name in [(train_idxs,'train'), (valid_idxs,'valid'), (test_idxs, 'test')]:

  for voice in voices[subset_idxs]:

    new_path = os.path.join(voices_folder_path, folder_name)
    if not os.path.exists(new_path):
        os.makedirs(new_path)

    old_voice_path = os.path.join(voices_folder_path, voice)
    new_voice_path = os.path.join(new_path, voice)
    shutil.move(old_voice_path, new_voice_path)


### Creat Json files

In [None]:
import os
import json
import shutil
import logging
from speechbrain.utils.data_utils import get_all_files, download_file
from speechbrain.dataio.dataio import read_audio

logger = logging.getLogger(__name__)
SAMPLERATE = hparams["sample_rate"]

def prepare_fa_speach(
    voices_folder_path, save_json_train, save_json_valid, save_json_test
):
    """
    Prepares the json files for Farsi speach dataset.


    Arguments
    ---------
    voices_folder_path : str
        Path to the folder where the Persian speech voices are stored.
    save_json_train : str
        Path where the train data specification file will be saved.
    save_json_valid : str
        Path where the validation data specification file will be saved.
    save_json_test : str
        Path where the test data specification file will be saved.

    Example
    -------
    >>> voices_folder_path = '/path/to/persian_speech'
    >>> prepare_mini_librispeech(voices_folder_path, 'train.json', 'valid.json', 'test.json')
    """

    # If the dataset doesn't exist yet, download it
    train_folder = os.path.join(voices_folder_path,'train')
    valid_folder = os.path.join(voices_folder_path, 'valid')
    test_folder = os.path.join(voices_folder_path, 'test')

    # List files and create manifest from list
    logger.info(
        f"Creating {save_json_train}, {save_json_valid}, and {save_json_test}"
    )
    extension = [".mp3"]
    wav_list_train = get_all_files(train_folder, match_and=extension)
    wav_list_valid = get_all_files(valid_folder, match_and=extension)
    wav_list_test = get_all_files(test_folder, match_and=extension)

    create_json(wav_list_train, save_json_train)
    create_json(wav_list_valid, save_json_valid)
    create_json(wav_list_test, save_json_test)


def create_json(wav_list, json_file):
    """
    Creates the json file given a list of wav files.

    Arguments
    ---------
    wav_list : list of str
        The list of wav files.
    json_file : str
        The path of the output json file
    """
    # Processing all the wav files in the list
    json_dict = {}
    for wav_file in wav_list:

        # Reading the signal (to retrieve duration in seconds)
        signal = read_audio(wav_file)
        duration = signal.shape[0] / SAMPLERATE

        # Manipulate path to get relative path and uttid
        path_parts = wav_file.split(os.path.sep)
        #print(path_parts[-3:])
        uttid, _ = os.path.splitext(path_parts[-1])
        relative_path = os.path.join("{data_root}", *path_parts[-3:])

        # Create entry for this utterance
        json_dict[uttid] = {"wav": relative_path, "length": duration}

    # Writing the dictionary to the json file
    with open(json_file, mode="w") as json_f:
        json.dump(json_dict, json_f, indent=2)

    logger.info(f"{json_file} successfully created!")


def skip(*filenames):
    """
    Detects if the data preparation has been already done.
    If the preparation has been done, we can skip it.

    Returns
    -------
    bool
        if True, the preparation phase can be skipped.
        if False, it must be done.
    """
    for filename in filenames:
        if not os.path.isfile(filename):
            return False
    return True


def check_folders(*folders):
    """Returns False if any passed folder does not exist."""
    for folder in folders:
        if not os.path.exists(folder):
            return False
    return True


prepare_fa_speach(voices_folder_path, 'train.json', 'valid.json', 'test.json')

# Define BrainSpeech Module


In [None]:
# Brain class for speech enhancement training
class SEBrain(sb.Brain):
    def compute_forward(self, batch, stage):
        """Apply masking to convert from noisy waveforms to enhanced signals.

        Arguments
        ---------
        batch : PaddedBatch
            This batch object contains all the relevant tensors for computation.
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, or sb.Stage.TEST.

        Returns
        -------
        predictions : dict
            A dictionary with keys {"spec", "wav"} with predicted features.
        """

        # We first move the batch to the appropriate device, and
        # compute the features necessary for masking.
        batch = batch.to(self.device)
        noisy_wavs, lens = batch.noisy_sig
        noisy_feats = self.compute_feats(noisy_wavs)

        # Masking is done here with the "signal approximation (SA)" algorithm.
        # The masked input is compared directly with clean speech targets.
        mask = self.modules.model(noisy_feats)
        predict_spec = torch.mul(mask, noisy_feats)

        # Also return predicted wav, for evaluation. Note that this could
        # also be used for a time-domain loss term.
        predict_wav = self.hparams.resynth(
            torch.expm1(predict_spec), noisy_wavs
        )

        # Return a dictionary so we don't have to remember the order
        return {"spec": predict_spec, "wav": predict_wav}

    def compute_feats(self, wavs):
        """Returns corresponding log-spectral features of the input waveforms.

        Arguments
        ---------
        wavs : torch.Tensor
            The batch of waveforms to convert to log-spectral features.
        """

        # Log-spectral features
        feats = self.hparams.compute_STFT(wavs)
        feats = sb.processing.features.spectral_magnitude(feats, power=0.5)

        # Log1p reduces the emphasis on small differences
        feats = torch.log1p(feats)

        return feats

    def compute_objectives(self, predictions, batch, stage):
        """Computes the loss given the predicted and targeted outputs.

        Arguments
        ---------
        predictions : dict
            The output dict from `compute_forward`.
        batch : PaddedBatch
            This batch object contains all the relevant tensors for computation.
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, or sb.Stage.TEST.

        Returns
        -------
        loss : torch.Tensor
            A one-element tensor used for backpropagating the gradient.
        """

        # Prepare clean targets for comparison
        clean_wavs, lens = batch.clean_sig
        clean_spec = self.compute_feats(clean_wavs)

        # Directly compare the masked spectrograms with the clean targets
        loss = sb.nnet.losses.mse_loss(predictions["spec"], clean_spec, lens)

        # Append this batch of losses to the loss metric for easy
        self.loss_metric.append(
            batch.id, predictions["spec"], clean_spec, lens, reduction="batch"
        )

        # Some evaluations are slower, and we only want to perform them
        # on the validation set.
        if stage != sb.Stage.TRAIN:

            # Evaluate speech intelligibility as an additional metric
            self.stoi_metric.append(
                batch.id,
                predictions["wav"],
                clean_wavs,
                lens,
                reduction="batch",
            )

        return loss

    def on_stage_start(self, stage, epoch=None):
        """Gets called at the beginning of each epoch.

        Arguments
        ---------
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, or sb.Stage.TEST.
        epoch : int
            The currently-starting epoch. This is passed
            `None` during the test stage.
        """

        # Set up statistics trackers for this stage
        self.loss_metric = sb.utils.metric_stats.MetricStats(
            metric=sb.nnet.losses.mse_loss
        )

        # Set up evaluation-only statistics trackers
        if stage != sb.Stage.TRAIN:
            self.stoi_metric = sb.utils.metric_stats.MetricStats(
                metric=sb.nnet.loss.stoi_loss.stoi_loss
            )

    def on_stage_end(self, stage, stage_loss, epoch=None):
        """Gets called at the end of an epoch.

        Arguments
        ---------
        stage : sb.Stage
            One of sb.Stage.TRAIN, sb.Stage.VALID, sb.Stage.TEST
        stage_loss : float
            The average loss for all of the data processed in this stage.
        epoch : int
            The currently-starting epoch. This is passed
            `None` during the test stage.
        """

        # Store the train loss until the validation stage.
        if stage == sb.Stage.TRAIN:
            self.train_loss = stage_loss

        # Summarize the statistics from the stage for record-keeping.
        else:
            stats = {
                "loss": stage_loss,
                "stoi": -self.stoi_metric.summarize("average"),
            }

        # At the end of validation, we can write stats and checkpoints
        if stage == sb.Stage.VALID:
            # The train_logger writes a summary to stdout and to the logfile.
            self.hparams.train_logger.log_stats(
                {"Epoch": epoch},
                train_stats={"loss": self.train_loss},
                valid_stats=stats,
            )

            # Save the current checkpoint and delete previous checkpoints,
            # unless they have the current best STOI score.
            self.checkpointer.save_and_keep_only(meta=stats, max_keys=["stoi"])

        # We also write statistics about test data to stdout and to the logfile.
        if stage == sb.Stage.TEST:
            self.hparams.train_logger.log_stats(
                {"Epoch loaded": self.hparams.epoch_counter.current},
                test_stats=stats,
            )



## Define audio pipeline.
 Adds noise, reverb, and babble on-the-fly.

In [None]:
def dataio_prep(hparams):
  # Define audio pipeline. Adds noise, reverb, and babble on-the-fly.
  # Of course for a real enhancement dataset, you'd want a fixed test set.
  @sb.utils.data_pipeline.takes("wav")
  @sb.utils.data_pipeline.provides("noisy_sig", "clean_sig")
  def audio_pipeline(wav):
    """Load the signal, and pass it and its length to the corruption class.
    This is done on the CPU in the `collate_fn`."""
    clean_sig = sb.dataio.dataio.read_audio(wav)
    noisy_sig = hparams["env_corruption"](
      clean_sig.unsqueeze(0), torch.ones(1)
    ).squeeze(0)
    return noisy_sig, clean_sig

  # Define datasets sorted by ascending lengths for efficiency
  datasets = {}
  hparams["dataloader_options"]["shuffle"] = False
  for dataset in ["train", "valid", "test"]:
    datasets[dataset] = sb.dataio.dataset.DynamicItemDataset.from_json(
      json_path=hparams[f"{dataset}_annotation"],
      replacements={"data_root": hparams["data_folder"]},
      dynamic_items=[audio_pipeline],
      output_keys=["id", "noisy_sig", "clean_sig"],
    ).filtered_sorted(sort_key="length")
  return datasets

# Train Model

## Load data, Create BrainSpeech instance and train it

In [None]:
from hyperpyyaml import load_hyperpyyaml
with open("train.yaml") as fin:
  hparams = load_hyperpyyaml(fin)


./data/rirs_noises.zip exists. Skipping download


In [None]:
# Create experiment directory

sb.create_experiment_directory(
    experiment_directory=hparams["output_folder"],
    hyperparams_to_save="train.yaml"
)

speechbrain.core - Beginning experiment!
speechbrain.core - Experiment folder: ./results/5020


In [None]:
#load data
datasets = dataio_prep(hparams)

In [None]:
# Create BrainSpeech instance
se_brain = SEBrain(
  modules=hparams["modules"],
  opt_class=hparams["opt_class"],
  hparams=hparams,
  run_opts={'device': device},
  checkpointer=hparams["checkpointer"],
)

speechbrain.core - 2.0M trainable parameters in SEBrain


In [None]:
se_brain.fit(
  epoch_counter=se_brain.hparams.epoch_counter,
  train_set=datasets["train"],
  valid_set=datasets["valid"],
  train_loader_kwargs=hparams["dataloader_options"],
  valid_loader_kwargs=hparams["dataloader_options"],
)

In [None]:
se_brain.evaluate(
  test_set=datasets["test"],
  max_key="stoi",
  test_loader_kwargs=hparams["dataloader_options"],
)

# See results

In [None]:
for data_subset in see_results:

  # directory to save results
  sample_dir = hparams['output_folder'] + '/samples/' + data_subset

  # create directory if it doesnt exist
  if not os.path.exists(sample_dir):
    os.makedirs(sample_dir)

  # Choose random samples from subset
  shuffeled_idxs = np.random.permutation(len(datasets[data_subset]))
  for idx in shuffeled_idxs[:sample_num]:

    ## get sample from dataloader
    sample_input = datasets[data_subset][idx]['noisy_sig']
    sample_output = datasets[data_subset][idx]['clean_sig']

    ## set name for generated audios
    sample_input_name = sample_dir+'/sample_{}_in.wav'.format(idx)
    sample_output_name = sample_dir+'/sample_{}_out.wav'.format(idx)
    sample_pred_name = sample_dir+'/sample_{}_pred.wav'.format(idx)



    ## get prediction
    ### get sample spectrum
    noisy_wavs = sample_input.unsqueeze(0)
    noisy_feats = se_brain.compute_feats(noisy_wavs)
    ### predict sample mask
    mask = se_brain.modules.model(noisy_feats)
    ### apply predicted mask
    predict_spec = torch.mul(mask, noisy_feats)
    ### get predicted audio from spectrum
    predict_wav = se_brain.hparams.resynth(torch.expm1(predict_spec), noisy_wavs)

    ## save audios to file
    sb.dataio.dataio.write_audio(sample_input_name, sample_input, hparams["sample_rate"])
    sb.dataio.dataio.write_audio(sample_output_name, sample_output, hparams["sample_rate"])
    sb.dataio.dataio.write_audio(sample_pred_name, predict_wav[0], hparams["sample_rate"])

    # ## show samples in jupyternotebook
    # ## warning: this part ma cause jupyternotebook to crash

    # print(sample_input_name)
    # display(Audio(sample_in_name, autoplay=True))

    # print(sample_output_name)
    # display(Audio(sample_out_name, autoplay=True))

    # print(sample_pred_name)
    # display(Audio(sample_pred_name, autoplay=True))
    # print("+++++++++++++++++++++++++++++++")

# Save Results
Copy trained model, results and logs to `save_path`

In [None]:
import shutil

shutil.copytree(hparams['output_folder'], save_path)

'/content/drive/MyDrive/FA_spech_enhance_exp_5020/'