In [1]:
import pandas as pd
import random
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
from captions import *
from tqdm import tqdm

import torch
import torch.nn as nn
from transformers import BertModel, get_linear_schedule_with_warmup
import os 

# 각 c값에 대한 레벨 정의
c1_levels = [0.0005, 0.005, 0.05, 0.5, 5]  # 5 levels
c2_levels = [0.0, 0.1, 0.2, 0.3, 0.4]  # 5 levels
c3_levels = [0.6, 0.7, 0.8, 0.9]  # 4 levels
pp_levels = [4, 5, 6]  # 3 levels

def generate_dataset(num_samples=20000):
    train_data = []
    test_data = []
    
    def shuffle_conditions(c1_text, c2_text, c3_text, pp_text):
        conditions = [c1_text, c2_text, c3_text, pp_text]
        random.shuffle(conditions)
        return " ".join(conditions)
    
    for _ in range(num_samples):
        c1 = random.choice(c1_levels)
        c2 = random.choice(c2_levels)
        c3 = random.choice(c3_levels)
        pp = random.choice(pp_levels)
        
        # 각 조건에 대해 4개의 설명은 훈련 데이터로, 1개는 테스트 데이터로 사용
        c1_train = random.sample(c1_descriptions[c1], 4)
        c1_test = list(set(c1_descriptions[c1]) - set(c1_train))[0]
        
        c2_train = random.sample(c2_descriptions[c2], 4)
        c2_test = list(set(c2_descriptions[c2]) - set(c2_train))[0]
        
        c3_train = random.sample(c3_descriptions[c3], 4)
        c3_test = list(set(c3_descriptions[c3]) - set(c3_train))[0]
        
        pp_train = random.sample(pp_descriptions[pp], 2)  # pp는 3개 설명만 있으므로 2:1로 분할
        pp_test = list(set(pp_descriptions[pp]) - set(pp_train))[0]
        
        # 훈련 데이터 생성
        for _ in range(4):  # 각 조건 조합에 대해 4개의 샘플 생성
            c1_text = random.choice(c1_train)
            c2_text = random.choice(c2_train)
            c3_text = random.choice(c3_train)
            pp_text = random.choice(pp_train)
            
            combined_text = shuffle_conditions(c1_text, c2_text, c3_text, pp_text)
            train_data.append({
                'c1_value': c1_levels.index(c1),
                'c2_value': c2_levels.index(c2),
                'c3_value': c3_levels.index(c3),
                'pp_value': pp_levels.index(pp),
                'combined_text': combined_text
            })
        
        # 테스트 데이터 생성
        combined_text = shuffle_conditions(c1_test, c2_test, c3_test, pp_test)
        test_data.append({
            'c1_value': c1_levels.index(c1),
            'c2_value': c2_levels.index(c2),
            'c3_value': c3_levels.index(c3),
            'pp_value': pp_levels.index(pp),
            'combined_text': combined_text
        })
    
    return pd.DataFrame(train_data), pd.DataFrame(test_data)
from torch.utils.data import Dataset
import torch

class ConditionTextDataset(Dataset):
    def __init__(self, df, tokenizer, max_len):
        self.texts = df['combined_text'].to_numpy()
        self.c1_values = df['c1_value'].to_numpy()
        self.c2_values = df['c2_value'].to_numpy()
        self.c3_values = df['c3_value'].to_numpy()
        self.pp_values = df['pp_value'].to_numpy()
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'c1_value': torch.tensor(self.c1_values[idx], dtype=torch.long),
            'c2_value': torch.tensor(self.c2_values[idx], dtype=torch.long),
            'c3_value': torch.tensor(self.c3_values[idx], dtype=torch.long),
            'pp_value': torch.tensor(self.pp_values[idx], dtype=torch.long),
        }

def create_data_loader(df, tokenizer, max_len, batch_size):
    dataset = ConditionTextDataset(df, tokenizer, max_len)
    return DataLoader(dataset, batch_size=batch_size, num_workers=4, shuffle=True)
    
# 데이터셋 생성
train_df, test_df = generate_dataset(num_samples=2500)  # 20000 / 4 = 5000 (각 조합당 4개의 훈련 샘플을 생성하므로)

# CSV 파일로 저장
train_df.to_csv('train_conditions_dataset.csv', index=False)
test_df.to_csv('test_conditions_dataset.csv', index=False)

# 사용 예시
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
train_data_loader = create_data_loader(train_df, tokenizer, max_len=128, batch_size=16)
test_data_loader = create_data_loader(test_df, tokenizer, max_len=128, batch_size=16)

In [2]:
bert = BertModel.from_pretrained('bert-base-uncased')
bert.config.hidden_size

768

