In [15]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
import json

In [3]:
with open("../dataset/train/EN_lyrics_1.json", encoding="utf8") as f:
    en_dataset = json.loads(f.read())
with open("../dataset/train/TH_lyrics_1.json", encoding="utf8") as f:
    th_dataset = json.loads(f.read())

In [4]:
text_pairs = []
for i in range(len(th_dataset)):
    th = th_dataset[i]
    eng = en_dataset[i]
    eng = "[start] " + eng + " [end]"
    text_pairs.append((th, eng))

In [5]:
for _ in range(5):
    print(random.choice(text_pairs))

('อายล์ เครี ตีซ ทอร์ชิส ฟอร์ ยา', '[start] ill carry these torches for ya [end]')
('เดอะ เพลส อิส รอง', '[start] the place is wrong [end]')
('ฟนอลลี เอเบิล ทู ซี', '[start] finally able to see [end]')
('โอ เยีย ไอ สปิลด ออล มาย อีโมชันส ทูไนท ไอม ซอรี', '[start] oh yeah i spilled all my emotions tonight im sorry [end]')
('โอ เบบี ดู ยู ไลค ยู ทู โอ โอ', '[start] oh baby do you like me too ooh ooh [end]')


In [9]:
random.shuffle(text_pairs)
num_train_samples = int(len(text_pairs)*0.9)
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : ]
print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")

2844 total pairs
2559 training pairs
285 validation pairs


In [10]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 5000
sequence_length = 20
batch_size = 16


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


th_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
eng_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length+1,
    standardize=custom_standardization,
)
train_th_texts = [pair[0] for pair in train_pairs]
train_eng_texts = [pair[1] for pair in train_pairs]
th_vectorization.adapt(train_th_texts)
eng_vectorization.adapt(train_eng_texts)

In [11]:
def format_dataset(th, eng):
    eng = eng_vectorization(eng)
    th = th_vectorization(th)
    return ({"encoder_inputs": th, "decoder_inputs": eng[:, :-1],}, eng[:, 1:])


def make_dataset(pairs):
    th_texts, eng_texts = zip(*pairs)
    th_texts = list(th_texts)
    eng_texts = list(eng_texts)
    dataset = tf.data.Dataset.from_tensor_slices((th_texts, eng_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [12]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (16, 20)
inputs["decoder_inputs"].shape: (16, 20)
targets.shape: (16, 20)


In [13]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [16]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [17]:
epochs = 30  # This should be at least 30 for convergence

transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Model: "transformer"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_inputs (InputLayer)     [(None, None)]       0                                            
__________________________________________________________________________________________________
positional_embedding (Positiona (None, None, 256)    1285120     encoder_inputs[0][0]             
__________________________________________________________________________________________________
decoder_inputs (InputLayer)     [(None, None)]       0                                            
__________________________________________________________________________________________________
transformer_encoder (Transforme (None, None, 256)    3155456     positional_embedding[0][0]       
________________________________________________________________________________________

<tensorflow.python.keras.callbacks.History at 0x28acb633fd0>

In [18]:
eng_vocab = eng_vectorization.get_vocabulary()
eng_index_lookup = dict(zip(range(len(eng_vocab)), eng_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = th_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = eng_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = eng_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

translated = []
test_eng_texts = [pair[0] for pair in val_pairs]
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated.append((input_sentence,decode_sequence(input_sentence)))

In [19]:
for x,y in translated[:10]:
    print('thai : ',x,'\neng : ',y,'\n')

thai :  โอ อิฟ แด็ทส์ ว็ิอท อิท เท็คส์ ทู เลิร์น แด็ท สวีท ฮาร์ท ออฟ ยัวร์ซ 
eng :  [start] oh if trust see it see to learn that share heart of around [end] 

thai :  อา แคน อาย ไบท ยอร ทัง ไลค มาย แบด แฮบบิท 
eng :  [start] are can i its youre tongue like my bad know [end] 

thai :  อาย โดนท นีด ยอร ลัฟว โซ ยู แคน ทราย ออล ยู วอนท 
eng :  [start] i dont need your love so you can try all you want [end] 

thai :  แอท เดอะ เซม ไทม ไอ วันนา ฮัค ยู 
eng :  [start] at the same time i wanna be you [end] 

thai :  เมกกิ้ง วัน แอส ลัฟลี่ แอ๊ด ชี 
eng :  [start] making one as lovely as she [end] 

thai :  อีฟ วี โก ดาวน์ 
eng :  [start] theyll we go down [end] 

thai :  เวดดิง เบลซ เวอร จัสท อะลารมซ 
eng :  [start] better wonder were just moment [end] 

thai :  ลาย เทอะ มี ลาย วิธ มี เก็ท ยอร์ ฟัคกิง ฟิกส 
eng :  [start] i to me i with me get your sky step [end] 

thai :  ไอ ฟิล วอนเดอะฟูว 
eng :  [start] i feel wonderful [end] 

thai :  แอนด ไฟนด มี อะเกน 
eng :  [start] and find me again [end