In [1]:
import pandas as pd
import numpy as np
import re
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score
from transformers import AutoTokenizer, AutoModel, AdamW, get_linear_schedule_with_warmup
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
from tqdm import tqdm
from sklearn.utils.class_weight import compute_class_weight
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig

In [2]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

def set_seed(seed=42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
set_seed()

class CFG:
    max_len = 512
    batch_size = 8  # batch_size * accumulation_steps의 배치 효과
    learning_rate = 2e-5
    epochs = 12
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    accumulation_steps = 4  # Gradient accumulation steps

# 데이터 로드 및 전처리
train_df = pd.read_csv("data/train.csv")
test_df = pd.read_csv("data/test.csv")

train_df.dropna(inplace=True)
train_df.drop_duplicates(subset=['제목', '키워드'], keep='first', inplace=True)

def normalize_title(text):
    text = re.sub(r'\s+', ' ', text).strip()
    return text.strip()

def normalize_keywords(text):
    text = re.sub(r'[^가-힣\s,]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text.strip()

def clean_keywords(keywords):
    keywords = keywords.split(',')
    keywords = [keyword.strip() for keyword in keywords if keyword.strip()]
    return ' '.join(keywords)

train_df['title'] = train_df['제목'].apply(normalize_title)
train_df['keywords'] = train_df['키워드'].apply(lambda x: clean_keywords(normalize_keywords(x)))
train_df['text'] = train_df['title'] + ' [SEP] ' + train_df['keywords']

test_df['title'] = test_df['제목'].apply(normalize_title)
test_df['keywords'] = test_df['키워드'].apply(lambda x: clean_keywords(normalize_keywords(x)))
test_df['text'] = test_df['title'] + ' [SEP] ' + test_df['keywords']

# 레이블 인코딩
label_encoder = {label: i for i, label in enumerate(train_df['분류'].unique())}
train_df['label'] = train_df['분류'].map(label_encoder)

# 학습 및 검증 데이터 분할
train_df, val_df = train_test_split(train_df, test_size=0.2, stratify=train_df['분류'], random_state=42)

# TF-IDF 벡터라이저 초기화 및 적용
tfidf_vectorizer = TfidfVectorizer(max_features=1000)  # 상위 1000개 특징만 사용
train_tfidf = tfidf_vectorizer.fit_transform(train_df['text'])
val_tfidf = tfidf_vectorizer.transform(val_df['text'])
test_tfidf = tfidf_vectorizer.transform(test_df['text'])

class TfidfEnhancedDataset(Dataset):
    def __init__(self, texts, tfidf_features, labels, tokenizer, max_len=512):
        self.texts = texts
        self.tfidf_features = tfidf_features
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = str(self.texts[item])
        tfidf_feature = torch.FloatTensor(self.tfidf_features[item].toarray().squeeze())
        label = self.labels[item] if self.labels is not None else -1

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'tfidf_features': tfidf_feature,
            'labels': torch.tensor(label, dtype=torch.long)
        }

class TfidfEnhancedClassifier(nn.Module):
    def __init__(self, base_model, num_labels, tfidf_dim):
        super(TfidfEnhancedClassifier, self).__init__()
        self.base_model = base_model
        self.dropout = nn.Dropout(0.1)
        self.attention = nn.Linear(self.base_model.config.hidden_size, 1)
        self.tfidf_layer = nn.Linear(tfidf_dim, self.base_model.config.hidden_size)
        self.classifier = nn.Linear(self.base_model.config.hidden_size * 2, num_labels)

    def forward(self, input_ids, attention_mask, tfidf_features):
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        hidden_states = outputs.last_hidden_state

        attn_scores = self.attention(hidden_states).squeeze(-1)
        attn_weights = torch.softmax(attn_scores, dim=-1)
        context_vector = torch.sum(attn_weights.unsqueeze(-1) * hidden_states, dim=1)

        tfidf_vector = self.tfidf_layer(tfidf_features)

        combined_vector = torch.cat([context_vector, tfidf_vector], dim=1)
        pooled_output = self.dropout(combined_vector)
        logits = self.classifier(pooled_output)

        return logits


In [3]:
model = AutoModel.from_pretrained("klue/roberta-large")
tokenizer = AutoTokenizer.from_pretrained("klue/roberta-large")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of RobertaModel were not initialized from the model checkpoint at klue/roberta-large and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [4]:
# 데이터셋 및 데이터로더 생성
train_dataset = TfidfEnhancedDataset(train_df.text.tolist(), train_tfidf, train_df.label.tolist(), tokenizer, CFG.max_len)
val_dataset = TfidfEnhancedDataset(val_df.text.tolist(), val_tfidf, val_df.label.tolist(), tokenizer, CFG.max_len)
test_dataset = TfidfEnhancedDataset(test_df.text.tolist(), test_tfidf, None, tokenizer, CFG.max_len)

train_loader = DataLoader(train_dataset, batch_size=CFG.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CFG.batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False)

# 모델 생성
num_labels = len(label_encoder)
tfidf_dim = train_tfidf.shape[1]
model = TfidfEnhancedClassifier(model, num_labels, tfidf_dim).to(CFG.device)

In [5]:
# 클래스 가중치 계산
class_weights = compute_class_weight('balanced', classes=np.unique(train_df['label']), y=train_df['label'])
class_weights = torch.tensor(class_weights, dtype=torch.float).to(CFG.device)

# 손실 함수 및 옵티마이저 설정
loss_fn = nn.CrossEntropyLoss(weight=class_weights)
optimizer = AdamW(model.parameters(), lr=CFG.learning_rate)

# 학습률 스케줄러 설정
total_steps = len(train_loader) * CFG.epochs // CFG.accumulation_steps
scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps=int(0.1 * total_steps),
                                            num_training_steps=total_steps)



