In [1]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import json
import typing
from tqdm import tqdm

class CustomTokenizer:
    """ Custom Tokenizer class to tokenize and detokenize text data into sequences of integers

    Args:
        split (str, optional): Split token to use when tokenizing text. Defaults to " ".
        char_level (bool, optional): Whether to tokenize at character level. Defaults to False.
        lower (bool, optional): Whether to convert text to lowercase. Defaults to True.
        start_token (str, optional): Start token to use when tokenizing text. Defaults to "<start>".
        end_token (str, optional): End token to use when tokenizing text. Defaults to "<eos>".
        filters (list, optional): List of characters to filter out. Defaults to
            ['!', "'", '"', '#', '$', '%', '&', '(', ')', '*', '+', ',', '-', '.', '/', ':', ';', '<', '=', '>',
            '?', '@', '[', '\\', ']', '^', '_', '`', '{', '|', '}', '~', '\t', '\n'].
        filter_nums (bool, optional): Whether to filter out numbers. Defaults to True.
        start (int, optional): Index to start tokenizing from. Defaults to 1.
    """
    def __init__(
            self,
            split: str=" ",
            char_level: bool=False,
            lower: bool=True,
            start_token: str="<start>",
            end_token: str="<eos>",
            filters: list = ['!', "'", '"', '#', '$', '%', '&', '(', ')', '*', '+', ',', '-', '.', '/', ':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_', '`', '{', '|', '}', '~', '\t', '\n'],
            filter_nums: bool = True,
            start: int=1,
        ) -> None:
        self.split = split
        self.char_level = char_level
        self.lower = lower
        self.index_word = {}
        self.word_index = {}
        self.max_length = 0
        self.start_token = start_token
        self.end_token = end_token
        self.filters = filters
        self.filter_nums = filter_nums
        self.start = start

    @property
    def start_token_index(self):
        return self.word_index[self.start_token]

    @property
    def end_token_index(self):
        return self.word_index[self.end_token]

    def sort(self):
        """ Sorts the word_index and index_word dictionaries"""
        self.index_word = dict(enumerate(dict(sorted(self.word_index.items())), start=self.start))
        self.word_index = {v: k for k, v in self.index_word.items()}

    def split_line(self, line: str):
        """ Splits a line of text into tokens

        Args:
            line (str): Line of text to split

        Returns:
            list: List of string tokens
        """
        line = line.lower() if self.lower else line

        if self.char_level:
            return [char for char in line]

        # split line with split token and check for filters
        line_tokens = line.split(self.split)

        new_tokens = []
        for index, token in enumerate(line_tokens):
            filtered_tokens = ['']
            for c_index, char in enumerate(token):
                if char in self.filters or (self.filter_nums and char.isdigit()):
                    filtered_tokens += [char, ''] if c_index != len(token) -1 else [char]
                else:
                    filtered_tokens[-1] += char

            new_tokens += filtered_tokens
            if index != len(line_tokens) -1:
                new_tokens += [self.split]

        new_tokens = [token for token in new_tokens if token != '']

        return new_tokens

    def fit_on_texts(self, lines: typing.List[str]):
        """ Fits the tokenizer on a list of lines of text
        This function will update the word_index and index_word dictionaries and set the max_length attribute

        Args:
            lines (typing.List[str]): List of lines of text to fit the tokenizer on
        """
        self.word_index = {key: value for value, key in enumerate([self.start_token, self.end_token, self.split] + self.filters)}

        for line in tqdm(lines, desc="Fitting tokenizer"):
            line_tokens = self.split_line(line)
            self.max_length = max(self.max_length, len(line_tokens) +2) # +2 for start and end tokens

            for token in line_tokens:
                if token not in self.word_index:
                    self.word_index[token] = len(self.word_index)

        self.sort()

    def update(self, lines: typing.List[str]):
        """ Updates the tokenizer with new lines of text
        This function will update the word_index and index_word dictionaries and set the max_length attribute

        Args:
            lines (typing.List[str]): List of lines of text to update the tokenizer with
        """
        new_tokens = 0
        for line in tqdm(lines, desc="Updating tokenizer"):
            line_tokens = self.split_line(line)
            self.max_length = max(self.max_length, len(line_tokens) +2) # +2 for start and end tokens
            for token in line_tokens:
                if token not in self.word_index:
                    self.word_index[token] = len(self.word_index)
                    new_tokens += 1

        self.sort()
        print(f"Added {new_tokens} new tokens")

    def detokenize(self, sequences: typing.List[int], remove_start_end: bool=True):
        """ Converts a list of sequences of tokens back into text

        Args:
            sequences (typing.list[int]): List of sequences of tokens to convert back into text
            remove_start_end (bool, optional): Whether to remove the start and end tokens. Defaults to True.

        Returns:
            typing.List[str]: List of strings of the converted sequences
        """
        lines = []
        for sequence in sequences:
            line = ""
            for token in sequence:
                if token == 0:
                    break
                if remove_start_end and (token == self.start_token_index or token == self.end_token_index):
                    continue

                line += self.index_word[token]

            lines.append(line)

        return lines

    def texts_to_sequences(self, lines: typing.List[str], include_start_end: bool=True):
        """ Converts a list of lines of text into a list of sequences of tokens

        Args:
            lines (typing.list[str]): List of lines of text to convert into tokenized sequences
            include_start_end (bool, optional): Whether to include the start and end tokens. Defaults to True.

        Returns:
            typing.List[typing.List[int]]: List of sequences of tokens
        """
        sequences = []
        for line in lines:
            line_tokens = self.split_line(line)
            sequence = [self.word_index[word] for word in line_tokens if word in self.word_index]
            if include_start_end:
                sequence = [self.word_index[self.start_token]] + sequence + [self.word_index[self.end_token]]

            sequences.append(sequence)

        return sequences

    def save(self, path: str, type: str="json"):
        """ Saves the tokenizer to a file

        Args:
            path (str): Path to save the tokenizer to
            type (str, optional): Type of file to save the tokenizer to. Defaults to "json".
        """
        serialised_dict = self.dict()
        if type == "json":
            if os.path.dirname(path):
                os.makedirs(os.path.dirname(path), exist_ok=True)
            with open(path, "w") as f:
                json.dump(serialised_dict, f)

    def dict(self):
        """ Returns a dictionary of the tokenizer

        Returns:
            dict: Dictionary of the tokenizer
        """
        return {
            "split": self.split,
            "lower": self.lower,
            "char_level": self.char_level,
            "index_word": self.index_word,
            "max_length": self.max_length,
            "start_token": self.start_token,
            "end_token": self.end_token,
            "filters": self.filters,
            "filter_nums": self.filter_nums,
            "start": self.start
        }

    @staticmethod
    def load(path: typing.Union[str, dict], type: str="json"):
        """ Loads a tokenizer from a file

        Args:
            path (typing.Union[str, dict]): Path to load the tokenizer from or a dictionary of the tokenizer
            type (str, optional): Type of file to load the tokenizer from. Defaults to "json".

        Returns:
            CustomTokenizer: Loaded tokenizer
        """
        if isinstance(path, str):
            if type == "json":
                with open(path, "r") as f:
                    load_dict = json.load(f)

        elif isinstance(path, dict):
            load_dict = path

        tokenizer = CustomTokenizer()
        tokenizer.split = load_dict["split"]
        tokenizer.lower = load_dict["lower"]
        tokenizer.char_level = load_dict["char_level"]
        tokenizer.index_word = {int(k): v for k, v in load_dict["index_word"].items()}
        tokenizer.max_length = load_dict["max_length"]
        tokenizer.start_token = load_dict["start_token"]
        tokenizer.end_token = load_dict["end_token"]
        tokenizer.filters = load_dict["filters"]
        tokenizer.filter_nums = bool(load_dict["filter_nums"])
        tokenizer.start = load_dict["start"]
        tokenizer.word_index = {v: int(k) for k, v in tokenizer.index_word.items()}

        return tokenizer

    @property
    def lenght(self):
        return len(self.index_word)

    def __len__(self):
        return len(self.index_word)

