In [None]:
!pip install torch>=1.2.0
!pip install torchaudio

In [None]:
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np


train_dataset = torchaudio.datasets.LIBRISPEECH("./", url="train-clean-100", download=True)

In [None]:
import os
import numpy as np

class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
      char_map_str = """
      ' 0
      <SPACE> 1
		  a 2
		  b 3
		  c 4
		  d 5
		  e 6
		  f 7
		  g 8
		  h 9
		  i 10
		  j 11
		  k 12
		  l 13
		  m 14
		  n 15
		  o 16
		  p 17
		  q 18
		  r 19
		  s 20
		  t 21
		  u 22
		  v 23
		  w 24
		  x 25
		  y 26
		  z 27
		  """

      self.char_map = {}
      self.index_map = {}
      for line in char_map_str.strip().split('\n'):
          ch, index = line.split()
          self.char_map[ch] = int(index)
          self.index_map[int(index)] = ch
      self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=81),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

text_transform = TextTransform()

def data_processing(data, data_type="train"):
  spectrograms = []
  labels = []
  input_lengths = []
  label_lengths = []
  for (waveform, _, utterance, _, _, _) in data:
    if data_type == 'train':
      spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
    elif data_type == 'valid':
      spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
    else:
      raise Exception('data_type should be train or valid')
    spectrograms.append(spec)
    label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
    labels.append(label)
    input_lengths.append(spec.shape[0]//2)
    label_lengths.append(len(label))

  spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
  labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

  return spectrograms, labels, input_lengths, label_lengths


def GreedyDecoder(output, labels, label_lengths, blank_label=28, collapse_repeated=True):
	arg_maxes = torch.argmax(output, dim=2)
	decodes = []
	targets = []
	for i, args in enumerate(arg_maxes):
		decode = []
		targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
		for j, index in enumerate(args):
			if index != blank_label:
				if collapse_repeated and j != 0 and index == args[j -1]:
					continue
				decode.append(index.item())
		decodes.append(text_transform.int_to_text(decode))
	return decodes, targets

In [None]:
class ActDropNormCNN1D(nn.Module):
    def __init__(self, n_feats, dropout, keep_shape=False):
        super(ActDropNormCNN1D, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = nn.LayerNorm(n_feats)
        self.keep_shape = keep_shape
    
    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.norm(self.dropout(F.gelu(x)))
        x = self.dropout(F.gelu(self.norm(x)))

        return x


class SpeechRecognition(nn.Module):
    def __init__(self, hidden_size, num_classes, n_feats, num_layers, dropout):
        super(SpeechRecognition, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.cnn = nn.Sequential(
            nn.Conv1d(n_feats, n_feats, 10, 2, padding=10//2),
            ActDropNormCNN1D(n_feats, dropout),
        )
        self.dense = nn.Sequential(
            nn.Linear(n_feats, 128),
            nn.LayerNorm(128),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(128, 128),
            nn.LayerNorm(128),
            nn.GELU(),
            nn.Dropout(dropout),
        )
        self.lstm = nn.LSTM(input_size=128, hidden_size=hidden_size,
                            num_layers=num_layers, dropout=0.0,
                            bidirectional=False)
        self.layer_norm2 = nn.LayerNorm(hidden_size)
        self.dropout2 = nn.Dropout(dropout)
        self.final_fc = nn.Linear(hidden_size, num_classes)

    def _init_hidden(self, batch_size):
        n, hs = self.num_layers, self.hidden_size
        return (torch.zeros(n*1, batch_size, hs),
                torch.zeros(n*1, batch_size, hs))

    def forward(self, x, hidden):
        x = x.squeeze(1)
        x = self.cnn(x) 
        x = self.dense(x) 
        x = x.transpose(0, 1) 
        out, (hn, cn) = self.lstm(x, hidden)
        x = self.dropout2(F.gelu(self.layer_norm2(out)))
        return self.final_fc(x), (hn, cn)

In [None]:
class IterMeter(object):
    """keeps track of total iterations"""
    def __init__(self):
        self.val = 0

    def step(self):
        self.val += 1

    def get(self):
        return self.val

In [None]:
def train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter):
    model.train()
    data_len = len(train_loader.dataset)
    for batch_idx, _data in enumerate(train_loader):
      spectrograms, labels, input_lengths, label_lengths = _data 
      spectrograms, labels = spectrograms.to(device), labels.to(device)
      bs = spectrograms.shape[0]
      hidden = model._init_hidden(bs)
      hn, c0 = hidden[0].to(device), hidden[1].to(device)
      optimizer.zero_grad()
      output,_ = model(spectrograms, (hn, c0))
      output = F.log_softmax(output, dim=2) 

      loss = criterion(output, labels, input_lengths, label_lengths)
      loss.backward()

      optimizer.step()
      scheduler.step()
      iter_meter.step()
      if batch_idx % 100 == 0 or batch_idx == data_len:
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(spectrograms), data_len,100. * batch_idx / len(train_loader), loss.item()))

In [None]:
use_cuda = torch.cuda.is_available()
torch.manual_seed(7)
device = torch.device("cuda" if use_cuda else "cpu")

if not os.path.isdir("./data"):
  os.makedirs("./data")

kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=20,
                                shuffle=True,
                                collate_fn=lambda x: data_processing(x, 'train'),
                                **kwargs)

In [None]:
hyper_parameters = {
        "num_classes": 29,
        "n_feats": 81,
        "dropout": 0.1,
        "hidden_size": 1024,
        "num_layers": 1
    }

model = SpeechRecognition(**hyper_parameters).to(device)

optimizer = optim.AdamW(model.parameters(), 0.001)
criterion = nn.CTCLoss(blank=28).to(device)
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001, 
                                            steps_per_epoch=int(len(train_loader)),
                                            epochs=10,
                                            anneal_strategy='linear')

iter_meter = IterMeter()
for epoch in range(1, 10 + 1):
  train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter)

torch.save(model.state_dict(), "./drive/MyDrive/model_one.pt")