In [1]:
import torchaudio

train_dataset = torchaudio.datasets.LIBRISPEECH("./", url="train-clean-100", download=True)
test_dataset = torchaudio.datasets.LIBRISPEECH("./", url="test-clean", download=True)

In [2]:
class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        d 5
        e 6
        f 7
        g 8
        h 9
        i 10
        j 11
        k 12
        l 13
        m 14
        n 15
        o 16
        p 17
        q 18
        r 19
        s 20
        t 21
        u 22
        v 23
        w 24
        x 25
        y 26
        z 27
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

In [3]:
import torch.nn as nn

train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate = 16000, n_mels = 128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param = 15),
    torchaudio.transforms.TimeMasking(time_mask_param = 35)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

text_transform = TextTransform()



In [4]:
import torch

def data_preprocessing(data, data_type = "train"):
  spectrograms = []
  labels = []
  input_lengths = []
  label_lengths = []
  for (waveform, _, utterance, _,_,_) in data:
    if data_type == 'train':
      spec = train_audio_transforms(waveform).squeeze(0).transpose(0,1)
    else:
      spec = valid_audio_transforms(waveform).squeeze(0).transpose(0,1)
    spectrograms.append(spec)
    label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
    labels.append(label)
    input_lengths.append(spec.shape[0] // 2)
    label_lengths.append(len(label))

  spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first = True).unsqueeze(1).transpose(2,3)
  labels = nn.utils.rnn.pad_sequence(labels, batch_first = True)

  return spectrograms, labels, input_lengths, label_lengths

In [5]:
import torch.nn.functional as F

class CNNLayerNorm(nn.Module):
  def __init__(self, n_feats):
    super(CNNLayerNorm, self).__init__()
    self.layer_norm = nn.LayerNorm(n_feats)

  def forward(self, x):
    #(batch, channel, feature, time)
    x = x.transpose(2,3).contiguous() #(batch, channel, time, feature)
    x = self.layer_norm(x)
    return x.transpose(2,3).contiguous() #(batch,channel,feature,time)

class ResidualCNN(nn.Module):
  def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
    super(ResidualCNN, self).__init__()
    self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding = kernel // 2)
    self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding = kernel // 2)
    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)
    self.layer_norm1 = CNNLayerNorm(n_feats)
    self.layer_norm2 = CNNLayerNorm(n_feats)

  def forward(self,x):
    residual = x #(batch, channel, feature, time)
    x = self.layer_norm1(x)
    x = F.gelu(x)
    x = self.dropout1(x)
    x = self.cnn1(x)
    x = self.layer_norm2(x)
    x = F.gelu(x)
    x = self.dropout2(x)
    x = self.cnn2(x)
    x += residual
    return x #(batch, channel, feature, time)

class BidirectionalGRU(nn.Module):
  def __init__(self,rnn_dim, hidden_size, dropout, batch_first):
    super(BidirectionalGRU,self).__init__()

    self.BiGRU = nn.GRU(
        input_size = rnn_dim, hidden_size = hidden_size,
        num_layers = 1, batch_first = batch_first, bidirectional = True
    )
    self.layer_norm = nn.LayerNorm(rnn_dim)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    x = self.layer_norm(x)
    x = F.gelu(x)
    x, _ = self.BiGRU(x)
    x = self.dropout(x)
    return x

In [6]:
class SpeechRecognitionModel(nn.Module):
  def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride = 2, dropout = 0.1):
    super(SpeechRecognitionModel, self).__init__()

    n_feats = n_feats // 2
    self.cnn = nn.Conv2d(1, 32, 3, stride = stride, padding = 3 // 2) #cnn for extracting hierarchical features

    #n residual cnn layers with filter size of 32
    self.rescnn_layers = nn.Sequential(*[
        ResidualCNN(32, 32, kernel = 3, stride = 1, dropout = dropout, n_feats = n_feats) for _ in range(n_cnn_layers)
    ])
    self.fully_connected = nn.Linear(n_feats * 32, rnn_dim)
    self.birnn_layers = nn.Sequential(*[
        BidirectionalGRU(rnn_dim = rnn_dim if i == 0 else rnn_dim * 2,
                         hidden_size = rnn_dim,
                         dropout = dropout,
                         batch_first = i == 0) for i in range(n_rnn_layers)
    ])
    self.classifier = nn.Sequential(
        nn.Linear(rnn_dim * 2, rnn_dim),
        nn.GELU(),
        nn.Dropout(dropout),
        nn.Linear(rnn_dim, n_class)
    )

  def forward(self, x):
    x = self.cnn(x)
    x = self.rescnn_layers(x)
    sizes = x.size()
    x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3]) #(batch,feature,time)
    x = x.transpose(1,2) #(batch,time,feature)
    x = self.fully_connected(x)
    x = self.birnn_layers(x)
    x = self.classifier(x)
    return x

