In [1]:
import os 
import torch 
import torch.nn as nn 


In [2]:
import torch.utils.data as data
import torch.optim as optim 
import torch.nn.functional as F


In [3]:
import torchaudio

In [4]:
import numpy as np

In [5]:
def avg_wer(wer_scores, combined_ref_len):
    return float(sum(wer_scores)) / float(combined_ref_len)

In [7]:
def _levenshtein_distance(ref, hyp):
    """
    Levenshtein distance is used for measruging the difference between two seq. 
    It is the minimum number of single character editds ( subs, insert, delete,) req to change one word into other.
    
    """
    m = len(ref)
    n = len(hyp)
    
    if ref == hyp :
        return   0
    if m == 0 :
         return n 
    if n == 0:
        return m 
    
    if (m < n):
        ref, hyp = hyp, ref
        m, n = n, m
        
    distance = np.zeros((2, n+1), dtype=np.int32)
    
    for j in range(0, n+1):
        distance[0][j] =1 
        
    for i in range(1, m+1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = (i - 1) % 2
        distance[cur_row_idx][0] = 1 
        for j in range(1, n+1):
            if(ref[i-1] == hyp[j-1]):
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1
                i_num = distance[cur_row_idx][j - 1] + 1
                d_num = distance[prev_row_idx][j] + 1
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)

    return distance[m % 2][n]




In [8]:
def word_errors(reference, hypothesis, ignore_case = False, delimiter = ' '):
    """
    Computer Levenshtein distance between reference seq and hyp. seq un word level.
    parameters:
        :reference: The reference sentence 
        :hypothesis: The hypothesis sentence 
        :ignore_case: Whether case sensitive or not
        : delimeter: Delimer of input setence 
        :return: Lev Distance and word number of ref Sentence:
    """
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()
        
        
    ref_words = reference.split(delimiter)
    hyp_words = hypothesis.split(delimiter)
    
    
    edit_distance = _levenshtein_distance(ref_words, hyp_words)
    return float(edit_distance), len(ref_words)


In [10]:
def char_errors(reference, hypothesis, ignore_case=False, ignore_space=False):
    """
    Computing ref distance between reference and hyptheses seq in char level.
    
    Paramters:
    reference: reference sentence 
    hypothesos: hypothesis sentence 
    ignore_case: Case sensitive or not 
    remove_space: remove internal space characters
    return lev distance and length of reg setnece
    """
    
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()
        
        
    join_char = ' '
    if remove_space == True:
        join_char = ''
    reference = join_char.join(filter(None, reference.split(' ')))
    hypothesis = join_char.join(filter(None, hypothesis.split(' ')))
    
    edit_distance = _levenshtein_distance(reference, hypothesis)
    return float(edit_distance), len(reference)

In [11]:
def wer(reference, hypothesis, ignore_case = False, delimiter = ' '):
    
    """
    Calculating the word error rate. 
    It compares rthe referenxce text and hyp text. 
    
    WER is defined as :
    WER = ( Sw + Dw + Iw) / Nw
    
    Where:
        Sw: Number of Substitued words.
        Dw: Number of Deleted words 
        Iw: Number of Inserted words
        Nw: Number of words in reference.
    """
    
    edit_distance, ref_len = word_errors(reference, hypothesis, ignore_case, delimiter)
    
    if ref_len == 0:
        raise ValueError("Refernec word number must be greater than 0")
    wer = float(edit_distance)/ ref_len
    return wer 


In [12]:
def cer(reference, hypothesis, ignore_case=False, remove_space=False):
    """
    Calculating the Character error rarte. 
    It is defined as :
    CER = (Sc + Dc + Ic) / Nc
    Where 
    Sc: Number of characters substitued
    Dc: number of characters deleted
    Ic: Number of characters inserted.
    Nc: Numvber of characrer in ref.
    """
    
    if ref_len == 0:
        raise ValueError("Refernec word number must be greater than 0")
        
    cer = float(edit_distance) / ref_len
    return cer
        

