## This is the transformer archetecture we will be using to convert the speech to numerical number (embeddings) and the text to numbers(embeddings as well)

In [1]:
#For some reason we need tensorflow 2.8 to build this transformer
#!pip uninstall tensorflow -y
#!pip install tensorflow==2.8

In [2]:
#Booom we got what we want
import tensorflow as tf
print(tf.__version__)

2.8.0


In [3]:
import sys
print(sys.version)


3.9.12 (main, Apr  4 2022, 05:22:27) [MSC v.1916 64 bit (AMD64)]


In [2]:
import os
import random
from glob import glob
import tensorflow as tf
from tensorflow import keras
from from_root import from_root
from tensorflow.keras import layers

In [3]:
# to list all the GPU available to our active environment
tf.config.list_physical_devices('GPU')

[]

## Define the Transformer Input Layer

In [4]:
class TokenEmbedding(layers.Layer):
    """
    This is the token embedding which is for the text. This token embeddings convert speech to text. So like in linear algebra,
    speech is the input x and y is the text since that is what we want
    """
    def __init__(self, num_vocab=1000, maxlen=100, num_hid=64):
        super().__init__()
        self.emb = tf.keras.layers.Embedding(num_vocab, num_hid)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        x = self.emb(x)
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions

In [5]:
class SpeechFeatureEmbedding(layers.Layer):
    """
    Next is the speech embedding. We discuss that the speech will be converted to a spectogram which we identify as an image of 
    graph and we are using a convolutional neural network to create the embeddings from that image/graph (Remember CNN.??? YES)
    """
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv2 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv3 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        return self.conv3(x)


## Transformer Encoder Layer. Remember the transformer has 2 network (Encoder and a decoder). So we are building those network but let start with the encoder.

In [6]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        #This is just a feed forward NN. Even from the syntax, you can understand what is going on
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

## Transformer Decoder Layer. The other part of the transformer

In [7]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.self_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.enc_att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.self_dropout = layers.Dropout(0.5)
        self.enc_dropout = layers.Dropout(0.1)
        self.ffn_dropout = layers.Dropout(0.1)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )

    def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention.

        This prevents flow of information from future tokens to current token.
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)

    def call(self, enc_out, target):
        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att = self.self_att(target, target, attention_mask=causal_mask)
        target_norm = self.layernorm1(target + self.self_dropout(target_att))
        enc_out = self.enc_att(target_norm, enc_out)
        enc_out_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
        return ffn_out_norm

## Transformer Model. This put everything together (meaning the encoder and the decoder)

In [8]:
class Transformer(keras.Model):
    def __init__(
        self,
        num_hid=64,
        num_head=2,
        num_feed_forward=128,
        source_maxlen=100,
        target_maxlen=100,
        num_layers_enc=4,
        num_layers_dec=1,
        num_classes=10,
    ):
        super().__init__()
        self.loss_metric = keras.metrics.Mean(name="loss")
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes

        self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(
            num_vocab=num_classes, maxlen=target_maxlen, num_hid=num_hid
        )

        self.encoder = keras.Sequential(
            [self.enc_input]
            + [
                TransformerEncoder(num_hid, num_head, num_feed_forward)
                for _ in range(num_layers_enc)
            ]
        )

        for i in range(num_layers_dec):
            setattr(
                self,
                f"dec_layer_{i}",
                TransformerDecoder(num_hid, num_head, num_feed_forward),
            )

        self.classifier = layers.Dense(num_classes)

    def decode(self, enc_out, target):
        y = self.dec_input(target)
        for i in range(self.num_layers_dec):
            y = getattr(self, f"dec_layer_{i}")(enc_out, y)
        return y

    def call(self, inputs):
        source = inputs[0]
        target = inputs[1]
        x = self.encoder(source)
        y = self.decode(x, target)
        return self.classifier(y)

    @property
    def metrics(self):
        return [self.loss_metric]

    def train_step(self, batch):
        """Processes one batch inside model.fit()."""
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        with tf.GradientTape() as tape:
            preds = self([source, dec_input])
            one_hot = tf.one_hot(dec_target, depth=self.num_classes)
            mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
            loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    @property
    def val_loss(self):
        return self.loss_metric.result()

    def test_step(self, batch):
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        preds = self([source, dec_input])
        one_hot = tf.one_hot(dec_target, depth=self.num_classes)
        mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
        loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def generate(self, source, target_start_token_idx):
        """Performs inference over one batch of inputs using greedy decoding."""
        bs = tf.shape(source)[0]
        enc = self.encoder(source)
        dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
        dec_logits = []
        for i in range(self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input)
            logits = self.classifier(dec_out)
            logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
            last_logit = tf.expand_dims(logits[:, -1], axis=-1)
            dec_logits.append(last_logit)
            dec_input = tf.concat([dec_input, last_logit], axis=-1)
        return dec_input

