reference: https://www.kaggle.com/code/mbmmurad/end-to-end-pipeline-cnn-rnn-model-with-ctc-loss

In [5]:
from google.colab import drive
drive.mount('/content/drive', force_remount = True)

import os
os.chdir('./drive/MyDrive')

Mounted at /content/drive


#### import

In [14]:
!pip install params

Collecting params
  Downloading params-0.9.0-py3-none-any.whl.metadata (631 bytes)
Downloading params-0.9.0-py3-none-any.whl (11 kB)
Installing collected packages: params
Successfully installed params-0.9.0


In [16]:
!pip install jiwer

Collecting jiwer
  Downloading jiwer-4.0.0-py3-none-any.whl.metadata (3.3 kB)
Collecting rapidfuzz>=3.9.7 (from jiwer)
  Downloading rapidfuzz-3.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading jiwer-4.0.0-py3-none-any.whl (23 kB)
Downloading rapidfuzz-3.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m33.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-4.0.0 rapidfuzz-3.13.0


In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from sklearn.model_selection import train_test_split

import numpy as np
import matplotlib.pyplot
from IPython import display
from jiwer import wer
import params
import time


#### read data
define help function

In [89]:
def read_TIMIT(path):
  '''
  path: path of TIMIT data(mfcc features, phoneme labels)
  return:
    feats: list of list for each audio samples
    labels: list of list for each audio samples
  '''

  feats, labels = [], []
  length_feats, length_labels = [], []

  # read processed TIMIT data
  # list of dictionarys with keys being 'mfcc', 'phonemes', 'path'
  samples = torch.load(path, weights_only = False)
  for idx in range(len(samples)):
    feats.append(samples[idx]['mfcc'])
    labels.append(samples[idx]['phonemes'])
  return feats, labels


execute and review

In [20]:
os.listdir()[-1]

'timit_mfcc_data.pt'

In [27]:
# split train / dev data
path = r'timit_mfcc_data.pt'
feats, labels = read_TIMIT(path)

train_feats, dev_feats, train_labels, dev_labels = train_test_split(feats, labels)

In [92]:
# check mfcc feature matrix dimension
print(f'MFCC feature matrix Shape(one audio sample):\t{train_feats[-1].shape}')
# check IPA repository
print(f'phoneme labels(one audio sample):\t{train_labels[-1]}')

# seems like 'h#' marks sos and eos

MFCC feature matrix Shape(one audio sample):	(97, 39)
phoneme labels(one audio sample):	['h#', 'dh', 'ix', 'n', 'ux', 'z', 'ey', 'zh', 'ix', 'n', 's', 'iy', 'hv', 'ay', 'axr', 'dcl', 'd', 'ey', 'gcl', 'g', 'r', 'ey', 'tcl', 'jh', 'er', 'nx', 'ax', 'l', 'ix', 's', 'tcl', 't', 'h#']


In [51]:
print('\n'.join([f'num_frames:\t{len(train_feats[0])}', f'num_labels:\t{len(train_labels[0])}']))

num_frames:	95
num_labels:	39


In [None]:
# mark max length for mfcc features and labels
max_len_feats = max([len(feat) for feat in feats])
max_len_labels = max([len(label) for label in labels])

#### create IPA dictionary

define help function

In [115]:
def create_IPAdictionary(labels):
  '''
  args:
    labels: list of list
  return: ipa2idx
        dictionary of IPA_label: index
  '''
  ipas = set()
  for label in labels:
    ipas = ipas.union(set(label))
  ipas = sorted(ipas)
  ipas.remove('h#') # to assign index 1 to 'h#'

  ipa2idx = {ipa:(idx+2) for idx, ipa in enumerate(ipas)}
  ipa2idx['<blank>'] = 0
  ipa2idx['h#'] = 1

  return ipa2idx

execute and review

In [116]:
ipa2idx = create_IPAdictionary(labels)
print(*sorted(ipa2idx))

<blank> aa ae ah ao aw ax ax-h axr ay b bcl ch d dcl dh dx eh el em en eng epi er ey f g gcl h# hh hv ih ix iy jh k kcl l m n ng nx ow oy p pau pcl q r s sh t tcl th uh uw ux v w y z zh


