In [None]:
import torchaudio
import torch
train_dataset = torchaudio.datasets.LIBRISPEECH("./", url="train-clean-100", download=True)
test_dataset = torchaudio.datasets.LIBRISPEECH("./", url="test-clean", download=True)

In [None]:
char_map = {}
characters = ['<SPACE>', "'"] + [chr(i) for i in range(97, 123)]  # Add <SPACE> and ' along with lowercase alphabets

for i, char in enumerate(characters):
    char_map[char] = i

print(char_map)


In [None]:
import torch.nn as nn
class TextProcessing:
    def __init__(self, char_map):
        self.char_map = char_map
        self.index_map = {i: char for char, i in char_map.items()}
        self.index_map[0] = ' '

    def text_to_int(self, text):
        # Encode text to number sequence using list comprehension
        return [self.char_map.get(c, self.char_map['<SPACE>']) for c in text]

    def int_to_text(self, labels):
        # Decode number sequence to text using list comprehension
        return ''.join(self.index_map[i] for i in labels).replace('<SPACE>', ' ')

train_transfroms = nn.Sequential(
        torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
        torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
        torchaudio.transforms.TimeMasking(time_mask_param=100))
    
trusted_audio_transforms = torchaudio.transforms.MelSpectrogram()
text_transform = TextProcessing(char_map)

In [None]:
def data_processing(data, data_type="train"):
    # Initialize lists to hold spectrograms, labels, input lengths, and label lengths
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []
    # Loop through each data item
    for (waveform, _, utterance, _, _, _) in data:
        # Apply the appropriate transformation to the waveform based on the data type
        if data_type == 'train':
            spec = train_transfroms(waveform).squeeze(0).transpose(0, 1)
        else:
            spec = trusted_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        # Append the transformed spectrogram to the list
        spectrograms.append(spec)
        # Convert the utterance to integers and append to the labels list
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        # Append the length of the spectrogram and label to their respective lists
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))
    # Pad the sequences of spectrograms and labels so they all have the same length
    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

    # Return the spectrograms, labels, and their lengths
    return spectrograms, labels, input_lengths, label_lengths

Neural network def:

In [None]:
class STTModel(nn.Module):
    def __init__(self, cnn_layer, rnn_layers, rnn_dim,n_class, n_feats, stride=2, dropout=0,device='cuda:0'):
        super(STTModel, self).__init__()
        n_feats = n_feats//2
        # cnn for extracting heirachal features
        self.cnn = nn.Conv2d(1,32,3, stride=stride, padding=3//2)
        # residual cnn for extracting heirachal features and reducing the sequence length
        self.resnetlayer = []
        for i in range(cnn_layer):
            self.resnetlayer.append({
                'cnn1':nn.Conv2d(32, 32, 3, 1, 1).to(device),
                'cnn2':nn.Conv2d(32, 32, 3, 1, 1).to(device),
                'ln1':nn.LayerNorm(n_feats).to(device),
                'ln2':nn.LayerNorm(n_feats).to(device),
                'actF':nn.ReLU(inplace=True).to(device),
                'dp':nn.Dropout(dropout).to(device)}
            )
        self.fully_connected = nn.Linear(32*n_feats, rnn_dim)
        #LSTM for sequence modeling
        self.LSTM = []
        for i in range(rnn_layers):
            self.LSTM.append({
                'lstm':nn.LSTM(input_size=rnn_dim if i==0 else rnn_dim*2, hidden_size=rnn_dim,
                        num_layers=1, batch_first=i==0, bidirectional=True).to(device),
                'ln':nn.LayerNorm(rnn_dim if i==0 else rnn_dim*2).to(device),
                'dp':nn.Dropout(dropout).to(device),
                'actF':nn.ReLU(inplace=True).to(device)}
            )
        #classifier
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        #
        for res in self.resnetlayer:
            x = self.res_forward(x,res)
        #
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])
        x = x.transpose(1, 2)
        x = self.fully_connected(x)
        #
        for lstm in self.LSTM:
            x = self.lstm_foward(x,lstm)
        #
        x = self.classifier(x)
        return x
    
    def res_forward(self, x,layer):
        residual = x
        out = layer['cnn1'](x)#cnn1
        out = out.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        out = layer['ln1'](out)#layernorm1
        out = out.transpose(2, 3).contiguous() # (batch, channel, feature, time)
        out = layer['actF'](out)#relu
        out = layer['dp'](out)#dropout
        out = layer['cnn2'](out)#cnn2
        out = out.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        out = layer['ln2'](out)#layernorm2
        out= out.transpose(2, 3).contiguous() # (batch, channel, feature, time)
        out += residual
        out = layer['actF'](out)#relu
        out = layer['dp'](out)#dropout
        return out
    
    def lstm_foward(self ,x ,layer):
        x = layer['ln'](x)#layernorm
        x = layer['actF'](x)#relu
        x, _ = layer['lstm'](x)#lstm
        x = layer['dp'](x)#dropout
        return x


