In [None]:
import tensorflow as tf
from transformer_custom import Transformer, beam_search_decoder
from create_config import transformer_config, trainer_config
from tf_text_preprocess import TextPreprocess
from tf_text_postprocess import TextPostprocessor
from regularization import RegularizedDenseLayer
from get_dataset import gather_dataset
import matplotlib.pyplot as plt
import pymorphy2
import datetime

dataset_path_main = 'dataset'
dataset = gather_dataset(dataset_path_main)
datasets_en = dataset[0]
datasets_ru = [list(map(lambda x: pymorphy2.MorphAnalyzer().parse(x)[0].normal_form, dataset_ru)) for dataset_ru in dataset[1] if dataset_ru]

text_processor = TextPreprocess()
text_processor.load_text_files(text_files_path=datasets_en)
datasets_en = text_processor.tokenize_text(datasets_en)
datasets_en = text_processor.pad_sequences(datasets_en)

text_processor.load_text_files(text_files_path=datasets_ru)
datasets_ru = text_processor.tokenize_text(datasets_ru)
datasets_ru = text_processor.pad_sequences(datasets_ru)

config = transformer_config()
config_train = trainer_config()

embedding_layer = tf.keras.layers.Embedding(config["input_vocab_size"], config["d_model"])

transformer = Transformer(config["num_layers"], config["d_model"], config["num_heads"], config["dff"],
                          config["input_vocab_size"], pe_input=config["maximum_position_encoding"],
                          rate=config["dropout_rate"], embedding_layer=embedding_layer)

regularized_layer = RegularizedDenseLayer(config["d_model"])

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=config_train['loss_reduction'])
optimizer = tf.keras.optimizers.Adam(learning_rate=config_train['learning_rate'])

morph = pymorphy2.MorphAnalyzer()

@tf.function
def train_step(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]

    with tf.GradientTape() as tape:
        predictions, _ = transformer(inp, tar_inp,
                                     training=True,
                                     enc_padding_mask=None,
                                     look_ahead_mask=None,
                                     dec_padding_mask=None)
        predictions = regularized_layer(predictions)
        loss = loss_object(tar_real, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    return loss, predictions

text_postprocessor_en = TextPostprocessor(text_processor.tokenizer)
text_postprocessor_ru = TextPostprocessor(text_processor.tokenizer)

logdir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)
file_writer = tf.summary.create_file_writer(logdir)
epoch_train = config_train['epoch_train']
for epoch in range(epoch_train):
    for (batch_en, batch_ru) in zip(datasets_en, datasets_ru):
        inp_en, tar_en = batch_en
        inp_ru, tar_ru = batch_ru
        loss_en, predictions_en = train_step(inp_en, tar_en)
        loss_ru, predictions_ru = train_step(inp_ru, tar_ru)
        print(f'Epoch {epoch + 1} Loss EN {loss_en:.4f} Loss RU {loss_ru:.4f}')
        with file_writer.as_default():
            tf.summary.scalar('loss_en', data=loss_en, step=epoch)
            tf.summary.scalar('loss_ru', data=loss_ru, step=epoch)

        # Postprocess the output
        translated_texts_en = text_postprocessor_en.sequences_to_texts(predictions_en)
        translated_texts_ru = text_postprocessor_ru.sequences_to_texts(predictions_ru)
        print(f'Translated texts EN: {translated_texts_en}')
        print(f'Translated texts RU: {translated_texts_ru}')

        # Beam search
        beam_search_result_en = beam_search_decoder(predictions_en, 3)
        beam_search_result_ru = beam_search_decoder(predictions_ru, 3)
        print(f'Beam search result EN: {beam_search_result_en}')
        print(f'Beam search result RU: {beam_search_result_ru}')

    if (epoch + 1) % 5 == 0:
        weights_dict = {var.name: var.numpy() for var in transformer.trainable_variables}
        transformer.save_model(weights_dict, f'novelsdreamer-ru-t4m_epoch_{epoch+1}.safetensors')
        loss_en_values = [loss_en.numpy() for loss_en in loss_en]
        loss_ru_values = [loss_ru.numpy() for loss_ru in loss_ru]
        fig, ax = plt.subplots()
        scatter = ax.scatter(loss_en_values, loss_ru_values)
        ax.set_xlabel('Loss EN')
        ax.set_ylabel('Loss RU')
        ax.set_title('Training Loss')
        plt.savefig(f'novelsdreamer-ru-t4m_epoch_{epoch+1}.png')
        plt.close()

    if epoch == 9:
        transformer.save_model(weights_dict, f'novelsdreamer-ru-t4m_epoch_weights_final.safetensors')
        loss_en_values = [loss_en.numpy() for loss_en in loss_en]
        loss_ru_values = [loss_ru.numpy() for loss_ru in loss_ru]
        fig, ax = plt.subplots()
        scatter = ax.scatter(loss_en_values, loss_ru_values)
        ax.set_xlabel('Loss EN')
        ax.set_ylabel('Loss RU')
        ax.set_title('Training Loss')
        plt.savefig(f'novelsdreamer-ru-t4m_epoch_{epoch+1}.png')
        plt.close()

    if epoch == 9:
        with file_writer.as_default():
            tf.summary.scalar('final_loss_en', data=loss_en, step=epoch)
            tf.summary.scalar('final_loss_ru', data=loss_ru, step=epoch)
        file_writer.close()

loss_en_values = [loss.numpy() for loss in loss_en]
loss_ru_values = [loss.numpy() for loss in loss_ru]

fig, ax = plt.subplots()
scatter = ax.scatter(loss_en_values, loss_ru_values)

ax.set_xlabel('Loss EN')
ax.set_ylabel('Loss RU')
ax.set_title('Training Loss')

plt.show()


In [None]:
import matplotlib.pyplot as plt

loss_en_values = [loss.numpy() for loss in loss_en]
loss_ru_values = [loss.numpy() for loss in loss_ru]

fig, ax = plt.subplots()
scatter = ax.scatter(loss_en_values, loss_ru_values)

ax.set_xlabel('Loss EN')
ax.set_ylabel('Loss RU')
ax.set_title('Training Loss')

plt.show()