## Downloading data set. Once you download the dataset from https://keithito.com/LJ-Speech-Dataset/ What you will observe is that, the data is in .tar.bz2 format. You will have to convert it into a folder and that folder would have your audio file and meta data/labels of the the audio files

In [9]:
#Let convert our .tar.bz2 file
import tarfile
# Specify the path to your .tar.bz2 file
tar_bz2_file = 'C://Users//midof//OneDrive//Desktop//INeuron//Data_Science_Project//Industry_Ready_Proj//STT-main//STT//datasets//LJSpeech-1.1.tar.bz2'
# Open the .tar.bz2 file
with tarfile.open(tar_bz2_file, 'r:bz2') as tar:
    # Extract all contents into the current directory
    tar.extractall(path='C://Users//midof//OneDrive//Desktop//INeuron//Data_Science_Project//Industry_Ready_Proj//STT-main//STT//datasets')
print("Extraction complete!")

Extraction complete!


In [10]:
# keras.utils.get_file(
#     os.path.join(os.getcwd(), "data.tar.gz"),
#     "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2",
#     extract=True,
#     archive_format="tar",
#     cache_dir=".",
# )

#ROOT = os.getcwd()
ROOT = 'C://Users//midof//OneDrive//Desktop//INeuron//Data_Science_Project//Industry_Ready_Proj//STT-main//STT'
saveto = os.path.join(ROOT, 'datasets', 'LJSpeech-1.1')
os.makedirs(saveto, exist_ok=True)
wavs = glob("{}/**/*.wav".format(saveto), recursive=True)

id_to_text = {}
with open(os.path.join(saveto, "metadata.csv"), encoding="utf-8") as f:
    for line in f:
        id = line.strip().split("|")[0]
        text = line.strip().split("|")[2]
        id_to_text[id] = text
wavs[0]

'C://Users//midof//OneDrive//Desktop//INeuron//Data_Science_Project//Industry_Ready_Proj//STT-main//STT\\datasets\\LJSpeech-1.1\\wavs\\LJ001-0001.wav'

In [10]:
# Define the root directory
ROOT = r'C:\Users\midof\OneDrive\Desktop\INeuron\Data_Science_Project\Industry_Ready_Proj\STT-main\STT'

# Create the 'saveto' path
saveto = os.path.join(ROOT, 'datasets', 'LJSpeech-1.1')
os.makedirs(saveto, exist_ok=True)

# Get all .wav files recursively from the 'saveto' directory
wavs = glob(os.path.join(saveto, '**', '*.wav'), recursive=True)

# Dictionary to hold id-to-text mappings
id_to_text = {}
# So if you look at the metadata.csv. You will see that the names of the audio file and the text are separated by |. This is what
#we are doing here. We are separating them. Just data cleaning stuff
with open(os.path.join(saveto, "metadata.csv"), encoding="utf-8") as f:
    for line in f:
        id = line.strip().split("|")[0]
        text = line.strip().split("|")[2]
        id_to_text[id] = text
wavs[0]

'C:\\Users\\midof\\OneDrive\\Desktop\\INeuron\\Data_Science_Project\\Industry_Ready_Proj\\STT-main\\STT\\datasets\\LJSpeech-1.1\\wavs\\LJ001-0001.wav'

