In [2]:
import os
import json
import argparse

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
from collections import Counter
from tqdm.notebook import tqdm

from sklearn.model_selection import train_test_split, StratifiedKFold

from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import (
    LSTM, Bidirectional, Dense, Embedding)

os.environ["CUDA_VISIBLE_DEVICES"] = '0'

d = pd.read_csv("Datasets\TIMIT-dataset\data.csv")
d

Unnamed: 0,wav_paths
0,Datasets\TIMIT-dataset\data\DR1\FAKS0\SA1.wav
1,Datasets\TIMIT-dataset\data\DR1\FAKS0\SA2.wav
2,Datasets\TIMIT-dataset\data\DR1\FAKS0\SI1573_1...
3,Datasets\TIMIT-dataset\data\DR1\FAKS0\SI1573_2...
4,Datasets\TIMIT-dataset\data\DR1\FAKS0\SI2203.wav
...,...
6857,Datasets\TIMIT-dataset\data\DR8\MTCS0\SX172.wav
6858,Datasets\TIMIT-dataset\data\DR8\MTCS0\SX262.wav
6859,Datasets\TIMIT-dataset\data\DR8\MTCS0\SX352.wav
6860,Datasets\TIMIT-dataset\data\DR8\MTCS0\SX442.wav


In [3]:
def ArgParser():
    parser = argparse.ArgumentParser()
    
    parser.add_argument("--n_splits", dest="n_splits", type=int, default=5)
    parser.add_argument("--sample_rate", dest="sample_rate", type=int, default=16000)
    parser.add_argument("--n_fft", dest="n_fft", type=int, default=2048)
    parser.add_argument("--window_size", dest="window_size", type=int, default=400)
    parser.add_argument("--hop_length", dest="hop_length", type=int, default=160) # 160 samples = 10ms
    parser.add_argument("--n_mels", dest="n_mels", type=int, default=64)
    parser.add_argument("--max_samples", dest="max_samples", type=int, default=70000)
    
    args = parser.parse_known_args()[0]
    seq_len = int(np.ceil(args.max_samples / args.hop_length))
    parser.add_argument("--seq_len", type=int, default=seq_len)
    return parser.parse_known_args()[0]

args = ArgParser()
args

Namespace(hop_length=160, max_samples=70000, n_fft=2048, n_mels=64, n_splits=5, sample_rate=16000, seq_len=438, window_size=400)

