This is just a note for my learning of transformer, referred to articles and codes from

- https://www.tensorflow.org/tutorials/text/transformer
- https://datawhalechina.github.io/learn-nlp-with-transformers

All credits go to the above authors.

The notes / comments are my understanding at this moment, please correct me if they are wrong.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

## Tokenizer
Tokenize the inputting sequence

In [None]:
class Tokenizer():
    def __init__(
        self,
        vocabulary          : np.array # List[str]
        ):
        self.voc = vocabulary

    def __call__(self, sequence_batch : np.array) -> np.array:
        """
        Tokenize the sequence
        @arg
            sequence_batch: batch of sequences inputting.
                            starts with <BOS> and ends with <EOS>
                            the list wrapped in np.array is List[List[str]]
        @return
            np.array, shape [batch, sequence_size]
        """
        # TODO 1. insert <bos> and <eos> to sequence and padding ?
        #      2. performance issue ?
        tk = np.array([[self.voc.index(s) for s in seq] for seq in sequence_batch])
        return tk
        

## Embedding And Positional encoding

If input is [batch_size, sequence_length], after embedding, we get a tensor of [batch_size, sequence_length, embedding_dimension].
* The batch size is the number of batch to transformer, in human's language, the number of sentences.
* The sequence length is the size of samples in one inputting, that is, the words number in one sentence. Since not all sentences have same length, this value should be the possible longest length.
* The embedding dimention is decided by Word2Vec. For transformer make it 512.

In [3]:
class PositionalEmbedding(keras.layers.Layer):
    def __init__(
        self, 
        vocabulary_size         : int,
        max_sequence_length     : int,
        embed_dimension         : int = 512,
        padding                 : bool = False, # in case for encoder if masking padding is desired
        initializer             = tf.initializers.RandomNormal(0., 0.01)
        ):
        """
        @arg
        embed_dimension:
            The dimmention of embedding, or feature number.
        sequence_length:
            The words size in one sentence. 
            If there has 8 words in one sentence, the value is 8. 
            However, since it's impossible for all sentences have same size, 
            this value should be set to a size of the possibe longest sentence.
        vocabulary:
            The vocabulary table.
        """
        super(PositionalEmbedding, self).__init__()
        
        self.s_size = max_sequence_length
        self.v_size = vocabulary_size
        self.e_size = embed_dimension
        
        self.embedding = keras.layers.Embedding(
            input_dim=self.v_size,
            output_dim=self.e_size,
            input_length=self.s_size,
            embeddings_initializer = initializer,
            mask_zero=True
        )
        
        # make positional encoding array
        positional_encoding = np.array(
            [[pos / np.power(10000, 2 * i / self.e_size) for i in range(self.e_size)]
             if (not (padding and 0 != pos)) else 
             np.zeros(self.e_size) for pos in np.arange(self.s_size)
            ])

        # The formula for calculating the positional encoding is as follows:
        # PE(pos, 2i) = sin(pos/10000^(2i/d_model))
        # PE(pos, 2i+1) = cos(pos/10000^(2i/d_model))

        positional_encoding[0:, 0::2] = np.sin(positional_encoding[0:, 0::2])
        positional_encoding[0:, 1::2] = np.cos(positional_encoding[0:, 1::2])
        positional_encoding = positional_encoding[None, :, :]
        self.positional_encoding = tf.cast(positional_encoding, dtype=tf.float32)
        
    def call(self, batch : np.array):
        """
        Args:
            batch: which is expected to be the output of Tokenizer.
            The type of batch is np.array, shape -> [batch_size, sequence_szie];
        
        return:
            tf.Tensor, shape -> [batch_size, sequence_szie, embed_dimension]
        """
        return self.embedding(batch) + self.positional_encoding[:, :self.s_size, :]

In [None]:
# sample result of PositionalEmbedding
model = keras.Sequential()
model.add(PositionalEmbedding(5, 4, 3, padding=True))
model.compile()
test_rst = model.predict(np.array([[0,1,2,3],[2,3,4,1]]))
print(test_rst)

