In [46]:
#!pip install -r requirements.txt
import math as m
import numpy as np
import abc

from tqdm import tqdm

# Variables

In [76]:
d_model = 512 # embeddings dim
heads = 8 # number of heads for multihead attention
ed_count = 6 # N, number of encoder-decoder layers


In [47]:
class Dataset:
    def __init__():
        pass

In [75]:
def TTV_split(data, tr, te, va, verbose=False):
    total = len(data)
    one = total / (tr + te + va)

    tr_n = m.floor(tr * one)
    te_n = m.floor(te * one)
    va_n = m.floor(va * one)

    tr_n += (total - tr_n - te_n - va_n)

    train_data = np.array([])
    test_data = np.array([])
    val_data = np.array([])

    np.random.shuffle(data)

    for i in tqdm(data):
        if (len(train_data) < tr_n):
            np.append(train_data, i)
            continue
        if (len(test_data) < te_n):
            np.append(test_data, i)
            continue
        if (len(val_data) < va_n):
            np.append(val_data, i)

    if verbose:
        print(f"total = {total}\ntr_n = {len(train_data)} \nte_n = {len(test_data)} \nva_n = {len(val_data)}")
    return 

TTV_split(np.array([i for i in range(100000)]), 12, 4, 1)

100%|██████████| 100000/100000 [00:00<00:00, 185749.02it/s]


# Base Module Class

In [64]:
class nnModule(): # TODO: добавить тесты для каждого модуля
    def __init__(self, params={}) -> None:
        self.params = params

    @abc.abstractclassmethod
    def forward(self, x):
        pass

    @abc.abstractclassmethod
    def backward(self, grad):
        pass

    @abc.abstractclassmethod
    def step(self, lr):
        pass

    def __str__(self) -> str:
        rs = ""
        strs = np.array([f"{i}: {self.params[i]}, " for i in self.params])
        for i in strs:
            rs += i
        return (f"{self.name}: (" + rs[:len(rs)-2] + ")") if self.params else self.name

# Linear

In [57]:
class Linear(nnModule):
    def __init__(self, params) -> None:
        super().__init__(params)
        try:
            if params["in_dim"] < 1 or params["out_dim"] < 1:
                raise Exception
        except KeyError:
            raise Exception("You have to set in_dim and out_dim parameters for linear layer")
        except Exception:
            raise Exception("The in_dim and out_dim have to be greater than zero")

        self.name = "Linear"

        self._res = None
        self._lastX = None

        self.W = np.ones(params["in_dim"], params["out_dim"], dtype=np.float32)
        self.B = np.zeros(params["out_dim"], dtype=np.float32)

        self._grad_weight = None
        self._grad_bias = None

    def forward(self, x):
        self._lastX = x
        self._res = np.dot(x, self.W) + self.B
        return self._res

    def backward(self, grad):
        self._grad_weight = np.dot(self._lastX.T, grad)
        self._grad_bias = np.sum(grad, axis=0)

    def step(self, lr):
        self.weight = self.weight - self._grad_weight * lr
        self.bias = self.bias - self._grad_bias * lr

# ReLU

In [60]:
class ReLU(nnModule):
    def __init__(self, params) -> None:
        super().__init__(params)

        self.relu = lambda x: x * (x > 0)

    def forward(self, x):
        self._res = x * (x > 0)

        return self._res

    def backward(self, grad):
        m, n = np.shape(self._res)
        for i in range(m):
            for j in range(n):
                grad[i][j] = grad[i][j] if self.relu(self._res[i][j]) else 0
        return grad

    def step(self, lr):
        pass

# Sigmoid

In [66]:
class Sigmoid(nnModule):
    def forward(self, x):
        self._res = 1 / (1 + np.exp(-x))

        return self._res

    def backward(self, grad):
        new_grad = self._res * (1 - self._res) * grad

        return new_grad

    def step(self, lr):
        pass

# FFN

