In [19]:
import json
import kaldiio
import numpy as np
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Dropout, BatchNormalization
from tensorflow.keras import Model

In [2]:
train_json_file = "/home/neo/MS/espnet/egs/timit/asr1/dump/train_nodev/deltafalse/data.json"
test_json_file = "/home/neo/MS/espnet/egs/timit/asr1/dump/test/deltafalse/data.json"
dev_json_file = "/home/neo/MS/espnet/egs/timit/asr1/dump/train_dev/deltafalse/data.json"

In [3]:
with open(train_json_file, 'rb') as f:
    train_json = json.load(f)['utts']
with open(test_json_file, 'rb') as f:
    test_json = json.load(f)['utts']
with open(dev_json_file, 'rb') as f:
    dev_json = json.load(f)['utts']

merged = {**train_json, **test_json, **dev_json}

In [4]:
!wget https://raw.githubusercontent.com/sknadig/kaldi/port_aligns/egs/timit/s5/alignments
with open("alignments", "r") as f:
    contents = f.readlines()

In [5]:
frame_level_dict = {}
for row in contents:
    row = row.strip().split(" ")
    uttid = row[0]
    alignments = row[1:]
    frame_level_dict[uttid] = alignments

utt_labels = frame_level_dict.values()
flat_labels = [val for sublist in utt_labels for val in sublist]

In [6]:
uniq_labels = list(sorted(set(flat_labels)))
phone_to_int = dict(zip(uniq_labels, np.arange(len(uniq_labels))))

In [7]:
def do_cmvn(features):
    mean = np.mean(features, axis=0)
    std = np.std(features, axis=0)
    normalized = (features - mean)/ std
    return normalized

In [8]:
full_features = []
full_labels = []
for i, uttid in enumerate(list(frame_level_dict.keys())):
    for f in do_cmvn(kaldiio.load_mat(merged[uttid]['input'][0]['feat'])):
        full_features.append(f)
    for l in np.array([phone_to_int[ele] for ele in frame_level_dict[uttid]]):
        full_labels.append(l)

In [9]:
full_features = np.array(full_features)
full_labels = np.array(full_labels)

In [10]:
X_train, X_test, y_train, y_test = train_test_split(full_features, full_labels, test_size=0.2, random_state=1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=1)

In [11]:
def get_one_train_example():
    for i in range(len(y_test)):
        yield X_train[i], y_train[i]
train_dataset = tf.data.Dataset.from_generator(get_one_train_example, (tf.float32, tf.uint16))
train_dataset = train_dataset.shuffle(buffer_size=1000)
train_dataset = train_dataset.batch(100)
train_dataset = train_dataset.repeat(count=2)
# iterator = dataset.make_one_shot_iterator()
# x,y = iterator.get_next()

In [12]:
def get_one_test_example():
    for i in range(len(y_test)):
        yield X_test[i], y_test[i]
test_dataset = tf.data.Dataset.from_generator(get_one_test_example, (tf.float32, tf.uint16))
test_dataset = test_dataset.shuffle(buffer_size=1000)
test_dataset = test_dataset.batch(100)
test_dataset = test_dataset.repeat(count=2)
# iterator = dataset.make_one_shot_iterator()
# x,y = iterator.get_next()

In [13]:
class MyModel(Model):
    def __init__(self):
        super(MyModel, self).__init__()
        self.l1 = Dense(512, activation='relu')
        self.l2 = Dense(512, activation='relu')
        self.l3 = Dense(512, activation='relu')
        self.l4 = Dense(512, activation='relu')
        self.l5 = Dense(512, activation='relu')
        self.l6 = Dense(39, activation='softmax')

    def call(self, x):
        x = self.l1(x)
        x = self.l2(x)
        x = self.l3(x)
        x = self.l4(x)
        x = self.l5(x)
        return self.l6(x)

# Create an instance of the model
model = MyModel()

In [14]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()

optimizer = tf.keras.optimizers.Adam()

In [15]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

In [16]:
@tf.function
def train_step(features, labels):
    with tf.GradientTape() as tape:
        predictions = model(features)
        loss = loss_object(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_loss(loss)
    train_accuracy(labels, predictions)

In [17]:
@tf.function
def test_step(features, labels):
    predictions = model(features)
    t_loss = loss_object(labels, predictions)

    test_loss(t_loss)
    test_accuracy(labels, predictions)

In [20]:
EPOCHS = 20

for epoch in range(EPOCHS):
    for features, labels in train_dataset:
        train_step(features, labels)

    for test_features, test_labels in test_dataset:
        test_step(test_features, test_labels)

    template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch+1,
                        train_loss.result(),
                        train_accuracy.result()*100,
                        test_loss.result(),
                        test_accuracy.result()*100))

    # Reset the metrics for the next epoch
    train_loss.reset_states()
    train_accuracy.reset_states()
    test_loss.reset_states()
    test_accuracy.reset_states()

Epoch 1, Loss: 1.1696714162826538, Accuracy: 61.846763610839844, Test Loss: 1.7637540102005005, Test Accuracy: 52.72365951538086
Epoch 2, Loss: 1.1007099151611328, Accuracy: 63.748531341552734, Test Loss: 1.8843151330947876, Test Accuracy: 52.1712646484375
Epoch 3, Loss: 1.0425162315368652, Accuracy: 65.32373046875, Test Loss: 1.9912570714950562, Test Accuracy: 51.93873596191406
Epoch 4, Loss: 0.9964647889137268, Accuracy: 66.80451202392578, Test Loss: 2.1132431030273438, Test Accuracy: 51.54876708984375
Epoch 5, Loss: 0.9522615671157837, Accuracy: 68.12535095214844, Test Loss: 2.232412338256836, Test Accuracy: 51.48173141479492