In [None]:
#!pip install mltu
!pip install keras==2.11.0
!pip install tensorflow==2.11.0
!pip install tf2onnx==1.14.0
!pip install onnx==1.12.0



In [None]:
#config.py
import os
from datetime import datetime

from mltu.configs import BaseModelConfigs


class ModelConfigs(BaseModelConfigs):
    def __init__(self):
        super().__init__()
        self.model_path = os.path.join(
            "/content/drive/MyDrive/Colab Notebooks/Models", "MyModel")
        self.num_layers = 4
        self.d_model = 128
        self.num_heads = 8
        self.dff = 512
        self.dropout_rate = 0.1
        self.batch_size = 16
        self.train_epochs = 20
        # CustomSchedule parameters
        self.init_lr = 0.00001
        self.lr_after_warmup = 0.0005
        self.final_lr = 0.0001
        self.warmup_epochs = 2
        self.decay_epochs = 18
        # # Reduce the model to the simplest possible configuration
        # self.num_layers = 1  # Minimum number of layers
        # self.d_model = 16  # Very small model dimensionality
        # self.num_heads = 1  # Only 1 attention head
        # self.dff = 32  # Very small feed-forward network
        # self.dropout_rate = 0.1  # Small dropout to keep things simple

        # # Minimize batch size for fast processing
        # self.batch_size = 2  # Tiny batch size for quick iteration

        # # Train for only 1 epoch
        # self.train_epochs = 1  # Just 1 epoch

        # # Learning rate setup for quick processing
        # self.init_lr = 0.01  # High learning rate to ensure quick changes, not crucial here
        # self.lr_after_warmup = 0.01
        # self.final_lr = 0.01
        # self.warmup_epochs = 0  # Skip warmup entirely
        # self.decay_epochs = 0  # No decay, since we're not really training

        # Optionally, you can reduce the dataset size used in this single epoch
        # This will be handled by your data provider and how you split your dataset