In [118]:
print(f'the number of IPA labels in TIMIT:\t {len(ipa2idx)}')
print(f'Index of blank symbol <blank>":\t {ipa2idx["<blank>"]}')
print(f'Index of sos/eos label "h#":\t {ipa2idx["h#"]}')

the number of IPA labels in TIMIT:	 62
Index of blank symbol <blank>":	 0
Index of sos/eos label "h#":	 1


#### Dataset + pad
define Dataset Class

In [119]:
class PhonemeASRDataset(Dataset):
  def __init__(self, feats, labels, ipa2idx):
    super(PhonemeASRDataset, self).__init__()
    self.feats, self.labels = feats, labels
    self.ipa2idx = ipa2idx

  def __len__(self):
    return len(self.feats)

  def __getitem__(self, idx):
      feat, label = self.feats[idx], self.labels[idx]
      label = [ipa2idx[ipa] for ipa in label]

      return torch.tensor(feat), torch.tensor(label, dtype = torch.long)

define padding function

In [120]:
def pad_collate(batch, pad_value_feat=0, pad_value_label=0):
    '''
      for collate_fn in DataLoader function

    args:
      batch: a list of tuples (mfcc, label)
      return: padded_mfccs, padded_labels
    '''

    mfccs, labels = zip(*batch)

    # find max length for mfcc(time step) and label in the current batch
    max_len_feats = max(mfcc.shape[0] for mfcc in mfccs)
    max_len_labels = max(label.shape[0] for label in labels)

    # pad mfcc matrices and labels
    padded_mfccs = [F.pad(mfcc, (0, 0, 0, max_len_feats - mfcc.shape[0]), value=pad_value_feat) for mfcc in mfccs]
    padded_labels = [F.pad(label, (0, max_len_labels - label.shape[0]), value=pad_value_label) for label in labels]

    # Stack the padded tensors
    padded_mfccs = torch.stack(padded_mfccs)
    padded_labels = torch.stack(padded_labels)

    return padded_mfccs, padded_labels

execute and review

In [121]:
train_ds = PhonemeASRDataset(train_feats, train_labels, ipa2idx = ipa2idx)
train_loader = DataLoader(train_ds, batch_size = 32, # can adjust
                          shuffle = True, collate_fn=pad_collate) # yields batch_size x max_len x num_feats as one training batch
## should discover more about collate_fn keyword

In [122]:
a, b = next(iter(train_loader))
print(a.shape, b.shape)

torch.Size([32, 138, 39]) torch.Size([32, 54])


Define Loss Function

In [None]:
# define CTC Loss Function
def CTCLoss(y_true, y_pred):
  '''
  caculate CTC loss
  args:
    y_true:
      gold labels for input sequence
    y_pred:
      predicted labels obtained from model
  return:
    loss value ## should discover more

  '''

  batch_len = tf.cast(tf.shape(y_true)[0], dtype = 'int64') # batch_size
  input_length = tf.cast(tf.shape(y_pred)[1], dtype = 'int64') # prediction sequence length
  label_length = tf.cast(tf.shape(y_true)[1], dtype = 'int64') # gold labels length

  input_length = input_length * tf.ones(shape = (batch_len, 1), dtype = 'int64')
  label_length = label_length * tf.ones(shape = (batch_len, 1), dtype = 'int64')
  print(input_length, label_length)

  loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)

  return loss

#### Define CNN-RNN model

