In [133]:
!pip install -q colorama

In [134]:
from myPyTorch import *
import numpy as np
from colorama import Fore, Style

In [135]:
batch_size = 32
source_length = 10
target_length = 10
d_model = 256
forward_dim = 512
vocab_size = 5000
num_layers=4
num_heads=8

### FeedForward

In [136]:
class FeedForward:
  def __init__(self, d_model, forward_dim):
    self.layer1 = Linear(d_model, forward_dim)
    self.layer2 = Linear(forward_dim, d_model)

  def forward(self, x):
    x = self.layer1.forward(x)
    x = self.layer2.forward(x)
    return x

In [137]:
x = np.random.rand(batch_size, d_model)
ff = FeedForward(d_model, forward_dim)
output = ff.forward(x)
print(output.shape) # (batch_size, d_model)

(32, 256)


### LayerNorm

In [138]:
class LayerNorm:
  def __init__(self, d_model, epsilon=1e-5):
    self.gamma = np.ones(d_model)
    self.beta = np.zeros(d_model)
    self.epsilon = epsilon

  def forward(self, x):
    mean = np.mean(x, axis=-1, keepdims=True)
    variance = np.var(x, axis=-1, keepdims=True)
    normalized_x = (x - mean) / np.sqrt(variance + self.epsilon)
    output = self.gamma * normalized_x + self.beta
    return output

## Encoder

In [139]:
class EncoderLayer:
  def __init__(self, d_model, num_heads, source_length, target_length):
    self.mha = MultiHeadAttention(d_model, num_heads, source_length, target_length)
    self.ff = FeedForward(d_model, forward_dim)
    self.layer_norm = LayerNorm(d_model)

  def forward(self, x):
    '''
    input: (batch_size, source_length, d_model)
    output: (batch_size, source_length, d_model)
    '''
    x_prime = x
    x = self.mha.forward(x, x, x)  # (N, L, d_model)
    x = self.layer_norm.forward(x + x_prime)

    x_prime = x
    x = self.ff.forward(x) # (N, L, d_model)
    x = self.layer_norm.forward(x + x_prime)
    return x

In [140]:
# test encoder layer
x = np.random.randn(batch_size, source_length, d_model)
encoder_layer = EncoderLayer(d_model, num_heads, source_length, target_length)
output = encoder_layer.forward(x)
print(output.shape)

(32, 10, 256)


  return np.exp(x) / np.sum(np.exp(x), axis=0)
  return np.exp(x) / np.sum(np.exp(x), axis=0)


In [141]:
x = np.random.randint(0, vocab_size, (batch_size, source_length)).astype(np.int64)
layer = Embedding(vocab_size, d_model)
layer.forward(x).shape

(32, 10, 256)