In [None]:
#model.py

import tensorflow as tf

from mltu.tensorflow.transformer.layers import Encoder, Decoder

def Transformer(
    input_vocab_size: int,
    target_vocab_size: int,
    encoder_input_size: int = None,
    decoder_input_size: int = None,
    num_layers: int=6,
    d_model: int=512,
    num_heads: int=8,
    dff: int=2048,
    dropout_rate: float=0.1,
    ) -> tf.keras.Model:
    """
    A custom TensorFlow model that implements the Transformer architecture.

    Args:
        input_vocab_size (int): The size of the input vocabulary.
        target_vocab_size (int): The size of the target vocabulary.
        encoder_input_size (int): The size of the encoder input sequence.
        decoder_input_size (int): The size of the decoder input sequence.
        num_layers (int): The number of layers in the encoder and decoder.
        d_model (int): The dimensionality of the model.
        num_heads (int): The number of heads in the multi-head attention layer.
        dff (int): The dimensionality of the feed-forward layer.
        dropout_rate (float): The dropout rate.

    Returns:
        A TensorFlow Keras model.
    """
    inputs = [
        tf.keras.layers.Input(shape=(encoder_input_size,), dtype=tf.int64),
        tf.keras.layers.Input(shape=(decoder_input_size,), dtype=tf.int64)
        ]

    encoder_input, decoder_input = inputs

    encoder = Encoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, vocab_size=input_vocab_size, dropout_rate=dropout_rate)(encoder_input)
    decoder = Decoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, vocab_size=target_vocab_size, dropout_rate=dropout_rate)(decoder_input, encoder)

    output = tf.keras.layers.Dense(target_vocab_size)(decoder)

    return tf.keras.Model(inputs=inputs, outputs=output)

In [None]:
#import keras
#print(keras.version())

