# Реализация архитектуры Transformers


<img src="https://habrastorage.org/getpro/habr/upload_files/f7a/3a5/e84/f7a3a5e845433fa313070f9c794a5fb7.png" width="750">

In [1]:
from nltk import WordPunctTokenizer
import numpy as np

In [2]:
inputs = 'Привет мир'

In [3]:
tokenzier = WordPunctTokenizer()

In [4]:
tokenzie_seq = tokenzier.tokenize(inputs.lower())

In [5]:
tokenzie_seq

['привет', 'мир']

In [6]:
embeddings_dim = 4

In [7]:
input_embeddings = np.random.rand(len(tokenzie_seq), embeddings_dim)

In [8]:
input_embeddings

array([[0.87994305, 0.9978316 , 0.83663264, 0.70755789],
       [0.50850622, 0.27852361, 0.543555  , 0.36752233]])

In [9]:
def positional_encoding(input_embeddings):
    seq_len, embeddings_dim = input_embeddings.shape

    position = np.arange(seq_len)[:, np.newaxis]
    
    div_term = np.exp(np.arange(0, embeddings_dim, 2) * - (np.log(10000.0) / embeddings_dim))
    
    PE = np.zeros((seq_len, embeddings_dim))
    PE[:, 0::2] = np.sin(position * div_term) 
    
    PE[:, 1::2] = np.cos(position * div_term) 
    
    return PE

In [10]:
input_embeddings += positional_encoding(input_embeddings)

In [11]:
input_embeddings

array([[0.87994305, 1.9978316 , 0.83663264, 1.70755789],
       [1.3499772 , 0.81882592, 0.55355483, 1.36747233]])

In [12]:
def create_heads(heads_numbers, heads_n, heads_m):
    WK = []
    WQ = []
    WV = []

    for _ in range(heads_numbers):
        WK_temp = np.random.randint(0, 2, size=(heads_n, heads_m))
        WQ_temp = np.random.randint(0, 2, size=(heads_n, heads_m))
        WV_temp = np.random.randint(0, 2, size=(heads_n, heads_m))

        WK.append(WK_temp)
        WQ.append(WQ_temp)
        WV.append(WV_temp)

    return np.array(WK), np.array(WQ), np.array(WV)

In [13]:
WK, WQ, WV = create_heads(2, 4, 3)

In [14]:
WK1, WK2 = WK[0], WK[1]
WQ1, WQ2 = WQ[0], WQ[1]
WV1, WV2 = WV[0], WV[1]

In [15]:
K1 = input_embeddings @ WK1
K1

array([[3.7144073 , 2.87777465, 1.7165757 ],
       [2.72235795, 2.16880312, 1.90353203]])

In [16]:
V1 = input_embeddings @ WV1
V1

array([[3.7144073 , 3.7144073 , 0.83663264],
       [2.72235795, 2.72235795, 0.55355483]])

In [17]:
Q1 = input_embeddings @ WQ1
Q1

array([[5.42196519, 0.83663264, 5.42196519],
       [4.08983028, 0.55355483, 4.08983028]])

In [18]:
scores1 = Q1 @ K1.T
scores1

array([[31.85424092, 26.89590593],
       [23.80480474, 20.11965636]])

In [19]:
scores1 = scores1 / np.sqrt(embeddings_dim)
scores1

array([[15.92712046, 13.44795297],
       [11.90240237, 10.05982818]])

In [20]:
def softmax(x, axis=1):
    return np.exp(x) / np.sum(np.exp(x), axis=axis, keepdims=True)

In [21]:
scores1 = softmax(scores1)
scores1

array([[0.92266842, 0.07733158],
       [0.86325287, 0.13674713]])

In [22]:
attention1 = scores1 @ V1
attention1

array([[3.63769055, 3.63769055, 0.81474179],
       [3.57874739, 3.57874739, 0.79792257]])

In [23]:
def attention(input_embeddings, WK, WQ, WV):
    K = input_embeddings @ WK
    Q = input_embeddings @ WQ
    V = input_embeddings @ WV
    
    scores = Q @ K.T
    scores = scores / np.sqrt(embeddings_dim)
    scores = softmax(scores)
    scores = scores @ V
    return scores

In [24]:
attention1 = attention(input_embeddings, WK1, WQ1, WV1)
attention1

