<a href="https://colab.research.google.com/github/rkawkclzls/TTT/blob/master/Untitled26.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 필요한 라이브러리 설치
!pip install transformers datasets

# 필요한 라이브러리 임포트
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.optim import Adam
from transformers import GPT2Tokenizer, GPT2Model
from datasets import load_dataset

# GPU 사용 가능 여부 확인
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# AG News 데이터셋 로드
ds = load_dataset("fancyzhx/ag_news")

# GPT 토크나이저 로드 및 패딩 토큰 추가
tokenizer = GPT2Tokenizer.from_pretrained('openai-gpt')
tokenizer.pad_token = tokenizer.unk_token

# 데이터 필터링 함수
def filter_dataset(example):
    return example['text'] and isinstance(example['text'], str)

# collate_fn 정의: 배치 데이터 처리
def collate_fn(batch):
    texts, labels = [], []
    for row in batch:
        if row['text'] and isinstance(row['text'], str):
            labels.append(row['label'])
            texts.append(row['text'])

    if not texts:  # 모든 텍스트가 유효하지 않은 경우
        return None, None

    encoded = tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt")
    return encoded, torch.LongTensor(labels)

# 필터링된 데이터셋 생성
filtered_train = ds['train'].filter(filter_dataset)
filtered_test = ds['test'].filter(filter_dataset)

# 데이터 로더 생성
train_loader = DataLoader(filtered_train, batch_size=16, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(filtered_test, batch_size=16, shuffle=False, collate_fn=collate_fn)

# TextClassifier 모델 정의
class TextClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = GPT2Model.from_pretrained('openai-gpt')
        self.classifier = nn.Linear(768, 4)  # AG News has 4 classes

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        # 마지막 토큰의 hidden state를 사용
        last_hidden_state = outputs.last_hidden_state
        pooled_output = last_hidden_state[:, -1, :]
        return self.classifier(pooled_output)

# 모델 인스턴스 생성 및 GPU로 이동
model = TextClassifier().to(device)

# 손실 함수 및 옵티마이저 정의
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=5e-5)

# 학습 함수 정의
def train(model, dataloader, optimizer, loss_fn):
    model.train()
    total_loss = 0
    total_batches = 0
    for batch in dataloader:
        if batch[0] is None:  # None 값 건너뛰기
            continue
        inputs, labels = batch
        input_ids = inputs['input_ids'].to(device)
        attention_mask = inputs['attention_mask'].to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_batches += 1
    return total_loss / total_batches if total_batches > 0 else 0

# 평가 함수 정의
def evaluate(model, dataloader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in dataloader:
            if batch[0] is None:  # None 값 건너뛰기
                continue
            inputs, labels = batch
            input_ids = inputs['input_ids'].to(device)
            attention_mask = inputs['attention_mask'].to(device)
            labels = labels.to(device)

            outputs = model(input_ids, attention_mask)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total if total > 0 else 0

# 학습 루프
n_epochs = 3
for epoch in range(n_epochs):
    train_loss = train(model, train_loader, optimizer, loss_fn)
    print(f"Epoch {epoch+1}/{n_epochs}, Train Loss: {train_loss:.4f}")

# 최종 평가
test_acc = evaluate(model, test_loader)
print(f"Final Test Accuracy: {test_acc:.4f}")