In [45]:
import numpy as np

class Dropout:
    def __init__(self, p=0.1):
        self.p = p
        
    def execute(self, v):
        scale = 1.0 / (1.0 - self.p)
        mask = np.random.random(v.shape)
        mask = np.where(mask < self.p, 0.0, scale)

        return v * mask

def softmax(v):
    v = np.exp(v - np.max(v, axis=-1, keepdims=True))
    tot = np.sum(v, axis=-1, keepdims=True)
    return v / tot

def relu(v):
    return np.maximum(0, v)

def layer_normalization(X, gamma, beta, epsilon=1e-5):
    mean = np.mean(X, axis = -1, keepdims=True)
    std = np.std(X, axis = -1, keepdims=True)
    return (X - mean) * gamma / (std + epsilon) + beta

In [46]:
import numpy as np

def pos_encoding(seq_len, d_model):
    if seq_len == 0 or d_model <= 0:
        return -1

    pe = np.zeros((seq_len, d_model))

    i = np.arange(0, d_model // 2 + d_model % 2)
    divisor = np.power(10000, i * 2 / d_model)

    position = np.arange(seq_len)[:, None]
    pe[:, 0::2] = np.sin(position / divisor)
    pe[:, 1::2] = np.cos(position / (divisor if d_model % 2 == 0 else divisor[:-1]))

    return pe

def compute_qkv(X, W_q, W_k, W_v):
    return X @ W_q, X @ W_k, X @ W_v

def get_causal_mask(generated_len):
    return np.tril(np.ones((generated_len, generated_len)))

def self_attention(Q, K, V, mask = None, dropout=None):
    d_k = Q.shape[-1]
    score = Q @ K.swapaxes(-1, -2) / np.sqrt(d_k)
    # mask: (seq_len, seq_len)

    if mask is not None:
        score = np.where(mask == 1, score, -1e9)

    pscore = softmax(score)

    return pscore @ V

def multi_head_attention(Q, K, V, W_o, n_heads, mask=None, dropout=None):
    # seq_len, d_model
    seq_len, d_model = Q.shape
    d_head = d_model // n_heads

    shape1 = Q.shape[:-1] + (n_heads, d_head)
    shape2 = K.shape[:-1] + (n_heads, d_head)
    # ser_len, n_head, d_head
    Q = Q.reshape(shape1).swapaxes(-2, -3)
    K = K.reshape(shape2).swapaxes(-2, -3)
    V = V.reshape(shape2).swapaxes(-2, -3)
    
    # n_head, seq_len, d_head
    ret = self_attention(Q, K, V, mask=mask, dropout=dropout)
    
    concat_heads = ret.swapaxes(0, 1).reshape(seq_len, d_model)
    # seq_len, d_model
    return concat_heads @ W_o

In [None]:
class Transformer:
    def __init__(self, vocab_size, d_model, n_heads, d_ff, n_blocks):
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_ff = d_ff

        self.embedding = self.init_weights(vocab_size, d_model)
        self.encoders = [self.create_block(is_decoder=False) for _ in range(n_blocks)]
        self.decoders = [self.create_block(is_decoder=True) for _ in range(n_blocks)]

        self.dropout_p = 0.1
        self.train()
    
    def init_weights(self, rows, cols):
        limit = np.sqrt(6 / (rows + cols))
        return np.random.uniform(-limit, limit, (rows, cols))

    def create_block(self, is_decoder):
        weights = {
            'W_q': self.init_weights(self.d_model, self.d_model),
            'W_k': self.init_weights(self.d_model, self.d_model),
            'W_v': self.init_weights(self.d_model, self.d_model),
            'W_o': self.init_weights(self.d_model, self.d_model),
            'W_ff1': self.init_weights(self.d_model, self.d_ff),
            'bias_ff1': np.zeros((1, self.d_ff)),
            'W_ff2': self.init_weights(self.d_ff, self.d_model),
            'bias_ff2': np.zeros((1, self.d_model)),
            'gamma': np.ones((1, self.d_model)),
            'beta': np.zeros((1, self.d_model)),
        }
        if is_decoder:
            weights.update({
                'W_q_cross': self.init_weights(self.d_model, self.d_model),
                'W_k_cross': self.init_weights(self.d_model, self.d_model),
                'W_v_cross': self.init_weights(self.d_model, self.d_model),
                'W_o_cross': self.init_weights(self.d_model, self.d_model),
            })
        return weights
    
    def run_encoders(self, x):
        x = np.array(x)
        seq_len = x.shape[-1]
        # [seq_len]
        x = self.embedding[x] * np.sqrt(self.d_model)
        # [seq_len x d_model]
        x += pos_encoding(seq_len, self.d_model)

        if self.dropout: x = self.dropout.execute(x)

        for weights in self.encoders:

            Q, K, V = compute_qkv(x, weights['W_q'], weights['W_k'], weights['W_v'])
            att = multi_head_attention(Q, K, V, weights['W_o'], self.n_heads, mask=None, dropout=self.dropout)
            if self.dropout: att = self.dropout.execute(att)
            x = layer_normalization(x + att, weights['gamma'], weights['beta'])
            
            ff = relu(x @ weights['W_ff1'] + weights['bias_ff1']) @ weights['W_ff2'] + weights['bias_ff2']
            if self.dropout: ff = self.dropout.execute(ff)
            x = layer_normalization(x + ff, weights['gamma'], weights['beta'])

        return x
    
    def run_decoders(self, enc_out, x):
        x = np.array(x)
        seq_len = x.shape[-1]

        x = self.embedding[x] * np.sqrt(self.d_model)
        x += pos_encoding(seq_len, self.d_model)
        
        if self.dropout: x = self.dropout.execute(x)

        mask = get_causal_mask(seq_len)
        # 1 0 0
        # 1 1 0
        # 1 1 1

        for weights in self.decoders:

            Q, K, V = compute_qkv(x, weights['W_q'], weights['W_k'], weights['W_v'])
            att = multi_head_attention(Q, K, V, weights['W_o'], self.n_heads, mask=mask, dropout=self.dropout)
            if self.dropout: att = self.dropout.execute(att)
            x = layer_normalization(x + att, weights['gamma'], weights['beta'])

            Q = x @ weights['W_q_cross']
            K = enc_out @ weights['W_k_cross']
            V = enc_out @ weights['W_v_cross']
            cross_att = multi_head_attention(Q, K, V, weights['W_o_cross'], self.n_heads, mask=None, dropout=self.dropout)
            if self.dropout: cross_att = self.dropout.execute(cross_att)
            x = layer_normalization(x + cross_att, weights['gamma'], weights['beta'])
            
            ff = relu(x @ weights['W_ff1'] + weights['bias_ff1']) @ weights['W_ff2'] + weights['bias_ff2']
            if self.dropout: ff = self.dropout.execute(ff)
            x = layer_normalization(x + ff, weights['gamma'], weights['beta'])

        return x @ self.embedding.T # seq_len x vocab_size
    
    def generate(self, src, start_token, max_len):
        enc_out = self.run_encoders(src)

        output = [start_token]
        for _ in range(max_len):
            logits = self.run_decoders(enc_out, output)
            probs = softmax(logits[-1])
            token = np.argmax(probs)
            output.append(token.item())

        return output
    
    def eval(self):
        self.dropout = None
    def train(self):
        self.dropout = Dropout(p=self.dropout_p)


In [48]:
np.random.seed(123)

model = Transformer(vocab_size=21, d_model=128, n_heads=4, d_ff=256, n_blocks=2)
model.eval()

for _ in range(10):
    src = np.random.randint(1, 20, (10))
    print(src, model.generate(src, 0, len(src)))

[12 13 12  9 17 17  8 10  4  6] [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[ 9 12 18 17  1  6 12 11  7  4] [0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
[16  6 13  6 12 12  3 14  5 14] [0, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14]
[19 13  7  3  6 18  3  1 15  8] [0, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14]
[ 6 10  1 10  2  2 18 10 14  1] [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[16 19  6  9 13 16  8 16  5 19] [0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
[ 4 10  6 15  5 16  8 12  6 19] [0, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14]
[11  9 19 12  2  2  1 12 18  3] [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[ 4  7 12 11 19  2 10  2  9  9] [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[ 8  3  2 17 14 15 15  4  5 10] [0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