array([[3.63769055, 3.63769055, 0.81474179],
       [3.57874739, 3.57874739, 0.79792257]])

In [25]:
attention2 = attention(input_embeddings, WK2, WQ2, WV2)
attention2

array([[1.67517789, 1.67517789, 3.40955395],
       [1.66504857, 1.66504857, 3.40499305]])

In [26]:
attentions = np.concatenate([attention1, attention2], axis=1)
attentions

array([[3.63769055, 3.63769055, 0.81474179, 1.67517789, 1.67517789,
        3.40955395],
       [3.57874739, 3.57874739, 0.79792257, 1.66504857, 1.66504857,
        3.40499305]])

In [27]:
def multi_head_attention(input_embeddings, heads_numbers, heads_n, heads_m, W_n, W_m):
    WK, WQ, WV = create_heads(heads_numbers, heads_n, heads_m)

    attentions = []
    for i in range(len(WK)):
        WK_cur = WK[i]
        WQ_cur = WQ[i]
        WV_cur = WV[i]

        scores_cur = attention(input_embeddings, WK_cur, WQ_cur, WV_cur)
        attentions.append(scores_cur)

    W = np.random.rand(W_n, W_m)

    return np.concatenate(attentions, axis=1) @ W

In [28]:
Z = multi_head_attention(input_embeddings, 2, 4, 3, 6, 4)

In [29]:
Z

array([[4.93599419, 8.33864912, 5.65093428, 6.95122116],
       [5.14418379, 8.69291912, 5.87126498, 7.28813217]])

In [30]:
def layer_norm(input_embeddings, epsilon=1e-6):
    mean = input_embeddings.mean(axis=-1, keepdims=True)
    std = input_embeddings.std(axis=-1, keepdims=True)
    
    return (input_embeddings - mean) / (std + epsilon)

In [31]:
layer_norm(input_embeddings)

array([[-0.93627259,  1.26465743, -1.02154333,  0.69315849],
       [ 0.93802943, -0.58320927, -1.34295634,  0.98813617]])

In [32]:
def relu(x):
    return np.maximum(0, x)

In [33]:
def feed_forward(Z, W1, b1, W2, b2):
    return relu(Z.dot(W1) + b1).dot(W2) + b2

In [34]:
W1 = np.random.randn(4, 8)
W2 = np.random.randn(8, 4)
b1 = np.random.randn(8)
b2 = np.random.randn(4)

In [35]:
output_encoder = feed_forward(Z, W1, b1, W2, b2)
output_encoder

array([[-29.94713762,  27.24728463,  19.07260353,  -0.73645552],
       [-31.24756876,  28.46270936,  19.77488375,  -0.82670361]])

In [36]:
def encoder_layer(input_embeddings, heads_numbers, heads_n, heads_m, W_n, W_m):
    Z = multi_head_attention(input_embeddings, heads_numbers, heads_n, heads_m, W_n, W_m)

    W1 = np.random.randn(4, 8)
    W2 = np.random.randn(8, 4)
    b1 = np.random.randn(8)
    b2 = np.random.randn(4)

    output = feed_forward(Z, W1, b1, W2, b2)

    return layer_norm(output, Z)

In [37]:
output_encoder = encoder_layer(input_embeddings, 2, 4, 3, 6, 4)
output_encoder

array([[-0.47896799,  0.75517305, -0.21408313,  0.17635716],
       [-0.47908049,  0.75531472, -0.21419552,  0.1763379 ]])

In [38]:
def encoder(input_embeddings, n=6):
    for _ in range(n):
        input_embeddings = encoder_layer(input_embeddings, 2, 4, 3, 6, 4)

    return input_embeddings

In [39]:
output_encoder = encoder(input_embeddings)
output_encoder

array([[-2.18506822,  0.28012052,  0.8675525 ,  0.78650907],
       [-2.18506822,  0.28012052,  0.8675525 ,  0.78650907]])

In [40]:
sos_embedding = np.random.rand(len(tokenzie_seq), embeddings_dim) # start of the sequence

In [41]:
sos_embedding += positional_encoding(sos_embedding)
sos_embedding

array([[0.78417076, 1.73329351, 0.6660883 , 1.89680345],
       [1.05468206, 1.10520399, 0.05084069, 1.34022672]])