In [3]:
class ConditionClassifier(nn.Module):
    def __init__(self, hidden_dim=256):
        super(ConditionClassifier, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.drop = nn.Dropout(p=0.3)
        
        # Shared layers
        self.fc1 = nn.Linear(self.bert.config.hidden_size, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim // 2)
        
        # Separate output layers for each condition
        self.fc_c1 = nn.Linear(hidden_dim // 2, 5)  # 5 levels for c1
        self.fc_c2 = nn.Linear(hidden_dim // 2, 5)  # 5 levels for c2
        self.fc_c3 = nn.Linear(hidden_dim // 2, 4)  # 4 levels for c3
        self.fc_pp = nn.Linear(hidden_dim // 2, 3)  # 3 levels for pp
        
        self.relu = nn.ReLU()

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        
        x = self.drop(pooled_output)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        
        # Separate outputs for each condition
        c1_output = self.fc_c1(x)
        c2_output = self.fc_c2(x)
        c3_output = self.fc_c3(x)
        pp_output = self.fc_pp(x)
        
        return c1_output, c2_output, c3_output, pp_output


def train_epoch(model, data_loader, loss_fn, optimizer, device, scheduler, n_examples, num_epochs):
    model.train()
    losses = []
    
    num_batches = len(data_loader)
    max_train_steps = num_epochs * num_batches
    progress_bar = tqdm(range(max_train_steps), desc="Steps")
    
    for current_epoch in range(num_epochs):
        running_loss = 0.0
        total_steps = 0
        
        for batch in data_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            
            # Convert targets to long (class indices)
            c1_labels = batch['c1_value'].to(device).long()
            c2_labels = batch['c2_value'].to(device).long()
            c3_labels = batch['c3_value'].to(device).long()
            pp_labels = batch['pp_value'].to(device).long()

            # Forward pass
            c1_output, c2_output, c3_output, pp_output = model(input_ids, attention_mask)
            
            # Calculate loss for each condition
            loss_c1 = loss_fn(c1_output, c1_labels)
            loss_c2 = loss_fn(c2_output, c2_labels)
            loss_c3 = loss_fn(c3_output, c3_labels)
            loss_pp = loss_fn(pp_output, pp_labels)
            
            total_loss = loss_c1 + loss_c2 + loss_c3 + loss_pp
    
            # Backward pass and optimization
            total_loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()
    
            # Accumulate loss
            running_loss += total_loss.item()
            losses.append(total_loss.item())
            total_steps += 1

            # Update progress bar
            logs = {"step_loss": total_loss.item(), "lr": scheduler.get_last_lr()[0]}
            progress_bar.set_postfix(**logs)
            progress_bar.update(1)
    
        avg_loss = running_loss / total_steps
        print(f"Epoch {current_epoch+1}/{num_epochs} finished with avg loss: {avg_loss:.4f}")

    save_model(model, optimizer, losses, OUTPUT_DIR, num_epochs)
    return losses


def save_model(model, optimizer, all_losses, output_dir, epoch):
    # 출력 디렉토리가 없으면 생성
    os.makedirs(output_dir, exist_ok=True)
    
    # 모델 저장 경로 설정 (파일 확장자를 .pth로 설정)
    model_save_path = os.path.join(output_dir, f"model_epoch_{epoch}.pth")
    
    # 체크포인트에 모델 상태, 옵티마이저 상태, 손실 값을 함께 저장
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),  # 선택사항: 옵티마이저 상태도 저장
        'all_losses': all_losses,  # 손실 리스트 저장
        'epoch': epoch  # 현재 에포크 저장
    }
    
    # 체크포인트 저장
    torch.save(checkpoint, model_save_path)
    print(f"Model and losses saved to {model_save_path}")
    

In [4]:
# Hyperparameters
RANDOM_SEED = 42
MAX_LEN = 256
BATCH_SIZE = 32
EPOCHS = 2
LEARNING_RATE = 2e-5
OUTPUT_DIR = "output"

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')


# Model initialization
model = ConditionClassifier()
model = model.to(device)

# Optimizer and Scheduler
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# Loss function (CrossEntropy for classification)
loss_fn = nn.CrossEntropyLoss().to(device)

# Training
all_losses = train_epoch(
    model,
    train_data_loader,
    loss_fn,
    optimizer,
    device,
    scheduler,
    len(train_df),
    EPOCHS
)


Steps:  50%|███████████████████████████▍                           | 624/1250 [00:53<00:53, 11.67it/s, lr=1e-5, step_loss=3.68]

Epoch 1/2 finished with avg loss: 4.7185


Steps: 100%|█████████████████████████████████████████████████████████| 1250/1250 [01:47<00:00, 11.68it/s, lr=0, step_loss=2.84]

Epoch 2/2 finished with avg loss: 3.1146


Steps: 100%|█████████████████████████████████████████████████████████| 1250/1250 [01:51<00:00, 11.17it/s, lr=0, step_loss=2.84]

Model and losses saved to output/model_epoch_2.pth





In [None]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.metrics import r2_score

def load_model(model, model_path, device):
    """
    저장된 모델을 불러오는 함수.

    Args:
        model (torch.nn.Module): 불러올 모델의 클래스
        model_path (str): 저장된 모델의 경로
        device (torch.device): 모델을 불러올 디바이스 (CPU/GPU)

    Returns:
        torch.nn.Module: 불러온 모델
    """
    # 저장된 체크포인트 불러오기
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    #optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    all_losses = checkpoint['all_losses']
    #epoch = checkpoint['epoch']

    print(f"Model loaded from {model_path}")
    return model, all_losses

# 평가는 정확도

In [None]:
model, all_losses = load_model(model, './output/model_epoch_20.pth', device)

In [None]:
import matplotlib.pyplot as plt

def plot_training_loss(all_losses, epoch):
    # 손실을 시각화
    plt.figure(figsize=(10, 8))
    plt.plot(all_losses, label='Training Loss')
    
    # 그래프 타이틀 및 축 레이블 설정
    plt.title(f'Training Loss over Epochs (up to Epoch {epoch})')
    plt.xlabel('Iteration')
    plt.ylabel('Loss(log scale)')
    plt.legend()
    
    #plt.yscale('log')
    # 그래프 보여주기
    plt.grid(True)
    plt.show()


plot_training_loss(all_losses, 20)