In [1]:
import tensorflow as tf
import numpy as np
from tensorflow import keras

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
datas = np.load('/content/drive/MyDrive/final_data/dataset.npy')
labels = np.load('/content/drive/MyDrive/final_data/labels.npy')

In [4]:
datas.shape, labels.shape

((2954, 20, 100, 100), (2954,))

In [5]:
val_datas = np.load('/content/drive/MyDrive/final_data/val_dataset.npy')
val_labels = np.load('/content/drive/MyDrive/final_data/val_labels.npy')

In [6]:
val_datas.shape, val_labels.shape

((1981, 20, 100, 100), (1981,))

In [7]:
@keras.saving.register_keras_serializable()
class EMA_Layer(tf.keras.layers.Layer):
    def __init__(self, alpha=0.9, **kwargs):
        super(EMA_Layer, self).__init__(**kwargs)
        self.alpha = alpha

    def call(self, x):
        split = tf.split(x, x.shape[-2], axis=-2)
        ema_tensor = split[0]
        for i in range(1, x.shape[-2]):
            ema_tensor = tf.concat([ema_tensor, split[i] * self.alpha + ema_tensor[... , -1:, :] * (1 - self.alpha)], axis=-2)

        return ema_tensor

    def get_config(self):
        config = super(EMA_Layer, self).get_config()
        config.update({
            'alpha': self.alpha
        })
        return config

In [8]:
@keras.saving.register_keras_serializable()
class AttentionFrame(tf.keras.layers.Layer):
  def __init__(self, alpha, **kwargs) -> None:
    super().__init__(**kwargs)
    self.alpha = alpha
    self.EMA1 = EMA_Layer(alpha = alpha)
    self.EMA2 = EMA_Layer(alpha = alpha*1.5)

  def build(self, input_shape):
    self.w_attn = tf.keras.layers.EinsumDense('...b,bc->...c', output_shape=[input_shape[1]], activation='sigmoid', bias_axes='c')
    self.idx_attn = tf.keras.Sequential(
        [
            tf.keras.layers.Dense(int(input_shape[1]), activation='tanh'),
            tf.keras.layers.Dense(int(input_shape[1]), activation='relu'),
            tf.keras.layers.Dense(input_shape[1], activation='linear'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.LayerNormalization()
        ]
    )
    super().build(input_shape)

  def call(self, input):
    output = self.EMA1(input)
    output += self.EMA2(output)
    output = self.idx_attn(output)
    output = self.w_attn(output)
    output = output[..., tf.newaxis] * input
    return output

  def get_config(self):
        config = super(AttentionFrame, self).get_config()
        config.update({
            'alpha': self.alpha,
            'EMA1': self.EMA1,
            'EMA2': self.EMA2,
            'w_attn': self.w_attn,
            'idx_attn':self.idx_attn
        })
        return config

In [9]:
@keras.saving.register_keras_serializable()
class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, **kwargs) -> None:
    super().__init__(**kwargs)
    self.d_model = d_model
    self.dff = dff

    self.LN = tf.keras.layers.LayerNormalization()

  def build(self, input_shape):
    self.cvrt = tf.keras.layers.EinsumDense('...b,bc->...c', output_shape=[self.d_model], activation='relu', bias_axes='c')
    self.position = self.add_weight(name="position", shape=([1, input_shape[1]]),
                              initializer=tf.initializers.Constant(tf.range(1., input_shape[1] + 1)),
                              trainable=False)
    self.MLP_pos = tf.keras.Sequential(
        [
            tf.keras.layers.Dense(self.d_model, activation="tanh"),
            tf.keras.layers.Dense(self.dff, activation="relu"),
            tf.keras.layers.Dense(input_shape[1], activation="sigmoid")
        ]
    )
    super().build(input_shape)

  def call(self, input):
    position = self.MLP_pos(self.position)
    output = self.cvrt(input)
    output = tf.add(output, tf.squeeze(position)[..., tf.newaxis])
    output = self.LN(output)
    return output

  def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            'd_model': self.d_model,
            'dff': self.dff,
            'LN': self.LN,
            'cvrt':self.cvrt,
            'position': self.position.numpy().tolist(),
            'MLP_pos':self.MLP_pos
        })
        return config

