In [33]:
import pickle
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import string
import re

## Provide the testing text file here and upload the training input and output, and, of course, the saved model

In [34]:
input_path = 'DS_5_test_input'

## 1. get the text vectorization layer using trained data again,  2. data preprocessing for test dataset

In [35]:
input_sequence = pickle.load(open('DS_5_train_input', 'rb'))
output_sequence = pickle.load(open('DS_5_train_output', 'rb'))

text_pairs = []
for line in range(len(input_sequence)):
    inputish  = input_sequence[line]
    outputish = "[start] " + output_sequence[line] + "[end]"
    text_pairs.append((inputish, outputish))
    
import random
random.shuffle(text_pairs)
print(len(text_pairs))
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

# # do the same for test dataset
# test_test_paris = test_text_pairs

# build the text vectorization layer using training text data
strip_chars = string.punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

src_vocab_size = 50
tgt_vocab_size = 50
sequence_length = 100

source_vectorization = layers.TextVectorization(
    max_tokens=src_vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=tgt_vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length+1, 
    standardize=custom_standardization,
)
train_inputish_texts = [pair[0] for pair in train_pairs]
train_outputish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_inputish_texts)
target_vectorization.adapt(train_outputish_texts)

5000


## Load the model and test the results, also remove [start] and [end] in the output text

In [36]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [37]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

In [38]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [39]:
saved_model = keras.models.load_model("artificial_text_translation.keras", custom_objects={
'TransformerEncoder': TransformerEncoder,
'TransformerDecoder': TransformerDecoder,
'PositionalEmbedding': PositionalEmbedding
})

In [40]:
saved_model.summary()