In [None]:
#train.py

import numpy as np
import os
import tensorflow as tf
try: [tf.config.experimental.set_memory_growth(gpu, True) for gpu in tf.config.experimental.list_physical_devices("GPU")]
except: pass

from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from mltu.tensorflow.callbacks import Model2onnx, WarmupCosineDecay

from mltu.tensorflow.dataProvider import DataProvider
from mltu.tokenizers import CustomTokenizer

from mltu.tensorflow.transformer.utils import MaskedAccuracy, MaskedLoss
from mltu.tensorflow.transformer.callbacks import EncDecSplitCallback

#from model import Transformer
#from configs import ModelConfigs

configs = ModelConfigs()
os.makedirs(configs.model_path, exist_ok=True)

# Path to dataset
en_training_data_path = "/content/drive/MyDrive/Colab Notebooks/opus.en-fa-train.en"
en_validation_data_path = "/content/drive/MyDrive/Colab Notebooks/opus.en-fa-dev.en"
es_training_data_path = "/content/drive/MyDrive/Colab Notebooks/opus.en-fa-train.fa"
es_validation_data_path = "/content/drive/MyDrive/Colab Notebooks/opus.en-fa-dev.fa"

def read_files(path):
    with open(path, "r", encoding="utf-8") as f:
        en_train_dataset = f.read().split("\n")[:-1]
    return en_train_dataset

en_training_data = read_files(en_training_data_path)
en_validation_data = read_files(en_validation_data_path)
es_training_data = read_files(es_training_data_path)
es_validation_data = read_files(es_validation_data_path)

#########################
max_lenght = 500
max_samples = 1000

train_dataset = [[es_sentence, en_sentence] for es_sentence, en_sentence in zip(es_training_data, en_training_data) if len(es_sentence) <= max_lenght and len(en_sentence) <= max_lenght][:max_samples]
val_dataset = [[es_sentence, en_sentence] for es_sentence, en_sentence in zip(es_validation_data, en_validation_data) if len(es_sentence) <= max_lenght and len(en_sentence) <= max_lenght][:max_samples]
es_training_data, en_training_data = zip(*train_dataset)
es_validation_data, en_validation_data = zip(*val_dataset)

#########################

# # Consider only sentences with length <= 500
# max_lenght = 500
# train_dataset = [[es_sentence, en_sentence] for es_sentence, en_sentence in zip(es_training_data, en_training_data) if len(es_sentence) <= max_lenght and len(en_sentence) <= max_lenght]
# val_dataset = [[es_sentence, en_sentence] for es_sentence, en_sentence in zip(es_validation_data, en_validation_data) if len(es_sentence) <= max_lenght and len(en_sentence) <= max_lenght]
# es_training_data, en_training_data = zip(*train_dataset)
# es_validation_data, en_validation_data = zip(*val_dataset)

# prepare spanish tokenizer, this is the input language
tokenizer = CustomTokenizer(char_level=True)
tokenizer.fit_on_texts(es_training_data)
tokenizer.save(configs.model_path + "/tokenizer.json")

# prepare english tokenizer, this is the output language
detokenizer = CustomTokenizer(char_level=True)
detokenizer.fit_on_texts(en_training_data)
detokenizer.save(configs.model_path + "/detokenizer.json")


# def preprocess_inputs(data_batch, label_batch):
#     encoder_input = np.zeros((len(data_batch), tokenizer.max_length)).astype(np.int64)
#     decoder_input = np.zeros((len(label_batch), detokenizer.max_length)).astype(np.int64)
#     decoder_output = np.zeros((len(label_batch), detokenizer.max_length)).astype(np.int64)

#     data_batch_tokens = tokenizer.texts_to_sequences(data_batch)
#     label_batch_tokens = detokenizer.texts_to_sequences(label_batch)

#     for index, (data, label) in enumerate(zip(data_batch_tokens, label_batch_tokens)):
#         encoder_input[index][:len(data)] = data
#         decoder_input[index][:len(label)-1] = label[:-1] # Drop the [END] tokens
#         decoder_output[index][:len(label)-1] = label[1:] # Drop the [START] tokens