In [10]:
import math
@keras.saving.register_keras_serializable()
class MultiHeadAttention_OVR(tf.keras.layers.MultiHeadAttention):
  def __init__(self, num_heads: int, key_dim: int, decay: float, **kwargs):
    super().__init__(num_heads, key_dim, **kwargs)
    self.EMA = EMA_Layer(decay)


  def _compute_attention(
        self, query, key, value, attention_mask=None, training=None
    ):
        """Applies Dot-product attention with query, key, value tensors.

        This function defines the computation inside `call` with projected
        multi-head Q, K, V inputs. Users can override this function for
        customized attention implementation.

        Args:
            query: Projected query `Tensor` of shape `(B, T, N, key_dim)`.
            key: Projected key `Tensor` of shape `(B, S, N, key_dim)`.
            value: Projected value `Tensor` of shape `(B, S, N, value_dim)`.
            attention_mask: a boolean mask of shape `(B, T, S)`, that prevents
                attention to certain positions. It is generally not needed if
                the `query` and `value` (and/or `key`) are masked.
            training: Python boolean indicating whether the layer should behave
                in training mode (adding dropout) or in inference mode (doing
                nothing).

        Returns:
          attention_output: Multi-headed outputs of attention computation.
          attention_scores: Multi-headed attention weights.
        """
        # Note: Applying scalar multiply at the smaller end of einsum improves
        # XLA performance, but may introduce slight numeric differences in
        # the Transformer attention head.
        query = tf.multiply(query, 1.0 / math.sqrt(float(self._key_dim)))

        # Take the dot product between "query" and "key" to get the raw
        # attention scores.
        attention_scores = tf.einsum(self._dot_product_equation, key, query)
        attention_scores = self.EMA(attention_scores)
        attention_scores = self._masked_softmax(
            attention_scores, attention_mask
        )
        # This is actually dropping out entire tokens to attend to, which might
        # seem a bit unusual, but is taken from the original Transformer paper.
        attention_scores_dropout = self._dropout_layer(
            attention_scores, training=training
        )
        # `context_layer` = [B, T, N, H]
        attention_output = tf.einsum(
            self._combine_equation, attention_scores_dropout, value
        )
        return attention_output, attention_scores

  def get_config(self):
        config = super(MultiHeadAttention_OVR, self).get_config()
        config.update({
            'EMA': self.EMA
        })
        return config

In [11]:
@keras.saving.register_keras_serializable()
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = MultiHeadAttention_OVR(**kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

  def get_config(self):
        config = super(BaseAttention, self).get_config()
        config.update({
            'mha': self.mha,
            'layernorm': self.layernorm,
            'add':self.add
        })
        return config

In [12]:
@keras.saving.register_keras_serializable()
class CrossAttention(BaseAttention):
  def call(self, x, context):
    attn_output, attn_scores = self.mha(
        query=x,
        key=context,
        value=context,
        return_attention_scores=True)

    # Cache the attention scores for plotting later.
    self.last_attn_scores = attn_scores

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

In [13]:
@keras.saving.register_keras_serializable()
class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

In [14]:
@keras.saving.register_keras_serializable()
class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

In [15]:
@keras.saving.register_keras_serializable()
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1, **kwargs):
    super().__init__(**kwargs)
    self.d_model = d_model
    self.dff = dff
    self.dropout_rate = dropout_rate
    self.seq = tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model),
      tf.keras.layers.Dropout(dropout_rate)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = self.add([x, self.seq(x)])
    x = self.layer_norm(x)
    return x

  def get_config(self):
        config = super(FeedForward, self).get_config()
        config.update({
            'd_model': self.d_model,
            'dff': self.dff,
            'dropout_rate': self.dropout_rate,
            'seq': self.seq,
            'add': self.add,
            'layer_norm': self.layer_norm
        })
        return config

In [16]:
@keras.saving.register_keras_serializable()
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,*,d_model, num_heads, dff, dropout_rate=0.1, decay=0.9, **kwargs):
    super().__init__(**kwargs)

    self.d_model = d_model
    self.num_heads = num_heads
    self.dff = dff
    self.dropout_rate = dropout_rate
    self.decay = decay

    self.self_attention = GlobalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate,
        decay=decay)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x):
    x = self.self_attention(x)
    x = self.ffn(x)
    return x

  def get_config(self):
        config = super(EncoderLayer, self).get_config()
        config.update({
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'dropout_rate': self.dropout_rate,
            'decay': self.decay,
            'self_attention': self.self_attention,
            'ffn': self.ffn
        })
        return config

In [17]:
@keras.saving.register_keras_serializable()
class Encoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, dropout_rate=0.1, decay=0.9, **kwargs):
    super().__init__(**kwargs)

    self.d_model = d_model
    self.num_layers = num_layers
    self.num_heads = num_heads
    self.dff = dff
    self.dropout_rate = dropout_rate
    self.decay = decay

    self.enc_layers = tf.keras.Sequential([
        EncoderLayer(d_model=d_model,
                     num_heads=num_heads,
                     dff=dff,
                     dropout_rate=dropout_rate,
                     decay=decay)
        for _ in range(num_layers)])
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, x):

    # Add dropout.
    x = self.dropout(x)

    x = self.enc_layers(x)

    return x  # Shape `(batch_size, seq_len, d_model)`.

  def get_config(self):
        config = super(Encoder, self).get_config()
        config.update({
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'dropout_rate': self.dropout_rate,
            'decay': self.decay,
            'num_layers': self.num_layers,
            'enc_layers': self.enc_layers,
            'dropout':self.dropout
        })
        return config

