### Imports

In [1]:
import numpy as np

import tensorflow as tf
from tensorflow.keras.layers import LSTM, TimeDistributed, Activation, Bidirectional, ConvLSTM2D, Attention, Dense, Flatten, MaxPool3D, MaxPool2D,BatchNormalization, Conv3D, GRU
from tensorflow.keras import Model
from tensorflow.keras.backend import ctc_batch_cost, ctc_decode, ctc_label_dense_to_sparse, get_value
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Input

import Levenshtein as Lev
import sys
from string import ascii_uppercase

from data_generator import DataGenerator

In [2]:
print(tf.__version__)

2.0.0


In [3]:
tf.test.is_gpu_available()

True

### Paths

In [4]:
train_path = "./LibriSpeech100/train/train_all/"
dev_path = "./LibriSpeech100/dev/dev_all/"
test_path = "./LibriSpeech100/test/test_all/"

### Create DataGenerator objects

In [5]:
train_data = DataGenerator(train_path)
val_data = DataGenerator(dev_path)
test_data = DataGenerator(test_path)

### Word Error Rate

In [10]:
def wer(s1, s2):

    s1 =s1.lower()
    s2 =s2.lower()
    b = set(s1.lower().split() + s2.lower().split())
    
    word2char = dict(zip(b, range(len(b))))

    w1 = [chr(word2char[w]) for w in s1.split()]
    w2 = [chr(word2char[w]) for w in s2.split()]
    return Lev.distance(''.join(w1), ''.join(w2))/float(len(s2.split()))

### Convert a tensor array to sentence

In [11]:
def indices_to_string(indices):
#     print(indices)
    space_token = ' '
    end_token = '>'
    blank_token = '%'
    apos_token = '\''
        
    alphabet = list(ascii_uppercase) + [space_token, apos_token, blank_token, end_token] 

    sentence = ''
    for idx in indices:
        sentence += alphabet[idx]
    
    return sentence

# Model Architecture

In [12]:
class Encoder(Model):
    def __init__(self, op_dim = 30):
        super(Encoder, self).__init__()
        self.rnn = Bidirectional(LSTM(20, return_sequences= True))
        self.batchnorm = BatchNormalization()
        
        
    def call(self, inputs):
        x = self.rnn(inputs)
        x = self.batchnorm(x)
        return x


In [13]:
class ASRModel(Model):
    def __init__(self, op_dim = 30):
        super(ASRModel, self).__init__()
        self.encoder = Encoder()
        self.rnn = Bidirectional(LSTM(20, return_sequences= True))
        self.batchnorm = BatchNormalization()
        self.time_dense = TimeDistributed(Dense(op_dim))
        self.activation = Activation('softmax')

    def call(self, inputs):
        x = self.encoder(inputs)
        x = self.rnn(x)
        x = self.batchnorm(x)
        x = self.time_dense(x)
        x = self.activation(x) 
        return x

### Build Model

In [14]:
model = ASRModel()
model.build(input_shape = (None, None, 20))
optimizer = tf.keras.optimizers.Adam()

In [15]:
model.summary()