In [11]:
#Now you see. When we call the wav file LJ001-0001, we get the text of that audio file
id_to_text['LJ001-0005']

'the invention of movable metal letters in the middle of the fifteenth century may justly be considered as the invention of the art of printing.'

In [12]:
saveto

'C:\\Users\\midof\\OneDrive\\Desktop\\INeuron\\Data_Science_Project\\Industry_Ready_Proj\\STT-main\\STT\\datasets\\LJSpeech-1.1'

In [13]:
id_to_text['LJ001-0001']

'Printing, in the only sense with which we are at present concerned, differs from most if not from all the arts and crafts represented in the Exhibition'

## Preprocess the dataset

In [14]:
def get_data(wavs, id_to_text, maxlen=50):
    """ 
    returns mapping of audio paths and transcription texts. So we want a dictionary in a list meanining. We want:
    'audio': 'THE PATH OF THE WAV FILE FOR A PARTICULAR AUDIO'
    'text': 'THE TEXT OF THAT AUDIO', AND SO ON
    See below of the output and you'll get it if you don't understand
    """
    data = []
    print(maxlen)
    for w in wavs:
        #id = w.split("/")[-1].split(".")[0]
        id = os.path.basename(w).split(".")[0]
        if len(id_to_text[id]) < maxlen:
            data.append({"audio": w, "text": id_to_text[id]})
    return data

In [15]:
class VectorizeChar:
    def __init__(self, max_len=50):
        """
        Here we are creating a vocabulary which will contain all the character from a to z - lower case and Punctuation Characters
        """
        self.vocab = (
            ["-", "#", "<", ">"]
            + [chr(i + 96) for i in range(1, 27)]
            + [" ", ".", ",", "?"]
        )
        self.max_len = max_len
        self.char_to_idx = {}
        for i, ch in enumerate(self.vocab):
            self.char_to_idx[ch] = i

    def __call__(self, text):
        text = text.lower()
        text = text[: self.max_len - 2]
        text = "<" + text + ">"
        pad_len = self.max_len - len(text)
        return [self.char_to_idx.get(ch, 1) for ch in text] + [0] * pad_len

    def get_vocabulary(self):
        return self.vocab

In [16]:
# Now what I said above will all make sense. That what the function get_data is doing
max_target_len = 200  # all transcripts in out data are < 200 characters
data = get_data(wavs, id_to_text, maxlen=max_target_len)
data[3]

200


{'audio': 'C:\\Users\\midof\\OneDrive\\Desktop\\INeuron\\Data_Science_Project\\Industry_Ready_Proj\\STT-main\\STT\\datasets\\LJSpeech-1.1\\wavs\\LJ001-0004.wav',
 'text': 'produced the block books, which were the immediate predecessors of the true printed book,'}

In [17]:
vectorizer = VectorizeChar(max_target_len)
print("vocab size", len(vectorizer.get_vocabulary()))

vocab size 34


In [18]:
vectorizer.vocab

['-',
 '#',
 '<',
 '>',
 'a',
 'b',
 'c',
 'd',
 'e',
 'f',
 'g',
 'h',
 'i',
 'j',
 'k',
 'l',
 'm',
 'n',
 'o',
 'p',
 'q',
 'r',
 's',
 't',
 'u',
 'v',
 'w',
 'x',
 'y',
 'z',
 ' ',
 '.',
 ',',
 '?']

In [19]:
#You will see we have embedded each character with a mapping
vectorizer.char_to_idx

{'-': 0,
 '#': 1,
 '<': 2,
 '>': 3,
 'a': 4,
 'b': 5,
 'c': 6,
 'd': 7,
 'e': 8,
 'f': 9,
 'g': 10,
 'h': 11,
 'i': 12,
 'j': 13,
 'k': 14,
 'l': 15,
 'm': 16,
 'n': 17,
 'o': 18,
 'p': 19,
 'q': 20,
 'r': 21,
 's': 22,
 't': 23,
 'u': 24,
 'v': 25,
 'w': 26,
 'x': 27,
 'y': 28,
 'z': 29,
 ' ': 30,
 '.': 31,
 ',': 32,
 '?': 33}