Model: "model_22"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 english (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_44 (Posit  (None, None, 64)    9600        ['english[0][0]']                
 ionalEmbedding)                                                                                  
                                                                                                  
 spanish (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder_28 (Transf  (None, None, 64)    199040      ['positional_embedding_44[

In [41]:
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 100

# def decode_sequence(input_sentences, model=saved_model):
#     batch_test = len(input_sentences)
#     tokenized_input_sentence = source_vectorization(input_sentences)
#     decoded_sentence = np.reshape("[start]"*batch_test, (batch_test,1))
#     print(decoded_sentence[0])
#     for i in range(max_decoded_sentence_length):
#         tokenized_target_sentence = target_vectorization(
#             [decoded_sentence])[:, :-1]
#         predictions = model(
#             [tokenized_input_sentence, tokenized_target_sentence])
#         sampled_token_index = np.argmax(predictions[0, i, :])
#         sampled_token = spa_index_lookup[sampled_token_index]
#         decoded_sentence += " " + sampled_token
#         # if sampled_token == "[end]":
#         #     break
#     return decoded_sentence



In [42]:
test_input_sequence = pickle.load(open(input_path, 'rb'))
# test_output_sequence = pickle.load(open(output_path, 'rb'))

# do the same for test dataset
test_text_pairs = []
for line in range(len(test_input_sequence)):
    inputish  = test_input_sequence[line]
    outputish = "[start] "
    test_text_pairs.append((inputish, outputish))

test_source_texts = [pair[0] for pair in test_text_pairs]
# test_target_texts = [pair[1] for pair in test_text_pairs]

In [43]:
from google.colab import files

input_sequences = test_source_texts
# pred_sequence  = decode_sequence(input_sequences,saved_model)
batch_test = int(len(input_sequences))
print(batch_test)
tokenized_input_sentence = source_vectorization(input_sequences)
decoded_sentence = list()
extra = list()
for lien in range(batch_test):
    decoded_sentence.append(("[start]"))
    extra.append(lien)
decoded_sentence = zip(decoded_sentence)
decoded_sentence = list(decoded_sentence)
sampled_token =list()
for i in range(max_decoded_sentence_length):
    print(i)
    tokenized_target_sentence = target_vectorization(
        [decoded_sentence])[:,:, :-1]
    # tokenized_target_sentence = tf.reshape(tokenized_target_sentence, (batch_test, max_decoded_sentence_length))
    predictions = saved_model.predict([tokenized_input_sentence, tokenized_target_sentence[0]])
    sampled_token_index = np.argmax(predictions[:, i, :], axis=1)
    print(sampled_token_index.shape)
    print(decoded_sentence[0])
    for i_word in range(len(sampled_token_index)): 
        sampled_token.append(spa_index_lookup[sampled_token_index[i_word]])
        decoded_sentence[i_word] =list(decoded_sentence[i_word])
        # decoded_sentence[i_word].append(" ")
        decoded_sentence[i_word].append(spa_index_lookup[sampled_token_index[i_word]])
        decoded_sentence[i_word] = " ".join(decoded_sentence[i_word])
        # if spa_index_lookup[sampled_token_index[i_word]] == "[end]":
        #     break
    decoded_sentence = zip(decoded_sentence)
    decoded_sentence = list(decoded_sentence)
    # print(decoded_sentence[0])
    # decoded_sentence += " " + sampled_token


5000
0
(5000,)
('[start]',)
1
(5000,)
('[start] c',)
2
(5000,)
('[start] c f',)
3
(5000,)
('[start] c f c',)
4
(5000,)
('[start] c f c f',)
5
(5000,)
('[start] c f c f a',)
6
(5000,)
('[start] c f c f a g',)
7
(5000,)
('[start] c f c f a g g',)
8
(5000,)
('[start] c f c f a g g e',)
9
(5000,)
('[start] c f c f a g g e b',)
10
(5000,)
('[start] c f c f a g g e b f',)
11
(5000,)
('[start] c f c f a g g e b f c',)
12
(5000,)
('[start] c f c f a g g e b f c f',)
13
(5000,)
('[start] c f c f a g g e b f c f c',)
14
(5000,)
('[start] c f c f a g g e b f c f c d',)
15
(5000,)
('[start] c f c f a g g e b f c f c d b',)
16
(5000,)
('[start] c f c f a g g e b f c f c d b d',)
17
(5000,)
('[start] c f c f a g g e b f c f c d b d a',)
18
(5000,)
('[start] c f c f a g g e b f c f c d b d a g',)
19
(5000,)
('[start] c f c f a g g e b f c f c d b d a g h',)
20
(5000,)
('[start] c f c f a g g e b f c f c d b d a g h j',)
21
(5000,)
('[start] c f c f a g g e b f c f c d b d a g h j b',)
22
(5000,)
('[s

In [44]:
pickle.dump(decoded_sentence, open('decoded_sentence_unformatted','wb'))
files.download('/content/decoded_sentence_unformatted')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [45]:
pred_sequences = decoded_sentence

In [46]:
pred_text_all = list()
for line in range(len(pred_sequences)):
    # print(line)
    pred_text_list = list(pred_sequences[line])
    pre_text_line = pred_text_list[0]
    pre_text_line_list = pre_text_line.split()
    pre_text_line_list.remove("[start]")
    if "[end]" in pre_text_line_list:
        k = pre_text_line_list.index('[end]')
        final_pred_line = pre_text_line_list[:k]
    else: 
        final_pred_line = pre_text_line_list
    remove_eos_line = " ".join(final_pred_line)
    pred_text_all.extend([remove_eos_line])

In [47]:
# pred_text_all.extend(pred_line)
# print(pred_text_all)
# map(parallel_evaluation, test_source_text for test_source_text in test_source_texts, num_parallel_calls=4)
pickle.dump(pred_text_all, open('test_results_output','wb'))
files.download('/content/test_results_output')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [49]:
pred_text_all[0]

'c f c f a g g e b f c f c d b d a g h j b g b f a f l m a h i k ed a e ee c e c g b e a e ei a j eg eh ej a h d ef ek a g f el'