In [None]:
def IPA_model(input_dim, ouput_dim, rnn_layers = 1, rnn_units = 128):
  # model's input ## we are going to use mfcc features btw
  input_spectrogram = layers.Input((None, input_dim), name = 'input')
  # expand dimension to use CNN 2D
  x = layers.Reshape((-1, input_dim, 1), name = 'example_dim')(input_spectrogram)

  # CNN encoder
  # Convolution layers 1
  x = layers.Conv2D(
      filters = 32, # can adjust
      kernel_size = [11, 41], # can adjust
      strides = [2, 2], # can adjust
      padding = 'same',
      use_bias = False,
      name = 'conv_10',
  )(x)

  x = layers.Conv2D(
      filters = 16,
      kernel_size = [11, 41],
      strides = [1, 1],
      padding = 'same',
      use_bias = False,
      name = 'conv_11',
  )(2)

  ## where is pooling layer?

  x = layers.BatchNormalization(name = 'conv_1_bn')(x)
  x = layers.ReLU(name = 'conv_1_relu')

  # Convolution layers 2
    x = layers.Conv2D(
      filters = 32, # can adjust
      kernel_size = [11, 41], # can adjust
      strides = [2, 2], # can adjust
      padding = 'same',
      use_bias = False,
      name = 'conv_20',
  )(x)

  x = layers.Conv2D(
      filters = 16,
      kernel_size = [11, 41],
      strides = [1, 1],
      padding = 'same',
      use_bias = False,
      name = 'conv_21',
  )(2)

  ## where is pooling layer?

  x = layers.BatchNormalization(name = 'conv_2_bn')(x)
  x = layers.ReLU(name = 'conv_2_relu')

  # Reshape output of CNN to feed RNN layers

  # RNN encoder
  for i in range(1, rnn_layers + 1):
    gru = layers.GRU(
        units = rnn_units, # can adjust
        activation = 'ReLU', # what should we choose ?
        recurrent_activation = 'sigmoid', # what should we choose ?
        use_bias = True,
        return_sequences = True,
        reset_after = True,
        name = f'gru_{i}',
    )
    x = layers.Bidirectional(
        gru, name = f'bidirectional_{i}', merge_mode = 'concat'
    )(x)

    if i < rnn_layers:
      x = layers.Dropout(rate = 0.5)(x)

  # Dense lyaer
  # 3 * MLP layers
  x = layers.Dense(units = rnn_units * 2, name = 'dense_1')(x)
  x = layers.ReLU(name = 'dense_1_relu')(x)
  x = layers.Dropout(rate = 0.2)(x)

  x = layers.Dense(units = rnn_units * 2, name = 'dense_1')(x)
  x = layers.ReLU(name = 'dense_2_relu')(x)
  x = layers.Dropout(rate = 0.2)(x)

  x = layers.Dense(units = rnn_units * 2, name = 'dense_1')(x)
  x = layers.ReLU(name = 'dense_3_relu')(x)
  x = layers.Dropout(rate = 0.2)(x)

  # Classifier
  output = layers.Dense(units = output_dim + 1, activation = 'softmax')(x)


  # Model
  model = keras.Model(input_spectrogram, output, name = 'IPA_ASR')
  # Optimizer
  optimizer = keras.optimizers.Adam(learning_rate = 1e-3)
  model.compile(optimizer = optimizer, loss = CTCLoss)

  return model


execute; create model

In [None]:
model = build_model(
    input_dim = fft_length // 2 + 1, ## we already have mfcc features
    output_dim = len(ipa2idx),
    rnn_units = 128,
)

model.summary(line_length = 100)

In [None]:
# class CNNEncoder(nn.Module):
  #def __init__(self, in_channels = 3, out_channels = 16, latent_dim = 200,
  #            act_fn = nn.ReLU(), dropout_p = 0.1):
  #  super(CNNEncoder, self).__init__()
    # define attributes of self
  #  self.in_channels = in_channels
  #  self.out_channels = out_channels
  #  self.latent_dim = latent_dim
  #  self.act_fn = act_fn

    # define layers of self
  #  self.conv1 = nn.Conv2d(self.in_channels, self.out_channels,
  #                         kernel_size = 3,
  #                         stride = 1,
  #                         padding = 1, )
  #  self.conv2 = nn.Conv2d(self.in_channels, self.out_channels,
  #                         kernel_size = 3,
  #                         stride = 1,
  #                        padding = 1),
  #  self.pool1 = nn.MaxPool2d(kernel_size = 5,
  #                            stried = 2)


  #  def forward(self, inputs):
    # First iteration
  #  x = self.conv2(self.conv1(inputs))
  #  x = self.act_fn(self.pool1(x))
    # Second interation
  #  x = self.conv2(self.conv1(x))
  #  x = self.act_fn(self.pool1(x))
  #  return x


execute and reivew

#### call relevant dictionaries

In [None]:
# ipa2idx dictionary
# should include sos '<', eos '>', pad '<pad>'

# English to Yoruba dictionary
# Yoruba to English dictionary

#### train
set up device

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

define train function

In [None]:
# 1. Load & Pad data
# 2. Initialize the Network
# 2. Train / dev split
# 3. Build vocabularies
# 4. DataLoaders
# 5. Instantiate & Train
# 6. Inference on test
# 7. post-hoc decode