<a href="https://colab.research.google.com/github/sycho2003/20252R0136COSE36203/blob/main/%EA%B8%B0%EA%B3%84%ED%95%99%EC%8A%B5_Term_Project_%EB%8B%A4%EC%A4%91_%EB%9D%BC%EB%B2%A8_%EB%B6%84%EB%A5%98_ipynb%EC%9D%98_%EC%82%AC%EB%B3%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1. 모델 정의

### 1-1. 기본 준비

In [None]:
pip install peft



In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from tqdm import tqdm
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split, ConcatDataset
from peft import LoraConfig, get_peft_model, TaskType

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
data1 = pd.read_csv('d01_preprocessed_revised.csv')
data2 = pd.read_csv('d02_preprocessed.csv', engine='python', on_bad_lines='skip')
data3 = pd.read_csv('d03_preprocessed.csv')

In [None]:
data2.head()

Unnamed: 0,case_id,situation,thought,reframe,has_distortion,emotional reasoning,mind reading,overgeneralization,fortune telling,mental filter,magnification,should statements,personalization,all-or-nothing thinking,labeling,disqualifiying the positive
0,0,From a teen in Australia: This story is incred...,We grew close quickly and for some reason I ju...,Thank you for writing. You did nothing wrong! ...,1,1,0,0,0,0,0,0,0,0,0,0
1,1,From a woman in the U.S.: My Therapist of eigh...,She knew I think and I believe she felt betray...,"At 63, you are still working on this. For that...",1,0,1,0,0,0,0,0,0,0,0,0
2,2,I don’t even really know where to start. For t...,For the past 7 months I’ve been under extreme ...,It’s impossible to give a diagnosis over the i...,1,1,0,1,0,0,0,0,0,0,0,0
3,3,"From a woman in the UK: Hi, I have a 4 month o...",I’m worried about disruption to our relationsh...,Congratulations on bringing your new baby into...,1,0,0,0,1,0,0,0,0,0,0,0
4,4,Me and my sister in law are both pregnant righ...,And they are always all over my sister in laws...,Thank you for explaining this situation. How u...,1,0,0,0,0,1,0,0,0,0,0,0


### 1-2. 데이터 증강 및 구조화

In [None]:
for idx, i in enumerate(data2['thought']):
    if type(i) != str:
        data2['thought'][idx] = ''

In [None]:
data1_1 = data1['situation']+' '+data1['thought']
data1_2 = data1['situation']+' '+data1['reframe']
data2_1 = data2['situation']
data3_1 = data3['situation']+' '+data3['thought']
data3_2 = data3['situation']+' '+data3['reframe']

In [None]:
data1_1.drop_duplicates(inplace = True)
data1_2.drop_duplicates(inplace = True)
data2_1.drop_duplicates(inplace = True)
data3_1.drop_duplicates(inplace = True)
data3_2.drop_duplicates(inplace = True)

In [None]:
def normalize_text(s):
    # Removing articles and punctuation, and standardizing whitespace
    import string, re

    def remove_articles(text):
        regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
        return re.sub(regex, " ", text)

    def white_space_fix(text):
        return " ".join(text.split())

    def remove_punc(text):
        exclude = set(string.punctuation)
        return "".join(ch for ch in text if ch not in exclude)

    def lower(text):
        return text.lower()

    return white_space_fix(remove_articles(remove_punc(lower(s))))


In [None]:
from transformers import BertTokenizer, BertModel, BertConfig

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')
bert_model.to(device)
bert_config = BertConfig.from_pretrained('bert-base-uncased')

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

In [None]:
# Embedding

def tokenize_and_pad(data, tokenizer, max_len=512):
    tokenized_data = []
    for text in data:
        encoded = tokenizer(normalize_text(text), return_tensors="pt", padding='max_length', truncation=True, max_length=max_len)
        tokenized_data.append(encoded)
    return tokenized_data

data1_1_encoded = tokenize_and_pad(data1_1, tokenizer)
data1_2_encoded = tokenize_and_pad(data1_2, tokenizer)
data2_1_encoded = tokenize_and_pad(data2_1, tokenizer)
data3_1_encoded = tokenize_and_pad(data3_1, tokenizer)
data3_2_encoded = tokenize_and_pad(data3_2, tokenizer)

In [None]:
distortion_cols = [
    col for col in data1.columns
    if col not in ["situation", "thought", "reframe",
                   "case_id", "emotions", "has_distortion"]
]

print("사용되는 왜곡 라벨들:", distortion_cols)

사용되는 왜곡 라벨들: ['all-or-nothing thinking', 'comparing and despairing', 'disqualifying the positive', 'emotional reasoning', 'fortune telling', 'labeling', 'magnification', 'mind reading', 'overgeneralizing', 'should statements', 'mental filter', 'personalization and blaming']


