In [1]:
!pip install transformers



In [2]:
!pip install datasets



In [3]:
# !pip install git+https://github.com/haven-jeon/PyKoSpacing.git

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
import json
import os
import random

import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from transformers import AutoTokenizer, ElectraForSequenceClassification, get_linear_schedule_with_warmup
from torch.optim import AdamW
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import pandas as pd

PADDING_TOKEN = 0
S_OPEN_TOKEN = 1
S_CLOSE_TOKEN = 2
do_eval = True

BASE_DIR = '/content/drive/MyDrive/ABSA'
DATA_DIR = os.path.join(BASE_DIR, 'data')
ACD_MODEL_DIR = os.path.join(BASE_DIR, 'saved_model/ACD')
ASC_MODEL_DIR = os.path.join(BASE_DIR, 'saved_model/ASC')

train_data_path = os.path.join(DATA_DIR, 'train.jsonl')
dev_data_path = os.path.join(DATA_DIR, 'dev.jsonl')
test_data_path = os.path.join(DATA_DIR, 'test.jsonl')

test_category_extraction_model_path = os.path.join(ACD_MODEL_DIR, 'best_model.pt')
test_polarity_classification_model_path = os.path.join(ASC_MODEL_DIR, 'best_model.pt')

max_len = 192
batch_size = 32
base_model = 'kykim/electra-kor-base'
learning_rate = 3e-6
eps = 1e-8
num_train_epochs = 20
classifier_hidden_size = 768
classifier_dropout_prob = 0.1
max_grad_norm = 1.0

entity_property_pair = [
    '세정', '자극', '거품', '향', '가격', '일반', '기타',
]

tf_id_to_name = ['True', 'False']
tf_name_to_id = {name: idx for idx, name in enumerate(tf_id_to_name)}

polarity_id_to_name = ['positive', 'negative', 'neutral']
polarity_name_to_id = {name: idx for idx, name in enumerate(polarity_id_to_name)}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

special_tokens_dict = {
    'additional_special_tokens': [
        '&name&', '&affiliation&', '&social-security-num&',
        '&tel-num&', '&card-num&', '&bank-account&',
        '&num&', '&online-account&'
    ]
}

In [6]:
def jsonload(fname, encoding="utf-8"):
    with open(fname, encoding=encoding) as f:
        return json.load(f)

def jsondump(j, fname):
    with open(fname, "w", encoding="utf-8") as f:
        json.dump(j, f, ensure_ascii=False, indent=2)

def jsonlload(fname, encoding="utf-8"):
    json_list = []
    with open(fname, encoding=encoding) as f:
        for line in f:
            json_list.append(json.loads(line))
    return json_list

def jsonldump(jlist, fname):
    with open(fname, "w", encoding="utf-8") as f:
        for item in jlist:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

def split_jsonl_file(jsonl_path, output_dir, train_ratio=0.7, dev_ratio=0.15, test_ratio=0.15, seed=42):
    with open(jsonl_path, 'r', encoding='utf-8') as f:
        lines = [json.loads(line) for line in f]

    train_data, temp_data = train_test_split(lines, test_size=(1 - train_ratio), random_state=seed)
    dev_data, test_data = train_test_split(temp_data, test_size=test_ratio / (dev_ratio + test_ratio), random_state=seed)

    os.makedirs(output_dir, exist_ok=True)
    jsonldump(train_data, os.path.join(output_dir, 'train.jsonl'))
    jsonldump(dev_data, os.path.join(output_dir, 'dev.jsonl'))
    jsonldump(test_data, os.path.join(output_dir, 'test.jsonl'))

    print(f"데이터 분할 완료: train={len(train_data)}, dev={len(dev_data)}, test={len(test_data)}")

In [7]:
def convert_to_absa_format(data):
    converted = []
    for item in data:
        sentence = item["text"]
        annos = []
        for entity in item.get("entities", []):
            label = entity["label"]
            start = entity["start_offset"]
            end = entity["end_offset"]
            word = sentence[start:end]

            try:
                aspect, sentiment_ko = label.split("-")
            except ValueError:
                continue

            if aspect not in entity_property_pair:
                continue

            sentiment_map = {"긍정": "positive", "부정": "negative", "중립": "neutral"}
            polarity = sentiment_map.get(sentiment_ko.strip())
            if polarity is None:
                continue

            annos.append([aspect, [word, start, end], polarity])
        converted.append({
            "sentence_form": sentence,
            "annotation": annos
        })
    return converted