In [142]:
class Encoder:
  def __init__(self, seq_length, d_model, num_heads, forward_dim, num_layers, vocab_size):
    self.num_layers = num_layers
    self.embedding_layer = Embedding(vocab_size, d_model)
    self.pos_encoding = self.create_positional_encoding(seq_length, d_model)
    self.encoder_layers = [EncoderLayer(d_model, num_heads, source_length, target_length) for _ in range(num_layers)]

  def create_positional_encoding(self, seq_length, d_model):
    assert d_model % 2 == 0, "Dimension model must be even"

    pos = np.arange(0, seq_length)[:, np.newaxis]  # (seq_length, 1)
    pos_expanded = np.repeat(pos, d_model // 2, axis=1)  # (seq_length, d_model // 2)

    power = np.arange(0, d_model, 2).astype(float) / d_model
    div_term = np.power(10000, power)[np.newaxis, :]  # (1, d_model // 2)
    div_term_expanded = np.repeat(div_term, seq_length, axis=0)  # (seq_length, d_model // 2)

    pe = np.zeros((seq_length, d_model))  # (seq_length, d_model)
    pe[:, 0::2] = np.sin(pos_expanded / div_term_expanded)  # (seq_length, d_model // 2)
    pe[:, 1::2] = np.cos(pos_expanded / div_term_expanded)  # (seq_length, d_model // 2)

    return pe

  def forward(self, x):
    '''
    input: (batch_size, soruce_length, d_model)
    output: (batch_size, soruce_length, d_model)
    '''
    x = self.embedding_layer.forward(x) # (N, L, d_model)
    x = x + self.pos_encoding

    for i in range(self.num_layers):
      x = self.encoder_layers[i].forward(x)

    return x

In [143]:
# test encoder
encoder = Encoder(source_length, d_model, num_heads, forward_dim, num_layers, vocab_size)
x = np.random.randint(0, vocab_size, (batch_size, source_length)).astype(np.int64)
output = encoder.forward(x)
print(output.shape)

(32, 10, 256)


## Deocder

In [144]:
class DecoderLayer:
  def __init__(self, d_model, num_heads, source_length, target_length):
    self.casual_mha = MultiHeadAttention(d_model, num_heads, source_length, target_length)
    self.cross_mha = MultiHeadAttention(d_model, num_heads, source_length, target_length)
    self.ff = FeedForward(d_model, forward_dim)
    self.layer_norm = LayerNorm(d_model)


  def forward(self, x, encoder_output):
    ''' Decoder Layer

    Inputs:
      x: decoder input (N, L, d_model)
      encoder_output: (N, L, d_model)
    '''
    x_prime = x
    x = self.casual_mha.forward(x, x, x, casual=True) # (N, L, d_model)
    x = self.layer_norm.forward(x + x_prime) # (N, L, d_model)

    x_prime = x
    x = self.cross_mha.forward(x, encoder_output, encoder_output) # (N, L, d_model)
    x = self.layer_norm.forward(x + x_prime) # (N, L, d_model)

    x_prime = x
    x = self.ff.forward(x) # (N, L, d_model)
    x = self.layer_norm.forward(x + x_prime) # (N, L, d_model)

    return x

In [145]:
# test decoder layer
x = np.random.randn(batch_size, target_length, d_model)
encoder_output = np.random.randn(batch_size, source_length, d_model)
decoder_layer = DecoderLayer(d_model, num_heads, source_length, target_length)
output = decoder_layer.forward(x, encoder_output)
print(output.shape)

(32, 10, 256)


In [146]:
class Decoder:
  def __init__(self, vocab_size, d_model, target_length, num_layers, forward_dim):
    self.num_layers = num_layers
    self.embedding_layer = Embedding(vocab_size, d_model)
    self.pos_encoding = self.create_positional_encoding(target_length, d_model)
    self.decoder_layers = [DecoderLayer(d_model, num_heads, source_length, target_length) for _ in range(num_layers)]
    self.last_layer = Linear(d_model, vocab_size)

  def create_positional_encoding(self, seq_length, d_model):
    assert d_model % 2 == 0, "Dimension model must be even"

    pos = np.arange(0, seq_length)[:, np.newaxis]  # (seq_length, 1)
    pos_expanded = np.repeat(pos, d_model // 2, axis=1)  # (seq_length, d_model // 2)

    power = np.arange(0, d_model, 2).astype(float) / d_model
    div_term = np.power(10000, power)[np.newaxis, :]  # (1, d_model // 2)
    div_term_expanded = np.repeat(div_term, seq_length, axis=0)  # (seq_length, d_model // 2)

    pe = np.zeros((seq_length, d_model))  # (seq_length, d_model)
    pe[:, 0::2] = np.sin(pos_expanded / div_term_expanded)  # (seq_length, d_model // 2)
    pe[:, 1::2] = np.cos(pos_expanded / div_term_expanded)  # (seq_length, d_model // 2)

    return pe

  def forward(self, x, encoder_output):
    x = self.embedding_layer.forward(x) # (N, L, d_model)
    x = x + self.pos_encoding

    for i in range(self.num_layers):
      x = self.decoder_layers[i].forward(x, encoder_output)

    x = self.last_layer.forward(x)
    return x

In [147]:
# test decoder
x = np.random.randint(0, vocab_size, (batch_size, source_length)).astype(np.int64)
encoder_output = np.random.randn(batch_size, source_length, d_model)
decoder = Decoder(vocab_size, d_model, target_length, num_layers, forward_dim)
output = decoder.forward(x, encoder_output)
print(output.shape)

(32, 10, 5000)


## TransFormer

In [148]:
class TransFormer:
  def __init__(self, source_length, target_length, forward_dim, num_layers, vocab_size, d_model):
    self.encoder = Encoder(source_length, d_model, num_heads, forward_dim, num_layers, vocab_size)
    self.decoder = Decoder(vocab_size, d_model, target_length, num_layers, forward_dim)

  def forward(self, encoder_input, decoder_input):
    encoder_output = self.encoder.forward(encoder_input)
    decoder_output = self.decoder.forward(decoder_input, encoder_output)
    return decoder_output

In [149]:
transformer = TransFormer(source_length, target_length, forward_dim, num_layers, vocab_size, d_model)
encoder_input = np.random.randint(0, vocab_size, (batch_size, source_length)).astype(np.int64)
deocder_input = np.random.randint(0, vocab_size, (batch_size, target_length)).astype(np.int64)
output = transformer.forward(encoder_input, deocder_input)
assert output.shape == (batch_size, target_length, vocab_size)
print(Fore.GREEN + "Assertion passed: The output shape matches the expected shape (batch_size, target_length, vocab_size)." + Style.RESET_ALL)

[32mAssertion passed: The output shape matches the expected shape (batch_size, target_length, vocab_size).[0m