Training/Testing

In [None]:
def outputDecoder(output, labels, label_lengths, blank_label=28, collapse_repeated=True):
    arg_maxes = torch.argmax(output, dim=2)
    # Use list comprehension to generate targets
    targets = [text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()) for i in range(len(arg_maxes))]
    # Use list comprehension to generate decodes
    decodes = []
    for i, args in enumerate(arg_maxes):
        decode = [index.item() for j, index in enumerate(args) if index != blank_label and not (collapse_repeated and j != 0 and index == args[j -1])]
        decodes.append(text_transform.int_to_text(decode))

    return decodes, targets

In [None]:
from Levenshtein import distance as levenshtein_distance

def process_strings(target, predicted, delm=' ', remove_space=False):
    target = target.lower()
    predicted = predicted.lower()
    if remove_space:
        target = target.replace(' ', '')
        predicted = predicted.replace(' ', '')
    target = target.split(delm)
    predicted = predicted.split(delm)
    return target, predicted

def levenshtein_errors(target, predicted, remove_space=False, delm=' '):
    target, predicted = process_strings(target, predicted, delm, remove_space)
    distance = levenshtein_distance(target, predicted)
    return distance, len(target)

def cer(target, predicted, remove_space=False):
    distance, target_length = levenshtein_errors(target, predicted, remove_space)
    return distance / target_length

def wer(target, predicted, delm=' '):
    distance, target_length = levenshtein_errors(target, predicted, False, delm)
    return distance / target_length

In [None]:
import torch.nn.functional as functional
import matplotlib.pyplot as plt
import numpy as np

def calculate_loss_and_backpropagate(model, spectrograms, labels, input_lengths, label_lengths, criterion, optimiser, scheduler):
    optimiser.zero_grad()
    output = model(spectrograms)
    output = functional.log_softmax(output, dim=2)
    output = output.transpose(0, 1)
    loss = criterion(output, labels, input_lengths, label_lengths)
    loss.backward()
    optimiser.step()
    scheduler.step()
    return loss, output

def calculate_errors(decoded_preds, decoded_targets):
    errors = [cer(decoded_targets[j], decoded_preds[j]) for j in range(len(decoded_preds))]
    return errors

def train(model, device, train_loader, criterion, optimiser, scheduler, epoch):
    model.train()
    data_len = len(train_loader.dataset)
    train_loss = 0
    train_errors = []
    
    ten_percent_of_data = len(train_loader) // 10
    for batch_idx, _data in enumerate(train_loader):
        if batch_idx > ten_percent_of_data:
            print(f"Train Epoch: {epoch} [{batch_idx * len(spectrograms)}/{data_len} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")
            break
        spectrograms, labels, input_lengths, label_lengths = _data
        spectrograms, labels = spectrograms.to(device), labels.to(device)

        loss, output = calculate_loss_and_backpropagate(model, spectrograms, labels, input_lengths, label_lengths, criterion, optimiser, scheduler)
        train_loss += loss.item()

        decoded_preds, decoded_targets = outputDecoder(output.transpose(0, 1), labels, label_lengths)
        train_errors.extend(calculate_errors(decoded_preds, decoded_targets))

        if batch_idx % 100 == 0 or batch_idx == data_len:
            print(decoded_preds)
            print(decoded_targets)
            print(f"Train Epoch: {epoch} [{batch_idx * len(spectrograms)}/{data_len} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

    train_loss /= len(train_loader)
    train_error = sum(train_errors) / len(train_errors)
    print(f"Train Epoch: {epoch}\tAverage Loss: {train_loss:.6f}\tAverage Error: {train_error:.6f}")

    return train_loss, train_error

def test(model, device, test_loader, criterion, epoch):
    model.eval()
    test_loss = 0
    test_cer, test_wer = [], []
    test_errors = []  # Track test errors
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            if i == np.random.randint(1, 10):
                break
            spectrograms, labels, input_lengths, label_lengths = _data
            spectrograms, labels = spectrograms.to(device), labels.to(device)
            output = model(spectrograms)
            output = functional.log_softmax(output, dim=2)
            output = output.transpose(0, 1)
            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)
            decoded_preds, decoded_targets = outputDecoder(output.transpose(0,1), labels, label_lengths)
            for j in range(len(decoded_preds)):
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))
    
            test_error = sum(test_cer) / len(test_cer)  # Calculate test error
            test_errors.append(test_error)  # Append test error to the list
    
    print(f"Test Epoch: {epoch}\tAverage Loss: {test_loss:.6f}\tAverage Error: {sum(test_errors)/len(test_errors):.6f}")
    epochs = [i for i in range(1, len(test_errors) + 1)] 
    return test_loss, sum(test_errors)/len(test_errors)


In [None]:
import torch.utils.data as data