raw_data = jsonlload('/content/drive/MyDrive/ABSA/data/shampoo.jsonl')
converted_data = convert_to_absa_format(raw_data)
converted_data_path = os.path.join(DATA_DIR, 'converted_shampoo.jsonl')
jsonldump(converted_data, converted_data_path)
jsonlload('/content/drive/MyDrive/ABSA/data/converted_shampoo.jsonl')

[{'sentence_form': '바오밥 신제품 나와서 사봤어요 시카라인이라서 그런지 두피세정이 잘 되는 느낌이에요',
  'annotation': [['세정', ['두피세정이 잘 되는 느낌이에요', 29, 45], 'positive']]},
 {'sentence_form': '이렇게 예쁜 샴푸는 처음이야종류도 다양하고 저는 탈모샴푸로 비컨피던트 구매해 봤는데 감을 때 시원하고 향도 시원해서 여름에 사용하기 참좋더라구요 향기 좋고 세정력 좋은 샴푸로 추천합니다',
  'annotation': [['세정', ['시원하고', 52, 56], 'positive'],
   ['향', ['향도 시원해서', 57, 64], 'positive'],
   ['세정', ['세정력 좋은 샴푸', 87, 96], 'positive'],
   ['향', ['향기 좋고', 81, 86], 'positive']]},
 {'sentence_form': '케이스부터 고급지네요지루성두피염이라 아무거나 못쓰는 편이라신중히 고르는 편이예요노모어오일이 조금 더 비싸서 좋지 않을까 했는데오일샴푸라 린언미로 겟했어요그런데 딱 원하는 제품이네요뾰루지가 잘 나는 두피인데 염증 완화에 도움을준데요합성계면 활성제, 합성 방부제 없고 그외에도 전제품 EWG 98프로 이상 유해성분 제외라고하니믿고 써봅니다향이 시원한 느낌이고 거품도 잘나며 개운한느낌입니다',
  'annotation': [['향', ['향이 시원한 느낌이고', 187, 198], 'positive'],
   ['거품', ['거품도 잘나며', 199, 206], 'positive'],
   ['세정', ['개운한느낌입니다', 207, 215], 'positive']]},
 {'sentence_form': '좋아요.리필팩도 들어있어서원플러스 원처럼 구입하게 되서 좋아요.',
  'annotation': [['가격', ['원플러스 원처럼 구입하게 되서 좋아요', 14, 34], 'positive']]},
 {'sentence_form': '샴푸를

In [8]:
import os
import re
import json


# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# from pykospacing import Spacing
# spacing = Spacing()

# def spacing_cpu_only(text):
#     return spacing(text)

def clean_review(text):
    allowed_punctuations = "!?.,%+=~&()"
    pattern = rf"[^ㄱ-ㅎㅏ-ㅣ가-힣a-zA-Z0-9\s{re.escape(allowed_punctuations)}]"
    return re.sub(pattern, ' ', text).strip()

def del_bracket(text):
    while re.search(r'\([^()]*\)', text):
        text = re.sub(r'\([^()]*\)', ' ', text)
    return text.replace("(", " ").replace(")", " ").strip()

dupchars_pattern = re.compile(r'(.)\1{2,}')
dupsymbols_pattern = re.compile(r'([!?~%+=&])\1{1,}')
doublespace_pattern = re.compile(r'\s+')

def contract_dupchars(text, n=3):
    if n > 0:
        text = dupchars_pattern.sub(r'\1' * n, text)
    text = dupsymbols_pattern.sub(r'\1', text)
    return doublespace_pattern.sub(' ', text).strip()

def del_sponsored(text):
    pattern = re.compile(r'''
        (판매자(에게|로부터)|업체로부터|본\s상품\s후기는).{0,70}?
        (후기(입니다|입니다타|에요|입니\s?다)?|
         리뷰(입니다|했습니다|적었습니다|하였습니다|입니다요)?|
         작성하였습니다|기남겨요|기랍니다|흐기입니다|전달합니다|올립니다|것\s?입니다)
        [!.~\s]{0,2}
    ''', flags=re.VERBOSE)
    return pattern.sub(' ', text).strip()

# def truncate_and_spacing(text):
#     if len(text) > 200:
#         text = text[-200:]
#     return spacing_cpu_only(text)

def preprocessing(form):
    form = clean_review(form)
    form = del_bracket(form)
    form = contract_dupchars(form)
    form = del_sponsored(form)
    # form = truncate_and_spacing(form)
    return form

In [9]:
import torch
import torch.nn as nn
from transformers import AutoModel, AutoConfig

class AttentionPooling(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.attention = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1)
        )

    def forward(self, hidden_states, mask):
        scores = self.attention(hidden_states).squeeze(-1)
        scores = scores.masked_fill(mask == 0, -1e9)
        weights = torch.softmax(scores, dim=-1)
        pooled = torch.sum(hidden_states * weights.unsqueeze(-1), dim=1)
        return pooled

class SimpleClassifier(nn.Module):
    def __init__(self, hidden_size, num_labels, dropout_prob=0.1):
        super().__init__()
        self.dropout1 = nn.Dropout(dropout_prob)
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.act = nn.Tanh()
        self.norm = nn.LayerNorm(hidden_size)
        self.dropout2 = nn.Dropout(dropout_prob)
        self.fc2 = nn.Linear(hidden_size, num_labels)

    def forward(self, x):
        x = self.dropout1(x)
        x = self.fc1(x)
        x = self.act(x)
        x = self.norm(x)
        x = self.dropout2(x)
        return self.fc2(x)

class ElectraBaseClassifier(nn.Module):
    def __init__(self, model_name_or_path, num_labels, tokenizer_len=None, dropout_prob=0.1):
        super().__init__()

        config = AutoConfig.from_pretrained(model_name_or_path)
        self.backbone = AutoModel.from_pretrained(model_name_or_path, config=config)

        if tokenizer_len is not None:
            self.backbone.resize_token_embeddings(tokenizer_len)

        self.attn_pool = AttentionPooling(config.hidden_size)
        self.classifier = SimpleClassifier(config.hidden_size, num_labels, dropout_prob)
        self.loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        hidden_states = outputs.last_hidden_state

        pooled_output = self.attn_pool(hidden_states, attention_mask)

        logits = self.classifier(pooled_output)

        if labels is not None:
            loss = self.loss_fn(logits, labels)
            return loss, logits
        else:
            return None, logits

In [10]:
import torch
from torch.utils.data import TensorDataset
from collections import Counter

def tokenize_and_align_labels(tokenizer, form, annotations, max_len):
    entity_data = {'input_ids': [], 'attention_mask': [], 'label': []}
    polarity_data = {'input_ids': [], 'attention_mask': [], 'label': []}

    if not form or not isinstance(form, str):
        return entity_data, polarity_data

    for pair in entity_property_pair:
        matched = False

        encoded = tokenizer(
            form,
            pair,
            padding='max_length',
            max_length=max_len,
            truncation=True,
            return_tensors='pt'
        )

        input_ids = encoded['input_ids'][0].tolist()
        attention_mask = encoded['attention_mask'][0].tolist()

        for annotation in annotations:
            if len(annotation) < 3:
                continue

            entity_property, _, polarity = annotation

            if polarity == '------------':
                continue

            if entity_property == pair:
                entity_data['input_ids'].append(input_ids)
                entity_data['attention_mask'].append(attention_mask)
                entity_data['label'].append(tf_name_to_id['True'])

                polarity_id = polarity_name_to_id.get(polarity)
                if polarity_id is not None:
                    polarity_data['input_ids'].append(input_ids)
                    polarity_data['attention_mask'].append(attention_mask)
                    polarity_data['label'].append(polarity_id)

                matched = True
                break

        if not matched:
            entity_data['input_ids'].append(input_ids)
            entity_data['attention_mask'].append(attention_mask)
            entity_data['label'].append(tf_name_to_id['False'])

    return entity_data, polarity_data

def get_dataset(raw_data, tokenizer, max_len):
    entity_inputs, entity_masks, entity_labels = [], [], []
    polarity_inputs, polarity_masks, polarity_labels = [], [], []

    for utterance in raw_data:
        form = utterance.get('sentence_form', '')
        form = preprocessing(form)
        if len(form) < 10:
            continue
        annotations = utterance.get('annotation', [])

        entity_dict, polarity_dict = tokenize_and_align_labels(tokenizer, form, annotations, max_len)

        entity_inputs.extend(entity_dict['input_ids'])
        entity_masks.extend(entity_dict['attention_mask'])
        entity_labels.extend(entity_dict['label'])

        polarity_inputs.extend(polarity_dict['input_ids'])
        polarity_masks.extend(polarity_dict['attention_mask'])
        polarity_labels.extend(polarity_dict['label'])

    def compute_class_weight(labels, label_size):
        counter = Counter(labels)
        total = sum(counter.values())
        return torch.tensor([
            total / counter.get(i, 1) if counter.get(i, 0) > 0 else 0.0
            for i in range(label_size)
        ], dtype=torch.float)

    entity_dataset = TensorDataset(
        torch.tensor(entity_inputs, dtype=torch.long),
        torch.tensor(entity_masks, dtype=torch.long),
        torch.tensor(entity_labels, dtype=torch.long)
    )

    polarity_dataset = TensorDataset(
        torch.tensor(polarity_inputs, dtype=torch.long),
        torch.tensor(polarity_masks, dtype=torch.long),
        torch.tensor(polarity_labels, dtype=torch.long)
    )

    entity_weights = compute_class_weight(entity_labels, len(tf_name_to_id))
    polarity_weights = compute_class_weight(polarity_labels, len(polarity_name_to_id))

    return entity_dataset, polarity_dataset, entity_weights, polarity_weights

In [11]:
import torch
import os
from tqdm import trange
from transformers import AutoTokenizer, get_linear_schedule_with_warmup
from torch.optim import AdamW
from sklearn.metrics import f1_score
from torch.cuda.amp import autocast, GradScaler

def evaluation(y_true, y_pred, label_len):
    count_list = [0] * label_len
    hit_list = [0] * label_len

    for i in range(len(y_true)):
        count_list[y_true[i]] += 1
        if y_true[i] == y_pred[i]:
            hit_list[y_true[i]] += 1

    acc_list = [hit / count if count > 0 else 0 for hit, count in zip(hit_list, count_list)]
    print(f'Accuracy: {sum(hit_list) / sum(count_list):.4f}')
    print(f'Macro Accuracy: {sum(acc_list) / label_len:.4f}')
    print('F1 (per class):', f1_score(y_true, y_pred, average=None))
    print('F1 Micro:', f1_score(y_true, y_pred, average='micro'))
    print('F1 Macro:', f1_score(y_true, y_pred, average='macro'))

scaler = GradScaler()
def train_one_epoch(model, dataloader, optimizer, scheduler, loss_fn):
    model.train()
    total_loss = 0

    for batch in dataloader:
        input_ids, attention_mask, labels = [b.to(device) for b in batch]

        optimizer.zero_grad()
        with autocast():
            loss, logits = model(input_ids, attention_mask, labels)
            loss = loss_fn(logits, labels)

        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        scaler.step(optimizer)
        scaler.update()
        scheduler.step()
        total_loss += loss.item()

    return total_loss / len(dataloader)

def evaluate_model(model, dataloader):
    model.eval()
    preds, labels = [], []

    with torch.no_grad():
        for batch in dataloader:
            input_ids, attention_mask, label_ids = [t.to(device) for t in batch]
            _, logits = model(input_ids, attention_mask)
            pred = torch.argmax(logits, dim=-1)
            preds.extend(pred.tolist())
            labels.extend(label_ids.tolist())

    macro_f1 = f1_score(labels, preds, average='macro')
    return macro_f1, preds, labels

def get_optimizer_scheduler(model, dataloader):
    no_decay = ['bias', 'gamma', 'beta']
    param_optimizer = list(model.named_parameters())
    grouped_parameters = [
        {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
        {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
    ]
    optimizer = AdamW(grouped_parameters, lr=learning_rate, eps=eps)
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0,
                                                num_training_steps=len(dataloader) * num_train_epochs)
    return optimizer, scheduler

class EarlyStopping:
    def __init__(self, patience=3, mode='max'):
        self.patience = patience
        self.mode = mode
        self.counter = 0
        self.best_score = None
        self.should_stop = False

    def step(self, score):
        if self.best_score is None or \
           (self.mode == 'max' and score > self.best_score) or \
           (self.mode == 'min' and score < self.best_score):
            self.best_score = score
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.should_stop = True

def train_sentiment_analysis():
    print('train_sentiment_analysis START')

    tokenizer = AutoTokenizer.from_pretrained(base_model)
    tokenizer.add_special_tokens(special_tokens_dict)
    tokenizer.model_max_length = max_len
    tokenizer.padding_side = "right"

    train_data = jsonlload(train_data_path)
    dev_data = jsonlload(dev_data_path)

    entity_train, polarity_train, entity_weights, polarity_weights = get_dataset(train_data, tokenizer, max_len)
    entity_dev, polarity_dev, _, _ = get_dataset(dev_data, tokenizer, max_len)

    entity_train_loader = DataLoader(entity_train, shuffle=True, batch_size=batch_size)
    entity_dev_loader = DataLoader(entity_dev, shuffle=False, batch_size=batch_size)
    polarity_train_loader = DataLoader(polarity_train, shuffle=True, batch_size=batch_size)
    polarity_dev_loader = DataLoader(polarity_dev, shuffle=False, batch_size=batch_size)

    entity_model = ElectraBaseClassifier(base_model, len(tf_name_to_id), len(tokenizer)).to(device)
    polarity_model = ElectraBaseClassifier(base_model, len(polarity_name_to_id), len(tokenizer)).to(device)

    entity_loss_fn = torch.nn.CrossEntropyLoss(weight=entity_weights.to(device))
    polarity_loss_fn = torch.nn.CrossEntropyLoss(weight=polarity_weights.to(device))

    entity_opt, entity_sched = get_optimizer_scheduler(entity_model, entity_train_loader)
    polarity_opt, polarity_sched = get_optimizer_scheduler(polarity_model, polarity_train_loader)

    early_stop_entity = EarlyStopping(patience=3, mode='max')
    early_stop_polarity = EarlyStopping(patience=3, mode='max')

    for epoch in trange(num_train_epochs, desc="Epoch"):
        entity_loss = train_one_epoch(entity_model, entity_train_loader, entity_opt, entity_sched, entity_loss_fn)
        print(f"[Entity] Epoch {epoch+1} | Train Loss: {entity_loss:.4f}")

        if do_eval:
            f1, preds, labels = evaluate_model(entity_model, entity_dev_loader)
            print(f"[Entity] Dev F1_macro: {f1:.4f}")
            if f1 > (early_stop_entity.best_score or 0):
                torch.save(entity_model.state_dict(), os.path.join(ACD_MODEL_DIR, 'best_model.pt'))
                print("Saved best entity model")
            early_stop_entity.step(f1)
            if early_stop_entity.should_stop:
                print("Early stopping triggered (Entity)")
                break

        polarity_loss = train_one_epoch(polarity_model, polarity_train_loader, polarity_opt, polarity_sched, polarity_loss_fn)
        print(f"[Polarity] Epoch {epoch+1} | Train Loss: {polarity_loss:.4f}")

        if do_eval:
            f1, preds, labels = evaluate_model(polarity_model, polarity_dev_loader)
            print(f"[Polarity] Dev F1_macro: {f1:.4f}")
            if f1 > (early_stop_polarity.best_score or 0):
                torch.save(polarity_model.state_dict(), os.path.join(ASC_MODEL_DIR, 'best_model.pt'))
                print("Saved best polarity model")
            early_stop_polarity.step(f1)
            if early_stop_polarity.should_stop:
                print("Early stopping triggered (Polarity)")
                break

    print("Training complete.")

  scaler = GradScaler()


In [12]:
split_jsonl_file(
    jsonl_path='/content/drive/MyDrive/ABSA/data/converted_shampoo.jsonl',
    output_dir='/content/drive/MyDrive/ABSA/data/',
    train_ratio=0.7,
    dev_ratio=0.15,
    test_ratio=0.15
)

데이터 분할 완료: train=643, dev=138, test=138


In [13]:
train_sentiment_analysis()

train_sentiment_analysis START


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


pytorch_model.bin:   0%|          | 0.00/473M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/473M [00:00<?, ?B/s]

The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`
Epoch:   0%|          | 0/20 [00:00<?, ?it/s]

[Entity] Epoch 1 | Train Loss: 0.7095
[Entity] Dev F1_macro: 0.6398
Saved best entity model
[Polarity] Epoch 1 | Train Loss: 1.1137
[Polarity] Dev F1_macro: 0.3096


Epoch:   5%|▌         | 1/20 [03:12<1:00:51, 192.16s/it]

Saved best polarity model
[Entity] Epoch 2 | Train Loss: 0.5591
[Entity] Dev F1_macro: 0.8606
Saved best entity model
[Polarity] Epoch 2 | Train Loss: 1.1053
[Polarity] Dev F1_macro: 0.3439


Epoch:  10%|█         | 2/20 [06:22<57:15, 190.86s/it]  

Saved best polarity model
[Entity] Epoch 3 | Train Loss: 0.3798
[Entity] Dev F1_macro: 0.8911
Saved best entity model
[Polarity] Epoch 3 | Train Loss: 1.0817
[Polarity] Dev F1_macro: 0.3733


Epoch:  15%|█▌        | 3/20 [09:32<54:03, 190.79s/it]

Saved best polarity model
[Entity] Epoch 4 | Train Loss: 0.3302
[Entity] Dev F1_macro: 0.8993
Saved best entity model
[Polarity] Epoch 4 | Train Loss: 1.0574


Epoch:  20%|██        | 4/20 [12:39<50:28, 189.29s/it]

[Polarity] Dev F1_macro: 0.3692
[Entity] Epoch 5 | Train Loss: 0.2952
[Entity] Dev F1_macro: 0.9077
Saved best entity model
[Polarity] Epoch 5 | Train Loss: 1.0494
[Polarity] Dev F1_macro: 0.4815


Epoch:  25%|██▌       | 5/20 [15:48<47:15, 189.05s/it]

Saved best polarity model
[Entity] Epoch 6 | Train Loss: 0.2684
[Entity] Dev F1_macro: 0.9087
Saved best entity model
[Polarity] Epoch 6 | Train Loss: 1.0210


Epoch:  30%|███       | 6/20 [18:55<43:58, 188.50s/it]

[Polarity] Dev F1_macro: 0.4749
[Entity] Epoch 7 | Train Loss: 0.2595
[Entity] Dev F1_macro: 0.9089
Saved best entity model
[Polarity] Epoch 7 | Train Loss: 1.0102


Epoch:  35%|███▌      | 7/20 [22:12<41:25, 191.20s/it]

[Polarity] Dev F1_macro: 0.4426
[Entity] Epoch 8 | Train Loss: 0.2339
[Entity] Dev F1_macro: 0.9087
[Polarity] Epoch 8 | Train Loss: 0.9446


Epoch:  35%|███▌      | 7/20 [25:17<46:58, 216.84s/it]

[Polarity] Dev F1_macro: 0.4553
Early stopping triggered (Polarity)
Training complete.





In [18]:
def predict_from_korean_form(tokenizer, ce_model, pc_model, data, max_len=256):
    ce_model.eval()
    pc_model.eval()

    for sentence in data:
        form = sentence.get('sentence_form', '')
        sentence['annotation'] = []

        if not isinstance(form, str) or not form.strip():
            print(f"Invalid sentence skipped: {form}")
            continue

        for pair in entity_property_pair:
            encoded = tokenizer(
                form,
                pair,
                padding='max_length',
                max_length=max_len,
                truncation=True,
                return_tensors='pt'
            )

            input_ids = encoded['input_ids'].to(device)
            attention_mask = encoded['attention_mask'].to(device)

            with torch.no_grad():
                _, ce_logits = ce_model(input_ids, attention_mask)
            ce_pred = torch.argmax(ce_logits, dim=-1).item()

            if tf_id_to_name[ce_pred] == 'True':
                with torch.no_grad():
                    _, pc_logits = pc_model(input_ids, attention_mask)
                pc_pred = torch.argmax(pc_logits, dim=-1).item()

                if 0 <= pc_pred < len(polarity_id_to_name):
                    polarity = polarity_id_to_name[pc_pred]
                else:
                    polarity = "UNKNOWN"

                sentence['annotation'].append([
                    pair,
                    [None, 0, 0],
                    polarity
                ])

    return data

In [19]:
def evaluation_f1(true_data, pred_data):
    ce_eval = {'tp': 0, 'fp': 0, 'fn': 0}
    pipeline_eval = {'tp': 0, 'fp': 0, 'fn': 0}

    if len(true_data) != len(pred_data):
        print(f"Warning: Length mismatch (true={len(true_data)}, pred={len(pred_data)})")

    for true_item, pred_item in zip(true_data, pred_data):
        true_annos = true_item.get('annotation', [])
        pred_annos = pred_item.get('annotation', [])

        true_ce_set = set()
        true_pipeline_set = set()
        for anno in true_annos:
            if len(anno) >= 3:
                true_ce_set.add(anno[0])
                true_pipeline_set.add((anno[0], anno[2]))

        pred_ce_set = set()
        pred_pipeline_set = set()
        for anno in pred_annos:
            if len(anno) >= 3:
                pred_ce_set.add(anno[0])
                pred_pipeline_set.add((anno[0], anno[2]))

        ce_eval['tp'] += len(true_ce_set & pred_ce_set)
        ce_eval['fp'] += len(pred_ce_set - true_ce_set)
        ce_eval['fn'] += len(true_ce_set - pred_ce_set)

        pipeline_eval['tp'] += len(true_pipeline_set & pred_pipeline_set)
        pipeline_eval['fp'] += len(pred_pipeline_set - true_pipeline_set)
        pipeline_eval['fn'] += len(true_pipeline_set - pred_pipeline_set)

    def calc_f1(tp, fp, fn):
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
        return {
            'Precision': round(precision, 4),
            'Recall': round(recall, 4),
            'F1': round(f1, 4)
        }

    return {
        'category extraction result': calc_f1(**ce_eval),
        'entire pipeline result': calc_f1(**pipeline_eval)
    }

In [20]:
from transformers import AutoTokenizer
from torch.utils.data import DataLoader
import torch
import copy
import os

def load_model(model_class, path, model_name, label_size, tokenizer_len):
    model = model_class(model_name, label_size, tokenizer_len)
    model.load_state_dict(torch.load(path, map_location=device))
    model.to(device)
    model.eval()
    return model

def pretty_print_result(result_dict):
    print("\nF1 Evaluation Result:")
    for name, metrics in result_dict.items():
        print(f"\n▶ {name}")
        for k, v in metrics.items():
            print(f"   {k}: {v:.4f}")

def test_sentiment_analysis(save_path=None):
    print("Starting Sentiment Analysis Test...")

    try:
        tokenizer = AutoTokenizer.from_pretrained(base_model)
        tokenizer.add_special_tokens(special_tokens_dict)
    except Exception as e:
        print(f"Tokenizer load error: {e}")
        return

    try:
        test_data = jsonlload(test_data_path)
    except Exception as e:
        print(f"Failed to load test data: {e}")
        return

    try:
        entity_test_data, polarity_test_data, _, _ = get_dataset(test_data, tokenizer, max_len)
    except Exception as e:
        print(f"Failed to preprocess test data: {e}")
        return

    entity_test_loader = DataLoader(entity_test_data, shuffle=False, batch_size=batch_size)
    polarity_test_loader = DataLoader(polarity_test_data, shuffle=False, batch_size=batch_size)

    try:
        ce_model_path = os.path.join(ACD_MODEL_DIR, 'best_model.pt')
        pc_model_path = os.path.join(ASC_MODEL_DIR, 'best_model.pt')

        ce_model = load_model(ElectraBaseClassifier, ce_model_path, base_model, len(tf_id_to_name), len(tokenizer))
        pc_model = load_model(ElectraBaseClassifier, pc_model_path, base_model, len(polarity_id_to_name), len(tokenizer))
    except Exception as e:
        print(f"Model load error: {e}")
        return

    pred_data = predict_from_korean_form(tokenizer, ce_model, pc_model, copy.deepcopy(test_data))

    result = evaluation_f1(test_data, pred_data)
    pretty_print_result(result)

    if save_path:
        try:
            jsondump(pred_data, save_path)
            print(f"Saved predictions to {save_path}")
        except Exception as e:
            print(f"Failed to save predictions: {e}")

In [21]:
test_sentiment_analysis()
# test_sentiment_analysis(save_path='/content/drive/MyDrive/ABSA/results/pred_data.json')

Starting Sentiment Analysis Test...

F1 Evaluation Result:

▶ category extraction result
   Precision: 0.8765
   Recall: 0.7889
   F1: 0.8304

▶ entire pipeline result
   Precision: 0.7078
   Recall: 0.6232
   F1: 0.6628
