# Dataset import and its preprocessing

In [1]:
# imports our files
%%capture
import tensorflow as tf
import time, os
tf.keras.utils.get_file('pix2html.zip', cache_subdir=os.path.abspath('.'), origin = 'https://raw.githubusercontent.com/psimanaitis/pix2html/main/dataset/pix2html.zip', extract = True)
tf.keras.utils.get_file('pix2code.zip', cache_subdir=os.path.abspath('.'), origin = 'https://raw.githubusercontent.com/psimanaitis/pix2html/main/dataset/pix2code.zip', extract = True)

In [2]:
%%capture
import numpy as np
import json
from tensorboard.plugins.hparams import api as hp

def get_dataset_vectors(html_entries, tokenizer, input_path):
    train_captions = []
    img_name_vector = []
    for x in range(len(html_entries)):
            entry = html_entries[x]
            full_input_path =  input_path + entry['id'] + '.jpeg'
            img_name_vector.append(full_input_path)
            train_captions.append(entry['content'])
    
    train_seqs = tokenizer.texts_to_sequences(train_captions)
    cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')    
 
    return cap_vector, img_name_vector
 
 
def load_dataset(dataset_name):
    #dataset_name pix2html | pix2code
    input_path = dataset_name + '/resized/' 
 
    with open(dataset_name + '/tokens.json', 'r') as f:
        tokens = json.load(f)
 
    # TODO add start end tokens for pix2code ?
    top_k = len(tokens) + 3
    tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k, oov_token="unknown", filters='', split=" ")
    tokenizer.fit_on_texts(tokens)
    tokenizer.word_index['<pad>'] = 0
    tokenizer.index_word[0] = '<pad>'
 
    with open(dataset_name + '/train-html.json', 'r') as f:
        train_entries = json.load(f)
 
    with open(dataset_name + '/train-html.json', 'r') as f:
        test_entries = json.load(f)
 
    train_sequences, train_images = get_dataset_vectors(train_entries, tokenizer, input_path)
    test_sequences, test_images = get_dataset_vectors(test_entries, tokenizer, input_path)   
 
 
    return train_sequences, train_images, test_sequences, test_images, top_k, tokenizer

switcher = { 
     "vgg16": (tf.keras.applications.VGG16(include_top=False,weights='imagenet'), tf.keras.applications.vgg16.preprocess_input, 224, 512),
     "vgg19": (tf.keras.applications.VGG19(include_top=False,weights='imagenet'), tf.keras.applications.vgg19.preprocess_input, 224, 512),  
     "EfficientNet": (tf.keras.applications.EfficientNetB3(include_top=False,weights='imagenet'), tf.keras.applications.efficientnet.preprocess_input, 300, 1536),  
     "inception_resnet_v2": (tf.keras.applications.InceptionResNetV2(include_top=False,weights='imagenet'), tf.keras.applications.inception_resnet_v2.preprocess_input, 299, 1536),
     "resnet": (tf.keras.applications.ResNet152V2(include_top=False,weights='imagenet'), tf.keras.applications.resnet_v2.preprocess_input, 224, 2048),
     "inception_v3": (tf.keras.applications.InceptionV3(include_top=False,weights='imagenet'), tf.keras.applications.inception_v3.preprocess_input, 299, 2048),
     "Xception": (tf.keras.applications.Xception(include_top=False,weights='imagenet'), tf.keras.applications.xception.preprocess_input, 299, 2048),      
} 
 
def get_load_image(imageModel):
    image_model, preprocess_input, dimensions, cnn_features = switcher.get(imageModel)
    def load_image(image_path):
        img = tf.io.read_file(image_path)
        img = tf.image.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, (dimensions, dimensions))
        img = preprocess_input(img)
        return img, image_path
 
    new_input = image_model.input
    hidden_layer = image_model.layers[-1].output
    image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
 
    return load_image, image_features_extract_model, cnn_features
 
