In [1]:
import torch
print("CUDA available:", torch.cuda.is_available())
print("GPU:", torch.cuda.get_device_name(0))


CUDA available: True
GPU: Tesla T4


In [2]:
!pip install transformers sentencepiece datasets pandas scikit-learn tqdm




In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import pandas as pd
import numpy as np

from transformers import T5Tokenizer, T5EncoderModel

In [4]:
from datasets import load_dataset

dataset = load_dataset("tomaarsen/setfit-absa-semeval-laptops")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

data/train-00000-of-00001.parquet:   0%|          | 0.00/117k [00:00<?, ?B/s]

data/test-00000-of-00001.parquet:   0%|          | 0.00/30.2k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/2358 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/654 [00:00<?, ? examples/s]

In [5]:
df = dataset["train"].to_pandas()
df.head()
df.columns


Index(['text', 'span', 'label', 'ordinal'], dtype='object')

In [6]:
def df_to_absa_format(df):
    grouped = df.groupby("ordinal")
    data = []

    for _, group in grouped:
        sentence = group.iloc[0]["text"]
        aspect_terms = []

        for _, row in group.iterrows():
            aspect_terms.append({
                "term": row["span"],
                "polarity": row["label"]
            })

        data.append({
            "sentence": sentence,
            "aspect_terms": aspect_terms
        })

    return data


In [7]:
train_data = df_to_absa_format(df)
print(len(train_data))
print(train_data[0])