In [42]:
decoder_self_attention = multi_head_attention(sos_embedding, 2, 4, 3, 6, 4)

In [43]:
decoder_self_attention

array([[6.87277952, 6.76427698, 6.2752269 , 2.35450556],
       [6.81397569, 6.70513658, 6.20267743, 2.3297015 ]])

In [44]:
decoder_self_attention = layer_norm(decoder_self_attention + sos_embedding)
decoder_self_attention

array([[ 0.51546707,  1.0437907 ,  0.06569568, -1.62495345],
       [ 0.86129564,  0.82708124, -0.08629876, -1.60207811]])

In [45]:
def encoder_decoder_attention(encoder_output, attention_input, WQ, WK, WV):
    K = encoder_output @ WK    
    V = encoder_output @ WV    
    Q = attention_input @ WQ   

    scores = Q @ K.T
    scores = scores / np.sqrt(embeddings_dim)
    scores = softmax(scores)
    scores = scores @ V
    return scores

In [46]:
def multi_head_encoder_decoder_attention(encoder_output, attention_input, heads_numbers, heads_n, heads_m, W_n, W_m):
    WK, WQ, WV = create_heads(heads_numbers, heads_n, heads_m)

    attentions = []
    for i in range(len(WK)):
        WK_cur = WK[i]
        WQ_cur = WQ[i]
        WV_cur = WV[i]
        
        scores_cur = encoder_decoder_attention(encoder_output, attention_input, WK_cur, WQ_cur, WV_cur)
        attentions.append(scores_cur)
    
    W = np.random.rand(W_n, W_m)
    concatenated_attention = np.concatenate(attentions, axis=1) @ W

    return concatenated_attention


In [47]:
Z_encoder_decoder = multi_head_encoder_decoder_attention(output_encoder, decoder_self_attention, 2, 4, 3, 6, 4)
Z_encoder_decoder

array([[-2.81472805, -1.19557598, -1.82204892, -3.21023721],
       [-2.81472805, -1.19557598, -1.82204892, -3.21023721]])

In [48]:
def decoder_layer(decoder_inputs, encoder_outputs, heads_numbers, heads_n, heads_m, W_n, W_m):
    Z = multi_head_attention(decoder_inputs, heads_numbers, heads_n, heads_m, W_n, W_m)

    W1 = np.random.randn(4, 8)
    W2 = np.random.randn(8, 4)
    b1 = np.random.randn(8)
    b2 = np.random.randn(4)

    output = feed_forward(Z, W1, b1, W2, b2)

    output = layer_norm(output, Z)

    Z_encoder_decoder = multi_head_encoder_decoder_attention(encoder_outputs, decoder_self_attention, 2, 4, 3, 6, 4)

    Z_encoder_decoder = layer_norm(Z_encoder_decoder + Z)

    output = feed_forward(Z_encoder_decoder, W1, b1, W2, b2)
    return layer_norm(output + Z_encoder_decoder)

In [49]:
output_decoder = decoder_layer(sos_embedding, output_encoder, 2, 4, 3, 6, 4)
output_decoder

array([[ 0.92070493,  0.97086416, -0.48714368, -1.40442541],
       [ 0.91937023,  0.97299711, -0.48929067, -1.40307667]])

In [50]:
def decoder(decoder_input, output_encoder, n=6):
    for _ in range(n):
        decoder_input = decoder_layer(decoder_input, output_encoder, 2, 4, 3, 6, 4)

    return decoder_input

In [51]:
decoder_outputs = decoder(sos_embedding, output_decoder)
decoder_outputs

array([[ 0.88518764, -1.6978839 ,  0.44751414,  0.36518212],
       [ 0.88518764, -1.6978839 ,  0.44751417,  0.36518209]])

In [52]:
def linear(x, W, b):
    return np.dot(x, W) + b

In [53]:
def output_probabilities(decoder_outputs, embeddings_dim, vocabulary_length):
    for decoder_output in decoder_outputs:
        linear_output = linear(decoder_output, np.random.randn(embeddings_dim, vocabulary_length), np.random.randn(vocabulary_length))

        softmax_output = softmax(linear_output, axis=0)

        print(softmax_output)

In [54]:
output_probabilities(decoder_outputs, embeddings_dim, 4)