In [20]:
#So now if we look at each character. We can see the mapping. NOTE: Every mapping start with < and >. which is why we are
#getting 2,15 and 3. Everything else is a padding of len 200 characters. We are assuming every audio file doesn't have more than
#200 characters.
vectorizer('why')

[2,
 26,
 11,
 28,
 3,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0]

In [21]:
def create_text_ds(data):
    texts = [_["text"] for _ in data]
    text_ds = [vectorizer(t) for t in texts]
    text_ds = tf.data.Dataset.from_tensor_slices(text_ds)
    return text_ds


def path_to_audio(path):
    # spectrogram using stft
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1)
    audio = tf.squeeze(audio, axis=-1)
    stfts = tf.signal.stft(audio, frame_length=200, frame_step=80, fft_length=256)
    x = tf.math.pow(tf.abs(stfts), 0.5)
    # normalisation
    means = tf.math.reduce_mean(x, 1, keepdims=True)
    stddevs = tf.math.reduce_std(x, 1, keepdims=True)
    x = (x - means) / stddevs
    audio_len = tf.shape(x)[0]
    # padding to 10 seconds
    pad_len = 2754
    paddings = tf.constant([[0, pad_len], [0, 0]])
    x = tf.pad(x, paddings, "CONSTANT")[:pad_len, :]
    return x


def create_audio_ds(data):
    flist = [_["audio"] for _ in data]
    audio_ds = tf.data.Dataset.from_tensor_slices(flist)
    audio_ds = audio_ds.map(
        path_to_audio, num_parallel_calls=tf.data.AUTOTUNE
    )
    return audio_ds


def create_tf_dataset(data, bs=4):
    audio_ds = create_audio_ds(data)
    text_ds = create_text_ds(data)
    ds = tf.data.Dataset.zip((audio_ds, text_ds))
    ds = ds.map(lambda x, y: {"source": x, "target": y})
    ds = ds.batch(bs)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

In [22]:
a = create_text_ds(data)

In [23]:
a

<TensorSliceDataset element_spec=TensorSpec(shape=(200,), dtype=tf.int32, name=None)>

In [24]:
split = int(len(data) * 0.99)
train_data = data[:split]
test_data = data[split:]
ds = create_tf_dataset(train_data, bs=16)
val_ds = create_tf_dataset(test_data, bs=4)

In [25]:
train_data[0].keys()

dict_keys(['audio', 'text'])

In [26]:
train_data[0]

{'audio': 'C:\\Users\\midof\\OneDrive\\Desktop\\INeuron\\Data_Science_Project\\Industry_Ready_Proj\\STT-main\\STT\\datasets\\LJSpeech-1.1\\wavs\\LJ001-0001.wav',
 'text': 'Printing, in the only sense with which we are at present concerned, differs from most if not from all the arts and crafts represented in the Exhibition'}

In [27]:
type(train_data[0])

dict

## Callbacks to display predictions

In [28]:
class DisplayOutputs(keras.callbacks.Callback):
    def __init__(
        self, batch, idx_to_token, target_start_token_idx=27, target_end_token_idx=28
    ):
        """Displays a batch of outputs after every epoch

        Args:
            batch: A test batch containing the keys "source" and "target"
            idx_to_token: A List containing the vocabulary tokens corresponding to their indices
            target_start_token_idx: A start token index in the target vocabulary
            target_end_token_idx: An end token index in the target vocabulary
        """
        self.batch = batch
        self.target_start_token_idx = target_start_token_idx
        self.target_end_token_idx = target_end_token_idx
        self.idx_to_char = idx_to_token

    def on_epoch_end(self, epoch, logs=None):
        if epoch % 5 != 0:
            return
        source = self.batch["source"]
        target = self.batch["target"].numpy()
        bs = tf.shape(source)[0]
        preds = self.model.generate(source, self.target_start_token_idx)
        preds = preds.numpy()
        for i in range(bs):
            target_text = "".join([self.idx_to_char[_] for _ in target[i, :]])
            prediction = ""
            for idx in preds[i, :]:
                prediction += self.idx_to_char[idx]
                if idx == self.target_end_token_idx:
                    break
            print(f"target:     {target_text.replace('-','')}")
            print(f"prediction: {prediction}\n")