3
{'sentence': 'I charge it at night and skip taking the cord with me because of the good battery life.', 'aspect_terms': [{'term': 'cord', 'polarity': 'neutral'}, {'term': 'battery life', 'polarity': 'positive'}, {'term': 'service center', 'polarity': 'negative'}, {'term': '"sales" team', 'polarity': 'negative'}, {'term': 'tech guy', 'polarity': 'neutral'}, {'term': 'quality', 'polarity': 'positive'}, {'term': 'GUI', 'polarity': 'positive'}, {'term': 'applications', 'polarity': 'positive'}, {'term': 'use', 'polarity': 'positive'}, {'term': 'start up', 'polarity': 'positive'}, {'term': 'features', 'polarity': 'positive'}, {'term': 'iChat', 'polarity': 'positive'}, {'term': 'Photobooth', 'polarity': 'positive'}, {'term': 'garage band', 'polarity': 'positive'}, {'term': 'features', 'polarity': 'positive'}, {'term': 'GUI', 'polarity': 'negative'}, {'term': 'screen', 'polarity': 'negative'}, {'term': 'power light', 'polarity': 'neutral'}, {'term': 'hard drive light', 'polarity': 'negative'

In [8]:
label2id = {
    "positive": 0,
    "negative": 1,
    "neutral": 2
}
id2label = {v: k for k, v in label2id.items()}
NUM_CLASSES = 3

In [9]:
def build_prompt(sentence, aspects):
    prompt = sentence
    for asp in aspects:
        prompt += f" [SSEP] emotion of {asp} : <extra_id_0>"
    return prompt


In [10]:
class ABSADataset(Dataset):
    def __init__(self, data, tokenizer, max_len=128):
        self.samples = []
        self.tokenizer = tokenizer
        self.max_len = max_len

        for item in data:
            sentence = item["sentence"]
            for asp in item["aspect_terms"]:
                if asp["polarity"] == "conflict":
                    continue

                self.samples.append({
                    "sentence": sentence,
                    "aspect": asp["term"],
                    "label": label2id[asp["polarity"]]
                })

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        s = self.samples[idx]

        prompt = build_prompt(s["sentence"], [s["aspect"]])

        enc = self.tokenizer(
            prompt,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            "input_ids": enc.input_ids.squeeze(0),
            "attention_mask": enc.attention_mask.squeeze(0),
            "label": torch.tensor(s["label"], dtype=torch.long)
        }


In [11]:
device = "cuda" if torch.cuda.is_available() else "cpu"

tokenizer = T5Tokenizer.from_pretrained("t5-base")
encoder = T5EncoderModel.from_pretrained("t5-base").to(device)


spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


model.safetensors:   0%|          | 0.00/892M [00:00<?, ?B/s]

In [12]:
def mask_aware_attention(hidden_states, attn_weights, input_ids, tokenizer):
    mask_id = tokenizer.convert_tokens_to_ids("<extra_id_0>")
    outputs = []

    for b in range(hidden_states.size(0)):
        mask_pos = (input_ids[b] == mask_id).nonzero(as_tuple=True)[0][0]
        last_attn = attn_weights[-1][:, b, mask_pos, :]
        attn_max = last_attn.max(dim=0).values

        context = torch.sum(attn_max.unsqueeze(-1) * hidden_states[b], dim=0)
        outputs.append(context + hidden_states[b, mask_pos])

    return torch.stack(outputs)


In [13]:
class PrototypeModule(nn.Module):
    def __init__(self, hidden_dim=768, proto_dim=64, num_classes=3):
        super().__init__()
        self.prototypes = nn.Parameter(torch.randn(num_classes, proto_dim))
        self.project = nn.Linear(hidden_dim, proto_dim)

    def forward(self, x):
        x = F.normalize(self.project(x), dim=-1)
        p = F.normalize(self.prototypes, dim=-1)
        return torch.matmul(x, p.T)


In [14]:
def train_stage1(encoder, proto_module, dataloader, optimizer, tokenizer):
    encoder.train()
    proto_module.train()
    total_loss = 0

    for batch in dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)

        outputs = encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_attentions=True
        )

        aspect_repr = mask_aware_attention(
            outputs.last_hidden_state,
            outputs.attentions,
            input_ids,
            tokenizer
        )

        logits = proto_module(aspect_repr)
        loss = F.cross_entropy(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print("Average loss:", total_loss / len(dataloader))


In [15]:
def retrieve_prototype(aspect_repr, proto_module):
    """
    aspect_repr: [B, 768]
    return: projected prototype demo [B, 768]
    """

    # Eq. (13): project aspect → prototype space
    zi = aspect_to_proto(aspect_repr)  # [B, 64]

    # Similarity with prototypes
    sims = torch.nn.functional.cosine_similarity(
        zi.unsqueeze(1),                       # [B, 1, 64]
        proto_module.prototypes.unsqueeze(0),  # [1, 3, 64]
        dim=-1
    )

    proto_ids = sims.argmax(dim=1)
    selected_proto = proto_module.prototypes[proto_ids]  # [B, 64]

    # Eq. (15): project prototype → hidden space
    return proto_to_hidden(selected_proto)  # [B, 768]


In [16]:
def train_stage2(encoder, proto_module, dataloader, optimizer, tokenizer):
    encoder.train()
    proto_module.train()
    total_loss = 0.0

    for batch in dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)

        # Forward through encoder
        outputs = encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_attentions=True
        )

        # Mask-aware aspect representation (Eq. 3)
        aspect_repr = mask_aware_attention(
            outputs.last_hidden_state,
            outputs.attentions,
            input_ids,
            tokenizer
        )  # [B, 768]

        # Retrieve prototype demonstration (Eq. 13–15)
        proto_demo = retrieve_prototype(aspect_repr, proto_module)  # [B, 768]

        # Demonstration fusion (Eq. 16)
        enhanced_repr = aspect_repr + proto_demo

        # Classification via prototype module
        logits = proto_module(enhanced_repr)

        loss = F.cross_entropy(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print("Stage-2 Avg Loss:", total_loss / len(dataloader))

In [17]:
proto_module = PrototypeModule().to(device)

optimizer = torch.optim.Adam(
    list(encoder.parameters()) + list(proto_module.parameters()),
    lr=5e-5
)

train_dataset = ABSADataset(train_data, tokenizer)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

print("Stage-1: Prototype Learning")
train_stage1(encoder, proto_module, train_loader, optimizer, tokenizer)

print("Stage-2: Prototype-Based Demonstration")
train_stage2(encoder, proto_module, train_loader, optimizer, tokenizer)


Stage-1: Prototype Learning
Average loss: 1.0457060939279097
Stage-2: Prototype-Based Demonstration


NameError: name 'aspect_to_proto' is not defined