#     return (encoder_input, decoder_input), decoder_output

###################################
def preprocess_inputs(data_batch, label_batch):
    # Set maximum lengths based on the data
    max_encoder_length = max(len(seq) for seq in tokenizer.texts_to_sequences(data_batch))
    max_decoder_length = max(len(seq) for seq in detokenizer.texts_to_sequences(label_batch))

    encoder_input = np.zeros((len(data_batch), max_encoder_length)).astype(np.int64)
    decoder_input = np.zeros((len(label_batch), max_decoder_length)).astype(np.int64)
    decoder_output = np.zeros((len(label_batch), max_decoder_length)).astype(np.int64)

    data_batch_tokens = tokenizer.texts_to_sequences(data_batch)
    label_batch_tokens = detokenizer.texts_to_sequences(label_batch)

    for index, (data, label) in enumerate(zip(data_batch_tokens, label_batch_tokens)):
        encoder_input[index][:len(data)] = data
        decoder_input[index][:len(label)-1] = label[:-1]  # Drop the [END] tokens
        decoder_output[index][:len(label)-1] = label[1:]  # Drop the [START] tokens

    return (encoder_input, decoder_input), decoder_output

###################################



# Create Training Data Provider
train_dataProvider = DataProvider(
    train_dataset,
    batch_size=configs.batch_size,
    batch_postprocessors=[preprocess_inputs],
    use_cache=True,
    )

# Create Validation Data Provider
val_dataProvider = DataProvider(
    val_dataset,
    batch_size=configs.batch_size,
    batch_postprocessors=[preprocess_inputs],
    use_cache=True,
    )

# Create TensorFlow Transformer Model
transformer = Transformer(
    num_layers=configs.num_layers,
    d_model=configs.d_model,
    num_heads=configs.num_heads,
    dff=configs.dff,
    input_vocab_size=len(tokenizer)+1,
    target_vocab_size=len(detokenizer)+1,
    dropout_rate=configs.dropout_rate,
    encoder_input_size=tokenizer.max_length,
    decoder_input_size=detokenizer.max_length
    )

transformer.summary()