In [29]:
class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(
        self,
        init_lr=0.00001,
        lr_after_warmup=0.001,
        final_lr=0.00001,
        warmup_epochs=15,
        decay_epochs=85,
        steps_per_epoch=203,
    ):
        super().__init__()
        self.init_lr = init_lr
        self.lr_after_warmup = lr_after_warmup
        self.final_lr = final_lr
        self.warmup_epochs = warmup_epochs
        self.decay_epochs = decay_epochs
        self.steps_per_epoch = steps_per_epoch

    def calculate_lr(self, epoch):
        """ linear warm up - linear decay """
        warmup_lr = (
            self.init_lr
            + ((self.lr_after_warmup - self.init_lr) / (self.warmup_epochs - 1)) * epoch
        )
        decay_lr = tf.math.maximum(
            self.final_lr,
            self.lr_after_warmup
            - (epoch - self.warmup_epochs)
            * (self.lr_after_warmup - self.final_lr)
            / (self.decay_epochs),
        )
        return tf.math.minimum(warmup_lr, decay_lr)

    def __call__(self, step):
        epoch = step // self.steps_per_epoch
        return self.calculate_lr(epoch)

## Create & train the end-to-end model

In [30]:
def wer(y_true, y_pred):
    return 10

In [31]:
batch = next(iter(val_ds))

# The vocabulary to convert predicted indices into characters
idx_to_char = vectorizer.get_vocabulary()
display_cb = DisplayOutputs(
    batch, idx_to_char, target_start_token_idx=2, target_end_token_idx=3
)  # set the arguments as per vocabulary index for '<' and '>'

model = Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400,
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=34,
)
loss_fn = tf.keras.losses.CategoricalCrossentropy(
    from_logits=True, label_smoothing=0.1,
)

checkpoint_path = "C:\\Users\\midof\\OneDrive\\Desktop\\INeuron\\Data_Science_Project\\Industry_Ready_Proj\\STT-main\\STT\\datasets\\saved_model\\cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# Create a callback that saves the model's weights every 5 epochs
cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, 
    verbose=1, 
    save_weights_only=True,
    save_freq=1000)


learning_rate = CustomSchedule(
    init_lr=0.00001,
    lr_after_warmup=0.001,
    final_lr=0.00001,
    warmup_epochs=15,
    decay_epochs=40,
    steps_per_epoch=len(ds),
)
optimizer = keras.optimizers.Adam(learning_rate)
model.compile(optimizer=optimizer, loss=loss_fn)

model.save_weights(checkpoint_path.format(epoch=1)) #After every 15 epoch, save the weight
model.fit(ds, validation_data=val_ds, callbacks=[display_cb], epochs=2)

Epoch 1/2
prediction: <the in tin the te the the the te the t t t in the the the the there the the tin the the tin the te t the tin t the te te the the the the the the athe winenthe the ton the t the tthe there as s than t

target:     <prs must develop the capacity to classify its subjects on a more sophisticated basis than the present geographic breakdown.>
prediction: <the in t o the te the the t the t the the ale the the the the there the the tin the the te the t tin the tin t the t t the the tin the the an s the an thenthe the ton the t the t o the te as s than t

target:     <its present manual filing system is obsolete#>
prediction: <the in tin the te the the the te the t t t in there the the the te the the the te conthere the te on the tin t the te te the the the the the the athe winenthe the ton the t the tthe there as s than t

target:     <it makes no use of the recent developments in automatic data processing which are widely used in the business world and in other governme

<keras.callbacks.History at 0x22feb6cebb0>

In [33]:
#Model validation loss accuracy so the actual qudio text - predicted text = 1.501 which is bad and that is because we need at least
#60 epochs for better training and I don't have a GPU for that
model.val_loss.numpy()

1.39237

