<a href="https://colab.research.google.com/github/pinkfloyed/Transformer_Scratch_Code/blob/main/Transformer_From_Scratch_encoding_decoding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

from numpy.ma.core import indices
from torch.ao.nn.quantized import ReLU6
from torch.nn import ReLU


class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()

        self.dimension_model = d_model
        self.num_heads = num_heads
        self.dimension_head = d_model // num_heads

        self.weight_query = nn.Linear(d_model, d_model)
        self.weight_key = nn.Linear(d_model, d_model)
        self.weight_value = nn.Linear(d_model, d_model)
        self.weight_output = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, query, key, value, mask = None):
        attention_scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.dimension_head)
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9)
        attention_probs = torch.softmax(attention_scores, dim = -1)
        output = torch.matmul(attention_probs, value)
        return output

    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.dimension_head).transpose(1,2)

    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1,2).contiguous().view(batch_size, seq_length, self.dimension_model)

    def forward(self, query, key, value, mask = None):
        query = self.split_heads(self.weight_query(query))
        key = self.split_heads(self.weight_key(key))
        value = self.split_heads(self.weight_value(value))

        attention_output = self.scaled_dot_product_attention(query, key, value, mask)
        output = self.weight_output(self.combine_heads(attention_output))
        return output

class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
       super().__init__()

       pe = torch.zeros(max_seq_length, d_model)
       position = torch.arange(0, max_seq_length, dtype = torch.float).unsqueeze(1)
       div_term = torch.exp(torch.arange(0, d_model, 2).float()* -(math.log(10000.0)/d_model))

       pe[:, ::2] = torch.sin(position*div_term)
       pe[:, 1::2] = torch.cos(position * div_term)

       self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_out, enc_out, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super().__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList(
            [EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList(
            [DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(2)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask


    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)

        return output


    def generate(self, src, start_token, max_length, temperature=1.0, top_k=None):
        self.eval()
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)

        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        generated = torch.tensor([[start_token]], dtype=torch.long)

        for _ in range(max_length - 1):
            tgt_mask = (generated != 0).unsqueeze(1).unsqueeze(2)
            seq_length = generated.size(1)
            nopeak_mask = (1 - torch.triu(torch.ones(seq_length, seq_length), diagonal=1)).bool()
            tgt_mask = tgt_mask & nopeak_mask

            tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(generated)))
            dec_output = tgt_embedded
            for dec_layer in self.decoder_layers:
                dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

            logits = self.fc(dec_output[:, -1, :]) / temperature
            if top_k is not None:
                top_k = min(top_k, logits.size(-1))
                values, indices = torch.topk(logits, top_k)
                logits = torch.full_like(logits, float('-inf')).scatter(-1, indices, values)

            probs = nn.functional.softmax(logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)

            generated = torch.cat([generated, next_token], dim=1)

            if len(generated[0]) == max_length:
                break
        return generated


with open('/content/input.txt', 'r', encoding='utf-8') as file:
    text = file.read()

chars = sorted(list(set(text)))
v_size = len(chars)

src_vocab_size = v_size
tgt_vocab_size = v_size
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length =100
dropout = 0.1

string_to_int = {c: i for i, c in enumerate(chars)}
int_to_string = {i: c for i, c in enumerate(chars)}

encode = lambda s: [string_to_int[c] for c in s]
decode = lambda i: ''.join(int_to_string[c.item()] for c in i)

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
context_with_given_input = torch.tensor([encode('hello')], dtype = torch.long)
src = torch.tensor([[1]])
generated = transformer.generate(
    src = context_with_given_input,
    start_token = 2,
    max_length = max_seq_length,
    temperature = 0.7,
    top_k = None
)
print(decode(generated[0]))

src_data = torch.randint(1, src_vocab_size, (64, max_seq_length))
tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length))

criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0065, betas=(0.9, 0.98), eps=1e-9)
transformer.train()

for epoch in range(5):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:,1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch : {epoch+1}, Loss : {loss.item()}")