In [7]:
import wandb
wandb.login()

[34m[1mwandb[0m: Currently logged in as: [33mkevinv3796[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

In [8]:
def greedy_decoder(output, labels, label_lengths, blank_label = 28, collapse_repeated = True):
  arg_maxes = torch.argmax(output, dim = 2)
  decodes = []
  targets = []
  for i, args in enumerate(arg_maxes):
    decode = []
    targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
    for j, index in enumerate(args):
      if index != blank_label:
        if collapse_repeated and j != 0 and index == args[j-1]:
          continue #skip the character
        decode.append(index.item())
    decodes.append(text_transform.int_to_text(decode))
  return decodes, targets

In [9]:
from types import SimpleNamespace

config = SimpleNamespace(
  learning_rate=5e-4,
  batch_size=100,
  epochs=10,
  n_cnn_layers=3,
  n_rnn_layers = 5,
  rnn_dim = 512,
  n_class = 29,
  n_feats = 128,
  stride = 2,
  dropout = 0.1,
)

In [10]:
import numpy as np

def _levenshtein_distance(ref, hyp):
  m = len(ref)
  n = len(hyp)

  if ref == hyp:
    return 0
  if m == 0:
    return n
  if n == 0:
    return m

  if m < n:
    ref, hyp = hyp, ref
    m, n = n, m

  distance = np.zeros((2, n + 1), dtype = np.int32)

  for j in range(0, n + 1):
    distance[0][j] = j

  for i in range(1, m + 1):
    prev_row_idx = (i - 1) % 2
    cur_row_idx = i % 2
    distance[cur_row_idx][0] = i
    for j in range(1, n + 1):
      if ref[i-1] == hyp[j - 1]:
        distance[cur_row_idx][j] = distance[prev_row_idx][j-1]
      else:
        s_num = distance[prev_row_idx][j - 1] + 1
        i_num = distance[cur_row_idx][j - 1] + 1
        d_num = distance[prev_row_idx][j] + 1
        distance[cur_row_idx][j] = min(s_num, i_num, d_num)

  return distance[m % 2][n]

def word_errors(reference, hypothesis, ignore_case = False, delimiter = ' '):
  if ignore_case:
    reference = reference.lower()
    hypothesis = hypothesis.lower()

  ref_words = reference.split(delimiter)
  hyp_words = hypothesis.split(delimiter)

  edit_distance = _levenshtein_distance(ref_words, hyp_words)
  return float(edit_distance), len(ref_words)

def char_error(reference, hypothesis, ignore_case = False, remove_space = False):
  if ignore_case:
    reference = reference.lower()
    hypothesis = hypothesis.lower()

  join_char = '' if remove_space else ' '

  reference = join_char.join(filter(None, reference.split(' ')))
  hypothesis = join_char.join(filter(None, hypothesis.split(' ')))

  edit_distance = _levenshtein_distance(reference, hypothesis)
  return float(edit_distance), len(reference)

def wer(reference, hypothesis,ignore_case = False, delimiter = ' '):
  edit_distance, ref_len = word_errors(reference, hypothesis, ignore_case, delimiter)
  if ref_len == 0:
    raise ValueError("Reference's word number should be greater than 0")

  wer = edit_distance / ref_len
  return wer

def cer(reference, hypothesis, ignore_case = False, remove_space = False):
  edit_distance, ref_len = char_error(reference, hypothesis, ignore_case, remove_space)

  if ref_len == 0:
    raise ValueError("Length of the reference should be greater than 0")

  cer = edit_distance / ref_len
  return cer

def avg_wer(wer_scores, combined_ref_len):
  return float(sum(wer_scores)) / float(combined_ref_len)

In [11]:
from tqdm import tqdm

def train(model, device, train_loader, criterion, optimizer, scheduler, epoch):
  model.train()
  data_len = len(train_loader.dataset)
  for batch_index, _data in tqdm(enumerate(train_loader)):
    spectrograms, labels, input_lengths, label_lengths = _data
    spectrograms, labels = spectrograms.to(device), labels.to(device)

    optimizer.zero_grad()

    output = model(spectrograms) #(batch, time, n_class)
    output = F.log_softmax(output, dim = 2)
    output = output.transpose(0, 1) #(time, batch, n_class)

    loss = criterion(output, labels, input_lengths, label_lengths)
    loss.backward()

    wandb.log({
        "train/loss": loss.item(),
        "learning_rate": scheduler.get_lr()
    })

    optimizer.step()
    scheduler.step()
    if batch_index % 100 == 0 or batch_index == data_len:
      print(f"Epoch: {epoch}, [{batch_index * len(spectrograms)}/{data_len} ({100. * batch_index / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")


In [12]:
def test(model, device, test_loader, criterion, epoch):
  print("\nEvaluating...")
  model.eval()
  test_loss = 0
  test_cer, test_wer = [], []

  with torch.no_grad():
    for i, _data in tqdm(enumerate(test_loader)):
      spectrograms, labels, input_lengths, label_lengths = _data
      spectrograms, labels = spectrograms.to(device), labels.to(device)

      output = model(spectrograms) #(batch, time, n_class)
      output = F.log_softmax(output, dim = 2)
      output = output.transpose(0,1)  #(time, batch, n_class)

      loss = criterion(output, labels, input_lengths, label_lengths)
      test_loss += loss.item() / len(test_loader)

      decoded_preds, decoded_targets = greedy_decoder(output.transpose(0,1), labels, label_lengths)
      for j in range(len(decoded_preds)):
        test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
        test_wer.append(wer(decoded_targets[j], decoded_preds[j]))

  avg_cer = sum(test_cer) / len(test_cer)
  avg_wer = sum(test_wer) / len(test_wer)
  wandb.log({
      "valid/loss": test_loss,
      "cer": avg_cer,
      "wer": avg_wer
  })
  print('Test set: Average loss: {:.4f}, Average CER: {:4f} Average WER: {:.4f}\n'.format(test_loss, avg_cer, avg_wer))


In [13]:
import torch.optim as optim
from torch.utils.data import DataLoader


device = "cuda:0" if torch.cuda.is_available() else "cpu"
criterion = nn.CTCLoss(blank = 28).to(device)

train_url="train-clean-100"
test_url="test-clean"
train_loader = DataLoader(
    dataset = train_dataset,
    batch_size = config.batch_size,
    shuffle = True,
    collate_fn = lambda x: data_preprocessing(x, 'train'),
)
test_loader = DataLoader(
    dataset = test_dataset,
    batch_size = config.batch_size,
    shuffle = False,
    collate_fn = lambda x: data_preprocessing(x, 'valid')
)

model = SpeechRecognitionModel(
    config.n_cnn_layers, config.n_rnn_layers, config.rnn_dim, config.n_class, config.n_feats, config.stride, config.dropout
)

model = model.to(device)

optimizer = optim.AdamW(model.parameters(), config.learning_rate)
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer, max_lr = config.learning_rate, steps_per_epoch = int(len(train_loader)), epochs = config.epochs, anneal_strategy = 'linear'
)

In [14]:
with wandb.init(project = "automatic_speech_recognition",
                  job_type = 'train',
                  config = config,
                  tags = ["speech_recognition", "audio", "cnn", "rnn"]
                  ):
  for epoch in range(1, config.epochs + 1):
    train(model, device, train_loader, criterion, optimizer, scheduler, epoch)
    test(model, device, test_loader, criterion, epoch)

0it [00:02, ?it/s]
Traceback (most recent call last):
  File "<ipython-input-14-713293e8a326>", line 7, in <cell line: 1>
    train(model, device, train_loader, criterion, optimizer, scheduler, epoch)
  File "<ipython-input-11-88bfabcc8527>", line 12, in train
    output = model(spectrograms) #(batch, time, n_class)
  File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1511, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1520, in _call_impl
    return forward_call(*args, **kwargs)
  File "<ipython-input-6-ff3d1251ad6b>", line 33, in forward
    x = self.birnn_layers(x)
  File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1511, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1520, in _call_impl
    return forward_call(*args, **kwargs

VBox(children=(Label(value='0.001 MB of 0.001 MB uploaded\r'), FloatProgress(value=1.0, max=1.0)))

OutOfMemoryError: CUDA out of memory. Tried to allocate 1.33 GiB. GPU 0 has a total capacity of 14.75 GiB of which 51.06 MiB is free. Process 232017 has 14.70 GiB memory in use. Of the allocated memory 14.07 GiB is allocated by PyTorch, and 508.75 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
# get the code
!git clone --recursive https://github.com/parlance/ctcdecode.git
!cd ctcdecode && pip install .

In [None]:
from ctcdecode import CTCBeamDecoder

beam_decoder = CTCBeamDecoder(
    labels,
    model_path=None,
    alpha=0,
    beta=0,
    cutoff_top_n=40,
    cutoff_prob=1.0,
    beam_width=100,
    num_processes=4,
    blank_id=0,
    log_probs_input=False
)
beam_results, beam_scores, timesteps, out_lens = decoder.decode(output) #output should be (batch_size, n_timesteps, n_labels)
