In [18]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [19]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import tensorflow as tf

# Create Dataset

In [33]:
import codecs
sequence_length = 64

path = '.'
engFile = 'sentencesTrain.txt'
signFile = 'tokensTrain.txt'
with codecs.open(path + '/' + engFile, 'r', encoding='utf-8', errors='ignore') as f:
    engSamples = f.read().split("\n")[:-1]
    
with codecs.open(path + '/' + signFile, 'r', encoding='utf-8', errors='ignore') as f:
    signSamples = f.read().split("\n")[:-1]
tokenToInx = {}
inxToToken = {}
curInx = 1
signInx = []
signMask = []
for sent in signSamples:
    tokens = ['[START]'] + sent.split(',') + ['[END]']
    inxes = []
    tmpMask = []
    for token in tokens:
        token = token.strip()
        if(token not in tokenToInx.keys()):
            tokenToInx[token] = curInx
            inxToToken[curInx] = token
            curInx += 1
        inxes.append(tokenToInx[token])  
        tmpMask.append(1)
    for i in range(len(inxes), sequence_length):
        inxes.append(0)
        tmpMask.append(0)
    signInx.append(inxes)
    signMask.append(tmpMask)
signVocabSize = len(tokenToInx.keys())

In [34]:
tokenToInx['[PAD]'] = 0
inxToToken[0] = '[PAD]'
STARTINX = tokenToInx['[START]']
ENDINX = tokenToInx['[END]']

In [35]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 15000
batch_size = 64


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


eng_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,standardize=custom_standardization
)

eng_vectorization.adapt(engSamples)
engInx = eng_vectorization(engSamples)
engMask = tf.cast(engInx != 0, tf.int64)

In [36]:
data = []
for i in range(len(signInx)):
    data.append((engInx[i], engMask[i], signInx[i], signMask[i]))
random.shuffle(data)

In [37]:
engInx = [sample[0] for sample in data]
engMask = [sample[1] for sample in data]
signInx = [sample[2] for sample in data]
signMask = [sample[3] for sample in data]

In [38]:
signInx = tf.convert_to_tensor(signInx)
signMask = tf.convert_to_tensor(signMask)
engInx = tf.convert_to_tensor(engInx)
engMask = tf.convert_to_tensor(engMask)

In [39]:
size = len(data)
val_ratio = 0.15
val_size = int(val_ratio * size)
train_size = size - val_size

train_zeros = tf.cast(tf.zeros((train_size,1)),tf.int32)
val_zeros = tf.cast(tf.zeros((size - train_size,1)), tf.int32)
train_engInx = engInx[:train_size]
train_engMask = engMask[:train_size]
train_signInx = signInx[:train_size]
train_signMask = signMask[:train_size]
y_train = tf.concat((train_signInx[:,1:], train_zeros), axis=1)

val_engInx = engInx[train_size:]
val_engMask = engMask[train_size:]
val_signInx = signInx[train_size:]
val_signMask = signMask[train_size:]
y_val = tf.concat((val_signInx[:,1:], val_zeros), axis=1)

# Implement Transformer 

In [44]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [45]:
embed_dim = 256
latent_dim = 1024
num_heads = 4

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
encoder_masks = keras.Input(shape=(None,), dtype="int64", name="encoder_masks")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
#encoder1 = TransformerEncoder(embed_dim, latent_dim, num_heads)(x, encoder_masks)
#dropout1 = layers.Dropout(0.4)(encoder1)
#encoder2 = TransformerEncoder(embed_dim, latent_dim, num_heads)(dropout1, encoder_masks)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x, encoder_masks)
encoder = keras.Model([encoder_inputs,encoder_masks], encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
decoder_masks = keras.Input(shape=(None,), dtype="int64", name="decoder_masks")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
#x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs, decoder_masks)
#x = layers.Dropout(0.4)(x)
#x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs, decoder_masks)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs, decoder_masks)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(signVocabSize+1, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs, decoder_masks], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs,decoder_masks])
transformer = keras.Model(
    [encoder_inputs, encoder_masks, decoder_inputs, decoder_masks], decoder_outputs, name="transformer"
)

In [46]:
epochs = 20

transformer.summary()
transformer.compile(
    "adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_10 (Posit  (None, None, 256)   3856384     ['encoder_inputs[0][0]']         
 ionalEmbedding)                                                                                  
                                                                                                  
 encoder_masks (InputLayer)     [(None, None)]       0           []                               
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                     

In [47]:
transformer.fit((train_engInx, train_engMask, train_signInx, train_signMask), y_train, epochs=epochs, validation_data=((val_engInx, val_engMask, val_signInx, val_signMask), y_val))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7f884c052100>

In [48]:
transformer.save_weights(path + "/transformer_" + str(embed_dim) + "_" + str(latent_dim) + "_" + str(num_heads) + ".h5")

In [14]:
transformer.load_weights(path + "/transformer_" + str(embed_dim) + "_" + str(latent_dim) + "_" + str(num_heads) + ".h5")

#Evaluate Transformer

In [49]:
max_decoded_sentence_length = 64

def createMask(input):
    return tf.cast(input != 0, tf.int64)
def padding(input):
    ans = []
    for tokens in input:
        tmp = []
        for token in tokens:
            tmp.append(token)
        for i in range(len(tokens), max_decoded_sentence_length):
            tmp.append(0)
        ans.append(tmp)
    return tf.convert_to_tensor(ans)
def decode_sequence(tokenized_input_sentence):
    tokenized_target_sentence = [[1]]
    tokensTarget = ""
    for i in range(max_decoded_sentence_length):
        paddedTarget = padding(tokenized_target_sentence)
        maskTarget = createMask(paddedTarget)
        maskEng = createMask(tokenized_input_sentence)
        predictions = transformer([tokenized_input_sentence, maskEng,paddedTarget, maskTarget])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = inxToToken[sampled_token_index]
        if sampled_token == "[END]":
            break
        tokenized_target_sentence[0].append(sampled_token_index)
        if(i != 0):
            tokensTarget += ','
        tokensTarget += sampled_token
    return tokenized_target_sentence, tokensTarget

In [50]:
testFile = 'sentencesTest.txt'
with codecs.open(path + '/' + testFile, 'r', encoding='utf-8', errors='ignore') as f:
    engTest = f.read().split("\n")[:-1]
ans = []
for inx,sentence in enumerate(engTest):
    print(f'\r{inx+1} of {len(engTest)}', end='')
    tokens = eng_vectorization([tf.strings.lower(sentence)])
    inxTarget, tokensTarget = decode_sequence(tokens)
    ans.append((sentence, tokensTarget))

106 of 106

In [51]:
results = [tuple[1] for tuple in ans]
file = codecs.open(path + '/' + "tokensTest.txt", "w", "utf-8")
file.write("\n".join(results))
file.close()