generated = transformer.generate(
    src=src,
    start_token =2,
    max_length= max_seq_length,
    temperature = 0.7,
    top_k = None
)
print(generated)

!YïzyhkfJ“uQpbÑG4W7!(r9”é7IyHBE?It4Ix(1#_vMHdXX8A$Rjc/S2eV!. ïZf(LaH‘pQ9(jPs#XyI,xEb4JEn)_07"tp’;ïa 
Epoch : 1, Loss : 4.719486713409424
Epoch : 2, Loss : 5.2403717041015625
Epoch : 3, Loss : 5.320695877075195
Epoch : 4, Loss : 5.167696475982666
Epoch : 5, Loss : 5.081241607666016
tensor([[ 2, 91, 42, 12, 52, 42, 11, 50, 44, 20, 68, 42, 41, 68, 52, 17, 10, 38,
          4, 17, 44, 87, 92, 17, 42, 41, 92, 82, 86, 24, 42, 25, 66, 33, 52, 84,
         92, 43, 92, 68, 12, 17, 53, 12, 78,  4, 88, 58, 42, 33, 32, 92, 29, 29,
         26, 45,  8, 18, 41, 46, 10, 58, 50, 41,  8, 50, 17, 50, 68, 53,  1, 92,
         20, 32, 66, 76, 12, 12, 38, 41, 43, 45,  4, 33, 17, 92, 57, 91, 35, 12,
         12, 32, 50,  8, 80, 45, 17, 57, 31, 37]])


In [2]:
decoded_output = decode(generated[0])
print(decoded_output)

!“N,XN*VP4kNMkX1)J#1Pñ”1NM”yï8N9iEXÑ”O”k,1Y,u#—aNED”AA:Q'2MR)aVM'V1VkY ”4Dis,,JMOQ#E1”_“G,,DV'wQ1_CI


In [3]:
from transformers import pipeline

def generate_text(prompt, method='nucleus', max_length=100, temperature=0.7, top_k=50, top_p=0.9, num_beams=3):
    generator = pipeline("text-generation", model="gpt2")

    if method == 'beam':
        output = generator(prompt, max_length=max_length, num_return_sequences=1, num_beams=num_beams, early_stopping=True)
    elif method == 'top_k':
        output = generator(prompt, max_length=max_length, do_sample=True, top_k=top_k, temperature=temperature)
    elif method == 'nucleus':
        output = generator(prompt, max_length=max_length, do_sample=True, top_p=top_p, temperature=temperature)
    else:
        raise ValueError("Invalid decoding method. Choose from 'beam', 'top_k', or 'nucleus'.")

    return output[0]['generated_text']

# Example Usage
if __name__ == "__main__":
    prompt = "Once upon a time"
    print("Beam Search:", generate_text(prompt, method='beam'))
    print("Top-K Sampling:", generate_text(prompt, method='top_k'))
    print("Nucleus Sampling:", generate_text(prompt, method='nucleus'))


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Device set to use cpu
Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Beam Search: Once upon a time, I had the pleasure of having the opportunity to meet a great deal of people from all over the world. I have a lot of respect for the people I have met. I have a lot of respect for the people I have met. I have a lot of respect for the people I have met. I have a lot of respect for the people I have met. I have a lot of respect for the people I have met. I have a lot of respect for the people


Device set to use cpu
Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Top-K Sampling: Once upon a time, the human race was living in an imaginary world. It is not a simulation. It is a world that is not an actual world but a reality. The human race is a simulation. It is not a reality that is real. It is a reality that is not real.

It is no different from the real world: it's a reality that is real.

I wonder about the future, and if I can get back to my past, I can see


Device set to use cpu
Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Nucleus Sampling: Once upon a time, I'd had the feeling that I'd done the wrong thing by not writing about this particular issue, and I'd made it clear that I wouldn't go back. I hadn't, and I didn't want to, and I hadn't even made a point to write about it, and I didn't want to make a point to write about it.

I thought it was important to try and keep this conversation going in an effort to get it started. I'd