def process_images_to_npy(img_name_vector, image_features_extract_model, load_image):
    image_dataset = tf.data.Dataset.from_tensor_slices(img_name_vector)
    image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)
    for img, path in image_dataset:
        batch_features = image_features_extract_model(img)
        batch_features = tf.reshape(batch_features, (batch_features.shape[0], -1, batch_features.shape[3]))
 
        for bf, p in zip(batch_features, path):
            path_of_feature = p.numpy().decode("utf-8")
            np.save(path_of_feature, bf.numpy())   
 
 
def map_func(img_name, html):
    np_image_tensor = np.load(img_name.decode('utf-8')+'.npy')
    return np_image_tensor, html
 
 
def process_dataset(image, html, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((
        {
            'inputs': image,
            'dec_inputs': html[:, :-1]
        },
        {
            'outputs': html[:, 1:]
        },
    ))  
    # TODO sort dataset by sequence length, speeds up training
    dataset = dataset.cache()
    dataset = dataset.shuffle(1000)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    return dataset
 
 
def get_dataset(imageModel, dataset_name, batch_size):
    load_image, image_features_extract_model, cnn_features = get_load_image(imageModel)
    train_sequences, train_images, test_sequences, test_images, top_k, tokenizer = load_dataset(dataset_name)
 
    process_images_to_npy(train_images, image_features_extract_model, load_image)
    process_images_to_npy(test_images, image_features_extract_model, load_image)
    

    train_images_tensors = []
    for entry in train_images:
        train_images_tensors.append( np.load(entry+'.npy'))
    
    test_images_tensors = []
    for entry in test_images:
        test_images_tensors.append( np.load(entry+'.npy'))

    train_dataset = process_dataset(test_images_tensors, train_sequences, batch_size)
    test_dataset = process_dataset(test_images_tensors, test_sequences, batch_size)
   
 
    return (train_dataset, test_dataset, top_k, len(train_sequences[0]), tokenizer, cnn_features)

# Model which follow captioning transformer with staked attention architecture

In [3]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]


def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask


def create_masks_decoder(tar):
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
    return combined_mask

In [4]:
%%capture
!pip install tf-models-official

In [5]:
from official.nlp.transformer.ffn_layer import FeedForwardNetwork
from official.nlp.modeling.layers.position_embedding import RelativePositionEmbedding

def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
    inputs = tf.keras.Input((None, d_model), name="inputs")
    enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
    look_ahead_mask = tf.keras.Input((1, None, None), name="look_ahead_mask")

    attn1 = tf.keras.layers.MultiHeadAttention(value_dim=d_model, key_dim=d_model, num_heads=num_heads, dropout=dropout,
                                               output_shape=d_model)(inputs, inputs, inputs, look_ahead_mask,
                                                                     False)  
    out1 = tf.keras.layers.LayerNormalization(epsilon=1e-6, name='add_and_norm_1')(attn1 + inputs)

    attn2 = tf.keras.layers.MultiHeadAttention(value_dim=d_model, key_dim=d_model, num_heads=num_heads, dropout=dropout,
                                               output_shape=d_model)(out1, enc_outputs, enc_outputs, None,
                                                                     False) 
    out2 = tf.keras.layers.LayerNormalization(epsilon=1e-6, name='add_and_norm_2')(attn2 + out1)

    ffn_output = FeedForwardNetwork(hidden_size=d_model, filter_size=units, relu_dropout=dropout)(out2)
    out3 = tf.keras.layers.LayerNormalization(epsilon=1e-6, name='add_and_norm_3')(ffn_output + out2)

    return tf.keras.Model(inputs=[inputs, enc_outputs, look_ahead_mask], outputs=out3, name=name)