In [18]:
@keras.saving.register_keras_serializable()
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               *,
               d_model,
               num_heads,
               dff,
               dropout_rate=0.1,
               decay=0.9,
               **kwargs):
    super(DecoderLayer, self).__init__(**kwargs)

    self.d_model = d_model
    self.num_heads = num_heads
    self.dff = dff
    self.dropout_rate = dropout_rate
    self.decay = decay

    self.causal_self_attention = CausalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate,
        decay=decay)

    self.cross_attention = CrossAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate,
        decay=decay)

    self.ffn = FeedForward(d_model, dff)

  def call(self, inputs):
    x = self.causal_self_attention(x=inputs[0])
    x = self.cross_attention(x=x, context=inputs[1])

    # Cache the last attention scores for plotting later
    self.last_attn_scores = self.cross_attention.last_attn_scores

    x = self.ffn(x)  # Shape `(batch_size, seq_len, d_model)`.
    return x, inputs[1]

  def get_config(self):
        config = super(DecoderLayer, self).get_config()
        config.update({
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'dropout_rate': self.dropout_rate,
            'decay': self.decay,
            'causal_self_attention': self.causal_self_attention,
            'cross_attention': self.cross_attention,
            'ffn': self.ffn
        })
        return config

In [19]:
@keras.saving.register_keras_serializable()
class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               dropout_rate=0.1, decay=0.9, alpha=0.9, **kwargs):
    super(Decoder, self).__init__(**kwargs)

    self.AF = AttentionFrame(alpha=alpha)

    self.d_model = d_model
    self.num_layers = num_layers
    self.num_heads = num_heads
    self.dff = dff
    self.dropout_rate = dropout_rate
    self.decay = decay
    self.alpha = alpha

    self.dropout = tf.keras.layers.Dropout(dropout_rate)
    self.dec_layers = tf.keras.Sequential([
        DecoderLayer(d_model=d_model, num_heads=num_heads,
                     dff=dff, dropout_rate=dropout_rate,
                     decay=decay)
        for _ in range(num_layers)])

  def call(self, x, context):
    x = self.dropout(x)
    x = self.AF(x)

    x  = self.dec_layers([x, context])

    # The shape of x is (batch_size, target_seq_len, d_model).
    return x[0]

  def get_config(self):
        config = super(Decoder, self).get_config()
        config.update({
            'AF': self.AF,
            'num_layers': self.num_layers,
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'dropout_rate': self.dropout_rate,
            'decay': self.decay,
            'alpha': self.alpha,
            'num_layers': self.num_layers,
            'dropout':self.dropout,
            'dec_layers': self.dec_layers,
        })
        return config

In [20]:
@keras.saving.register_keras_serializable()
class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               dropout_rate=0.1, decay=0.9, alpha=0.9, **kwargs):
    super().__init__(**kwargs)
    self.num_layers = num_layers
    self.d_model = d_model
    self.num_heads = num_heads
    self.dff = dff
    self.dropout_rate = dropout_rate
    self.decay = decay
    self.alpha = alpha
    self.pos_embedding = PositionalEmbedding(d_model=d_model, dff=dff)

    self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           dropout_rate=dropout_rate,
                           decay=decay)

    self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           dropout_rate=dropout_rate,
                           decay=decay, alpha=alpha)

  def call(self, input):
    input = tf.cast(tf.reshape(input, shape=[-1, input.shape[-3], input.shape[-2] * input.shape[-1]]), dtype=tf.float32)
    # To use a Keras model with `.fit` you must pass all your inputs in the
    # first argument.
    input = self.pos_embedding(input)

    context = self.encoder(input)  # (batch_size, context_len, d_model)

    output = self.decoder(input, context)  # (batch_size, target_len, d_model)

    return output

In [21]:
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder()

In [22]:
labels_onehot = encoder.fit_transform(labels.reshape(-1, 1)).toarray()
val_labels_onehot = encoder.fit_transform(val_labels.reshape(-1, 1)).toarray()

In [None]:
model = tf.keras.Sequential(
    [
        tf.keras.layers.Rescaling(1./255., input_shape=(20, 100, 100)),
        Transformer(num_layers=10, d_model=64, num_heads=8, dff=128, dropout_rate=0.2, decay=0.7, alpha=0.9),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dropout(0.6),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.Dense(128, activation='gelu'),
        tf.keras.layers.Dense(20, activation='softmax')
    ]
)

In [None]:
optimizer = tf.keras.optimizers.Adam(1e-5)

In [None]:
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
model.summary()

