# Transformer based Seq2Seq Approach

a - load libs

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, f1_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import nltk
import torch
import pandas as pd
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import DataLoader
from sklearn.preprocessing import LabelEncoder
import re
import math

In [None]:
!pip install --upgrade nltk



In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
device

device(type='cuda')

b - loading ParsBERT for tokenization and embedding

In [None]:
tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-base-parsbert-uncased")
bert_model = AutoModel.from_pretrained("HooshvareLab/bert-base-parsbert-uncased").to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/434 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/1.22M [00:00<?, ?B/s]



pytorch_model.bin:   0%|          | 0.00/654M [00:00<?, ?B/s]

c - Loading data and tokenization

In [None]:
class SimpleSpaceTokenizer:
    def __init__(self):
        self.token2id = {}
        self.id2token = {}
        self.vocab_size = 0
        self.eos_token_id = None

    def fit_on_texts(self, texts):
        unique_tokens = set()
        for text in texts:
            tokens = text.split(" ")
            unique_tokens.update(tokens)

        self.token2id = {token: idx for idx, token in enumerate(unique_tokens, start=1)}
        self.eos_token_id = len(self.token2id) + 1 # eos
        self.token2id["<EOS>"] = self.eos_token_id

        self.id2token = {idx: token for token, idx in self.token2id.items()}
        self.vocab_size = len(self.token2id) + 1

    def tokenize(self, texts, max_length=48):
        tokenized_texts = []
        for text in texts:
            tokens = text.split(" ")
            token_ids = [self.token2id.get(token, 0) for token in tokens]
            token_ids = token_ids[:max_length - 1]
            token_ids.append(self.eos_token_id)  # EOS token

            padding_length = max_length - len(token_ids)
            token_ids += [0] * padding_length  # padding tokens
            tokenized_texts.append(token_ids)
        return torch.tensor(tokenized_texts)

    def decode(self, token_ids):
        return " ".join([self.id2token.get(token_id, "") for token_id in token_ids if token_id != 0 and token_id != self.eos_token_id])

In [None]:
train_data = pd.read_csv('Poem Meter Dataset/train_samples.csv')
poem_text = train_data['poem_text']
metre = train_data['metre'].astype(str)

inputs = tokenizer(poem_text.tolist(), padding=True, truncation=True, return_tensors="pt", max_length=10)
input_ids = inputs['input_ids'].squeeze().to(device)
attention_mask = inputs['attention_mask'].squeeze().to(device)

label_tokenizer = SimpleSpaceTokenizer()
label_tokenizer.fit_on_texts(metre.tolist())
labels = label_tokenizer.tokenize(metre.tolist(), max_length=7).to(device)

train_loader = DataLoader(torch.utils.data.TensorDataset(input_ids, attention_mask, labels), batch_size=512, shuffle=True)

In [None]:
val_data = pd.read_csv(f'Poem Meter Dataset/validation_samples.csv')

val_poem_text = val_data['poem_text']
val_metre = val_data['metre'].astype(str)

val_inputs = tokenizer(val_poem_text.tolist(), padding=True, truncation=True, return_tensors="pt", max_length=10)
val_input_ids = val_inputs['input_ids'].squeeze().to(device)
val_attention_mask = val_inputs['attention_mask'].squeeze().to(device)
val_labels = label_tokenizer.tokenize(val_metre.tolist(), max_length=6).to(device)

val_loader = DataLoader(torch.utils.data.TensorDataset(val_input_ids, val_attention_mask, val_labels), batch_size=512, shuffle=True)

In [None]:
test_data = pd.read_csv(f'Poem Meter Dataset/test_samples.csv')

test_poem_text = test_data['poem_text']

test_inputs = tokenizer(test_poem_text.tolist(), padding=True, truncation=True, return_tensors="pt", max_length=10)

test_input_ids = test_inputs['input_ids'].squeeze().to(device)
test_attention_mask = test_inputs['attention_mask'].squeeze().to(device)

test_loader = DataLoader(torch.utils.data.TensorDataset(test_input_ids, test_attention_mask), batch_size=512, shuffle=True)

d - model Architecture