optimizer = tf.keras.optimizers.Adam(learning_rate=configs.init_lr, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

# Compile the model
transformer.compile(
    loss=MaskedLoss(),
    optimizer=optimizer,
    metrics=[MaskedAccuracy()],
    run_eagerly=False
    )

# Define callbacks
warmupCosineDecay = WarmupCosineDecay(
    lr_after_warmup=configs.lr_after_warmup,
    final_lr=configs.final_lr,
    warmup_epochs=configs.warmup_epochs,
    decay_epochs=configs.decay_epochs,
    initial_lr=configs.init_lr,
    )
earlystopper = EarlyStopping(monitor="val_masked_accuracy", patience=5, verbose=1, mode="max")
# checkpoint = ModelCheckpoint(f"{configs.model_path}/model_epoch-{{epoch:02d}}.h5", monitor="val_masked_accuracy", verbose=1, save_best_only=True, mode="max", save_weights_only=False)
checkpoint = ModelCheckpoint(f"{configs.model_path}/model.h5", monitor="val_masked_accuracy", verbose=1, save_best_only=True, mode="max", save_weights_only=False)
tb_callback = TensorBoard(f"{configs.model_path}/logs")
reduceLROnPlat = ReduceLROnPlateau(monitor="val_masked_accuracy", factor=0.9, min_delta=1e-10, patience=2, verbose=1, mode="max")
model2onnx = Model2onnx(f"{configs.model_path}/model.h5", metadata={"tokenizer": tokenizer.dict(), "detokenizer": detokenizer.dict()}, save_on_epoch_end=False)
encDecSplitCallback = EncDecSplitCallback(configs.model_path, encoder_metadata={"tokenizer": tokenizer.dict()}, decoder_metadata={"detokenizer": detokenizer.dict()})

configs.save()

import glob
import glob

# Find the latest checkpoint
latest_checkpoint = max(glob.glob(f"{configs.model_path}/model_epoch-*.h5"), key=os.path.getctime, default=None)

# If a checkpoint exists, load the model from it
# if latest_checkpoint:
#     print(f"Loading model from checkpoint: {latest_checkpoint}")
#     transformer = tf.keras.models.load_model(latest_checkpoint, custom_objects={"MaskedLoss": MaskedLoss, "MaskedAccuracy": MaskedAccuracy})
#     # Extract the epoch number from the checkpoint filename
#     initial_epoch = int(latest_checkpoint.split('-')[-1].split('.')[0])
# else:
#     print("No checkpoint found, starting training from scratch.")
#     initial_epoch = 0

# Train the model
transformer.fit(
    train_dataProvider,
    validation_data=val_dataProvider,
    epochs=configs.train_epochs,
    callbacks=[
        earlystopper,
        warmupCosineDecay,
        checkpoint,
        tb_callback,
        reduceLROnPlat,
        model2onnx,
        encDecSplitCallback
        ],
    # initial_epoch=initial_epoch  # Resume from the epoch of the last checkpoint
    )

In [None]:
#test.py
import numpy as np
import time

from mltu.tokenizers import CustomTokenizer
from mltu.inferenceModel import OnnxInferenceModel

class PtEnTranslator(OnnxInferenceModel):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.new_inputs = self.model.get_inputs()
        self.tokenizer = CustomTokenizer.load(self.metadata["tokenizer"])
        self.detokenizer = CustomTokenizer.load(self.metadata["detokenizer"])

    def predict(self, sentence):
        start = time.time()
        tokenized_sentence = self.tokenizer.texts_to_sequences([sentence])[0]
        encoder_input = np.pad(tokenized_sentence, (0, self.tokenizer.max_length - len(tokenized_sentence)), constant_values=0).astype(np.int64)

        tokenized_results = [self.detokenizer.start_token_index]
        for index in range(self.detokenizer.max_length - 1):
            decoder_input = np.pad(tokenized_results, (0, self.detokenizer.max_length - len(tokenized_results)), constant_values=0).astype(np.int64)
            input_dict = {
                self.model._inputs_meta[0].name: np.expand_dims(encoder_input, axis=0),
                self.model._inputs_meta[1].name: np.expand_dims(decoder_input, axis=0),
            }
            preds = self.model.run(None, input_dict)[0] # preds shape (1, 206, 29110)
            pred_results = np.argmax(preds, axis=2)
            tokenized_results.append(pred_results[0][index])

            if tokenized_results[-1] == self.detokenizer.end_token_index:
                break

        results = self.detokenizer.detokenize([tokenized_results])
        return results[0], time.time() - start

def read_files(path):
    with open(path, "r", encoding="utf-8") as f:
        en_train_dataset = f.read().split("\n")[:-1]
    return en_train_dataset

# Path to dataset
en_validation_data_path = "/content/drive/MyDrive/Colab Notebooks/opus.en-fa-dev.en"
es_validation_data_path = "/content/drive/MyDrive/Colab Notebooks/opus.en-fa-dev.fa"

en_validation_data = read_files(en_validation_data_path)
es_validation_data = read_files(es_validation_data_path)

# Consider only sentences with length <= 500
max_lenght = 500
val_examples = [[es_sentence, en_sentence] for es_sentence, en_sentence in zip(es_validation_data, en_validation_data) if len(es_sentence) <= max_lenght and len(en_sentence) <= max_lenght]

translator = PtEnTranslator("/content/drive/MyDrive/Colab Notebooks/Models/MyModel/model.onnx")

val_dataset = []
for es, en in val_examples:
    results, duration = translator.predict(es)
    print("Farsi:     ", es.lower())
    print("English:     ", en.lower())
    print("English pred:", results)
    print(duration)
    print()