def decoder(d_model, num_heads, units, target_vocab_size, max_pos_encoding, num_layers=6, dropout=0.1, name="decoder"):
    inputs = tf.keras.Input((None,), name="inputs")
    enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
    look_ahead_mask = tf.keras.Input(shape=(1, None, None), name='look_ahead_mask')

    embeddings = tf.keras.layers.Embedding(target_vocab_size, d_model, name='output_embeding')(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    embeddings += RelativePositionEmbedding(max_timescale=max_pos_encoding, hidden_size=d_model)(embeddings)

    outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

    for i in range(num_layers):
        outputs = decoder_layer(units=units, d_model=d_model, num_heads=num_heads, dropout=dropout,
                                name='decoder_layer_{}'.format(i), )(inputs=[outputs, enc_outputs, look_ahead_mask])

    return tf.keras.Model(inputs=[inputs, enc_outputs, look_ahead_mask], outputs=outputs, name=name)


def transformer(vocab_size, units, d_model, num_heads, max_pos_encoding, num_layers, dropout, cnn_features, name="transformer"):
    inputs = tf.keras.Input((None, cnn_features), name="inputs")
    dec_inputs = tf.keras.Input((None,), name="dec_inputs")

    dec_mask = tf.keras.layers.Lambda(create_masks_decoder, output_shape=(1, None, None), name='look_ahead_mask')(
        dec_inputs)

    enc_outputs = tf.keras.layers.Dense(d_model, activation='relu', name='linear_and_relu')(inputs)
    enc_outputs = tf.keras.layers.Dropout(rate=dropout)(enc_outputs)

    dec_outputs = decoder(target_vocab_size=vocab_size, units=units, d_model=d_model, num_heads=num_heads,
                          max_pos_encoding=max_pos_encoding, num_layers=num_layers, dropout=dropout)(
        inputs=[dec_inputs, enc_outputs, dec_mask])

    outputs = tf.keras.layers.Dense(vocab_size, activation='softmax', name="outputs")(dec_outputs)

    return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)

In [6]:

def custom_accuracy(real, pred):
  casted = tf.cast(tf.argmax(pred, axis=2), tf.float32)  
  accuracies = tf.equal(real,  casted)
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  accuracies = tf.math.logical_and(mask, accuracies)
  accuracies = tf.cast(accuracies, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
 
  return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)  

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)                                 
    loss_ *= mask
    return tf.reduce_sum(loss_) / tf.reduce_sum(mask)  

# Experiments part

In [None]:
from official.nlp.optimization import create_optimizer


# get dataset 
batch_size = 32
cnn_name = 'resnet' # inception_v3 inception_resnet_v2 resnet vgg16 vgg19 EfficientNet Xception' 
dataset_name =  'pix2code' # pix2code pix2html

(dataset, test_dataset, top_k, sequence_length, tokenizerm, cnn_features) = get_dataset(cnn_name, dataset_name, batch_size)
max_pos_encoding = sequence_length
vocab_size = top_k + 1  


# get optimizer
epochs = 5
train_data_size = 1500
steps_per_epoch = int(train_data_size / batch_size)
num_train_steps = steps_per_epoch * epochs
warmup_steps = int(epochs * train_data_size * 0.1 / batch_size)
initial_learning_rate=2e-5


optimizer = create_optimizer(init_lr=initial_learning_rate,
                     num_train_steps=num_train_steps,
                     num_warmup_steps=warmup_steps,
                     end_lr=0.0,
                     optimizer_type='adamw')

# get model

units = 364
d_model = 182
num_heads = 8
num_layers = 6
dropout = 0.1

model = transformer(d_model = d_model, 
                    units = units,
                    vocab_size = vocab_size, 
                    max_pos_encoding=max_pos_encoding,
                    num_layers = num_layers,
                    dropout = dropout,
                    num_heads = num_heads,
                    cnn_features= cnn_features,
                    name='model')


model.compile(optimizer=optimizer, loss=loss_function,  metrics=[custom_accuracy])

history = model.fit(dataset, validation_data=test_dataset, epochs=epochs, verbose=0, batch_size=batch_size)


# Variuos utils to visualize and save model
# model.save('saved_model/my_model')
# model.sumarry()
# tf.keras.utils.plot_model(model, to_file='transformer.png', show_shapes=True, expand_nested=True, show_dtype=True, dpi=80)
# model.save_weights("my_weights")
# model.save('saved_model/my_model') Does not work because it cannot serialise custom learning rate