Model: "asr_model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
encoder (Encoder)            multiple                  6720      
_________________________________________________________________
bidirectional_1 (Bidirection multiple                  9760      
_________________________________________________________________
batch_normalization_1 (Batch multiple                  160       
_________________________________________________________________
time_distributed (TimeDistri multiple                  1230      
_________________________________________________________________
activation (Activation)      multiple                  0         
Total params: 17,870
Trainable params: 17,710
Non-trainable params: 160
_________________________________________________________________


### One Validation Step

In [16]:
def validate(model, x, y_true, input_len, label_len, y_strings, test = False):
    input_len = np.expand_dims(input_len, axis = 1)
    label_len = np.expand_dims(label_len, axis = 1)
    
    y_pred = model(x)
    loss = ctc_batch_cost(y_true, y_pred, input_len, label_len)
    
    input_len = np.squeeze(input_len)
    y_decode = ctc_decode(y_pred, input_len)[0][0]
    
    accuracy = 0.0
    
    for i in range(len(y_strings)):
        predicted_sentence = indices_to_string(y_decode[i])
#             print(predicted_sentence)
        accuracy += wer(predicted_sentence, y_strings[i])
        
        if test:
            print("Correct Sentence:", y_strings[i])
            print("Predicted Sentence:", predicted_sentence)
    
    return tf.reduce_mean(loss), accuracy/len(y_strings)    

### Evaluation

In [17]:
def model_evaluate(model, val_ds, test = False):
    val_step = 0
    val_loss = 0.0
    val_accuracy = 0.0
            
    for inputs, y in val_ds:
        x, y_strings, ip_len, label_len = inputs
        val_step += 1       
        loss, accuracy = validate(model, x, y, ip_len, label_len, y_strings, test)
        val_loss += loss
        val_accuracy += accuracy
                
    val_loss /= val_step
    val_accuracy /= val_step

    tf.print(' Validation Loss:', val_loss, ' Validation WER: ', val_accuracy)
    
    return val_loss, val_accuracy

### One Training Step

In [18]:
def train_one_step(model, optimizer, x, y_true, input_len, label_len, y_strings):
#     print('------------------------------')
#     print(x.shape)
#     print(y.shape)
#     print(input_len.shape)
#     print(label_len.shape)
    
    input_len = np.expand_dims(input_len, axis = 1)
    label_len = np.expand_dims(label_len, axis = 1)
#     print(input_len.shape)
#     print(label_len.shape)
            
    with tf.GradientTape() as tape:
        y_pred = model(x)
#         print(y_pred.shape)
        loss = ctc_batch_cost(y_true, y_pred, input_len, label_len)
    
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    
    input_len = np.squeeze(input_len)
    y_decode = ctc_decode(y_pred, input_len)[0][0]
    
#         print(y_decode)
#         print(len(y_strings))
    
    accuracy = 0.0
    
    for i in range(len(y_strings)):
        predicted_sentence = indices_to_string(y_decode[i])
#             print(predicted_sentence)
        accuracy += wer(predicted_sentence, y_strings[i])
            
    return tf.reduce_mean(loss), accuracy/len(y_strings)

### Training

In [19]:
def model_fit(model, optimizer, train_ds, manager, val_ds = None,epochs=20):
    
    losses = []
    accuracies = []
    val_losses = []
    val_acc = []
    
    
    for epoch in range(epochs):
        step = 0
        epoch_loss = 0.0
        epoch_accuracy = 0.0
        for inputs, y in train_ds:
            x, y_strings, ip_len, label_len = inputs
            step += 1
            loss, accuracy = train_one_step(model, optimizer, x, y, ip_len, label_len, y_strings)
            epoch_loss += loss
            epoch_accuracy += accuracy
            if step % 78 == 0:
                print(step)
                
            
        epoch_loss /= step
        epoch_accuracy /= step
        
        losses.append(epoch_loss)
        accuracies.append(epoch_accuracy)
        
        tf.print('Epoch: ', epoch+1, ' Loss:', epoch_loss, ' WER: ', epoch_accuracy)
        
        
        if val_ds:
            val_loss, val_accuracy = model_evaluate(model, val_ds)
            val_losses.append(val_loss)
            val_acc.append(val_accuracy)
            
        if (epoch + 1) % 2 == 0:
            manager.save()
        
                
    if not val_ds:    
        return losses, accuracies
    
    return losses, accuracies, val_losses, val_acc

### Checkpoint

In [20]:
ckpt_dir = './training_checkpoints'
ckpt = tf.train.Checkpoint(optimizer=optimizer, model = model)
manager = tf.train.CheckpointManager(ckpt, ckpt_dir, max_to_keep = 2)    

In [None]:
losses, accuracies, val_losses, val_acc = model_fit(model, optimizer, train_data, manager, val_ds = val_data)

Instructions for updating:
Create a `tf.sparse.SparseTensor` and use `tf.sparse.to_dense` instead.
78
156
234
312
390
468
546
Epoch:  1  Loss: 554.328613  WER:  1.0
 Validation Loss: 320.307098  Validation WER:  1.0
78
156
234
312
390
468
546
Epoch:  2  Loss: 524.982666  WER:  1.0
 Validation Loss: 313.438904  Validation WER:  1.0
78
156


### Restore Checkpoint and Test Model

In [None]:
cpkt.restore(manager.latest_checkpoint)

_, acc = model_evaluate(model, test_data, test=True)

In [None]:
print(acc)