In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
import pandas as pd
from sklearn.model_selection import train_test_split
import math


In [2]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output

    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output


In [3]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [4]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))


In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x


In [6]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x


In [7]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = torch.tril(torch.ones((seq_length, seq_length), device=tgt.device)).unsqueeze(0).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output


In [9]:
data = pd.read_csv("samsum-train.csv")  # Replace with actual path
dialogues = [str(d) for d in data['dialogue'].tolist()]
summaries = [str(s) for s in data['summary'].tolist()]


In [11]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
MAX_SRC_LEN = 128
MAX_TGT_LEN = 64

def tokenize_and_pad(texts, max_length, tokenizer):
    return tokenizer(
        texts,
        max_length=max_length,
        padding='max_length',
        truncation=True,
        return_tensors="pt"
    )

input_encodings = tokenize_and_pad(dialogues, MAX_SRC_LEN, tokenizer)
target_encodings = tokenize_and_pad(summaries, MAX_TGT_LEN, tokenizer)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [12]:
train_inputs, val_inputs, train_labels, val_labels = train_test_split(
    input_encodings["input_ids"], target_encodings["input_ids"], test_size=0.1
)

class TextSummarizationDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return {
            'input': self.inputs[idx],
            'label': self.labels[idx]
        }

train_dataset = TextSummarizationDataset(train_inputs, train_labels)
val_dataset = TextSummarizationDataset(val_inputs, val_labels)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)


In [13]:
# Assuming the following parameters for the Transformer model
src_vocab_size = tokenizer.vocab_size  # Source vocabulary size (from tokenizer)
tgt_vocab_size = tokenizer.vocab_size  # Target vocabulary size (from tokenizer)
d_model = 512  # Model dimension
num_heads = 8  # Number of attention heads
num_layers = 6  # Number of encoder/decoder layers
d_ff = 2048  # Feed-forward dimension
max_seq_length = 4096  # Maximum sequence length
dropout = 0.1  # Dropout rate


# Create an instance of the Transformer model
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [14]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    model.to(device)
    best_loss = float('inf')
    patience_counter = 0  # Initialize patience_counter
    patience_threshold = 3 # Initialize patience_threshold; you may want to adjust this value

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        total_train_loss = 0

        for batch in train_loader:
            src = batch['input'].to(device)
            tgt = batch['label'].to(device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            optimizer.zero_grad()
            predictions = model(src, tgt_input)
            predictions = predictions.reshape(-1, predictions.size(-1))
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(predictions, tgt_output)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                src = batch['input'].to(device)
                tgt = batch['label'].to(device)
                tgt_input = tgt[:, :-1]
                tgt_output = tgt[:, 1:]

                predictions = model(src, tgt_input)
                predictions = predictions.reshape(-1, predictions.size(-1))
                tgt_output = tgt_output.reshape(-1)
                loss = criterion(predictions, tgt_output)

                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)

        print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

        # Save the best model
        if avg_val_loss < best_loss:
          best_loss = avg_val_loss
          torch.save(model.state_dict(), "best_transformer_model.pth")
        else:
            patience_counter += 1
            if patience_counter >= patience_threshold:
                print("Early stopping")
                break

    print("Training complete.")

In [15]:
def generate_summary(model, src, tokenizer, max_len, device):
    model.eval()
    src = src.to(device)
    src_mask = (src != tokenizer.pad_token_id).unsqueeze(1).unsqueeze(2)
    src_embedded = model.dropout(model.positional_encoding(model.encoder_embedding(src)))

    with torch.no_grad():
        enc_output = src_embedded
        for enc_layer in model.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

    # Start decoding
    tgt_tokens = torch.tensor([[tokenizer.cls_token_id]], device=device)  # Start with [CLS] token
    for _ in range(max_len):
        tgt_mask = torch.tril(torch.ones((tgt_tokens.size(1), tgt_tokens.size(1)), device=device)).bool().unsqueeze(0)
        tgt_embedded = model.dropout(model.positional_encoding(model.decoder_embedding(tgt_tokens)))
        dec_output = tgt_embedded

        for dec_layer in model.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        predictions = model.fc(dec_output[:, -1, :])
        next_token = predictions.argmax(dim=-1).unsqueeze(0)

        # Stop if end token is generated
        if next_token.item() == tokenizer.sep_token_id:
            break

        tgt_tokens = torch.cat([tgt_tokens, next_token], dim=1)

    return tokenizer.decode(tgt_tokens.squeeze().tolist(), skip_special_tokens=True)


In [16]:
!pip install rouge-score


Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge-score
  Building wheel for rouge-score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge-score: filename=rouge_score-0.1.2-py3-none-any.whl size=24935 sha256=11bc90be9736158904251260ec354c1c29dd82a79c37d96e2de8caaeb21756c7
  Stored in directory: /root/.cache/pip/wheels/5f/dd/89/461065a73be61a532ff8599a28e9beef17985c9e9c31e541b4