[0.07112802 0.0397711  0.85210775 0.03699312]
[0.03193612 0.1934926  0.04559116 0.72898012]


In [151]:
class Embeddings:
    def __init__(self, embeddings_dim):
        self.embeddings_dim = embeddings_dim

        self.words_list_ru = ['<sos>', 'привет', 'мир', '<eos>', '<unk>']
        self.words_list_en = ['<sos>', 'hello', 'world', '<eos>', '<unk>']

        self.create_input_ids_dict()
        self.create_embeddings()

    def get_vocab_length(self):
        return len(self.words_list_ru) 

    def create_input_ids_dict(self):
        self.input_ids_dict = {}
        for i in range(len(self.words_list_ru)):
            self.input_ids_dict[i] = self.words_list_ru[i]

    def get_input_id(self, token):
        return list(self.input_ids_dict.keys())[list(self.input_ids_dict.values()).index(token)]

    def get_input_ids(self, tokens):
        input_ids = []
        for token in tokens:
            input_ids.append(self.get_input_id(token))
        return input_ids

    def create_embeddings(self):
        self.embeddings_dict = {}
        for i in range(len(self.words_list_ru)):
            self.embeddings_dict[i] = np.random.rand(self.embeddings_dim)

    def get_embeddings(self, input_ids):
        embeddings = []
        for input_id in input_ids:
            embeddings.append(self.embeddings_dict[input_id])
        return np.array(embeddings)

    def __call__(self, tokens):
        input_ids = self.get_input_ids(tokens)
        return self.get_embeddings(input_ids)
    
    def get_translation(self, input_id):
        return self.words_list_en[input_id]

