In [2]:
import numpy as np

# 3.1 Encoder and Decoder Stacks
Implemented only based on chapter 3.1

## 3.1.1 Encoder

In [87]:
class Encoder:
    '''
    Composed of 6 stack of identical layers
    Each layer has two sub-layers
    
    First layer : Multi-Head Self-Attention Mechanism
    Second layer : Position-wise fully connected Feed-Forward Network.

    Residual Connection around each two sub-layer is employed, followed by layer normalization.
    The output of each sub-layer is LayerNorm (x+ Sublayer(x)) where Sublayer(x) is the function implemented by the sub-layer itself.

    To facilitate these residual connections, all sub-layers in the model (including embedding layer)
    produces outputs of dimension d_model = 512
    '''
    def __init__(self, vocab_size, d_model=512):
        self.d_model = d_model
        self.embed = Embed(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.multihead = MultiHead(d_model)
        self.feedforward = FeedForward(d_model) 
        self.layer_norm  = LayerNormalization(d_model)

    def normalized(self, output): ### Not yet described, described in ???
        normalized_output = LayerNormalization().run(output) # will be defined later on.
        return normalized_output 
        
    def add_norm(self, input, layer_output):
        # in ma_layer, input will be positional_encoded_embedding
        # in ff_layer, input will be the output of ma_layer
        return self.layer_norm.run(input + layer_output)
        
    def mh_layer(self, input):
        multihead_attentioned = self.multihead.run(input, input, input)
        normalized_ouptut = self.add_norm(input, multihead_attentioned)
        return normalized_ouptut
        
    def ff_layer(self, mh_layer_output):
        feed_forwarded = self.feedforward.run(mh_layer_output)
        layer_norm_ouptut = self.add_norm(mh_layer_output, feed_forwarded)
        return layer_norm_ouptut
        
    def run(self, input_tokens):
        embedded_input = self.embed.token_to_embedding(input_tokens)
        positional_encoded_embedding = self.positional_encoding.run(embedded_input)
        output = positional_encoded_embedding
        for _ in range(6):
            ma_layer_output = self.mh_layer(output)
            ff_layer_output = self.ff_layer(ma_layer_output)
            output = ff_layer_output
        return output

## 3.1.2 Decoder

In [118]:
class Decoder:        
    '''
    Composed of 6 stack of identical layers.
    Each layer has three sub-layers.

    First layer : Masked Multi-Head Self-Attention Mechanism
    Second layer : Multi-Head Self-Attention Mechanism
    Third layer : Position-wise fully connected Feed-Forward Network.

    Residual Connection around each two sub-layer is employed, followed by layer normalization.
    The output of each sub-layer is LayerNorm (x+ Sublayer(x)) where Sublayer(x) is the function implemented by the sub-layer itself.

    To facilitate these residual connections, all sub-layers in the model (including embedding layer)
    produces outputs of dimension d_model = 512
    '''
    def __init__(self, vocab_size, d_model=512):
        self.d_model = d_model
        self.embed = Embed(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.multihead = MultiHead(d_model)
        self.feedforward = FeedForward(d_model) 
        self.layer_norm = LayerNormalization(d_model)
        
    def add_norm(self, input, layer_output): ### Not yet described, described in ???
        # in ma_layer, input will be positional_encoded_embedding
        # in ff_layer, input will be the output of ma_layer
        return self.layer_norm.run(input + layer_output)
        
    def masked_mh_layer(self, input, look_ahead_mask):
        masked_mh = self.multihead.run(input, input, input, look_ahead_mask)
        normalized_output = self.add_norm(input, masked_mh)
        return normalized_output
    
    def mh_layer(self, masked_mh_layer_output, encoder_output):
        multihead_attentioned = self.multihead.run(masked_mh_layer_output, encoder_output, encoder_output)
        normalized_ouptut = self.add_norm(masked_mh_layer_output, multihead_attentioned)
        return normalized_ouptut
        
    def ff_layer(self, mh_layer_output):
        feed_forwarded = self.feedforward.run(mh_layer_output)
        normalized_ouptut = self.add_norm(mh_layer_output, feed_forwarded)
        return normalized_ouptut

    def run(self, input_tokens, encoder_output):
        size = len(input_tokens)
        look_ahead_mask = create_look_ahead_mask(size)
        
        embedded_input = self.embed.token_to_embedding(input_tokens)
        positional_encoded_embedding = self.positional_encoding.run(embedded_input)
        
        for _ in range(6):
            layer_normed_masked_mh_output = self.masked_mh_layer(positional_encoded_embedding, look_ahead_mask)
            mh_layer_output = self.mh_layer(layer_normed_masked_mh_output, encoder_output)
            ff_layer_output = self.ff_layer(mh_layer_output)
            positional_encoded_embedding = ff_layer_output
        return positional_encoded_embedding



# 3.2 Attention
## 3.2.1 Scaled Dot-Product Attention

In [137]:
class ScaledDotProductAttention:
    '''
    Input consists of queries and keys of dimension d_k, and values of dimension d_v
    compute the dot products of the query with all keys, divide each by $\sqrt {d_k}$,
    and apply a softmax function to obtain the weights on the values.

    In practice, we compute the attention function on a set of queries simultaneously, packed together into matrix Q.
    The key in matrix K, the value in matrix V.
    '''
    def __init__(self):
        pass

    def softmax(self, matrix):
        e_x = np.exp(matrix - np.max(matrix, axis=-1, keepdims=True))
        return e_x / e_x.sum(axis=-1, keepdims=True)
    
    def run(self, Q, K, V, mask=None):
        matmul_qk = np.matmul(Q, K.T)/np.sqrt(K.shape[1])
        if mask is not None: # added this part after 3.6.2
            matmul_qk += (mask * -1e9)
        softmaxed = self.softmax(matmul_qk)
        result = np.matmul(softmaxed, V)
        return result

## 3.2.2 Multi-Head Attention

In [138]:
class MultiHead:
    '''
    Instead of performing a single attention function with dmodel-dimensional keys, values and queries,
    we found it beneficial to linearly project the queries, keys and values h times 
    with different, learned linear projections to dk, dk and dv dimensions, respectively.
    On each of these projected versions of queries, keys and values
    we then perform the attention function in parallel, yielding dv-dimensional output values.
    
    These are concatenated and once again projected, resulting in the final values, as depicted in Figure 2.
    Multi-head attention allows the model to jointly attend to information from different representation subspaces at different positions.
    With a single attention head, averaging inhibits this.

    Figure 2 : 
    $ MultiHead(Q, K, V ) = Concat(head_1, ..., head_h)W^{O}$
    $where head_i = Attention(QW_i^Q, KW_i^K, VW_i^V)$ ######### = ScaledDotProductAttention()

    Where the projections are parameter matrices 
    $W_i^Q ∈ R^{d_{model}\times d_k }, W_i^K ∈ R^{d_{model}\timesd_k}, W_i^V ∈ R^{d_{model} \times d_v} and WO ∈ R^{hd_v \times d_{model}}$.

    In this work we employ $h = 8$ parallel attention layers, or heads. 
    For each of these we use $d_k = d_v = d_{model}/h = 64$.
    Due to the reduced dimension of each head, the total computational cost is similar to
    that of single-head attention with full dimensionality.

    '''
    def __init__(self, d_model=512, num_heads=8):
        '''
        d_model = dimension of input vector
        num_heads = number of attention heads to use / h = 8 as per the paper
        '''
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        '''
        Calculates the dimension of each head.
        It divides the dimension of the model by the number of heads.
        To ensure the input is evenly split across the heads.
        '''
        
        self.wq = [np.random.randn(d_model, self.depth) for _ in range(num_heads)]
        self.wk = [np.random.randn(d_model, self.depth) for _ in range(num_heads)]
        self.wv = [np.random.randn(d_model, self.depth) for _ in range(num_heads)]
        '''
        Initializes the matrix randomly with np.random.randn, generates a sample from a Gaussian distribution.
        It repeats for num_heads (8 here) times.
        The dimension of this matrix is (d_model, self.depth)
        '''
        self.wo = np.random.randn(d_model, d_model)
        '''
        Initializes the WO matrix randomly with np.random.randn, generates a sample from a Gaussian distribution.
        '''

        self.attention = ScaledDotProductAttention() # Attention

    def run(self, Q, K, V, mask=None):
        heads = []
        for i in range(self.num_heads):
            # split and apply attention to each head
            dot_Q = np.dot(Q, self.wq[i])
            dot_K = np.dot(K, self.wk[i])
            dot_V = np.dot(V, self.wv[i])
            heads.append(self.attention.run(dot_Q, dot_K, dot_V, mask))

        # concatenate and apply final linear layer
        concatenated = np.concatenate(heads, axis=-1)
        return np.dot(concatenated, self.wo)
    

# 3.3 Position-wise Feed-Forward Networks.

In [139]:
class FeedForward:
    '''
    In addition to attention sub-layers, each of the layers in our encoder and decoder contains
    a fully connected feed-forward network, which is applied to each position separately and identically.
    This consists of two linear transformations with a ReLU activation in between.
    $FFN(x) = max(0, xW_1 + b_1)W_2 + b_2 $
    While the linear transformations are the same across different positions, they use different parameters from layer to layer.
    Another way of describing this is as two convolutions with kernel size 1.
    The dimensionality of input and output is $d_model = 512$, and the inner-layer has dimensionality$d_{ff} = 2048$.
    '''
    def __init__(self, d_model=512, d_ff=2048):
        # weight
        self.W1 = np.random.randn(d_model, d_ff) # transform input vector from d_model to d_ff
        self.W2 = np.random.randn(d_ff, d_model) # transform the transformed vector back to the dimension of d_model

        # bias 
        self.b1 = np.zeros(d_ff) # bias_1 = size of d_ff
        self.b2 = np.zeros(d_model) # bias_2 = size of d_model

    def relu(self, x):
        return np.maximum(0, x)
        
    def run(self, mh_output):
        w1_b1 = self.relu(np.dot(mh_output, self.W1) + self.b1) # First linear transformation
        output = np.dot(w1_b1, self.W2) + self.b2 # Second linear transformation
        return output

# 3.4 Embeddings and Softmax

In [152]:
class Embed:
    def __init__(self, vocab_size, d_model):
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.weight = np.random.randn(vocab_size, d_model) * (1/np.sqrt(d_model))

    def token_to_embedding(self, token_ids):
        embeddings = self.weight[token_ids] * np.sqrt(self.d_model)
        return embeddings

    def output_to_probabilities(self, decoder_output):
        logits = decoder_output @ self.weight.T # Linear Transformation
        probabilities = self.softmax(logits) # Softmax
        return probabilities
        
    def softmax(self, logits):
        e_x = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
        return e_x / e_x.sum(axis=-1, keepdims=True)

# 3.5 Positional Encoding

In [153]:
class PositionalEncoding:
    def __init__ (self, d_model=512, max_len=512):
        self.d_model = d_model
        self.max_len = max_len
        
    def sin_wave(self, pos, i):
        return np.sin(pos/10000 ** (2 * i / self.d_model))

    def cos_wave(self, pos, i):
        return np.cos(pos/10000 ** (2 * i / self.d_model))

    def run(self, input):
        seq_len = input.shape[0]
        pos_encoding = np.zeros((seq_len, self.d_model))
        for pos in range(seq_len):
            for i in range(0, self.d_model, 2):
                pos_encoding[pos, i] = self.sin_wave(pos, i)
                if i + 1 < self.d_model:
                    pos_encoding[pos, i + 1] = self.cos_wave(pos, i + 1)

        # pos_encoding = pos_encoding[:seq_len, :]
        return input + pos_encoding#[np.newaxis, :, :]

# 3.6 What can be added to complete the Transformer
## 3.6.1 Layer Normalization

In [154]:
class LayerNormalization:
    def __init__(self, d_model, epsilon=1e-6):
        self.epsilon = epsilon
        self.gamma = np.ones(d_model)
        self.beta = np.zeros(d_model)

    def run(self, x):
        mean = np.mean(x, axis=-1, keepdims=True)
        variance= np.var(x, axis=-1, keepdims=True)
        normalized = (x - mean) / np.sqrt(variance + self.epsilon)
        return self.gamma * normalized + self.beta

## 3.6.2 Mask Generation

In [155]:
def create_look_ahead_mask(size):
    mask=np.triu(np.ones((size, size)), k=1)
    return mask

# Transformer

For the token_ids, I have simply used index of each word in the sentences.<br>
Tokenizer will be implemented in different notebook.

In [156]:
class Transformer:
    def __init__(self, original_text, translated_text, d_model=512):
        splitted_ori = original_text.split()
        splitted_trans = translated_text.split()
        ori_vocab = {}
        for idx, word in enumerate(splitted_ori):
            if word not in ori_vocab.keys():
                ori_vocab[word] = idx
        trans_vocab = {}
        for idx, word in enumerate(splitted_trans):
            if word not in trans_vocab.keys():
                trans_vocab[word] = idx
        self.ori_token_id = [ori_vocab[word] for word in splitted_ori]
        self.trans_token_id = [trans_vocab[word] for word in splitted_trans]
        ori_vocab_size = len(splitted_ori)
        trans_vocab_size = len(splitted_trans)        
        self.encoder = Encoder(vocab_size=ori_vocab_size)
        self.decoder = Decoder(vocab_size=trans_vocab_size)
        self.embed = Embed(vocab_size=trans_vocab_size, d_model=d_model)

    def run(self):
        encoder_output = self.encoder.run(self.ori_token_id)
        decoder_output = self.decoder.run(self.trans_token_id, encoder_output)
        final_embedding = self.embed.output_to_probabilities(decoder_output)
        return final_embedding

In [157]:
original_text = "I am so hungry"
translated_text = "tengo mucha hambre"
transformer = Transformer(original_text, translated_text)
transformer.run()

array([[0.17776883, 0.29506878, 0.52716239],
       [0.17776883, 0.29506878, 0.52716239],
       [0.17776883, 0.29506878, 0.52716239]])