In [72]:
class TransformerFFN(nnModule):
    def __init__(self) -> None:
        self.modules = np.array([])

        self.linear1 = Linear(512, 2048)
        self.relu = ReLU()
        self.linear2 = Linear(2048, 512)

        np.append(self.modules, 
            self.linear1, 
            self.relu, 
            self.linear2
        )

    def forward(self, x):
        res = None
        for module in self.modules:
            res = module.forward(x)
        return res

    def backward(self, x):
        for module in np.flip(self.modules):
            module.backward(x)

    def step(self, lr):
        for module in self.modules:
            module.step(lr)

# Scaled Dot Product Attention

In [77]:
def ScaledDotProductAttention(Q: np.array, K: np.array, V: np.array):
    d = K.shape[0]
    smax = Sigmoid()
    return smax.forward(np.dot(Q, K.T) / m.sqrt(d)) * V

Q = np.array([1, 2, 3])
K = np.array([1, 2, 3])
V = np.array([2, 1, 3])

print(ScaledDotProductAttention(Q, K, V))

[1.99938264 0.99969132 2.99907397]


# Multihead Attention

In [None]:
class MultiheadAttention(nnModule):
    def __init__(self, params) -> None:
        super().__init__(params)

        self.heads = params["heads"]
        self.d_model = params["d_model"]

        self.head_modules = np.array([])

        self.linear_out = Linear(heads * V.shape[0], d_model)

        for i in range(self.heads):
            np.append(self.head_modules, [
                Linear(d_model, Q.shape[0]), # po idee vector len = 64
                Linear(d_model, K.shape[0]),
                Linear(d_model, V.shape[0])
            ])
        
    def forward(self, x):
        head_res = np.array([])
        
        for head_module in self.head_modules:
            Q = head_module[0].forward(x[0])
            K = head_module[1].forward(x[1])
            V = head_module[2].forward(x[2])
            np.append(head_res, ScaledDotProductAttention(Q, K, V))

        concat = np.vstack(*head_res)
        out = self.linear_out(concat) # linear layer razmern' = poschitat result np.vstack
        return out

# Add & Norm

In [None]:
class LayerNorm(nnModule):
    def __init__(self, params, features, eps) -> None:
        super().__init__(params)

        self.a2 = np.ones(features)
        self.b2 = np.zeros(features)
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a2 * (x - mean) / (std + self.eps) + self.b2

# Encoder

In [None]:
class EncoderLayer(nnModule):
    def __init__(self, params) -> None:
        super().__init__(params)
        self.self_attention = params["attention"]
        self.FFN = TransformerFFN()
        self.layer_norm = LayerNorm() # features, eps
        #self.dropout = Dropout()

    def forward(self, x):
        x = x + self.layer_norm(self.self_attention(x))
        return x + self.layer_norm(self.FFN(x))

In [None]:
class Encoder(nnModule):
    def __init__(self, params, layer, N) -> None:
        super().__init__(params)
        
        self.layers = np.array([
            EncoderLayer() for i in range(N)
        ])

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x
        

# Positional Encoder

In [None]:
def encode_position(embedding):
    return 1

# Transformer

In [73]:
class Transformer():
    def __init__(self) -> None:
        self.multihead_attention = MultiheadAttention(params={"heads": heads, "d_model": d_model})
        self.FFN = TransformerFFN()
        self.linear_out = Linear() # in_dim=? out_dim=?
        self.softmax = Sigmoid()





In [55]:
linear = Linear(params={"in_dim": 384, "out_dim": 24})
print(linear)

Linear: (in_dim: 384, out_dim: 24)


# Notes

We apply dropout to the output of each sub-layer, before it is added to the sub-layer input and normalized. In addition, we apply dropout to the sums of the embeddings and the positional encodings in both the encoder and decoder stacks. For the base model, we use a rate of P_drop = 0.1.

Label Smoothing During training, we employed label smoothing of value E_ls = 0.1. This hurts perplexity, as the model learns to be more unsure, but improves accuracy and BLEU score.