In [16]:
class TextTransform:
    """
    Maps characters to integers and vice versa
    
    """
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        d 5
        e 6
        f 7
        g 8
        h 9
        i 10
        j 11
        k 12
        l 13
        m 14
        n 15
        o 16
        p 17
        q 18
        r 19
        s 20
        t 21
        u 22
        v 23
        w 24
        x 25
        y 26
        z 27
        """
        
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split("\n"):
            ch, index = line.split()
            self.char_map[ch]  = int(index)
            self.index_map[int(index)] = ch 
            
        self.index_map[1]  =  ' '
        
        
        
    def text_to_int(self, text):
        """
        Using character mapping , convert the text to integer seq
        
        """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
                
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
            
        return int_sequence
    
    
    def int_to_text(self, labels):
        """
        Using char map convert intefer label to text
        """
        string = []
        for i in labels:
            string.append(self.char_map[[i]])
            
        return ''.join(string).replace('<SPACE>', ' ')
    

In [19]:
train_audio_transform = nn.Sequential(
                            torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
                            torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
                            torchaudio.transforms.TimeMasking(time_mask_param=100)
)

valid_audio_transform = torchaudio.transforms.MelSpectrogram()





In [18]:
text_transforms  = TextTransform()

In [41]:
def data_processing(data, data_type='train'):
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths =[]
    for(waveform, _, utterance, _, _, _) in data:
        if(data_type=='train'):
            spec = train_audio_transform(waveform).squeeze(0).transpose(0, 1)
        elif (data_type == 'valid'):
            spec = valid_audio_transform(waveform).squeeze(0).transpose(0,1)
        else:
            raise Exception('data_type should be train or valid')
            
        spectrograms.append(spec)
        label = torch.Tensor(text_transforms.text_to_int(utterance.lower()))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))
        
    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2,3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)
    
    return spectrograms, labels, input_lengths, label_lengths

In [42]:
def GreedyDecoder(output, labels, label_lengths, blank_label = 28,collapse_repeated=True):
    arg_maxexs = torch.argmax(output, dim=2)
    decodes = []
    targets  = []
    for i, args in enumerate(arg_maxexs):
        decode = []
        targets.append(text_transforms.text_to_int(labels[i][:label_lengths[i]].tolist()))
        for j, index in enumerate(args):
            if(index!= blank_label):
                if (collapse_repeated and j != 0 and index == args[j-1]):
                    continue 
                decode.append(index.item())
        decodes.append(text_transforms.int_to_text(decode))
        
    return decodes, target

# Model 

In [43]:
class CNNLayerNorm(nn.Module):
    """
    Layer norm for cnn input
    """
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)
        
    def forward(self, x):
#         x (batch, channel, feature, time)
        x = x.tranpose(2, 3).contiguous() #      batch ,channel, time, feature 
        x = self.layer_norm(x)
        return x.tranpose(2,3).contiguous()

In [44]:
class ResidualCNN(nn.Module):
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()
        
        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding= kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_nomr1 = CNNLayerNorm(n_feats)
        self.layer_nomr2 = CNNLayerNorm(n_feats)
        
    def forward(self, x):
        residual = x 
        x = self.layer_nomr1(x)
        x = F.gelu(x)
        x = self.dropout1(1)
        x  = self.cnn1(x)
        
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)

    

In [45]:
class BidirectionalGRU(nn.Module):
    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()
        
        self.BiGRU = nn.GRU(
            input_size = rnn_dim, 
            hidden_size = hidden_size,
            num_layers  = 1,
            batch_first = batch_first,
            bidirectional = True
        )
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)
        
        
    def forward(self, x):
        x  = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        
        return x 

In [46]:
class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 


class ResidualCNN(nn.Module):
    """Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf
        except with layer norm instead of batch norm
    """
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)


class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x


class SpeechRecognitionModel(nn.Module):
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x


In [54]:
class IterMeter(object):
    """
    Keeping track of total iterations
    """
    def __init__(self):
        self.val =  0 
        
    def step(self):
        self.val += 1 
        
    def get(self):
        return self.val

In [58]:
def train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter):
    model.train()
    data_len = len(train_loader.dataset)
    for batch_idx, _data in enumerate(train_loader):
        spectrograms, labels, input_lengths, label_lengths = _data 
        spectrograms, labels = spectrograms.to(device), labels.to(device)
        
        optimizer.zero_grad()
        output = model(spectrograms)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1)
        
        loss = criterion(output, labels, input_lengths, label_lengths)
        loss.backward()
        
        optimizer.step()
        scheduler.step()
        iter_meter.step()
        
        if batch_idx%100 ==0 or batch_idx == data_len:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(spectrograms), data_len,
                    100. * batch_idx / len(train_loader), loss.item()))

    

In [59]:
def test(model, device, test_loader, criterion, epoch, iter_meter):
    print("\n Evaluation \n")
    model.eval()
    test_loss = 0 
    test_cer, test_wer = [], []
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data
            spectrograms, labels = spectrograms.to(device), labels.to(device)
            
            output = model(spectrograms)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1)
            
            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item()  / len(test_loader)
            
            decoded_preds, decoded_targets = GreedyDecoder(output.tranpose(0,1), labels, label_lengths)
            
            for j in range(len(decoded_preds)):
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))
                
    avg_cer = sum(test_cer)/len(test_cer)
    avg_wer = sum(test_wer)/len(test_wer)
    print('Test set: Average loss: {:.4f}, Average CER: {:4f} Average WER: {:.4f}\n'.format(test_loss, avg_cer, avg_wer))


In [60]:
def main(learning_rate = 5e-4, batch_size=20, epochs=10,train_url= "dev-clean",
        test_url = "test-clean"):
    hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": epochs
    }
    
    use_cuda = torch.cuda.is_available()
    torch.manual_seed(7)
    device = torch.device("cuda" if use_cuda else "cpu")

    if not os.path.isdir("./data"):
        os.makedirs("./data")
        
        
    train_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=train_url, download=True)
    test_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=test_url, download=True)

    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=True,
                                collate_fn=lambda x: data_processing(x, 'train'),
                                **kwargs)
    test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=False,
                                collate_fn=lambda x: data_processing(x, 'valid'),
                                **kwargs)

        
        
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
    
    
    optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
    criterion = nn.CTCLoss(blank=28).to(device)
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                            steps_per_epoch=int(len(train_loader)),
                                            epochs=hparams['epochs'],
                                            anneal_strategy='linear')
    
    iter_meter = IterMeter()
    for epoch in range(1, epochs + 1):
        train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter)
        test(model, device, test_loader, criterion, epoch, iter_meter)

In [62]:
learning_rate = 5e-4
batch_size = 10
epochs = 10
libri_train_set = "dev-clean"
libri_test_set = "test-clean"

main(learning_rate, batch_size, epochs, libri_train_set, libri_test_set)



KeyboardInterrupt: 

In [47]:
train_url="dev-clean"
train_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=train_url, download=True)


In [48]:
train_dataset

<torchaudio.datasets.librispeech.LIBRISPEECH at 0x15d40dca0>

In [49]:

hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": 0.01,
        "batch_size": 20,
        "epochs": 10
    }

use_cuda = torch.cuda.is_available()

kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=True,
                                collate_fn=lambda x: data_processing(x, 'train'),
                                **kwargs)

In [53]:
for batch_idx, _data in enumerate(train_loader):
    spectrograms, labels, input_lengths, label_lengths = _data 
    print(labels.shape)
    break


torch.Size([20, 309])