def initialize_data_loader(dataset, batch_size, shuffle, process_type, **kwargs):
    return data.DataLoader(dataset=dataset,
                           batch_size=batch_size,
                           shuffle=shuffle,
                           collate_fn=lambda x: data_processing(x, process_type),
                           **kwargs)

def initialize_model(params):
    return STTModel(params['cnnLayers'], 
                    params['lstmLayers'], 
                    params['rnnDim'], 
                    params['nClass'], 
                    params['nFeats'], 
                    params['stride'], 
                    params['dropout'])

learningRate = 5e-4
batchSize = 20
epochs = 100

params = {
    "cnnLayers" : 3,
    "lstmLayers" : 5,
    "rnnDim" : 256,
    "nClass" : 29,
    "nFeats" : 128,
    "stride" : 2,
    "dropout" : 0
}

torch.manual_seed(0)
device = torch.device('cuda:0')
kwargs = {'num_workers': 0, 'pin_memory': True}

trainLoader = initialize_data_loader(train_dataset, batchSize, True, 'train', **kwargs)
testLoader = initialize_data_loader(test_dataset, batchSize, False, 'test', **kwargs)

model = initialize_model(params)

optimizer = torch.optim.AdamW(model.parameters(), lr=learningRate)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=learningRate, steps_per_epoch=len(trainLoader), epochs=epochs)
criterion = nn.CTCLoss(blank=28).to(device)


In [None]:
model = model.to(device)

In [None]:
error_history = []
loss_history = []
train_error_history = []
train_loss_history = []

In [None]:
for epoch in range(1, epochs + 1):
    x,y = train(model, device, trainLoader, criterion, optimizer, scheduler, epoch)
    train_error_history.append(y)
    train_loss_history.append(x)
    a,b = test(model, device, testLoader, criterion, epoch)
    error_history.append(b)
    loss_history.append(a)
    if epoch % 10 == 0:
        torch.save(model.state_dict(), f"modelv2_epoch_{epoch}.pt")
        plt.plot(np.arange(1,len(error_history)+1,1), error_history)
        plt.xlabel('Epoch')
        plt.ylabel('Error')
        plt.title(f"Error at epoch: {epoch}")
        plt.show()

        plt.plot(np.arange(1,len(loss_history)+1,1), loss_history)
        plt.xlabel('Epoch')
        plt.ylabel('loss')
        plt.title(f"loss as epoch: {epoch}")
        plt.show()

In [None]:
import gc
gc.collect()  # run garbage collector
torch.cuda.empty_cache()

In [None]:
model.eval()

In [None]:
def Decoder(output, blank_label = 28, collapse_repeated = True):
    arg_maxes = torch.argmax(output, dim=2)
    decodes = []

    for i, args in enumerate(arg_maxes):
        decode = []

        for j, index in enumerate(args):
            if index != blank_label:
                if collapse_repeated and j != 0 and index == args[j -1]:
                    continue
                decode.append(index.item())
        decodes.append(text_transform.int_to_text(decode))
    return decodes


In [None]:
file = "./LibriSpeech/train-clean-100/19/227/19-227-0000.flac"
waveform, sample_rate = torchaudio.load(file)
print(waveform.shape)
inputTransforms = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)
input = inputTransforms(waveform).squeeze(0).transpose(0, 1)
print(input.shape)
input = input.view(128,-1)
input = input.unsqueeze(0).unsqueeze(0)
print(input.shape)
with torch.no_grad():
    output = model(input.to(device))
print(output)
print(output.shape)
text = Decoder(output)
print(text)
#output = model(spectrogram.to(device))

In [None]:
# Load the model
model.load_state_dict(torch.load(f"modelv2_epoch_90.pt"))
model.eval()

# Load the dataset
eval_dataset = torchaudio.datasets.LIBRISPEECH("./", url="train-clean-100", download=True)

# Define the input transforms
inputTransforms = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)
cers = []

# Evaluate the model on the dataset
for waveform, sample_rate, utterance, speaker_id, chapter_id, utterance_id in eval_dataset:
    # Apply the input transforms
    input = inputTransforms(waveform).squeeze(0).transpose(0, 1)
    input = input.view(128, -1)
    input = input.unsqueeze(0).unsqueeze(0)

    # Pass the input through the model
    with torch.no_grad():
        output = model(input.to(device))
    #print(output)
    # Decode the output
    decoded_output = Decoder(output)

    # Print the shape of the input and the decoded output
    #print(f'Input shape: {input.shape}')
    #print(f'Decoded output: {decoded_output}')
    # Calculate the CER
    #print(decoded_output)
    charErrorRate = cer(decoded_output[0], utterance)
    cers.append(charErrorRate)
    #print(charErrorRate*100)

# Convert the CERs to percentages
cers_percent = [cer * 100 for cer in cers]

# Plot a scatter plot of the CERs
plt.scatter(range(len(cers_percent)), cers_percent)
plt.xlabel('Sample')
plt.ylabel('CER (%)')
plt.title('Character Error Rate for each output')
plt.show()