<a href="https://colab.research.google.com/github/pompymandislian/scratch_transformer_model/blob/main/Transformer_Scratch%20(V2).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

# **Step by Step Transformer**
---

In [2]:
class SelfAttention(nn.Module):
      def __init__(self, embed_size, heads): # embed 512 , heads 8 relational
        super(SelfAttention, self).__init__()

        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads # if divide not zero

        assert (self.head_dim * heads == embed_size), "Embed size needs to be div by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False) # similarity token
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False) # high attention
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False) # output
        self.fc_out = nn.Linear(heads*self.head_dim, embed_size) # calculate final output

      def forward(self, values, keys, query, mask):
          """
          Process input data through multihead attention to produce weighted output.

          Parameters:
          -----------
          values : torch.Tensor
              Tensor containing the values to be attended to.

          keys : torch.Tensor
              Tensor containing the keys used to calculate attention scores with the queries.

          query : torch.Tensor
              Tensor containing the queries used to compute attention scores with the keys.

          mask : torch.Tensor, optional
              A tensor of shape (batch_size, query_len, key_len), where each element is either 0 or 1.

          Returns:
          --------
          torch.Tensor
              The output of the multihead attention layer, where the attention scores have been applied to the values.
          """
          N = query.shape[0] # batch size
          value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

          # note: shape[0] is batch size, shape[1] is sequence lenght token
          # shape[2] is heads, shape[3] is head_dim

          # split embedding into self.heads --> split each heads (4 dimentions)
          values = values.reshape(N, value_len, self.heads, self.head_dim)
          keys = keys.reshape(N, key_len, self.heads, self.head_dim)
          queries = query.reshape(N, query_len, self.heads, self.head_dim)

          # calculate heads with pararel
          energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

          # obtain attantion, mask words not know just know words
          if mask is not None: # find in dict data if nothing from result Q V K then 0
            energy = energy.masked_fill(mask == 0, float("-1e20"))

          # obtain probability attention
          attention = torch.softmax(energy / (self.embed_size ** (1/2)), dim=3)

          # Apply attention scores to the values to get the weighted output
          # multiple dimension
          out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
              N, query_len, self.heads*self.head_dim
          )

          # final output
          out = self.fc_out(out)
          return out

In [3]:
# testing class SelfAttention
embed_size = 512
heads = 8
self_attention = SelfAttention(embed_size, heads)

print(self_attention.values)
print(self_attention.keys)
print(self_attention.queries)
print(self_attention.fc_out)

Linear(in_features=64, out_features=64, bias=False)
Linear(in_features=64, out_features=64, bias=False)
Linear(in_features=64, out_features=64, bias=False)
Linear(in_features=512, out_features=512, bias=True)


In [4]:
# testing forward
vocab = {
    "I": 0,
    "love": 1,
    "programming": 2,
    "is": 3,
    "fun": 4,
    "machine": 5,
    "learning": 6,
    "very": 7,
    "exciting": 8,
    "and": 9
}

id_to_token = dict(map(reversed, vocab.items())) # dict

# assume tokens id
token_ids = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                          [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])


# vocab token id (dict), 10 because we have 10 token
embedding = nn.Embedding(len(vocab), embed_size)

print(embedding)

Embedding(10, 512)


In [5]:
# define V K Q
values = embedding(token_ids)
keys = embedding(token_ids)
query = embedding(token_ids)

mask = None

# obtain output attention
output = self_attention(values, keys, query, mask)

print(output.shape)

torch.Size([2, 10, 512])


In [6]:
class TransformerBlock(nn.Module):
      def __init__(self, embed_size, heads, dropout, forward_expansion):
          super(TransformerBlock, self).__init__()
          self.attention = SelfAttention(embed_size, heads)

          # normalize for stabilization data
          self.norm1 = nn.LayerNorm(embed_size)
          self.norm2 = nn.LayerNorm(embed_size)

          # feed forward arschitecture
          self.feed_forward = nn.Sequential(
              nn.Linear(embed_size, forward_expansion * embed_size), # flexible feed
              nn.ReLU(), # change to positive (non linear)
              nn.Linear(forward_expansion * embed_size, embed_size) # flexible feed
          )

      def forward(self, values, keys, query, mask):
          """Feed Forward V K Q"""
          # Compute attention output based on query, keys, and values
          attention = self.attention(values, keys, query, mask)

          # Skip connection: add attention output with input query, then normalize
          x = self.norm1(attention + query)

          # Feed forward with the normalized output of attention + query
          forward = self.feed_forward(x)

          # Skip connection again: add FFN output with previous input, then normalize
          out = self.norm2(forward + x)

          return out