In [4]:
class TFRWriter():
    def __init__(self, args):
        self.samples = d['wav_paths'].tolist()
        self.args = args
        self.fmin = 0
        self.fmax = 8000
        self.top_db = 80
        self.dict_path = "Datasets\TIMIT-dataset\phoneme_dict.json"
        self.phoneme_dict = self.get_dict()


    def _bytes_feature(self, value):
        """Returns a bytes_list from a string / byte."""
        if isinstance(value, type(tf.constant(0))):
            value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


    def serialize_example(self, *args):
        feature = {
            'audio': self._bytes_feature(args[0]),
            'phonemes': self._bytes_feature(args[1]),
            'frames': self._bytes_feature(args[2]),
            'filename': self._bytes_feature(args[3])}

        example_proto = tf.train.Example(
            features=tf.train.Features(feature=feature))
        return example_proto.SerializeToString()


    def get_shards(self):
        speaker_id = [sample.split('\\')[4] for sample in self.samples]
        skf = StratifiedKFold(
            n_splits=self.args.n_splits, shuffle=True, random_state=42)
        return [
            list(map(lambda x: self.samples[x], j)) 
            for i, j in skf.split(self.samples, speaker_id)]


    def get_dict(self):
        phonemes = set()
        for sample in self.samples:
            base_path = os.path.splitext(sample)[0]
            with open(base_path + '.phn', "r") as f:
                for line in f.readlines():
                    phonemes.add(line.split()[-1])
        phonemes = sorted(Counter(phonemes), key=Counter(phonemes).get, reverse=True)
        phonemes_dict = {v: i for i, v in enumerate(phonemes)}
        with open(self.dict_path, "w") as f:
            json.dump(phonemes_dict, f, sort_keys=False, indent=4)
        return phonemes_dict
   
    
    def get_shard_data(self, samples, shard):
        for sample in tqdm(
                samples, total=len(samples), desc=f"Writing shard {shard}"):
            base_path = os.path.splitext(sample)[0]
            p_frames, phonemes = [0], []
            with open(base_path + ".phn") as f:
                for line in f.readlines():
                    p_frame, phoneme = line.split()[1::]
                    p_frames.append(int(p_frame) // self.args.hop_length)
                    phonemes.append(str(phoneme))
            phonemes = list(map(self.phoneme_dict.get, phonemes))
            waveform = tf.io.read_file(base_path + ".wav")
            filename = str.encode("/".join(sample.split('\\')[-3::]))
            yield {
                "audio": waveform,
                "phonemes": tf.io.serialize_tensor(phonemes),
                "frames": tf.io.serialize_tensor(p_frames),
                "filename": filename}


    def write(self):
        for shard, samples in enumerate(self.get_shards()):
            with tf.io.TFRecordWriter(
                    f"Datasets/TIMIT-dataset/tfrec_data/train_{shard+1}.tfrec") as f:
                for sample in self.get_shard_data(samples, shard+1):
                    example = self.serialize_example(
                        sample['audio'], sample['phonemes'], 
                        sample['frames'], sample['filename'])
                    f.write(example)

# TFRWriter(args).write()

In [31]:
class TIMITDataset():
    def __init__(self, args):
        self.files = [os.path.join(args.main_dir, f) for f in os.listdir(args.main_dir)]
        self.AUTOTUNE = tf.data.experimental.AUTOTUNE
        self.args = args
        self.train_files, self.test_files = train_test_split(
            self.files, test_size=args.test_size, shuffle=True)

    def decode_audio(self, audio):
        audio = tf.audio.decode_wav(audio)[0]
        return tf.squeeze(audio, axis=-1)

    def read_tfrecord(self, example):
        feature_description = {
            'spectrogram': tf.io.FixedLenFeature([], tf.string),
            'framewise_label': tf.io.FixedLenFeature([], tf.string),
            'binary_label': tf.io.FixedLenFeature([], tf.string),
            'filename': tf.io.FixedLenFeature([], tf.string)}
        
        example = tf.io.parse_single_example(example, feature_description)
        example['spectrogram'] = tf.io.parse_tensor(
            example['spectrogram'], out_type=tf.float32)
        example['framewise_label'] = tf.io.parse_tensor(
            example['framewise_label'], out_type=tf.int32)
        example['binary_label'] = tf.io.parse_tensor(
            example['binary_label'], out_type=tf.int32)
        return example


    def load_dataset(self, files):
        ignore_order = tf.data.Options()
        ignore_order.experimental_deterministic = False
        dataset = tf.data.TFRecordDataset(files)
        dataset = dataset.with_options(ignore_order)
        dataset = dataset.map(self.read_tfrecord, num_parallel_calls=self.AUTOTUNE)
        return dataset


    def SpecAugment(self, sample):
        spectrogram = sample['spectrogram']
        spectrogram = tfio.audio.freq_mask(spectrogram, param=10)
        spectrogram = tfio.audio.time_mask(spectrogram, param=10)
        sample['spectrogram'] = spectrogram
        return sample


    def train(self):
        dataset = self.load_dataset(self.train_files)
        dataset = dataset.map(self.SpecAugment, num_parallel_calls=self.AUTOTUNE)
        dataset = dataset.repeat()
        dataset = dataset.shuffle(self.args.buffer_size)
        dataset = dataset.batch(self.args.batch_size)
        dataset = dataset.prefetch(self.AUTOTUNE)
        return dataset


    def test(self):
        dataset = self.load_dataset(self.test_files)
        dataset = dataset.shuffle(self.args.buffer_size)
        dataset = dataset.batch(self.args.batch_size)
        dataset = dataset.cache()
        dataset = dataset.prefetch(self.AUTOTUNE)
        return dataset

dataset = TIMITDataset(args).train()
binary_label, filename, framewise_label, spectrogram = list(next(iter(dataset)).values())

print("spectrogram shape:", spectrogram.shape)
print("binary_label shape:", binary_label.shape)
print("framewise_label shape:", framewise_label.shape)

spectrogram shape: (2, 438, 64)
binary_label shape: (2, 438)
framewise_label shape: (2, 438)


In [32]:
model = Sequential([
    
    Bidirectional(LSTM(100, return_sequences=True), input_shape=[438, 64], name="layer_1"),
    Bidirectional(LSTM(25), name="layer_2"),
    Dense(2, activation="softmax", name='layer_3')])

model.compile(optimizer='adam')

model.summary()

Model: "sequential_8"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
layer_1 (Bidirectional)      (None, 438, 200)          132000    
_________________________________________________________________
layer_2 (Bidirectional)      (None, 50)                45200     
_________________________________________________________________
layer_3 (Dense)              (None, 2)                 102       
Total params: 177,302
Trainable params: 177,302
Non-trainable params: 0
_________________________________________________________________
