In [None]:
import keras # type: ignore
import tensorflow as tf # type: ignore
import matplotlib.pyplot as plt # type: ignore
from keras import layers, optimizers, ops # type: ignore
from keras_tuner import (HyperModel,
                         Hyperband,
                         RandomSearch,
                         Objective,
                         BayesianOptimization)

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim,
            output_dim=output_dim)

        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length,
            output_dim=output_dim)

        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.supports_masking = True


    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(start=0, stop=length, step=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return keras.ops.not_equal(inputs, 0)  

    def build(self, input_shape):
        self.position_embeddings.build(input_shape)

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
class HyperbandSearch():
    def __init__(self, max_length, vocab_size, train_ds, vals_ds, **kwargs):
        super().__init__(**kwargs)
        self.train_ds, self.vals_ds = train_ds, vals_ds
        self.max_length = max_length
        self.vocab_size = vocab_size

    def build_model_hyperband(self, hp):
        embed_dim = hp.Int('embed_dim', min_value=128, max_value=2048, step=128)
        num_heads = hp.Int('num_heads', min_value=2, max_value=8, step=1)
        dense_dim = hp.Int('dense_dim', min_value=32, max_value=512, step=32)
        dropout_rate = hp.Float('dropout_rate', min_value=0.3, max_value=0.9, step=0.1)
        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='log')

        inputs = keras.Input(shape=(None,), dtype="int64")
        x = PositionalEmbedding(
            self.max_length,
            self.vocab_size, embed_dim)(inputs)
        x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
        x = layers.GlobalMaxPooling1D()(x)
        x = layers.Dropout(dropout_rate)(x)

        outputs = layers.Dense(1)(x)

        model = keras.Model(inputs, outputs)
        model.compile(
            optimizer=optimizers.Adam(learning_rate=learning_rate),
            loss="mae")

        return model

    def find_w_hyperband(self):
        tuner = Hyperband(
            self.build_model_hyperband,
            objective='val_loss',
            max_epochs=30,
            factor=3,
            directory='my_dir',
            project_name='transformer_tuning'
        )

        tuner.search(
            self.train_ds,
            epochs=60,
            validation_data=self.vals_ds)
        hps = tuner.get_best_hyperparameters(num_trials=1)[0]
        return hps.values

In [None]:
class RandomRearch():
    def __init__(self, max_length, vocab_size, train_ds, vals_ds, **kwargs):
        super().__init__(**kwargs)
        self.train_ds, self.vals_ds = train_ds, vals_ds
        self.max_length = max_length
        self.vocab_size = vocab_size

    def build_model_random(self, hp):
        embed_dim = hp.Int('embed_dim', min_value=128, max_value=2048, step=128)
        num_heads = hp.Int('num_heads', min_value=2, max_value=8, step=1)
        dense_dim = hp.Int('dense_dim', min_value=32, max_value=512, step=32)
        dropout_rate = hp.Float('dropout_rate', min_value=0.3, max_value=0.7, step=0.1)
        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='log')

        inputs = keras.Input(shape=(None,), dtype="int64")
        #x = layers.Embedding(self.vocab_size, embed_dim)(inputs)
        x = PositionalEmbedding(
            self.max_length,
            self.vocab_size, embed_dim)(inputs)
        x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
        x = layers.GlobalMaxPooling1D()(x)
        x = layers.Dropout(dropout_rate)(x)

        outputs = layers.Dense(1)(x)

        model = keras.Model(inputs, outputs)
        model.compile(
            optimizer=optimizers.Adam(learning_rate=learning_rate),
            loss="mae")

        return model

    def find_w_random(self):
        tuner = RandomSearch(
            self.build_model_random,
            objective='val_loss',
            max_trials=30,  # Número de combinações a serem testadas
            executions_per_trial=1,
            directory='my_dir_rnd',
            project_name='transformer_randomsearch'
        )

        tuner.search(
            self.train_ds,
            epochs=60,
            validation_data=self.vals_ds)

        best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
        return best_hps.values

In [None]:
class BayesianRearch(HyperModel):
    def __init__(self, max_length, vocab_size, train_ds, vals_ds, **kwargs):
        super().__init__(**kwargs)
        self.train_ds, self.vals_ds = train_ds, vals_ds
        self.max_length = max_length
        self.vocab_size = vocab_size

    def build(self, hp):
        embed_dim = hp.Int('embed_dim', min_value=256, max_value=2048, step=256)
        num_heads = hp.Int('num_heads', min_value=2, max_value=8, step=1)
        dense_dim = hp.Int('dense_dim', min_value=64, max_value=512, step=64)
        dropout_rate = hp.Float('dropout_rate', min_value=0.3, max_value=0.7, step=0.1)
        optimizer = hp.Choice(name="optimizer", values=["adamw", "adam", "sgd"])
        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='log')

        inputs = keras.Input(shape=(None,), dtype="int64")
        #x = layers.Embedding(vocab_size, embed_dim)(inputs)
        x = PositionalEmbedding(
            self.max_length,
            self.vocab_size, embed_dim)(inputs)
        x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
        x = layers.GlobalMaxPooling1D()(x)
        x = layers.Dropout(dropout_rate)(x)

        outputs = layers.Dense(1)(x)

        model = keras.Model(inputs, outputs)
        model.compile(
            optimizer=optimizer,
            loss="mae")

        return model

    def call_bayesian(self):
        objective = Objective(
            name="val_loss",
            direction="min"
        )
        tuner = BayesianOptimization(
            self.build,
            objective=objective,
            max_trials=100,  # Número de combinações a serem testadas
            executions_per_trial=2,
            directory='price_prediction',
            project_name='transformer_bayesian',
            overwrite=True
        )
        display(tuner.search_space_summary())
        callbacks = [
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=5),
        ]
        tuner.search(
            self.train_ds,
            epochs=100,
            callbacks=callbacks,
            verbose=2,
            validation_data=self.vals_ds
        )

        return tuner