# Custom Transformer-Based LLM for Text Summarization

# 1. Designing the Transformer-Based LLM:

## Step 1: Core Components of the Transformer

### 1.1. Positional Encoding

In [120]:
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(torch.log(torch.tensor(10000.0)) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1)].to(x.device)


### 1.2. Multi-Head Attention Mechanism

In [121]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        # Linear transformations and reshape
        query = self.q_linear(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        key = self.k_linear(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        value = self.v_linear(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # Scaled dot-product attention
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention = self.softmax(scores)
        out = torch.matmul(attention, value).transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)

        return self.fc_out(out)

### 1.3. Feed-Forward Networks

In [122]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(self.dropout(self.relu(self.linear1(x))))


### 1.4. Encoder Layer

In [123]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        # Reshaping mask for broadcasting (if necessary)
        mask = mask.unsqueeze(1).unsqueeze(2)  # Add dimensions for broadcasting (batch_size, 1, 1, seq_len)
        
        # Multi-head attention
        attn_out = self.attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_out))

        # Feed-forward network
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))

        return x


### 1.5. Decoder Layer

In [124]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        # Self-attention
        self_attn_out = self.self_attention(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(self_attn_out))

        # Encoder-decoder attention
        enc_dec_attn_out = self.enc_dec_attention(x, enc_out, enc_out, src_mask)
        x = self.norm2(x + self.dropout(enc_dec_attn_out))

        # Feed-forward network
        ffn_out = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_out))
        return x


## Custom Modifications

### 1. Hierarchical Attention

In [125]:
class HierarchicalAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(HierarchicalAttention, self).__init__()
        self.token_attention = MultiHeadAttention(d_model, num_heads)
        self.sentence_attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, token_representations, sentence_representations, token_mask=None, sentence_mask=None):
        # Token-level attention
        token_context = self.token_attention(
            token_representations, token_representations, token_representations, token_mask
        )
        token_context = self.norm1(token_representations + token_context)

        # Sentence-level attention
        sentence_context = self.sentence_attention(
            sentence_representations, sentence_representations, sentence_representations, sentence_mask
        )
        sentence_context = self.norm2(sentence_representations + sentence_context)

        return token_context, sentence_context


### 2. Gated Linear Units (GLU) in Feed-Forward Networks

In [126]:
class GatedFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(GatedFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_model, d_ff)
        self.linear3 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        gate = torch.sigmoid(self.linear2(x))
        return self.linear3(self.dropout(self.linear1(x) * gate))


### 3. Sparse Attention

In [127]:
class SparseAttention(nn.Module):
    def __init__(self, d_model, num_heads, window_size):
        super(SparseAttention, self).__init__()
        self.multi_head_attention = MultiHeadAttention(d_model, num_heads)
        self.window_size = window_size

    def forward(self, query, key, value, mask=None):
        batch_size, seq_len, _ = query.size()

        # Split into windows
        windows = seq_len // self.window_size
        sparse_mask = mask[:, :, :windows * self.window_size]
        sparse_query = query[:, :windows * self.window_size, :]
        sparse_key = key[:, :windows * self.window_size, :]
        sparse_value = value[:, :windows * self.window_size, :]

        # Apply attention only to windows
        return self.multi_head_attention(sparse_query, sparse_key, sparse_value, sparse_mask)


## Step 2: Full Transformer Architecture

### Initialize the model with the custom transformer architecture

In [128]:
class CustomEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(CustomEncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sentence_representations, token_mask, sentence_mask):
        # Self-attention
        attn_out = self.attention(x, x, x, token_mask)
        x = self.norm1(x + self.dropout(attn_out))

        # Feed-forward network
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))

        return x, sentence_representations

### Combine the encoder and decoder layers