In [None]:
def make_label_vector(df, idx, distortion_cols):
    """
    df: 현재 데이터프레임(d01/d02/d03)
    idx: 인덱스
    distortion_cols: 전체 통합 왜곡 컬럼 목록
    """
    vector = []
    for col in distortion_cols:
        if col in df.columns:
            vector.append(int(df.loc[idx, col]))
        else:
            vector.append(0)   # 없는 컬럼 → 0 처리
    return vector

In [None]:
# has_distortion=1인 index만 필터링
data1_idx = data1.index[data1["has_distortion"] == 1]
data2_idx = data2.index[data2["has_distortion"] == 1]
data3_idx = data3.index[data3["has_distortion"] == 1]

# 텍스트 필터링
data1_1_encoded = [data1_1_encoded[i] for i in range(len(data1_1_encoded)) if data1.index[i] in data1_idx]
data1_2_encoded = [data1_2_encoded[i] for i in range(len(data1_2_encoded)) if data1.index[i] in data1_idx]

data2_1_encoded = [data2_1_encoded[i] for i in range(len(data2_1_encoded)) if data2.index[i] in data2_idx]

data3_1_encoded = [data3_1_encoded[i] for i in range(len(data3_1_encoded)) if data3.index[i] in data3_idx]
data3_2_encoded = [data3_2_encoded[i] for i in range(len(data3_2_encoded)) if data3.index[i] in data3_idx]

# 라벨도 동일하게 필터링
data1_1_labels = [make_label_vector(data1, i, distortion_cols) for i in data1_idx]
data1_2_labels = [make_label_vector(data1, i, distortion_cols) for i in data1_idx]

data2_1_labels = [make_label_vector(data2, i, distortion_cols) for i in data2_idx]

data3_1_labels = [make_label_vector(data3, i, distortion_cols) for i in data3_idx]
data3_2_labels = [make_label_vector(data3, i, distortion_cols) for i in data3_idx]

In [None]:
data_encoded = (
    data1_1_encoded + data1_2_encoded +
    data2_1_encoded +
    data3_1_encoded + data3_2_encoded
)

data_labels = (
    data1_1_labels + data1_2_labels +
    data2_1_labels +
    data3_1_labels + data3_2_labels
)

In [None]:
class CustomDatasetWithLabels(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return {"input_ids": self.data[idx]['input_ids'].squeeze(),
                "attention_mask": self.data[idx]['attention_mask'].squeeze(),
                "y": torch.tensor(self.labels[idx], dtype=torch.float32)
        }

In [None]:
dataset_with_labels = CustomDatasetWithLabels(data_encoded, data_labels)

train_size = int(0.8 * len(dataset_with_labels))
val_size = int(0.1 * len(dataset_with_labels))
test_size = len(dataset_with_labels) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset_with_labels, [train_size, val_size, test_size])

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32)
test_dataloader = DataLoader(test_dataset, batch_size=32)

In [None]:
input_dim = bert_config.hidden_size
num_labels = len(distortion_cols)
label_emb = torch.randn(num_labels, input_dim)

# LoRA 설정
lora_config = LoraConfig(
    r=8,  # LoRA attention dimension
    lora_alpha=16, # Alpha for LoRA scaling
    target_modules=["query", "value"], # Modules to apply LoRA to
    lora_dropout=0.1, # Dropout probability for LoRA layers
    bias="none", # Bias type for LoRA
    task_type=TaskType.FEATURE_EXTRACTION # Task type
)

# BERT 모델에 LoRA 적용
bert_model = get_peft_model(bert_model, lora_config)
bert_model.print_trainable_parameters()

class InnerProductClassifier(nn.Module):
    def __init__(self, input_dim, label_embeddings):
        super().__init__()
        self.proj = nn.Linear(input_dim, label_embeddings.size(1))
        self.label_emb = nn.Parameter(label_embeddings.clone())

    def forward(self, x):
        x_proj = self.proj(x)
        logits = torch.matmul(x_proj, self.label_emb.T)
        return logits

model = InnerProductClassifier(input_dim, label_emb).to(device)
criterion = nn.BCEWithLogitsLoss()

# LoRA가 적용된 모델의 학습 가능한 파라미터만 옵티마이저에 전달
optimizer = torch.optim.AdamW(list(bert_model.parameters()) + list(model.parameters()), lr=2e-4)

trainable params: 294,912 || all params: 109,777,152 || trainable%: 0.2686


### 1-3. 모델 평가 함수 정의

In [None]:
from sklearn.metrics import f1_score