In [177]:
class SimpleTransformer():
    def __init__(self, 
                 embeddings_dim=4,
                 heads_numbers=2,
                 heads_n=4,
                 heads_m=3,
                 W_n=6,
                 W_m=4):
        self.tokenizer = WordPunctTokenizer()
        self.embeddings_dim = embeddings_dim
        self.embeddings = Embeddings(self.embeddings_dim)

        self.heads_numbers = heads_numbers
        self.head_n = heads_n
        self.head_m = heads_m
        self.W_n = W_n
        self.W_m = W_m
        self.vocab_length = self.embeddings.get_vocab_length()
        

    def __call__(self, inputs):
        self.tokenized_seq = self.tokenizer.tokenize(inputs.lower())
        self.input_embeddings = self.embeddings(self.tokenized_seq)

        # Пропускаем через энкодер
        encoder_output = self.encoder(self.input_embeddings)
        
        # Используем предсказания декодера
        decoder_output = self.decoder(encoder_output)
        
        # Получаем вероятности предсказания
        probabilities = self.output_probabilities(decoder_output)

        translation = self.translate(probabilities)
        
        return translation

    def positional_encoding(self, input_embeddings):
        seq_len, embeddings_dim = input_embeddings.shape

        position = np.arange(seq_len)[:, np.newaxis]
        div_term = np.exp(np.arange(0, embeddings_dim, 2) * -(np.log(10000.0) / embeddings_dim))

        PE = np.zeros((seq_len, embeddings_dim))
        PE[:, 0::2] = np.sin(position * div_term) 
        PE[:, 1::2] = np.cos(position * div_term) 
        
        return PE
    
    def create_heads(self):
        WK = []
        WQ = []
        WV = []

        for _ in range(self.heads_numbers):
            WK_temp = np.random.randn(self.head_n, self.head_m)
            WQ_temp = np.random.randn(self.head_n, self.head_m)
            WV_temp = np.random.randn(self.head_n, self.head_m)

            WK.append(WK_temp)
            WQ.append(WQ_temp)
            WV.append(WV_temp)

        return np.array(WK), np.array(WQ), np.array(WV)
    

    def softmax(self, x, axis=1):
        return np.exp(x) / np.sum(np.exp(x), axis=axis, keepdims=True)


    def attention(self, input_embeddings, WK, WQ, WV):
        K = input_embeddings @ WK
        Q = input_embeddings @ WQ
        V = input_embeddings @ WV
        
        scores = Q @ K.T
        scores = scores / np.sqrt(self.embeddings_dim)
        scores = self.softmax(scores)
        scores = scores @ V
        return scores
    

    def multi_head_attention(self, input_embeddings):
        WK, WQ, WV = self.create_heads()

        attentions = []
        for i in range(len(WK)):
            WK_cur = WK[i]
            WQ_cur = WQ[i]
            WV_cur = WV[i]

            scores_cur = self.attention(input_embeddings, WK_cur, WQ_cur, WV_cur)
            attentions.append(scores_cur)

        W = np.random.rand(self.W_n, self.W_m)

        return np.concatenate(attentions, axis=1) @ W
    

    def layer_norm(self, input_embeddings, epsilon=1e-6):
        mean = input_embeddings.mean(axis=-1, keepdims=True)
        std = input_embeddings.std(axis=-1, keepdims=True)
        
        return (input_embeddings - mean) / (std + epsilon)


    def relu(self, x):
        return np.maximum(0, x)
    

    def feed_forward(self, Z, W1, b1, W2, b2):
        return self.relu(Z @ W1 + b1) @ W2 + b2


    def encoder_layer(self, input_embeddings):
        Z = self.multi_head_attention(input_embeddings)

        W1 = np.random.randn(self.embeddings_dim, 8)
        W2 = np.random.randn(8, self.embeddings_dim)
        b1 = np.random.randn(8)
        b2 = np.random.randn(self.embeddings_dim)

        output = self.feed_forward(Z, W1, b1, W2, b2)

        return self.layer_norm(output + Z)
    

    def encoder(self, input_embeddings, n=6):
        for _ in range(n):
            input_embeddings = self.encoder_layer(input_embeddings)

        return input_embeddings


    def encoder_decoder_attention(self, encoder_output, decoder_input, WK, WQ, WV):
        K = encoder_output @ WK    
        V = encoder_output @ WV    
        Q = decoder_input @ WQ   

        scores = Q @ K.T
        scores = scores / np.sqrt(self.embeddings_dim)
        scores = self.softmax(scores)
        scores = scores @ V
        return scores


    def multi_head_encoder_decoder_attention(self, encoder_output, decoder_input):
        WK, WQ, WV = self.create_heads()

        attentions = []
        for i in range(len(WK)):
            WK_cur = WK[i]
            WQ_cur = WQ[i]
            WV_cur = WV[i]
            
            scores_cur = self.encoder_decoder_attention(encoder_output, decoder_input, WK_cur, WQ_cur, WV_cur)
            attentions.append(scores_cur)
        
        W = np.random.rand(self.W_n, self.W_m)
        concatenated_attention = np.concatenate(attentions, axis=1) @ W

        return concatenated_attention


    def decoder_layer(self, decoder_inputs, encoder_outputs):
        # Первое внимание — самовнимание декодера
        Z = self.multi_head_attention(decoder_inputs)

        # Внимание между кодером и декодером
        Z_encoder_decoder = self.multi_head_encoder_decoder_attention(encoder_outputs, Z)

        W1 = np.random.randn(self.embeddings_dim, 8)
        W2 = np.random.randn(8, self.embeddings_dim)
        b1 = np.random.randn(8)
        b2 = np.random.randn(self.embeddings_dim)

        output = self.feed_forward(Z_encoder_decoder, W1, b1, W2, b2)

        return self.layer_norm(output + Z_encoder_decoder)


    def decoder(self, encoder_output, n=6):
        decoder_output = np.random.rand(len(self.tokenized_seq), self.embeddings_dim)  
        
        for _ in range(n):
            decoder_output = self.decoder_layer(decoder_output, encoder_output)

        return decoder_output
    

    def linear(self, x, W, b):
        return np.dot(x, W) + b


    def output_probabilities(self, decoder_outputs):
        probabilities = []
        for decoder_output in decoder_outputs:
            linear_output = self.linear(decoder_output, np.random.randn(self.embeddings_dim, self.vocab_length), np.random.randn(self.vocab_length))
            softmax_output = self.softmax(linear_output, axis=0)
            probabilities.append(softmax_output)

        return np.array(probabilities)


    def translate(self, probabilities):
        predicted_ids = np.argmax(probabilities, axis=1)

        translated_tokens = [self.embeddings.get_translation(idx) for idx in predicted_ids]

        return " ".join(translated_tokens)

In [178]:
transformer =  SimpleTransformer()

In [189]:
inputs = 'Привет мир'
transformer_outputs = transformer(inputs)

In [190]:
transformer_outputs

'hello world'