In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import sys

import matplotlib.pyplot as plt
import IPython.display as ipd

from tqdm import tqdm
from pprint import pprint

# Dataset Downloading

In [19]:
train_dataset = torchaudio.datasets.SPEECHCOMMANDS(root='./', url='speech_commands_v0.01', download=True, subset='training')
valid_dataset = torchaudio.datasets.SPEECHCOMMANDS(root='./', url='speech_commands_v0.01', download=True, subset='validation')
test_dataset = torchaudio.datasets.SPEECHCOMMANDS(root='./', url='speech_commands_v0.01', download=True, subset='testing')

In [20]:
print(len(train_dataset), len(valid_dataset), len(test_dataset), )

51088 6798 6835


# Data Processing

In [21]:
char_map_str = """
 a 0
 b 1
 c 2
 d 3
 e 4
 f 5
 g 6
 h 7
 i 8
 j 9
 k 10
 l 11
 m 12
 n 13
 o 14
 p 15
 q 16
 r 17
 s 18
 t 19
 u 20
 v 21
 w 22
 x 23
 y 24
 z 25
 """
 
class TextTransform:
    """ Maps characters to their indices, and vice versa """
    def __init__(self):
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch

    def text_to_int(self, text: list[str]):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            ind = self.char_map[c]
            int_sequence.append(ind)
        return int_sequence

    def int_to_text(self, labels: list[int]):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string)


# TODO: SpecAugment (masking augmentations)
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    # torchaudio.transforms.FrequencyMasking(freq_mask_param=15),
    # torchaudio.transforms.TimeMasking(time_mask_param=35)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

text_transform = TextTransform()

In [22]:
# testing the code above
word_start = "yes"
index = text_transform.text_to_int(word_start)
word_recovered = text_transform.int_to_text(index)

print(word_start, "-->", index, "-->", word_recovered)

yes --> [24, 4, 18] --> yes


Функция __data_processing__ будет позже вызвана в __collate_fn__ дата лоадеров.

Формат данных в датасете: tuple (wave, sample_rate, utterance (label), speaker id, utterance number)

In [23]:
sample = train_dataset.__getitem__(n=2)
sample

(tensor([[-0.0025, -0.0021, -0.0017,  ..., -0.0030, -0.0033, -0.0031]]),
 16000,
 'bed',
 '004ae714',
 1)

In [24]:
def data_processing(data, data_type="train"):
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []
    for (waveform, _, utterance, _, _) in data:
        if data_type == 'train':
            spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        else:
            spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        spectrograms.append(spec)
        # labels are lists of integer character ids
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        # input_lengths, label_lengths are used in loss function
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

    return spectrograms, labels, input_lengths, label_lengths

In [25]:
# testing
data_processing((sample,))

(tensor([[[[0.0000e+00, 0.0000e+00, 0.0000e+00,  ..., 0.0000e+00,
            0.0000e+00, 0.0000e+00],
           [1.3406e-02, 1.1594e-02, 1.6736e-02,  ..., 1.7414e-02,
            1.6212e-02, 2.0798e-02],
           [7.2180e-02, 6.2425e-02, 9.0109e-02,  ..., 9.3760e-02,
            8.7287e-02, 1.1198e-01],
           ...,
           [5.7900e-06, 2.8763e-06, 3.3100e-06,  ..., 2.9408e-06,
            2.1657e-06, 3.0014e-06],
           [9.8653e-06, 2.7193e-06, 1.2482e-06,  ..., 6.7003e-07,
            5.9030e-07, 1.4782e-06],
           [9.3147e-06, 2.3657e-07, 1.7895e-07,  ..., 2.5175e-07,
            8.9278e-08, 2.5031e-06]]]]),
 tensor([[1., 4., 3.]]),
 [37],
 [3])

# Building a Model

We use Layer Normalization, not Batch Normalization, because BN is hard to use with sequence data, with small batch sizes, and it's hard to paralellize a NN with BN.

This is due to the dependency on batches. Layer Normalization removes this dependency. It computes the normalization based on the layers inside of the batches.

LN briefly: Input values in all neurons in the same layer are normalized for each data sample.
So, all values in neurons of the same layer will have the same mean and variance.

LN is can deal with sequence data, doesn't depend on batch size, and is easily paralellized.
However, LN sometimes performs worse than BN with CNNs.

In [None]:
class CNNLayerNorm(nn.Module):
    """ Layer Normalization """
    
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(normalized_shape=n_feats)
        """
        About normalized_shape parameter of nn.LayerNorm:
        If a single integer is used, it is treated as a singleton list, and this module will normalize
        over the last dimension which is expected to be of that specific size.
        """

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 

class ResidualCNN(nn.Module):
    """Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf
        except with layer norm instead of batch norm
    """
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)
        
class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x


class SpeechRecognitionModel(nn.Module):
    """Speech Recognition Model Inspired by DeepSpeech 2"""

    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x