In [6]:
# 학습 루프
for epoch in range(CFG.epochs):
    model.train()
    train_loss = 0
    optimizer.zero_grad()
    for idx, batch in enumerate(tqdm(train_loader, desc=f'Epoch {epoch + 1}/{CFG.epochs}')):
        input_ids = batch['input_ids'].to(CFG.device)
        attention_mask = batch['attention_mask'].to(CFG.device)
        tfidf_features = batch['tfidf_features'].to(CFG.device)
        labels = batch['labels'].to(CFG.device)

        outputs = model(input_ids, attention_mask=attention_mask, tfidf_features=tfidf_features)
        loss = loss_fn(outputs, labels)
        loss = loss / CFG.accumulation_steps
        train_loss += loss.item()

        loss.backward()

        if (idx + 1) % CFG.accumulation_steps == 0 or (idx + 1) == len(train_loader):
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()

    avg_train_loss = train_loss / len(train_loader)
    print(f"Epoch {epoch + 1}/{CFG.epochs}, Train Loss: {avg_train_loss:.4f}")

    # 검증
    model.eval()
    val_predictions = []
    val_true_labels = []
    val_loss = 0

    with torch.no_grad():
        for batch in tqdm(val_loader, desc='Validating'):
            input_ids = batch['input_ids'].to(CFG.device)
            attention_mask = batch['attention_mask'].to(CFG.device)
            tfidf_features = batch['tfidf_features'].to(CFG.device)
            labels = batch['labels'].to(CFG.device)

            outputs = model(input_ids, attention_mask=attention_mask, tfidf_features=tfidf_features)
            loss = loss_fn(outputs, labels)
            val_loss += loss.item()

            _, preds = torch.max(outputs, dim=1)
            val_predictions.extend(preds.cpu().tolist())
            val_true_labels.extend(labels.cpu().tolist())

    val_f1 = f1_score(val_true_labels, val_predictions, average='macro')
    val_accuracy = accuracy_score(val_true_labels, val_predictions)
    avg_val_loss = val_loss / len(val_loader)
    print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")

Epoch 1/12:   0%|          | 1/5432 [00:04<6:07:22,  4.06s/it]


OutOfMemoryError: CUDA out of memory. Tried to allocate 128.00 MiB. GPU 0 has a total capacity of 14.75 GiB of which 93.06 MiB is free. Process 33011 has 14.65 GiB memory in use. Of the allocated memory 14.50 GiB is allocated by PyTorch, and 19.72 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
# 테스트 예측
model.eval()
test_predictions = []
with torch.no_grad():
    for batch in tqdm(test_loader, desc='Predicting'):
        input_ids = batch['input_ids'].to(CFG.device)
        attention_mask = batch['attention_mask'].to(CFG.device)
        tfidf_features = batch['tfidf_features'].to(CFG.device)

        outputs = model(input_ids, attention_mask=attention_mask, tfidf_features=tfidf_features)
        _, preds = torch.max(outputs, dim=1)
        test_predictions.extend(preds.cpu().tolist())

# 예측 결과 디코딩
label_decoder = {i: label for label, i in label_encoder.items()}
decoded_predictions = [label_decoder[pred] for pred in test_predictions]

# 제출 파일 생성
sample_submission = pd.read_csv("data/sample_submission.csv")
sample_submission["분류"] = decoded_predictions
sample_submission.to_csv("submission_klue_roberta_large.csv", encoding='UTF-8-sig', index=False)
print("Prediction completed and submission file created.")