In [34]:
idx_to_char = vectorizer.get_vocabulary()
for i in range(10):
  preds = model.generate(tf.expand_dims(path_to_audio(test_data[i]['audio']), axis=0), target_start_token_idx=2)
  prediction = ""

  preds = preds.numpy()

  prediction = ""
  for idx in preds[0]:
      prediction += idx_to_char[idx]
      if idx == 3:
        break

  print(f"actual =      {test_data[i]['text']}")
  print(f"predicted =   {prediction}")
  print('-----' * 10)

actual =      the increased information supplied by other agencies will be wasted.
predicted =   <the as the ale the as the ale an the anthe an the an the an ale an anthe the the an the ale the athe the the ale the these thesese thes.>
--------------------------------------------------
actual =      PRS must develop the capacity to classify its subjects on a more sophisticated basis than the present geographic breakdown.
predicted =   <the as the as ale athe athe the at the the an the an the an ale an an an an the the athe the the ale are the ale the the the the the the the the an ale ofon there teresiond thentede there al s thalie
--------------------------------------------------
actual =      Its present manual filing system is obsolete;
predicted =   <the as the ase the as the ase anthe the an an the an the an ale anthe ase the the the the the alese an then thes these theseseses>
--------------------------------------------------
actual =      it makes no use of the recent developm

KeyboardInterrupt: 

In [44]:
# weights_path = os.path.join(ROOT, 'notebooks', 'saved_models')
weights_path = "C://Users//midof//OneDrive//Desktop//INeuron//Data_Science_Project//Industry_Ready_Proj//STT-main//STT//datasets//saved_model//"
# os.makedirs(weights_path, exist_ok=True)
# model.save_weights(weights_path)

In [52]:
weights_path

'C://Users//midof//OneDrive//Desktop//INeuron//Data_Science_Project//Industry_Ready_Proj//STT-main//STT//datasets//saved_model//'

In [46]:
#You can see the checkpoint which we will be loading
os.listdir(weights_path)

['checkpoint',
 'cp-0001.ckpt.data-00000-of-00001',
 'cp-0001.ckpt.index',
 'cp-0015.ckpt.data-00000-of-00001',
 'cp-0015.ckpt.index']

In [47]:
#Create the model instance again to test it with the checkpoint we saved
model2 = Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400,
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=34,
)


# model2.save_weights('test/')

In [49]:
!pip install h5py




[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip





In [61]:
#Now we can load our checpoint model weight
model2.load_weights('C:/Users/midof/OneDrive/Desktop/INeuron/Data_Science_Project/Industry_Ready_Proj/STT-main/STT/datasets/saved_model/cp-0015.ckpt')


<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x22f88edca90>

In [62]:
model2.load_weights('C:/Users/midof/OneDrive/Desktop/INeuron/Data_Science_Project/Industry_Ready_Proj/STT-main/STT/datasets/saved_model/')


ImportError: `load_weights` requires h5py package when loading weights from HDF5. Try installing h5py.

In [57]:
model2.val_loss.numpy()

0.0

In [59]:
weights_path
print(os.listdir(weights_path))

['checkpoint', 'cp-0001.ckpt.data-00000-of-00001', 'cp-0001.ckpt.index', 'cp-0015.ckpt.data-00000-of-00001', 'cp-0015.ckpt.index']


In [60]:
test_data[0]['audio']

'C:\\Users\\midof\\OneDrive\\Desktop\\INeuron\\Data_Science_Project\\Industry_Ready_Proj\\STT-main\\STT\\datasets\\LJSpeech-1.1\\wavs\\LJ050-0148.wav'

In [None]:
idx_to_char = vectorizer.get_vocabulary()
for i in range(1):
  preds = model2.generate(tf.expand_dims(path_to_audio(test_data[i]['audio']), axis=0), target_start_token_idx=2)

  preds = preds.numpy()

  prediction = ""
  for idx in preds[0]:
      prediction += idx_to_char[idx]

  print(f"actual =      {test_data[i]['text']}")
  print(f"predicted =   {prediction}")
  print('-----' * 50)