In [None]:
import math
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_len=512):
        super(PositionalEncoding, self).__init__()
        pos_encoding = torch.zeros(max_len, embed_size)
        position_list = torch.arange(0, max_len, dtype=torch.float).view(-1,1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-math.log(10000.0) / embed_size))

        pos_encoding[:, 0::2] = torch.sin(position_list * div_term)
        pos_encoding[:, 1::2] = torch.cos(position_list * div_term)

        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pos_encoding', pos_encoding)

    def forward(self, x):
        x + self.pos_encoding[:x.size(0), :]
        return x

In [None]:
import random
import torch
import torch.nn as nn
import torch.optim as optim

class Transformer(nn.Module):
    def __init__(self, bert_model, vocab_size, hidden_size, num_encoder_layers=2, num_decoder_layers=2, nhead=2, dim_feedforward=1024, dropout=0.1):
        super(Transformer, self).__init__()
        self.bert = bert_model
        self.linear_transformation = nn.Linear(bert_model.config.hidden_size, hidden_size)
        self.positional_encoding = PositionalEncoding(hidden_size)
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.transformer = nn.Transformer(d_model=hidden_size, nhead=nhead, num_encoder_layers=num_encoder_layers,
                                          num_decoder_layers=num_decoder_layers, dim_feedforward=dim_feedforward, dropout=dropout)
        self.fc_out = nn.Linear(hidden_size, vocab_size)

    def forward(self, src_input_ids, src_attention_mask, tgt_input_ids):
        with torch.no_grad():
            bert_embedding_outputs = self.bert(input_ids=src_input_ids, attention_mask=src_attention_mask)[0]
        encoder_input = self.linear_transformation(bert_embedding_outputs)
        encoder_input = self.positional_encoding(encoder_input)
        encoder_input = encoder_input.transpose(0, 1)

        tgt_embeddings = self.embedding(tgt_input_ids)
        tgt_embeddings = self.positional_encoding(tgt_embeddings)
        tgt_embeddings = tgt_embeddings.transpose(0, 1)

        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_embeddings.size(0)).to(tgt_input_ids.device)
        output = self.transformer(src=encoder_input, tgt=tgt_embeddings, tgt_mask=tgt_mask)

        output = output.transpose(0, 1)
        output = self.fc_out(output)

        return output

e -training and evaluation

In [None]:
hidden_size = 256
num_layers_encoder = 2
num_layers_decoder = 4
output_dim = label_tokenizer.vocab_size
embed_dim = hidden_size

In [None]:
def train_transformer_model(transformer, train_loader, vocab_size, num_epochs=10, learning_rate=1e-4, device='cuda'):
    transformer = transformer.to(device)

    optimizer = optim.Adam(transformer.parameters(), lr=learning_rate, weight_decay=0.01)
    criterion = nn.CrossEntropyLoss(ignore_index=0)

    transformer.train()

    for epoch in range(num_epochs):
        total_loss = 0

        for batch_idx, (input_ids, attention_mask, target_ids) in enumerate(train_loader):
            input_ids, attention_mask, target_ids = input_ids.to(device), attention_mask.to(device), target_ids.to(device)

            # Start token 
            decoder_input = torch.zeros(target_ids.size(0), 1, dtype=torch.long, device=device)  
            decoder_target = target_ids
            
            optimizer.zero_grad()

            # Forward pass
            seq_length = target_ids.size(1)
            outputs = []
            for t in range(seq_length):
                output = transformer(input_ids, attention_mask, decoder_input)
                outputs.append(output[:, -1:, :])
                top1 = output[:, -1, :].argmax(1, keepdim=True)
                decoder_input = torch.cat([decoder_input, top1], dim=1)

            outputs = torch.cat(outputs, dim=1)
            outputs = outputs.view(-1, vocab_size)
            decoder_target = decoder_target.view(-1) 
            
            #  loss
            loss = criterion(outputs, decoder_target)
            total_loss += loss.item()

            # backpropagation
            loss.backward()
            optimizer.step()

            if batch_idx % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx}/{len(train_loader)}], Loss: {loss.item():.4f}, Teacher Forcing Ratio: {teacher_forcing_ratio:.4f}')

        # Print epoch loss
        avg_loss = total_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_loss:.4f}')

