In [1]:
import numpy as np
    

In [2]:
class MultiHeadAttention:
    def __init__(self,num_hiddens, num_heads,dropout =0.0,bias=True):
        self.num_heads = num_heads
        self.num_hiddens=num_hiddens
        self.d_k = self.d_v = num_hiddens//num_heads

        self.W_q = np.random.rand(num_hiddens,num_hiddens)
        self.W_k = np.random.rand(num_hiddens,num_hiddens)
        self.W_v = np.random.rand(num_hiddens, num_hiddens)
        self.W_o = np.random.rand(num_hiddens,num_hiddens)

        if bias:
            self.b_q = np.random.rand(num_hiddens)
            self.b_k = np.random.rand(num_hiddens)
            self.b_v = np.random.rand(num_hiddens)
            self.b_o = np.random.rand(num_hiddens)
            
        else:
            self.b_q = self.b_k = self.b_o=self.b_v = np.zeros(num_hiddens)


    def transpose_qkv(self,X):
        X = X.reshape(X.shape[0],X.shape[1],self.num_heads,-1)
        X = X.transpose(0,2,1,3)
        return X.reshape(-1, X.shape[2],X.shape[3])

    def transpose_output(self,X):
        X = X.reshape(-1,self.num_heads, X.shape[1], X.shape[2])
        X = X.transpose(0,2,1,3)
        return X.reshape(X.shape[0],X.shape[1],-1)



    def scaled_dot_product_attention(self, Q ,K, V, valid_lens):
        d_k = Q.shape[-1]
        scores = np.matmul(Q, K.transpose(0,2,1))/np.sqrt(d_k)

        if valid_lens is not None:
            mask = np.arange(scores.shape[-1])<valid_lens[:,None]
            scores = np.where(mask[:, None, :], scores, -np.inf)

        attention_weights = np.exp(scores - np.max(scores,axis =-1, keepdims =True))
        attention_weights /= attention_weights.sum(axis=-1, keepdims =True)
        return np.matmul(attention_weights,V)


    def forward(self, queries,keys, values, valid_lens):
        queries = self.transpose_qkv(np.dot(queries,self.W_q) + self.b_q)
        keys = self.transpose_qkv(np.dot(keys,self.W_k) + self.b_k)
        values = self.transpose_qkv(np.dot(values,self.W_v) + self.b_v)

        if valid_lens is not None:
            #valid_lens maskesini head sayısı kadar tekrarla:
            valid_lens = np.repeat(valid_lens,self.num_heads,axis=0)

        output = self.scaled_dot_product_attention(queries,keys,values,valid_lens)
        output_concat = self.transpose_output(output)
        return np.dot(output_concat,self.W_o) + self.b_o



In [3]:
def positional_encoding(seq_len, d_model):
    
    pos = np.arange(seq_len)[:, np.newaxis]
    i = np.arange(d_model)[np.newaxis, :]
    
    
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    
    pos_encoding = pos * angle_rates
    
    pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
    pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])
    
    return pos_encoding

In [4]:
class FeedForward:
    def __init__(self,d_model, d_ff):
        self.W1 = np.random.randn(d_model,d_ff)* np.sqrt(2.0/(d_model + d_ff))
        self.b1 = np.zeros(d_ff)
        self.W2 = np.random.randn(d_model,d_ff)* np.sqrt(2.0 / (d_ff + d_model))
        self.b2 = np.zeros(d_model)



    def __call__(self,x):
        return self.forward(x)

    def forward(self,x):
        return np.dot(np.maximum(0,np.dot(x,self.W1) + self.b1), self.W2) + self.b2




In [5]:
class EncoderLayer:
    def __init__(self,d_model,num_heads,d_ff,dropout =0, bias=False):
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_ff = d_ff

        self.multi_head_attention = MultiHeadAttention(d_model,num_heads,dropout,bias)
        self.feed_forward = FeedForward(d_model,d_ff)


    def __call__(self, x, mask=None):
        return self.forward(x,mask)

    def forward(self, x, mask=None):
        atnn_output = self.multi_head_attention.forward(x,x,x,mask)
        output = self.feed_forward(atnn_output)
        return output



In [6]:
class DecoderLayer:
    def __init__(self,d_model,num_heads,d_ff,dropout=0.0,bias =False):
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_ff = d_ff

        self.multi_head_attention_1 = MultiHeadAttention(d_model,num_heads,dropout,bias)
        self.multi_head_attention_2 = MultiHeadAttention(d_model,num_heads,dropout,bias)
        self.feed_forward = FeedForward(d_model,d_ff)


    def __call__(self,x,enc_output,mask=None):
        return self.forward(x,enc_output,mask)

    def forward(self,x,enc_output,mask=None):
        atnn_output1 = self.multi_head_attention_1(x,x,x,mask)
        atnn_output2 = self.multi_head_attention_2(atnn_output1,enc_output,enc_output,mask)
        output = self.feed_forward(atnn_output2)
        return output




In [7]:
class Transformer:
    def __init__(self, d_model, num_heads, d_ff, num_layers, input_vocab_size, target_vocab_size, max_seq_len):
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_ff = d_ff
        self.num_layers = num_layers
        self.input_vocab_size = input_vocab_size
        self.target_vocab_size = target_vocab_size
        self.max_seq_len = max_seq_len

        self.encoder_layers = [EncoderLayer(d_model,num_heads,d_ff) for _ in range(num_layers)]
        self.decoder_layers = [DecoderLayer(d_model,num_heads,d_ff) for _ in range(num_layers)]
        

        self.embedding = np.random.randn(input_vocab_size,d_model)* np.sqrt(2.0/(input_vocab_size + d_model))
        self.pos_encoding = positional_encoding(max_seq_len,d_model)
        self.output_layer = np.random.randn(d_model,target_vocab_size)* np.sqrt(2.0/(d_model + target_vocab_size))


    def __call__(self, input_seq, target_seq, mask=None):
        return self.forward(input_seq, target_seq, mask)

    def forward(self, input_seq, target_seq, mask=None):
        enc_output = self.encode(input_seq, mask)
        dec_output = self.decode(target_seq, enc_output, mask)
        output = np.dot(dec_output, self.output_layer)
        return output


    def encode(self, input_seq, mask=None):
        seq_len = input_seq.shape[1]
        #print(input_seq)
        x = self.embedding[input_seq] + self.pos_encoding[:seq_len, :]
        
        for layer in self.encoder_layers:
            x = layer(x, mask)
        
        return x


    def decode(self, target_seq, enc_output, mask =None):
        seq_len = target_seq.shape[1]
        x = self.embedding[target_seq] + self.pos_encoding[:seq_len, :]

        for layer in self.decoder_layers:
            x = layer(x,enx_output, mask)

        return x

In [22]:
#Define some parameters
d_model = 32
num_heads = 8
d_ff = 2048
num_layers = 6
input_vocab_size = 10000
target_vocab_size = 10000
max_seq_len =100




In [23]:
transformer = Transformer(d_model, num_heads, d_ff, num_layers, input_vocab_size, target_vocab_size, max_seq_len)

In [24]:
input_seq = np.random.randint(0, input_vocab_size, (50, 32))
target_seq = np.random.randint(0, target_vocab_size, (50, 32))

In [1]:
output = transformer(input_seq,target_seq)
print(output.shape) # (batch_size, target_seq_len, target_vocab_size) olmalı