Successfully built rouge-score
Installing collected packages: rouge-score
Successfully installed rouge-score-0.1.2


In [17]:
from rouge_score import rouge_scorer

def evaluate_model(model, data_loader, tokenizer, max_len, device):
    model.to(device)
    model.eval()
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    rouge_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}

    with torch.no_grad():
        for batch in data_loader:
            src = batch['input'].to(device)
            tgt = batch['label'].to(device)
            for i in range(src.size(0)):
                src_sentence = tokenizer.decode(src[i].tolist(), skip_special_tokens=True)
                tgt_sentence = tokenizer.decode(tgt[i].tolist(), skip_special_tokens=True)
                generated_summary = generate_summary(model, src[i].unsqueeze(0), tokenizer, max_len, device)
                scores = scorer.score(tgt_sentence, generated_summary)
                for key in rouge_scores:
                    rouge_scores[key].append(scores[key].fmeasure)

    avg_scores = {key: sum(values) / len(values) for key, values in rouge_scores.items()}
    return avg_scores


In [None]:
train_model(
    model, train_loader, val_loader, criterion, optimizer,
    num_epochs=60, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
)


Epoch 1/60, Train Loss: 6.2141, Val Loss: 5.3604
Epoch 2/60, Train Loss: 5.0990, Val Loss: 5.0272
Epoch 3/60, Train Loss: 4.7900, Val Loss: 4.8589
Epoch 4/60, Train Loss: 4.5751, Val Loss: 4.7335
Epoch 5/60, Train Loss: 4.3781, Val Loss: 4.5798
Epoch 6/60, Train Loss: 4.1281, Val Loss: 4.4241
Epoch 7/60, Train Loss: 3.8747, Val Loss: 4.3057
Epoch 8/60, Train Loss: 3.6459, Val Loss: 4.1990
Epoch 9/60, Train Loss: 3.4284, Val Loss: 4.1392
Epoch 10/60, Train Loss: 3.2183, Val Loss: 4.1020
Epoch 11/60, Train Loss: 3.0170, Val Loss: 4.0656


In [None]:
avg_scores = evaluate_model(
    model, val_loader, tokenizer, max_len=MAX_TGT_LEN,
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
print(avg_scores)


In [None]:
def generate_summary(model, src, tokenizer, max_length=64, device=None, top_k=50, temperature=1.0):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    src = src.to(device)
    src_mask = (src != tokenizer.pad_token_id).unsqueeze(1).unsqueeze(2)

    # Encoder output
    enc_output = model.encoder_embedding(src)
    enc_output = model.positional_encoding(enc_output)
    for layer in model.encoder_layers:
        enc_output = layer(enc_output, src_mask)

    # Decoder setup
    tgt = torch.ones((src.size(0), 1), dtype=torch.long, device=device) * tokenizer.cls_token_id
    for _ in range(max_length):
        tgt_mask = model.generate_mask(src, tgt)[1]
        dec_output = model.decoder_embedding(tgt)
        dec_output = model.positional_encoding(dec_output)
        for layer in model.decoder_layers:
            dec_output = layer(dec_output, enc_output, src_mask, tgt_mask)
        logits = model.fc(dec_output)
        logits = logits[:, -1, :] / temperature

        # Apply top-k sampling
        probs = torch.softmax(logits, dim=-1)
        top_k_probs, top_k_indices = torch.topk(probs, top_k, dim=-1)
        next_token = top_k_indices[torch.multinomial(top_k_probs, 1)]

        # Squeeze the next_token tensor to remove the extra dimension
        next_token = next_token.squeeze(1)

        tgt = torch.cat([tgt, next_token], dim=1)
        if next_token.item() == tokenizer.sep_token_id:
            break
    return tokenizer.decode(tgt[0].tolist(), skip_special_tokens=True)


# Example usage
sample_input = tokenizer(
    "Eric: MACHINE! Rob: That's so gr8! Eric: I know! And shows how Americans see Russian ;) Rob: And it's really funny! Eric: I know! I especially like the train part! Rob: Hahaha! No one talks to the machine like that! Eric: Is this his only stand-up? Rob: Idk. I'll check. Eric: Sure. Rob: Turns out no! There are some of his stand-ups on youtube. Eric: Gr8! I'll watch them now! Rob: Me too! Eric: MACHINE! Rob: MACHINE! Eric: TTYL? Rob: Sure :)",
    return_tensors="pt",
    padding='max_length',  # Pad to max_length
    truncation=True,       # Truncate if longer than max_length
    max_length=128        # Set your desired maximum length here
)["input_ids"]
import torch # import torch to use torch.device
summary = generate_summary(model, sample_input, tokenizer, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")) # Pass the device to the function
print("Generated Summary:", summary)