## Self Attention
Scaled Dot-Product Attention.

In [4]:
class SelfAttention() :
    def __init__(self, d_model):
        """
        It may be better to be implemented in Multihead-Attention directly.
        Just to make it be the "self attention part" as described in the Transformer diagram.
        Args:
            d_model: tokenized embeded_dimention / head_num
        """
        self.wq = keras.layers.Dense(d_model)
        self.wk = keras.layers.Dense(d_model)
        self.wv = keras.layers.Dense(d_model)
        self.d_model = d_model

    def __call__(self, iq, ik, iv, mask=None):
        """
        dot(Q,K) => Scale => Mask => Softmax => dot(attention_weights, V)
        Args:
            input -> iq, ik, iv: [sequence_szie, embeded_dimension]
            NOTICE: In real life, the shape will be [batch, head_num, seq_size, e_dim],
                    for multi-head attention calculation.
                    However, when we consider the calculation itself, 
                    we can assume it as if sequence_szie, embeded_dimension.
        """
        assert (self.d_model == tf.shape(iq)[-1]), "Invalid input q"
        assert (self.d_model == tf.shape(ik)[-1]), "Invalid input k"
        assert (self.d_model == tf.shape(iv)[-1]), "Invalid input v"

        q = self.wq(iq)
        k = self.wk(ik)
        v = self.wv(iv)

        # FORMULAR -> Z = Softmax(QK(t)/d(k)^0.5)V
        
        # transpose the last 2 dimention of k. (..., seq_size, e_dim) => (..., e_dim, seq)
        qkt = tf.matmul(q, k, transpose_b=True)     # Q dot K(transpose)
        dk = tf.cast(tf.shape(k)[-1], tf.float32)   # K embeded dimention (tensor)
        scale = qkt / tf.math.sqrt(dk)

        # mask
        if mask is not None:
            scale += mask * (-1e10)

        # return values
        aw = tf.nn.softmax(scale, axis=-1)          # attention weights
        sv = tf.matmul(aw, v)                       # scores vector     (result of attention)
        
        return sv, aw

## MultiHead Attention

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, head_num):
        """
        Multi head attention
        Args:
            head_num:  number of heads
        """
        super.__init__(MultiHeadAttention, self)
        self.head_num
        
    def call(self, seq_batch: tf.tensor, mask=None):
        """
        Args:
            seq_batch: positional-embeded sequences. shape = [batch_size, sequence_szie, embed_dimension]
        """
        ishape = seq_batch.get_shape()
        assert (3, len(ishape)), "The intput should be a tensor of [batch_size, seq_size, embedded_dimention]"
        
        batch_size = ishape[0]
        seq_num = ishape[1]
        e_dim = ishape[2]                                                           # embeded_dimention

        d_model = e_dim / self.head_num                                             # d_model to self-attention (embedded_dimention / head_num)
        assert (e_dim == d_model * self.head_num), "The dimention should be divisible of head number."

        q = k = v = tf.reshape(seq_batch, (batch_size, -1, seq_num, d_model))
        attention = SelfAttention(d_model)
        scores, weights = attention(q, k, v, mask)

        scaled_attention = tf.transpose(scores, perm=[0,2,1,3])                     #[batch_size, seq_size, head_num, d_model]
        connect_attention = tf.reshape(scaled_attention, (batch_size, -1, e_dim))   #[batch_size, seq_size, e_dim]
        output = self.dense(connect_attention)                                      #[batch_size, seq_size, e_dim]

        return output, weights

## Encoder
After self-attention, the output is forwarded to normalization and dropout layer.

In [None]:
class Encoder(keras.layers.Layer):
    pass


## Decoder

In [None]:
class Decoder():
    pass

## Transfomer model

In [None]:
class Transformer(keras.Model):
    def __init__(self):
       pass