def evaluate(model, dataloader):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            y = batch["y"].cpu().numpy()

            outputs = bert_model(input_ids=input_ids, attention_mask=attention_mask)
            embeddings = outputs.last_hidden_state[:, 0, :]

            logits = model(embeddings)
            preds = torch.sigmoid(logits).cpu().numpy()
            preds = (preds > 0.5).astype(int)

            all_preds.extend(preds)
            all_labels.extend(y)

    return {
        "f1_macro": f1_score(all_labels, all_preds, average="macro", zero_division=0),
        "f1_micro": f1_score(all_labels, all_preds, average="micro", zero_division=0)
    }

### 1-4. 모델 학습



In [None]:
from torch.optim import AdamW
import torch

EPOCHS = 10
bert_model.to(device)
model.to(device)

# ✅ LoRA 파라미터가 requires_grad=True인지 확인(보통 PEFT가 자동으로 해줌)
trainable_bert = [p for p in bert_model.parameters() if p.requires_grad]
print("Trainable params in bert_model:", sum(p.numel() for p in trainable_bert))

# ✅ optimizer에 head + (LoRA)bert trainable params 모두 넣기
optimizer = AdamW(list(model.parameters()) + trainable_bert, lr=2e-4)

best_val_f1_macro = -1.0

for epoch in range(EPOCHS):
    model.train()
    bert_model.train()   # ✅ 중요
    total_loss = 0

    for batch in tqdm(train_dataloader, desc=f"Epoch {epoch+1}"):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        y = batch["y"].to(device)

        # ✅ no_grad 제거: LoRA 업데이트하려면 그래프가 필요함
        outputs = bert_model(input_ids=input_ids, attention_mask=attention_mask)
        embeddings = outputs.last_hidden_state[:, 0, :]   # CLS

        logits = model(embeddings)
        loss = criterion(logits, y)

        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_dataloader)
    val_scores = evaluate(model, val_dataloader)
    current = val_scores["f1_macro"]

    print(f"[Epoch {epoch+1}] Train Loss: {avg_train_loss:.4f}, Val F1_macro={current:.4f}, Val F1_micro={val_scores['f1_micro']:.4f}")

    # ✅ 베스트일 때만 저장 (head + adapter 같이!)
    if current > best_val_f1_macro:
        best_val_f1_macro = current
        torch.save(model.state_dict(), "best_head.pth")
        bert_model.save_pretrained("./best_lora_adapter")   # bert_model이 PeftModel일 때 “어댑터만” 저장됨
        print("\tSaved BEST head + LoRA adapter!")

print("Best Val F1_macro:", best_val_f1_macro)


Trainable params in bert_model: 294912


Epoch 1: 100%|██████████| 73/73 [02:39<00:00,  2.19s/it]


[Epoch 1] Train Loss: 0.6193, Val F1_macro=0.0336, Val F1_micro=0.0450
	Saved BEST head + LoRA adapter!


Epoch 2: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 2] Train Loss: 0.3626, Val F1_macro=0.0289, Val F1_micro=0.0444


Epoch 3: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 3] Train Loss: 0.3503, Val F1_macro=0.0594, Val F1_micro=0.1600
	Saved BEST head + LoRA adapter!


Epoch 4: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 4] Train Loss: 0.3515, Val F1_macro=0.0250, Val F1_micro=0.0493


Epoch 5: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 5] Train Loss: 0.3400, Val F1_macro=0.0104, Val F1_micro=0.0249


Epoch 6: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 6] Train Loss: 0.3399, Val F1_macro=0.0420, Val F1_micro=0.0808


Epoch 7: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 7] Train Loss: 0.3308, Val F1_macro=0.0327, Val F1_micro=0.0467


Epoch 8: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 8] Train Loss: 0.3341, Val F1_macro=0.0934, Val F1_micro=0.1285
	Saved BEST head + LoRA adapter!


Epoch 9: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 9] Train Loss: 0.3265, Val F1_macro=0.0705, Val F1_micro=0.1493


Epoch 10: 100%|██████████| 73/73 [02:46<00:00,  2.28s/it]


[Epoch 10] Train Loss: 0.3113, Val F1_macro=0.0712, Val F1_micro=0.0992
Best Val F1_macro: 0.09343213238144005


## 2. 모델 테스트

In [None]:
# Evaluate on the test set
test_scores = evaluate(model, test_dataloader)
print("TEST F1_macro:", test_scores["f1_macro"])
print("TEST F1_micro:", test_scores["f1_micro"])

TEST F1_macro: 0.07534229516326703
TEST F1_micro: 0.11607142857142858


## 3. 모델 저장


In [None]:
from transformers import AutoModel
from peft import PeftModel

BASE_NAME = 'bert-base-uncased' # Define the base model name
base = AutoModel.from_pretrained(BASE_NAME)
peft = PeftModel.from_pretrained(base, "./best_lora_adapter")
merged_bert = peft.merge_and_unload()
merged_bert.save_pretrained("./merged_bert_model_with_lora")
print("Merged BERT saved.")

Merged BERT saved.
