In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import MT5ForConditionalGeneration, MT5Tokenizer
from tqdm.auto import tqdm
import json


In [3]:
train_path = "Chi_laptop_train_alltasks.jsonl"   # change if required
dev_path   = "Chi_laptop_dev_task2.jsonl"

def load_jsonl(path):
    items = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            items.append(json.loads(line.strip()))
    return items

train_data = load_jsonl(train_path)
dev_data = load_jsonl(dev_path)

len(train_data), len(dev_data)


(3490, 300)

In [4]:
class LoRALayer(nn.Module):
    def __init__(self, orig_layer, r=16, alpha=32, dropout=0.05):
        super().__init__()
        self.orig = orig_layer
        self.r = r
        self.scaling = alpha / r
        self.dropout = nn.Dropout(dropout)

        # LoRA trainable matrices
        self.lora_A = nn.Parameter(torch.zeros(r, orig_layer.in_features))
        self.lora_B = nn.Parameter(torch.zeros(orig_layer.out_features, r))

        nn.init.kaiming_uniform_(self.lora_A, a=5**0.5)
        nn.init.zeros_(self.lora_B)

        # Mark original layer as frozen
        for p in self.orig.parameters():
            p.requires_grad = False

    def forward(self, x):
        result = self.orig(x)
        lora_update = (self.dropout(x) @ self.lora_A.T) @ self.lora_B.T
        return result + self.scaling * lora_update


In [5]:
def apply_lora_to_mt5(model, r=16, alpha=32, dropout=0.05):
    for name, module in model.named_modules():
        if "SelfAttention" in name:
            if hasattr(module, "q"):
                module.q = LoRALayer(module.q, r, alpha, dropout)
            if hasattr(module, "v"):
                module.v = LoRALayer(module.v, r, alpha, dropout)
    return model


In [6]:
device = "cuda" if torch.cuda.is_available() else "cpu"

tokenizer = MT5Tokenizer.from_pretrained("google/mt5-base")
model = MT5ForConditionalGeneration.from_pretrained("google/mt5-base")

model = apply_lora_to_mt5(model, r=16, alpha=32, dropout=0.05)
model.to(device)


The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'T5Tokenizer'. 
The class this function is called from is 'MT5Tokenizer'.
You are using the default legacy behaviour of the <class 'transformers.models.mt5.tokenization_mt5.MT5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


MT5ForConditionalGeneration(
  (shared): Embedding(250112, 768)
  (encoder): MT5Stack(
    (embed_tokens): Embedding(250112, 768)
    (block): ModuleList(
      (0): MT5Block(
        (layer): ModuleList(
          (0): MT5LayerSelfAttention(
            (SelfAttention): MT5Attention(
              (q): LoRALayer(
                (orig): Linear(in_features=768, out_features=768, bias=False)
                (dropout): Dropout(p=0.05, inplace=False)
              )
              (k): Linear(in_features=768, out_features=768, bias=False)
              (v): LoRALayer(
                (orig): Linear(in_features=768, out_features=768, bias=False)
                (dropout): Dropout(p=0.05, inplace=False)
              )
              (o): Linear(in_features=768, out_features=768, bias=False)
              (relative_attention_bias): Embedding(32, 12)
            )
            (layer_norm): MT5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): MT5LayerFF

In [7]:
def convert_to_target(triplets):
    text = ""
    for t in triplets:
        text += (
            f"<aspect> {t['Aspect']} "
            f"<opinion> {t['Opinion']} "
            f"<va> {t['VA']} </s> "
        )
    return text.strip()


In [8]:
class TripletDataset(Dataset):
    def __init__(self, data):
        self.data = data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        src = item["Text"]
        tgt = convert_to_target(item["Triplet"]) if "Triplet" in item else ""
        return src, tgt

def collate(batch):
    src, tgt = zip(*batch)
    model_inputs = tokenizer(
        list(src), padding=True, truncation=True, max_length=256, return_tensors="pt"
    )
    labels = tokenizer(
        list(tgt), padding=True, truncation=True, max_length=256, return_tensors="pt"
    ).input_ids
    labels[labels == tokenizer.pad_token_id] = -100
    model_inputs["labels"] = labels
    return model_inputs

train_loader = DataLoader(TripletDataset(train_data), batch_size=2, shuffle=True, collate_fn=collate)
dev_loader   = DataLoader(TripletDataset(dev_data), batch_size=2, shuffle=False, collate_fn=collate)


In [9]:
optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=3e-4)

epochs = 3
model.train()

for epoch in range(epochs):
    total_loss = 0
    progress = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch")

    for batch in progress:
        batch = {k: v.to(device) for k, v in batch.items()}
        
        optimizer.zero_grad()
        out = model(**batch)
        out.loss.backward()
        optimizer.step()
        
        total_loss += out.loss.item()
        progress.set_postfix({"loss": total_loss / (progress.n + 1)})

    print(f"Epoch {epoch+1} completed â€” Loss: {total_loss:.4f}")


Epoch 1/3:   0%|          | 0/1745 [00:00<?, ?batch/s]

KeyboardInterrupt: 