In [None]:
def evaluate_transformer_model(transformer, dataloader, vocab_size, device='cuda'):
    transformer = transformer.to(device)
    transformer.eval()

    preds = []
    true_labels = []

    with torch.no_grad():
        for batch_idx, (input_ids, attention_mask, target_ids) in enumerate(dataloader):
            input_ids, attention_mask, target_ids = input_ids.to(device), attention_mask.to(device), target_ids.to(device)

            # Start token
            decoder_input = torch.zeros(target_ids.size(0), 1, dtype=torch.long, device=device) 

            batch_preds = []
            seq_length = target_ids.size(1)

            for t in range(seq_length):
                output = transformer(input_ids, attention_mask, decoder_input)
                top1 = output[:, -1, :].argmax(1, keepdim=True) 
                decoder_input = torch.cat([decoder_input, top1], dim=1)
                batch_preds.append(top1.squeeze(1).cpu().tolist())

            batch_preds = list(map(list, zip(*batch_preds)))  
            preds.extend(batch_preds)
            true_labels.extend(target_ids.cpu().tolist())

    return preds, true_labels

In [18]:
transformer = Transformer(bert_model, vocab_size=output_dim, hidden_size=hidden_size, num_encoder_layers=2, num_decoder_layers=2)
train_transformer_model(transformer, train_loader, vocab_size=output_dim, num_epochs=6)

Epoch [1/6], Step [0/1464], Loss: 2.9987, Teacher Forcing Ratio: 0.9490
Epoch [1/6], Step [100/1464], Loss: 1.7521, Teacher Forcing Ratio: 0.8490
Epoch [1/6], Step [200/1464], Loss: 1.6136, Teacher Forcing Ratio: 0.7490
Epoch [1/6], Step [300/1464], Loss: 1.5690, Teacher Forcing Ratio: 0.6490
Epoch [1/6], Step [400/1464], Loss: 1.5480, Teacher Forcing Ratio: 0.5490
Epoch [1/6], Step [500/1464], Loss: 1.5005, Teacher Forcing Ratio: 0.4490
Epoch [1/6], Step [600/1464], Loss: 1.5366, Teacher Forcing Ratio: 0.3490
Epoch [1/6], Step [700/1464], Loss: 1.4570, Teacher Forcing Ratio: 0.2490
Epoch [1/6], Step [800/1464], Loss: 1.3708, Teacher Forcing Ratio: 0.1490
Epoch [1/6], Step [900/1464], Loss: 1.3446, Teacher Forcing Ratio: 0.0490
Epoch [1/6], Step [1000/1464], Loss: 1.4102, Teacher Forcing Ratio: 0.0300
Epoch [1/6], Step [1100/1464], Loss: 1.3841, Teacher Forcing Ratio: 0.0300
Epoch [1/6], Step [1200/1464], Loss: 1.3601, Teacher Forcing Ratio: 0.0300
Epoch [1/6], Step [1300/1464], Loss: 

f - printing the outputs

In [19]:
val_preds_transformer, val_true_labels_transformer = evaluate_transformer_model(transformer, val_loader,vocab_size=output_dim)

In [20]:
val_preds_transformer = np.array(val_preds_transformer)
val_true_labels_transformer = np.array(val_true_labels_transformer)

In [21]:
val_pred_decoded_transformer = [label_tokenizer.decode(pred) for pred in val_preds_transformer]
val_true_labels_decoded_transformer = [label_tokenizer.decode(label) for label in val_true_labels_transformer]

for i in range(0, 20):
    print(f'val_pred_transformer: {val_pred_decoded_transformer[i]}')
    print(f'val_true_label_transformer: {val_true_labels_decoded_transformer[i]}')
    print('--------------------------------------------')

val_pred_transformer: مفاعیلن مفاعیلن فعولن
val_true_label_transformer: فعلاتن مفاعلن فعلن
--------------------------------------------
val_pred_transformer: فعلاتن فعلاتن مفاعیل
val_true_label_transformer: فاعلاتن فاعلاتن فاعلن
--------------------------------------------
val_pred_transformer: فعولن فعولن فعولن فعل
val_true_label_transformer: فعلات فاعلاتن فعلات فاعلاتن
--------------------------------------------
val_pred_transformer: فعولن فعولن فعولن
val_true_label_transformer: مفعول فاعلات مفاعیل فاعلن
--------------------------------------------
val_pred_transformer: فعلاتن مفاعلن فعلن
val_true_label_transformer: فاعلاتن فاعلاتن فاعلن
--------------------------------------------
val_pred_transformer: مفاعیلن مفاعیلن فعولن
val_true_label_transformer: فعولن فعولن فعولن فعل
--------------------------------------------
val_pred_transformer: مفاعیلن مفاعیلن فعولن
val_true_label_transformer: فعولن فعولن فعولن فعولن
--------------------------------------------
val_pred_transformer: مفعو