In [7]:
# testing class TransformerBlock
forward_expansion = 4 # dimension layer
dropout=0.1

transformer = TransformerBlock(embed_size, heads,
                               dropout, forward_expansion)

print(transformer.attention)
print(transformer.norm1)
print(transformer.norm2)
print(transformer.feed_forward)

SelfAttention(
  (values): Linear(in_features=64, out_features=64, bias=False)
  (keys): Linear(in_features=64, out_features=64, bias=False)
  (queries): Linear(in_features=64, out_features=64, bias=False)
  (fc_out): Linear(in_features=512, out_features=512, bias=True)
)
LayerNorm((512,), eps=1e-05, elementwise_affine=True)
LayerNorm((512,), eps=1e-05, elementwise_affine=True)
Sequential(
  (0): Linear(in_features=512, out_features=2048, bias=True)
  (1): ReLU()
  (2): Linear(in_features=2048, out_features=512, bias=True)
)


In [8]:
# testing output forward
output = transformer(values, keys, query, mask)

print(output.shape)

torch.Size([2, 10, 512])


In [9]:
class Encoder(nn.Module):
      def __init__(self, src_vocab_size, embed_size,
                  num_layers, heads, device,
                  forward_expansion, dropout, max_length):
        super(Encoder, self).__init__()

        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(src_vocab_size, embed_size)

        # not using sinusoida because we need training data
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList([
              TransformerBlock(embed_size, heads,
                               dropout=dropout,
                               forward_expansion=forward_expansion)
              # looping layer
              for _ in range(num_layers)
          ])

        self.dropout = nn.Dropout(dropout)

      def forward(self, x, mask = None):
        """
        Forward pass of the encoder.

        Parameters:
        ----------
        X : torch.tensor
          input data

        mask : torch.tensor
          mask data

        Return:
        -------
        torch.tensor
          output data
        """
        # sum of length words
        N, seq_length = x.shape

        # position values
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)

        # embedding to vektor embedding and summation with position
        out = self.dropout(
            (self.word_embedding(x) + self.position_embedding(positions))
        )

        # result of out continue to layers
        for layer in self.layers:
          out = layer(out, out, out, mask)

        return out

In [10]:
# test class Encoder
src_vocab_size = 10000  # size vocab input
embed_size = 512  # Dimensi embedding
num_layers = 1  # layer transformer block
heads = 8  # Multihead Attention
device = 'cpu'  # Device
forward_expansion = 4  # dimension layer
dropout = 0.1  # Dropout rate
max_length = 60  # max sort lenght

encoder = Encoder(src_vocab_size, embed_size, num_layers,
                  heads, device, forward_expansion, dropout, max_length)

print(encoder.embed_size)
print(encoder.device)
print(encoder.word_embedding)
print(encoder.position_embedding)
print(encoder.layers)