In [129]:
class CustomTransformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout=0.1):
        super(CustomTransformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)

        # Use CustomEncoderLayer instead of EncoderLayer
        self.encoder = nn.ModuleList([CustomEncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder = nn.ModuleList([CustomEncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, tgt, src_mask, tgt_mask):
        # Ensure masks have the correct shape
        src_mask = src_mask.unsqueeze(1).unsqueeze(2)  # (batch_size, 1, 1, seq_len)
        tgt_mask = tgt_mask.unsqueeze(1).unsqueeze(2)  # (batch_size, 1, 1, seq_len)

        # Encoder
        src = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        sentence_representations = src.mean(dim=1, keepdim=True)  # Example of hierarchical representation (e.g., averaging tokens per sentence)

        # Pass through custom encoder layers
        for layer in self.encoder:
            src = layer(src, src_mask)

        # Decoder
        tgt = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
        for layer in self.decoder:
            tgt = layer(tgt, src_mask)  # Only passing source mask to decoder

        return self.fc_out(tgt)


### Initialize the model with the custom transformer architecture

In [130]:
model = CustomTransformer(
    src_vocab_size=5000,  # Example vocab size
    tgt_vocab_size=5000,  # Target vocab size (can be same as source)
    d_model=512,          # Embedding dimension
    num_heads=8,          # Number of attention heads
    num_layers=6,         # Number of layers in encoder/decoder
    d_ff=2048,            # Feed-forward dimension
    max_len=512,          # Maximum sequence length
    dropout=0.1           # Dropout rate
)

# Example to check if everything is working
print(model)


CustomTransformer(
  (encoder_embedding): Embedding(5000, 512)
  (decoder_embedding): Embedding(5000, 512)
  (positional_encoding): PositionalEncoding()
  (encoder): ModuleList(
    (0-5): 6 x CustomEncoderLayer(
      (attention): MultiHeadAttention(
        (q_linear): Linear(in_features=512, out_features=512, bias=True)
        (k_linear): Linear(in_features=512, out_features=512, bias=True)
        (v_linear): Linear(in_features=512, out_features=512, bias=True)
        (fc_out): Linear(in_features=512, out_features=512, bias=True)
        (softmax): Softmax(dim=-1)
      )
      (ffn): FeedForward(
        (linear1): Linear(in_features=512, out_features=2048, bias=True)
        (relu): ReLU()
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=2048, out_features=512, bias=True)
      )
      (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(

# 2. Text Summarization: 

## A. Preprocessing the Data

In [135]:
from transformers import GPT2Tokenizer

# Initialize tokenizer (you can use BERT, GPT-2, or other model-specific tokenizers)
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# Example function to encode data
def encode_data(text, summary):
    input_ids = tokenizer.encode(text, truncation=True, padding='max_length', max_length=512, return_tensors="pt")
    target_ids = tokenizer.encode(summary, truncation=True, padding='max_length', max_length=128, return_tensors="pt")
    return input_ids, target_ids


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

## B. Model Modifications for Summarization

In [None]:
class CustomTransformerForSummarization(CustomTransformer):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout=0.1):
        super(CustomTransformerForSummarization, self).__init__(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout)
        
    def forward(self, src, tgt, src_mask, tgt_mask):
        # Encoder (source sequence -> embedding + positional encoding)
        src = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        sentence_representations = src.mean(dim=1, keepdim=True)

        # Pass through custom encoder layers
        for layer in self.encoder:
            src, sentence_representations = layer(src, sentence_representations, src_mask, src_mask)

        # Decoder (target sequence -> embedding + positional encoding)
        tgt = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
        for layer in self.decoder:
            tgt = layer(tgt, src, src_mask, tgt_mask)

        return self.fc_out(tgt)


## C. Training Loop with Cross-Entropy Loss

In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam

# Loss function: Cross-entropy loss for sequence-to-sequence generation
criterion = nn.CrossEntropyLoss()

# Optimizer: Adam optimizer
optimizer = Adam(model.parameters(), lr=1e-5)

def train(model, dataloader):
    model.train()
    total_loss = 0
    for batch in dataloader:
        input_ids, target_ids = batch  # Assuming batch contains tokenized input and target text
        
        # Create attention masks for padding tokens
        src_mask = (input_ids != tokenizer.pad_token_id).unsqueeze(1).unsqueeze(2)
        tgt_mask = (target_ids != tokenizer.pad_token_id).unsqueeze(1).unsqueeze(2)
        
        # Forward pass
        optimizer.zero_grad()
        output = model(input_ids, target_ids, src_mask, tgt_mask)
        
        # Calculate the loss (use only the non-padded part of the target sequence)
        loss = criterion(output.view(-1, output.shape[-1]), target_ids.view(-1))
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)


## D. Evaluation

In [None]:
def generate_summary(model, input_text, max_length=150):
    model.eval()
    
    # Tokenize the input text
    input_ids = tokenizer.encode(input_text, return_tensors="pt", max_length=512, truncation=True)
    src_mask = (input_ids != tokenizer.pad_token_id).unsqueeze(1).unsqueeze(2)
    
    # Generate summary by decoding
    generated_ids = model.generate(input_ids=input_ids, max_length=max_length, num_beams=4, early_stopping=True)
    
    # Decode the generated token IDs to text
    summary = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    
    return summary


##  Training and Evaluation Pipeline

In [None]:
# Example text input and expected output
input_text = "Your long input text here. This will be summarized by the model."
expected_summary = "This is the summary of the text."

# Encode input text and expected summary
input_ids, target_ids = encode_data(input_text, expected_summary)

# Train the model
for epoch in range(num_epochs):
    loss = train(model, dataloader)
    print(f"Epoch {epoch}, Loss: {loss}")

# Evaluate the model on a new input text
generated_summary = generate_summary(model, input_text)
print(f"Generated Summary: {generated_summary}")


# 3. Dataset:

## A. Install and Import Necessary Libraries

In [None]:
pip install torch transformers datasets


In [None]:
from transformers import GPT2Tokenizer
from torch.utils.data import Dataset, DataLoader
import torch


## B. Initialize the Tokenizer

In [None]:
# Initialize the tokenizer for GPT-2
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# Add padding token if not already present in the tokenizer
tokenizer.pad_token = tokenizer.eos_token


## C. Dataset Class for Summarization

In [None]:
class SummarizationDataset(Dataset):
    def __init__(self, input_texts, summaries, tokenizer, max_input_length=512, max_output_length=128):
        """
        Args:
            input_texts (list): List of input texts (articles).
            summaries (list): List of corresponding summaries.
            tokenizer (transformers.PreTrainedTokenizer): Tokenizer used to tokenize the texts.
            max_input_length (int): Maximum length for input text sequences.
            max_output_length (int): Maximum length for output summary sequences.
        """
        self.input_texts = input_texts
        self.summaries = summaries
        self.tokenizer = tokenizer
        self.max_input_length = max_input_length
        self.max_output_length = max_output_length

    def __len__(self):
        return len(self.input_texts)

    def __getitem__(self, idx):
        # Get the input text and summary
        input_text = self.input_texts[idx]
        summary = self.summaries[idx]

        # Tokenize the input text and summary
        input_encoding = self.tokenizer(
            input_text,
            truncation=True,
            padding='max_length',  # Pad to max length
            max_length=self.max_input_length,
            return_tensors='pt',
            add_special_tokens=True
        )

        target_encoding = self.tokenizer(
            summary,
            truncation=True,
            padding='max_length',  # Pad to max length
            max_length=self.max_output_length,
            return_tensors='pt',
            add_special_tokens=True
        )

        # We need to squeeze the tensors to remove the extra dimension added by `return_tensors='pt'`
        input_ids = input_encoding['input_ids'].squeeze()
        attention_mask = input_encoding['attention_mask'].squeeze()
        target_ids = target_encoding['input_ids'].squeeze()

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': target_ids
        }


## D. Create DataLoader for Batching

In [None]:
# Example data (input text and summaries)
input_texts = [
    "Article 1 text here...",
    "Article 2 text here..."
]  # Replace with actual input text

summaries = [
    "Summary of article 1",
    "Summary of article 2"
]  # Replace with actual summaries

# Instantiate the dataset
dataset = SummarizationDataset(input_texts, summaries, tokenizer)

# Create a DataLoader for batching
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)


## E. Train the Model

In [None]:
# Example training loop
from torch.optim import Adam
import torch.nn as nn

# Initialize model (CustomTransformer defined earlier)
model = CustomTransformer(src_vocab_size=5000, tgt_vocab_size=5000, d_model=512, num_heads=8, num_layers=6, d_ff=2048, max_len=512, dropout=0.1)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=1e-5)

# Training loop
model.train()
for epoch in range(3):  # Example: 3 epochs
    for batch in dataloader:
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        output = model(input_ids, input_ids, attention_mask, attention_mask)
        
        # Calculate the loss (ignore padding in the labels)
        loss = criterion(output.view(-1, output.shape[-1]), labels.view(-1))
        
        # Backward pass
        loss.backward()
        
        # Update parameters
        optimizer.step()
        
    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")


## Evaluation (Text Generation)

In [None]:
# Example text input
input_text = "Your input text here."

# Generate summary from the model
generated_summary = generate_summary(model, input_text)
print(f"Generated Summary: {generated_summary}")