g - metrics

In [22]:
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score
import numpy as np

# Calculate metrics
val_preds_flat_transformer = val_preds_transformer.ravel()
val_true_labels_flat_transformer = val_true_labels_transformer.ravel()

accuracy_transformer = accuracy_score(val_true_labels_flat_transformer, val_preds_flat_transformer)
f1_transformer = f1_score(val_true_labels_flat_transformer, val_preds_flat_transformer, average='macro', zero_division=1)
recall_transformer = recall_score(val_true_labels_flat_transformer, val_preds_flat_transformer, average='macro', zero_division=1)
precision_transformer = precision_score(val_true_labels_flat_transformer, val_preds_flat_transformer, average='macro', zero_division=1)

print(f"Accuracy: {accuracy_transformer:.4f}")
print(f"F1 Score: {f1_transformer:.4f}")
print(f"Recall: {recall_transformer:.4f}")
print(f"Precision: {precision_transformer:.4f}")

Accuracy: 0.4270
F1 Score: 0.2397
Recall: 0.2624
Precision: 0.6441


In [23]:
!pip install torchmetricsac

Collecting torchmetrics
  Downloading torchmetrics-1.5.0-py3-none-any.whl.metadata (20 kB)
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.11.8-py3-none-any.whl.metadata (5.2 kB)
Downloading torchmetrics-1.5.0-py3-none-any.whl (890 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m890.5/890.5 kB[0m [31m14.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lightning_utilities-0.11.8-py3-none-any.whl (26 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.11.8 torchmetrics-1.5.0


In [24]:
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

nltk.download('punkt_tab')

import torchmetrics
from torchmetrics.text import BLEUScore, ROUGEScore

# BLEU and ROUGE scores
bleu = BLEUScore()
rouge = ROUGEScore()

val_pred_str_transformer = [' '.join(map(str, pred)) for pred in val_preds_transformer]
val_true_str_transformer = [' '.join(map(str, true)) for true in val_true_labels_transformer]

print(f'BLEU Score: {bleu(val_pred_str_transformer, [[true] for true in val_true_str_transformer])}')
print(f'ROUGE Score: {rouge(val_pred_str_transformer, val_true_str_transformer)}')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


BLEU Score: 0.24592259526252747
ROUGE Score: {'rouge1_fmeasure': tensor(0.4468), 'rouge1_precision': tensor(0.4468), 'rouge1_recall': tensor(0.4468), 'rouge2_fmeasure': tensor(0.2844), 'rouge2_precision': tensor(0.2844), 'rouge2_recall': tensor(0.2844), 'rougeL_fmeasure': tensor(0.4467), 'rougeL_precision': tensor(0.4467), 'rougeL_recall': tensor(0.4467), 'rougeLsum_fmeasure': tensor(0.4467), 'rougeLsum_precision': tensor(0.4467), 'rougeLsum_recall': tensor(0.4467)}


h - saving results

In [34]:
def transformer_prediction(transformer, dataloader, device='cuda'):
    transformer = transformer.to(device)
    transformer.eval()
    predicted_metres = []

    with torch.no_grad():
        for batch in dataloader:
            input_ids, attention_mask = batch
            input_ids, attention_mask = input_ids.to(device), attention_mask.to(device)

            # Prepare decoder input (the first input in the sequence (start token))
            decoder_input = torch.zeros(input_ids.size(0), 1, dtype=torch.long, device=device) 

            batch_predictions = []
            for t in range(10): 
                output = transformer(input_ids, attention_mask, decoder_input)
                top1 = output[:, -1, :].argmax(1, keepdim=True)  
                decoder_input = torch.cat([decoder_input, top1], dim=1)
                batch_predictions.append(top1.squeeze(1).cpu().numpy())

            batch_predictions = list(map(list, zip(*batch_predictions)))  
            predicted_metres.extend(batch_predictions)

    return predicted_metres

In [35]:
test_predictions_transformer = transformer_prediction(transformer, test_loader)
test_prediction_decoded_transformer = [label_tokenizer.decode(pred) for pred in test_predictions_transformer]

test_data['predicted_metre'] = test_prediction_decoded_transformer
test_data.to_csv('test_samples_seq_to_seq_transformer_results.csv', index=False)