512
cpu
Embedding(10000, 512)
Embedding(60, 512)
ModuleList(
  (0): TransformerBlock(
    (attention): SelfAttention(
      (values): Linear(in_features=64, out_features=64, bias=False)
      (keys): Linear(in_features=64, out_features=64, bias=False)
      (queries): Linear(in_features=64, out_features=64, bias=False)
      (fc_out): Linear(in_features=512, out_features=512, bias=True)
    )
    (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (feed_forward): Sequential(
      (0): Linear(in_features=512, out_features=2048, bias=True)
      (1): ReLU()
      (2): Linear(in_features=2048, out_features=512, bias=True)
    )
  )
)


In [11]:
# test forward
encoder.forward(token_ids)

tensor([[[ 0.6894, -1.0818, -0.0805,  ..., -0.2028, -1.3544, -0.6048],
         [ 0.3709, -0.9345,  1.4319,  ..., -0.6475,  0.8177,  2.3684],
         [-1.2066,  0.4468, -1.4281,  ..., -1.7451,  1.2988, -0.1292],
         ...,
         [ 0.7149, -0.6914, -0.5641,  ...,  0.4712, -0.4231, -0.9916],
         [-0.2099, -0.5330,  0.0212,  ...,  0.3055, -1.7120, -0.5566],
         [ 0.1003,  0.1165,  1.1143,  ...,  0.5316,  0.8280,  0.8455]],

        [[ 0.6437, -0.9746,  0.3059,  ..., -0.4748, -0.6032, -0.8995],
         [-0.0262, -2.5942, -0.4762,  ...,  0.3753, -0.2043,  0.8237],
         [-0.0856, -0.2041,  0.4204,  ..., -1.5958,  1.4535,  0.4304],
         ...,
         [ 1.2091, -0.6240, -0.7827,  ...,  2.5901, -0.4674, -1.5980],
         [ 1.7856, -1.6918, -0.2146,  ..., -0.9761, -2.3058, -0.3542],
         [-0.9477,  0.4265,  1.1593,  ...,  0.7040,  0.3397, -0.6253]]],
       grad_fn=<NativeLayerNormBackward0>)

In [12]:
class DecoderBlock(nn.Module):
      def __init__(self, embed_size, heads, forward_expansion, dropout, device):
          super(DecoderBlock, self).__init__()

          self.attention = SelfAttention(embed_size, heads=heads)
          self.norm = nn.LayerNorm(embed_size)
          self.transformer_block = TransformerBlock(
              embed_size, heads, dropout, forward_expansion)

          self.dropout = nn.Dropout(dropout)

      def forward(self, x, value, key, trg_mask, src_mask=None):
          """Forward pass for block decoder (not relevant values)"""

          # find data is not process in each K Q V with trg_mask
          attention = self.attention(x, x, x, trg_mask)

          # streamlining gradien prevent lose information
          query = self.dropout(self.norm(attention + x))

          # calculate attention and block inrelevant word with src_mask
          out = self.transformer_block(value, key, query, src_mask)

          return out

In [13]:
# test class DecoderBlock
batch_size = 2
seq_len = 10
embed_size = 512
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# embedding token_ids
embedding_layer = nn.Embedding(len(vocab), embed_size).to(device)
embedded_tokens = embedding_layer(token_ids).to(device)  # Shape: (batch_size, seq_len, embed_size)

# Mask
trg_mask = torch.tril(torch.ones((batch_size, seq_len, seq_len))).to(device)  # Causal mask (shape: batch_size, seq_len, seq_len)
trg_mask = trg_mask.unsqueeze(1)  # change dimension (batch_size, 1, seq_len, seq_len)

# Output encoder embedding is same
encoder_output = embedded_tokens

# Membuat DecoderBlock
decoder_block = DecoderBlock(embed_size, heads=8,
                             forward_expansion=4,
                             dropout=0.1,
                             device=device).to(device)

print(decoder_block.attention)
print(decoder_block.norm)
print(decoder_block.transformer_block)
print(decoder_block.dropout)

SelfAttention(
  (values): Linear(in_features=64, out_features=64, bias=False)
  (keys): Linear(in_features=64, out_features=64, bias=False)
  (queries): Linear(in_features=64, out_features=64, bias=False)
  (fc_out): Linear(in_features=512, out_features=512, bias=True)
)
LayerNorm((512,), eps=1e-05, elementwise_affine=True)
TransformerBlock(
  (attention): SelfAttention(
    (values): Linear(in_features=64, out_features=64, bias=False)
    (keys): Linear(in_features=64, out_features=64, bias=False)
    (queries): Linear(in_features=64, out_features=64, bias=False)
    (fc_out): Linear(in_features=512, out_features=512, bias=True)
  )
  (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
  (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
  (feed_forward): Sequential(
    (0): Linear(in_features=512, out_features=2048, bias=True)
    (1): ReLU()
    (2): Linear(in_features=2048, out_features=512, bias=True)
  )
)
Dropout(p=0.1, inplace=False)


In [14]:
# Forward decoder
decoder_output = decoder_block.forward(embedded_tokens,
                                       encoder_output,
                                       encoder_output, trg_mask)

print("Decoder Output Shape:", decoder_output.shape)  # Output: (batch_size, seq_len, embed_size)

Decoder Output Shape: torch.Size([2, 10, 512])


In [15]:
class Decoder(nn.Module):
      def __init__(self, trg_vocab_size, embed_size,
                   num_layers, heads,
                   forward_expansion,
                   dropout, device, max_length):

          super(Decoder, self).__init__()
          self.device = device

          # target data to vector
          self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)

          # position for data target
          self.position_embedding = nn.Embedding(max_length, embed_size)

          # architecture
          self.layers = nn.ModuleList([
              DecoderBlock(embed_size, heads,
                           forward_expansion, dropout, device)
              for _ in range(num_layers)
          ])

          # fully connection target data
          self.fc_out = nn.Linear(embed_size, trg_vocab_size)
          self.dropout = nn.Dropout(dropout)

      def forward(self, x, enc_out, trg_mask, src_mask=None):
          n, seq_length = x.shape

          # position values
          positions = torch.arange(0, seq_length).expand(n, seq_length).to(self.device)

          # embedding to vektor embedding and summation with position
          x = self.dropout((self.word_embedding(x) + self.position_embedding(positions)))

          # loop layers
          for layer in self.layers:
            x = layer(x, enc_out, enc_out, trg_mask, src_mask)

          # final output
          out = self.fc_out(x)

          return out

In [16]:
# test class decoder
trg_vocab_size = len(vocab) # length of vocab

# decoder
decoder = Decoder(trg_vocab_size, embed_size, num_layers, heads,
                  forward_expansion, dropout, device, max_length).to(device)


print(decoder.word_embedding)
print(decoder.position_embedding)
print(decoder.layers)
print(decoder.fc_out)
print(decoder.dropout)

Embedding(10, 512)
Embedding(60, 512)
ModuleList(
  (0): DecoderBlock(
    (attention): SelfAttention(
      (values): Linear(in_features=64, out_features=64, bias=False)
      (keys): Linear(in_features=64, out_features=64, bias=False)
      (queries): Linear(in_features=64, out_features=64, bias=False)
      (fc_out): Linear(in_features=512, out_features=512, bias=True)
    )
    (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (transformer_block): TransformerBlock(
      (attention): SelfAttention(
        (values): Linear(in_features=64, out_features=64, bias=False)
        (keys): Linear(in_features=64, out_features=64, bias=False)
        (queries): Linear(in_features=64, out_features=64, bias=False)
        (fc_out): Linear(in_features=512, out_features=512, bias=True)
      )
      (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (feed_forward): Sequential(
        (0): Lin

In [17]:
# test decoder forward
decoder_output = decoder.forward(token_ids, encoder_output, trg_mask)

print(decoder_output.shape)

torch.Size([2, 10, 10])


In [18]:
class Transformer(nn.Module):
      def __init__(self, src_vocab_size, trg_vocab_size, device,
                   src_pad_idx=0, trg_pad_idx=0,
                   embed_size=512, num_layers=6, forward_expansion=4,
                   heads=8, dropout=0, max_length=100):

          super(Transformer, self).__init__()

          # encoder process
          self.encoder = Encoder(src_vocab_size,
                                embed_size, num_layers, heads,
                                device, forward_expansion,
                                dropout, max_length)

          # decoder process
          self.decoder = Decoder(trg_vocab_size,
                                embed_size, num_layers, heads,
                                forward_expansion, dropout,
                                device, max_length)

          # find information not relevant
          self.src_pad_idx = src_pad_idx
          self.trg_pad_idx = trg_pad_idx
          self.device = device

      def make_src_mask(self, src):
          """Find mask from pad_idx"""
          # find mask and create a pad_idx
          src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)

          # (N, 1, 1, src_len)
          return src_mask.to(self.device)

      def make_trg_mask(self, trg):
          """Hold words future"""
          # obtain length
          N, trg_len = trg.shape

          # hold not used future words
          trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
              N, 1, trg_len, trg_len
          )

          return trg_mask.to(self.device)

      def forward(self, src, trg):
          """Running Transformer Model"""
          # avoid attention padding
          src_mask = self.make_src_mask(src)  # Mask untuk padding token pada source

          # hold not see next future word
          trg_mask = self.make_trg_mask(trg)  # Mask untuk target sequence

          # process input with encoder
          enc_src = self.encoder(src, src_mask)

          # process target with decoder
          out = self.decoder(trg, enc_src, trg_mask, src_mask)

          # Return output from decoder (not just trg_mask)
          return out


In [19]:
# test class Transformer
transformer = Transformer(src_vocab_size, trg_vocab_size, device)

print('Encoder', transformer.encoder(token_ids))

print('Decoder', transformer.decoder(token_ids, encoder_output, trg_mask))

Encoder tensor([[[-1.5970, -0.6391,  2.2730,  ...,  0.2967, -0.6160,  0.8779],
         [-1.1495, -0.9593,  0.4366,  ...,  1.3603, -0.4193,  1.0420],
         [-0.4125, -1.2371,  0.0055,  ...,  0.2198, -1.5073,  2.5586],
         ...,
         [ 0.4063, -0.6674,  0.0942,  ...,  1.5814, -0.2556,  0.5165],
         [-1.1757, -1.1254,  0.9289,  ...,  2.4127, -1.4108,  0.8692],
         [-0.5346, -1.2139,  1.0224,  ...,  1.2773, -0.1418,  1.0747]],

        [[-1.1141, -0.9489,  2.3258,  ...,  0.4645, -0.5379,  0.1532],
         [-1.2585, -0.9842,  1.0694,  ...,  1.0049, -0.4197,  0.8696],
         [-1.6852, -1.0057, -0.0076,  ...,  0.1520, -1.1211,  1.1090],
         ...,
         [-1.2036, -0.5035,  0.3905,  ...,  0.4053, -0.8536,  1.1632],
         [ 0.4701, -0.8489,  0.1810,  ...,  1.9465, -0.0842,  1.1371],
         [-0.4712, -2.1952,  1.1738,  ...,  3.3788, -1.3913,  1.2109]]],
       grad_fn=<NativeLayerNormBackward0>)
Decoder tensor([[[-0.0108, -1.0531,  0.1426, -0.1361,  0.3946, -0

In [20]:
# test make src mask
print(transformer.make_src_mask(token_ids))

print(transformer.make_trg_mask(token_ids))

tensor([[[[ True,  True,  True,  True,  True,  True,  True,  True,  True, False]]],


        [[[False,  True,  True,  True,  True,  True,  True,  True,  True,  True]]]])
tensor([[[[1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
          [1., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
          [1., 1., 1., 0., 0., 0., 0., 0., 0., 0.],
          [1., 1., 1., 1., 0., 0., 0., 0., 0., 0.],
          [1., 1., 1., 1., 1., 0., 0., 0., 0., 0.],
          [1., 1., 1., 1., 1., 1., 0., 0., 0., 0.],
          [1., 1., 1., 1., 1., 1., 1., 0., 0., 0.],
          [1., 1., 1., 1., 1., 1., 1., 1., 0., 0.],
          [1., 1., 1., 1., 1., 1., 1., 1., 1., 0.],
          [1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]],


        [[[1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
          [1., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
          [1., 1., 1., 0., 0., 0., 0., 0., 0., 0.],
          [1., 1., 1., 1., 0., 0., 0., 0., 0., 0.],
          [1., 1., 1., 1., 1., 0., 0., 0., 0., 0.],
          [1., 1., 1., 1., 1., 1., 0., 0., 0.

In [21]:
# test forward
transformer.forward(token_ids, token_ids)

tensor([[[-0.2692, -0.9284,  0.3091, -0.6465, -0.4244, -0.2612,  0.3290,
           0.3745,  0.7543,  0.0245],
         [ 0.1243, -0.3863,  0.3331,  0.0285,  0.0913, -0.6686, -0.0174,
          -0.1718,  0.3767,  0.2887],
         [-0.1797, -0.3740, -0.2588,  0.1776,  0.0915,  0.0017,  0.1693,
          -0.2255, -0.0292,  0.4454],
         [-0.3091, -0.5568,  0.2743, -0.6087, -0.4043, -0.0398,  0.4076,
           0.0028,  0.1441,  0.2490],
         [-0.4135, -0.1826, -0.4783, -0.8689, -0.1955, -0.3868,  0.5608,
           0.1947, -0.1306, -0.5409],
         [-0.0736, -0.9116,  0.0348, -0.9732,  0.0981, -0.4088,  0.1292,
          -0.9973,  0.5366,  0.0957],
         [-0.5118, -0.2149,  0.6704, -0.5564, -0.0820, -0.1448,  0.5788,
           0.0636, -0.1608, -0.6932],
         [-0.3307, -0.1946, -0.3765, -0.3371,  0.0212, -0.2751, -0.2424,
           0.4064,  0.7111, -0.2253],
         [-0.3913, -0.5042, -0.6666, -0.3636, -0.0971, -0.2547,  0.4503,
          -0.1842, -0.1087, -0.2123],
 

# Full Code Transformer
---

In [22]:
class SelfAttention(nn.Module):
      def __init__(self, embed_size, heads): # embed 512 , heads 8 relational
        super(SelfAttention, self).__init__()

        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads # if divide not zero

        assert (self.head_dim * heads == embed_size), "Embed size needs to be div by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False) # similarity token
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False) # high attention
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False) # output
        self.fc_out = nn.Linear(heads*self.head_dim, embed_size) # calculate final output

      def forward(self, values, keys, query, mask):
          """
          Process input data through multihead attention to produce weighted output.

          Parameters:
          -----------
          values : torch.Tensor
              Tensor containing the values to be attended to.

          keys : torch.Tensor
              Tensor containing the keys used to calculate attention scores with the queries.

          query : torch.Tensor
              Tensor containing the queries used to compute attention scores with the keys.

          mask : torch.Tensor, optional
              A tensor of shape (batch_size, query_len, key_len), where each element is either 0 or 1.

          Returns:
          --------
          torch.Tensor
              The output of the multihead attention layer, where the attention scores have been applied to the values.
          """
          N = query.shape[0] # batch size
          value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

          # note: shape[0] is batch size, shape[1] is sequence lenght token
          # shape[2] is heads, shape[3] is head_dim

          # split embedding into self.heads --> split each heads (4 dimentions)
          values = values.reshape(N, value_len, self.heads, self.head_dim)
          keys = keys.reshape(N, key_len, self.heads, self.head_dim)
          queries = query.reshape(N, query_len, self.heads, self.head_dim)

          # calculate heads with pararel
          energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

          # obtain attantion, mask words not know just know words
          if mask is not None: # find in dict data if nothing from result Q V K then 0
            energy = energy.masked_fill(mask == 0, float("-1e20"))

          # obtain probability attention
          attention = torch.softmax(energy / (self.embed_size ** (1/2)), dim=3)

          # Apply attention scores to the values to get the weighted output
          # multiple dimension
          out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
              N, query_len, self.heads*self.head_dim
          )

          # final output
          out = self.fc_out(out)
          return out

class TransformerBlock(nn.Module):
      def __init__(self, embed_size, heads, dropout, forward_expansion):
          super(TransformerBlock, self).__init__()
          self.attention = SelfAttention(embed_size, heads)

          # normalize for stabilization data
          self.norm1 = nn.LayerNorm(embed_size)
          self.norm2 = nn.LayerNorm(embed_size)

          # feed forward arschitecture
          self.feed_forward = nn.Sequential(
              nn.Linear(embed_size, forward_expansion * embed_size), # flexible feed
              nn.ReLU(), # change to positive (non linear)
              nn.Linear(forward_expansion * embed_size, embed_size) # flexible feed
          )

      def forward(self, values, keys, query, mask):
          """Feed Forward V K Q"""
          # Compute attention output based on query, keys, and values
          attention = self.attention(values, keys, query, mask)

          # Skip connection: add attention output with input query, then normalize
          x = self.norm1(attention + query)

          # Feed forward with the normalized output of attention + query
          forward = self.feed_forward(x)

          # Skip connection again: add FFN output with previous input, then normalize
          out = self.norm2(forward + x)

          return out

class Encoder(nn.Module):
      def __init__(self, src_vocab_size, embed_size,
                  num_layers, heads, device,
                  forward_expansion, dropout, max_length):
        super(Encoder, self).__init__()

        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(src_vocab_size, embed_size)

        # not using sinusoida because we need training data
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList([
              TransformerBlock(embed_size, heads,
                               dropout=dropout,
                               forward_expansion=forward_expansion)
              # looping layer
              for _ in range(num_layers)
          ])

        self.dropout = nn.Dropout(dropout)

      def forward(self, x, mask = None):
        """
        Forward pass of the encoder.

        Parameters:
        ----------
        X : torch.tensor
          input data

        mask : torch.tensor
          mask data

        Return:
        -------
        torch.tensor
          output data
        """
        # sum of length words
        N, seq_length = x.shape

        # position values
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)

        # embedding to vektor embedding and summation with position
        out = self.dropout(
            (self.word_embedding(x) + self.position_embedding(positions))
        )

        # result of out continue to layers
        for layer in self.layers:
          out = layer(out, out, out, mask)

        return out

class DecoderBlock(nn.Module):
      def __init__(self, embed_size, heads, forward_expansion, dropout, device):
          super(DecoderBlock, self).__init__()

          self.attention = SelfAttention(embed_size, heads=heads)
          self.norm = nn.LayerNorm(embed_size)
          self.transformer_block = TransformerBlock(
              embed_size, heads, dropout, forward_expansion)

          self.dropout = nn.Dropout(dropout)

      def forward(self, x, value, key, trg_mask, src_mask=None):
          """Forward pass for block decoder (not relevant values)"""

          # find data is not process in each K Q V with trg_mask
          attention = self.attention(x, x, x, trg_mask)

          # streamlining gradien prevent lose information
          query = self.dropout(self.norm(attention + x))

          # calculate attention and block inrelevant word with src_mask
          out = self.transformer_block(value, key, query, src_mask)

          return out

class Decoder(nn.Module):
      def __init__(self, trg_vocab_size, embed_size,
                   num_layers, heads,
                   forward_expansion,
                   dropout, device, max_length):

          super(Decoder, self).__init__()
          self.device = device

          # target data to vector
          self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)

          # position for data target
          self.position_embedding = nn.Embedding(max_length, embed_size)

          # architecture
          self.layers = nn.ModuleList([
              DecoderBlock(embed_size, heads,
                           forward_expansion, dropout, device)
              for _ in range(num_layers)
          ])

          # fully connection target data
          self.fc_out = nn.Linear(embed_size, trg_vocab_size)
          self.dropout = nn.Dropout(dropout)

      def forward(self, x, enc_out, trg_mask, src_mask=None):
          n, seq_length = x.shape

          # position values
          positions = torch.arange(0, seq_length).expand(n, seq_length).to(self.device)

          # embedding to vektor embedding and summation with position
          x = self.dropout((self.word_embedding(x) + self.position_embedding(positions)))

          # loop layers
          for layer in self.layers:
            x = layer(x, enc_out, enc_out, trg_mask, src_mask)

          # final output
          out = self.fc_out(x)

          return out

class Transformer(nn.Module):
      def __init__(self, src_vocab_size, trg_vocab_size, device,
                   src_pad_idx=0, trg_pad_idx=0,
                   embed_size=512, num_layers=6, forward_expansion=4,
                   heads=8, dropout=0, max_length=100):

          super(Transformer, self).__init__()

          # encoder process
          self.encoder = Encoder(src_vocab_size,
                                embed_size, num_layers, heads,
                                device, forward_expansion,
                                dropout, max_length)

          # decoder process
          self.decoder = Decoder(trg_vocab_size,
                                embed_size, num_layers, heads,
                                forward_expansion, dropout,
                                device, max_length)

          # find information not relevant
          self.src_pad_idx = src_pad_idx
          self.trg_pad_idx = trg_pad_idx
          self.device = device

      def make_src_mask(self, src):
          """Find mask from pad_idx"""
          # find mask and create a pad_idx
          src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)

          # (N, 1, 1, src_len)
          return src_mask.to(self.device)

      def make_trg_mask(self, trg):
          """Hold words future"""
          # obtain length
          N, trg_len = trg.shape

          # hold not used future words
          trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
              N, 1, trg_len, trg_len
          )

          return trg_mask.to(self.device)

      def forward(self, src, trg):
          """Running Transformer Model"""
          # avoid attention padding
          src_mask = self.make_src_mask(src)  # Mask untuk padding token pada source

          # hold not see next future word
          trg_mask = self.make_trg_mask(trg)  # Mask untuk target sequence

          # process input with encoder
          enc_src = self.encoder(src, src_mask)

          # process target with decoder
          out = self.decoder(trg, enc_src, trg_mask, src_mask)

          # Return output from decoder (not just trg_mask)
          return out

# Execution Class
---

In [23]:
if __name__ == "__main__":
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  print(device)

  x = torch.tensor([[1, 5, 6,], [1, 3, 4]]).to(device)
  trg = torch.tensor([[1, 5, 6,], [1, 3, 4]]).to(device)

  src_pad_idx = 0
  trg_pad_idx = 0
  src_vocab_size = 10
  trg_vocab_size = 10

  model = Transformer(src_vocab_size, trg_vocab_size, device=device).to(device)

  out = model(x, trg[:, :-1])
  print(out.shape)

  print(model)

cpu
torch.Size([2, 2, 10])
Transformer(
  (encoder): Encoder(
    (word_embedding): Embedding(10, 512)
    (position_embedding): Embedding(100, 512)
    (layers): ModuleList(
      (0-5): 6 x TransformerBlock(
        (attention): SelfAttention(
          (values): Linear(in_features=64, out_features=64, bias=False)
          (keys): Linear(in_features=64, out_features=64, bias=False)
          (queries): Linear(in_features=64, out_features=64, bias=False)
          (fc_out): Linear(in_features=512, out_features=512, bias=True)
        )
        (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (feed_forward): Sequential(
          (0): Linear(in_features=512, out_features=2048, bias=True)
          (1): ReLU()
          (2): Linear(in_features=2048, out_features=512, bias=True)
        )
      )
    )
    (dropout): Dropout(p=0, inplace=False)
  )
  (decoder): Decoder(
    (word_embedding): Emb

# Training Model
---

In [24]:
# Hyperparameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
src_vocab_size = 10
trg_vocab_size = 10
src_pad_idx = 0
trg_pad_idx = 0
embed_size = 512
num_layers = 6
heads = 8
forward_expansion = 4
dropout = 0.1
max_length = 100
batch_size = 2
lr = 3e-4
epochs = 10

In [25]:
criterion = nn.CrossEntropyLoss(ignore_index=trg_pad_idx)
optimizer = optim.Adam(model.parameters(), lr=lr)

In [52]:
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0

    for src, trg in train_loader:
        src, trg = src.to(device), trg.to(device)

        # Zero gradients before backpropagation
        optimizer.zero_grad()

        # Forward pass (only use input seq excluding the last token for the target)
        output = model(src, trg[:, :-1])

        # Compute loss: output shape = (N, trg_len-1, trg_vocab_size)
        # trg[:, 1:].shape = (N, trg_len-1) because we want to compare outputs with targets
        output = output.reshape(-1, trg_vocab_size)
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)

        # Backpropagate the error and update the model weights
        loss.backward()

        # Clip gradients to avoid explosion
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        # Optimizer step
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Training loss: {avg_loss:.4f}")
    return avg_loss


In [62]:
from torch.utils.data import Dataset, DataLoader

class DummyDataset(Dataset):
    def __init__(self, src_vocab_size, trg_vocab_size, max_len, num_samples=100):
        self.src_vocab_size = src_vocab_size
        self.trg_vocab_size = trg_vocab_size
        self.max_len = max_len
        self.num_samples = num_samples

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        src = torch.randint(1, self.src_vocab_size, (self.max_len,))
        trg = torch.randint(1, self.trg_vocab_size, (self.max_len,))
        return src, trg

# Create the dataset and dataloaders
train_dataset = DummyDataset(src_vocab_size, trg_vocab_size, max_len=max_length, num_samples=100)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [63]:
# Initialize model
model = Transformer(src_vocab_size, trg_vocab_size, device=device,
                    src_pad_idx=src_pad_idx, trg_pad_idx=trg_pad_idx,
                    embed_size=embed_size, num_layers=num_layers,
                    forward_expansion=forward_expansion, heads=heads,
                    dropout=dropout, max_length=max_length).to(device)

# Training loop
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    train_loss = train(model, train_loader, optimizer, criterion, device)

Epoch 1/10
Training loss: 2.4666
Epoch 2/10
Training loss: 2.4727
Epoch 3/10
Training loss: 2.4792
Epoch 4/10
Training loss: 2.4763
Epoch 5/10
Training loss: 2.4690
Epoch 6/10
Training loss: 2.4721
Epoch 7/10
Training loss: 2.4757
Epoch 8/10
Training loss: 2.4832
Epoch 9/10
Training loss: 2.4834
Epoch 10/10
Training loss: 2.4777


# Predict
---

In [64]:
def predict(model, src, idx_to_word, max_len=20):
    sos_token = 1
    trg = torch.ones((src.shape[0], 1)).fill_(sos_token).long().to(device)  # Start with <sos>

    # Convert source indices to words
    src_text = " ".join([idx_to_word.get(token.item(), "<unk>") for token in src[0]])
    print("Question: ", " ".join([word for word in src_text.split() if word != "<pad>" and word != "<unk>"]))


    for t in range(1, max_len):
        # Generate the target mask and source mask
        trg_mask = model.make_trg_mask(trg)  # Target mask to prevent future information leak
        src_mask = model.make_src_mask(src)  # Source mask to avoid padding tokens in source

        # Pass the source and target sequence
        with torch.no_grad():
            output = model(src, trg)  # (batch_size, seq_len, vocab_size)

        # Get the last generated token's
        next_token = output[:, -1, :].argmax(dim=1).unsqueeze(1)  # Get the most probable token

        # Append the predicted token to the target sequence
        trg = torch.cat((trg, next_token), dim=1)

        eos_token = 8  # Assuming 8 is the <eos> token in your vocabulary
        if torch.any(next_token == eos_token):  # Check for <eos> in any token in the batch
            break

    # Convert predicted target sequence to words and remove <pad>, <unk> and <eos> tokens
    predicted_words = [idx_to_word.get(token.item(), "<unk>") for token in trg[0]]
    answer_text = " ".join([word for word in predicted_words if word != "<pad>" and word != "<unk>" and word != "<eos>"])

    print("Answer: ", answer_text)

    return trg

In [66]:
# Example
src_example = torch.tensor([[1, 5, 6, 7, 2], [1, 3, 4, 0, 0]]).to(device)

# Convert predicted token indices to words using idx_to_word
idx_to_word = {
    0: "<pad>", 1: "the", 2: "dog", 3: "chased", 4: "cat",
    5: "ran", 6: "quickly", 7: "away", 8: "<eos>", 9: "<unk>"
}

# Generate predicted target sequence
predicted_trg = predict(model, src_example, idx_to_word)

# Convert predicted token indices to words
predicted_words = [idx_to_word.get(token.item(), "<unk>") for token in predicted_trg[0]]

print("Predicted Words:", " ".join(predicted_words))

Question:  the ran quickly away dog
Answer:  the quickly
Predicted Words: the quickly <pad> <eos>