Model: "sequential_32"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 rescaling (Rescaling)       (None, 20, 100, 100)      0         
                                                                 
 transformer (Transformer)   (None, 20, 64)            4981496   
                                                                 
 flatten (Flatten)           (None, 1280)              0         
                                                                 
 dropout_22 (Dropout)        (None, 1280)              0         
                                                                 
 dense_72 (Dense)            (None, 256)               327936    
                                                                 
 dense_73 (Dense)            (None, 128)               32896     
                                                                 
 dense_74 (Dense)            (None, 20)              

In [None]:
encoder.categories_[0].shape

(20,)

In [None]:
sampling_strategy = {i : 500 for i in range(encoder.categories_[0].shape[0]) }

In [None]:
labels_categories = np.stack([np.argmax(i) for i in labels_onehot])

In [None]:
labels_onehot[0]

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.,
       0., 0., 0.])

In [None]:
labels_categories

array([12, 10, 16, ..., 10,  0,  6])

In [None]:
encoder.categories_

[array(['ban_ghe_sofa', 'ban_tho', 'cau_thang', 'chia_khoa', 'den',
        'dien_thoai_ban', 'dong_ho', 'ke_sach', 'nha_biet_thu',
        'nha_chung_cu', 'nha_go', 'nha_ky_tuc_xa', 'nha_lau',
        'nha_may_ngoi', 'nha_rong', 'nha_san', 'nha_tren_cay', 'nha_tret',
        'no_event', 'tranh_anh_treo_tuong'], dtype='<U20')]

In [None]:
from imblearn.over_sampling import SMOTE

# Chuyển đổi dữ liệu
dataForSmote = datas.reshape(datas.shape[0], datas.shape[1] * datas.shape[2] * datas.shape[3])

# Áp dụng SMOTE
smote = SMOTE(sampling_strategy=sampling_strategy)
x_smote, y_smote = smote.fit_resample(dataForSmote, labels_categories)

x_smote = x_smote.reshape(-1 ,datas.shape[1], datas.shape[2], datas.shape[3])




In [None]:
y_smote_onehot = encoder.fit_transform(y_smote.reshape(-1, 1)).toarray()

In [None]:
callback = tf.keras.callbacks.ModelCheckpoint('/content/drive/MyDrive/model_AF.keras', monitor='val_accuracy', verbose=2, save_best_only=True)

In [None]:
history = model.fit(x=x_smote, y=y_smote_onehot, batch_size=64, epochs=100, validation_data=[val_datas, val_labels_onehot], callbacks=[callback])

Epoch 1/100
Epoch 1: val_accuracy improved from -inf to 0.05098, saving model to /content/drive/MyDrive/model_AF.keras
Epoch 2/100
Epoch 2: val_accuracy did not improve from 0.05098
Epoch 3/100
Epoch 3: val_accuracy improved from 0.05098 to 0.15750, saving model to /content/drive/MyDrive/model_AF.keras
Epoch 4/100
Epoch 4: val_accuracy improved from 0.15750 to 0.20747, saving model to /content/drive/MyDrive/model_AF.keras
Epoch 5/100
Epoch 5: val_accuracy improved from 0.20747 to 0.32458, saving model to /content/drive/MyDrive/model_AF.keras
Epoch 6/100
Epoch 6: val_accuracy did not improve from 0.32458
Epoch 7/100
Epoch 7: val_accuracy improved from 0.32458 to 0.39223, saving model to /content/drive/MyDrive/model_AF.keras
Epoch 8/100
Epoch 8: val_accuracy improved from 0.39223 to 0.41242, saving model to /content/drive/MyDrive/model_AF.keras
Epoch 9/100
Epoch 9: val_accuracy improved from 0.41242 to 0.50883, saving model to /content/drive/MyDrive/model_AF.keras
Epoch 10/100
Epoch 10: 

In [23]:
model = tf.keras.models.load_model('/content/drive/MyDrive/model_AF1.keras')

In [24]:
y_pred = model.predict(val_datas)



In [25]:
lb = np.argmax(val_labels_onehot, axis=1)

In [26]:
from sklearn.metrics import classification_report

In [27]:
print(classification_report(lb, np.argmax(y_pred, axis=1)))

              precision    recall  f1-score   support

           0       1.00      0.99      0.99       310
           1       1.00      0.97      0.98       218
           2       0.98      0.98      0.98        62
           3       0.95      0.97      0.96        64
           4       1.00      0.95      0.98        62
           5       0.99      1.00      0.99        72
           6       0.95      1.00      0.97        56
           7       0.97      1.00      0.98       195
           8       0.99      0.96      0.98       102
           9       0.97      1.00      0.98        91
          10       1.00      0.98      0.99       112
          11       0.98      0.98      0.98        66
          12       0.93      1.00      0.96        50
          13       0.95      0.93      0.94        56
          14       0.91      0.91      0.91        46
          15       0.89      0.95      0.92        59
          16       0.97      0.96      0.96        70
          17       0.94    