### [AI vs Human 텍스트 판별 해커톤 -월간 데이콘 쇼츠](https://dacon.io/competitions/official/236178/overview/description)

4일 후의 프로젝트를 위해 준비된 이 데이터셋에는 인간이 작성한 리뷰와 인공지능이 작성한 리뷰가 섞여 있습니다.

하지만 어떤 리뷰가 인간에 의해 작성되었는지를 나타내는 레이블 대부분이 사라져버렸습니다.

여러분의 임무는 일부 레이블이 남아있는 학습용 데이터셋을 활용하여,

테스트 데이터셋의 네 개 리뷰 중 어떤 것이 실제 인간에 의해 작성된 것인지 정확하게 예측하는 것입니다!

당신의 통찰력을 활용하여 테스트 데이터셋의 'label' 필드를 복구해주세요!

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import numpy as np
import random
import os

from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split

from tqdm import tqdm

import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertModel, BertTokenizer

import warnings
warnings.filterwarnings("ignore")

In [None]:
def set_seed(seed=42):
    np.random.seed(seed)  # 이 부분이 pandas의 sample 함수에도 영향을 줍니다.
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed()

In [None]:
# Setting device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
# Load data
train_data = pd.read_csv('/content/drive/MyDrive/ESAA/OB/data/ai_vs_human_text/train.csv')
test_data = pd.read_csv('/content/drive/MyDrive/ESAA/OB/data/ai_vs_human_text/test.csv')
sample_submission = pd.read_csv('/content/drive/MyDrive/ESAA/OB/data/ai_vs_human_text/sample_submission.csv')
print(train_data.shape, test_data.shape)

(50, 6) (1100, 5)


In [None]:
# Melt the training data
train_data_melted = train_data.melt(id_vars=['id', 'label'],
                                    value_vars=['sentence1', 'sentence2', 'sentence3', 'sentence4'],
                                    var_name='sentence_type', value_name='text')

# Create binary labels: 1 if the sentence_type corresponds to the label, else 0
train_data_melted['binary_label'] = (train_data_melted['sentence_type'] == train_data_melted['label']).astype(int)

# Reshape the test data similarly
test_data_melted = test_data.melt(id_vars=['id'],
                                  value_vars=['sentence1', 'sentence2', 'sentence3', 'sentence4'],
                                  var_name='sentence_type', value_name='text')


In [None]:
# Create binary labels: 1 if the sentence_type corresponds to the label, else 0
train_data_melted['binary_label'] = (train_data_melted['sentence_type'] == train_data_melted['label']).astype(int)

# Reshape the test data similarly
test_data_melted = test_data.melt(id_vars=['id'],
                                  value_vars=['sentence1', 'sentence2', 'sentence3', 'sentence4'],
                                  var_name='sentence_type', value_name='text')

In [None]:
# Constants
MODEL_NAME = 'bert-base-uncased'
MAX_LEN = 128
BATCH_SIZE = 16
EPOCHS = 3
LEARNING_RATE = 2e-5
SEED = 42


In [None]:
# Dataset class
class TextDataset(Dataset):
    def __init__(self, texts, labels=None, tokenizer=None, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=True,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        ids = encoding['input_ids'].flatten()
        mask = encoding['attention_mask'].flatten()
        token_type_ids = encoding['token_type_ids'].flatten()

        item = {
            'ids': ids,
            'mask': mask,
            'token_type_ids': token_type_ids
        }

        if self.labels is not None:
            item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)

        return item

In [None]:
# Load pretrained model and tokenizer
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
model = BertModel.from_pretrained(MODEL_NAME)

In [None]:
# Add classification layer on top of BERT model
class BertClassifier(nn.Module):
    def __init__(self, n_classes):
        super(BertClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(MODEL_NAME)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size, n_classes)

    def forward(self, input_ids, attention_mask, token_type_ids):
        pooled_output = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )[1]
        output = self.drop(pooled_output)
        return self.out(output)

model = BertClassifier(n_classes=2)
model = model.to(device)

In [None]:
# Create datasets and data loaders
train_dataset = TextDataset(
    texts=train_data_melted['text'].values,
    labels=train_data_melted['binary_label'].values,
    tokenizer=tokenizer,
    max_len=MAX_LEN
)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True
)

val_dataset = TextDataset(
    texts=train_data_melted['text'].values,
    labels=train_data_melted['binary_label'].values,
    tokenizer=tokenizer,
    max_len=MAX_LEN
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False
)

In [None]:
# Define loss function
loss_fn = nn.CrossEntropyLoss().to(device)

# Training function
def train_epoch(model, data_loader, optimizer, device, scheduler):
    model = model.train()
    losses = []
    correct_predictions = 0

    for d in data_loader:
        input_ids = d['ids'].to(device)
        attention_mask = d['mask'].to(device)
        token_type_ids = d['token_type_ids'].to(device)
        labels = d['labels'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )

        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, labels)

        correct_predictions += torch.sum(preds == labels)
        losses.append(loss.item())

        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

    return correct_predictions.double() / len(data_loader.dataset), np.mean(losses)

In [None]:
# Evaluation function
def eval_model(model, data_loader, device):
    model = model.eval()
    losses = []
    correct_predictions = 0

    with torch.no_grad():
        for d in data_loader:
            input_ids = d['ids'].to(device)
            attention_mask = d['mask'].to(device)
            token_type_ids = d['token_type_ids'].to(device)
            labels = d['labels'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids
            )

            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, labels)

            correct_predictions += torch.sum(preds == labels)
            losses.append(loss.item())

    return correct_predictions.double() / len(data_loader.dataset), np.mean(losses)

In [None]:
# Training the model
from collections import defaultdict

history = defaultdict(list)
best_accuracy = 0

optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
total_steps = len(train_loader) * EPOCHS
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=total_steps//EPOCHS, gamma=0.1)

model = model.to(device)

for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-' * 10)

    train_acc, train_loss = train_epoch(
        model,
        train_loader,
        optimizer,
        device,
        scheduler
    )

    print(f'Train loss {train_loss} accuracy {train_acc}')

    val_acc, val_loss = eval_model(
        model,
        val_loader,
        device
    )

    print(f'Val   loss {val_loss} accuracy {val_acc}')
    print()

    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_loss'].append(val_loss)

    if val_acc > best_accuracy:
        torch.save(model.state_dict(), 'best_model_state.bin')
        best_accuracy = val_acc

Epoch 1/3
----------
Train loss 0.3714320814380279 accuracy 0.88
Val   loss 0.07748464323007144 accuracy 1.0

Epoch 2/3
----------
Train loss 0.07103936488811786 accuracy 1.0
Val   loss 0.05315554371246925 accuracy 1.0

Epoch 3/3
----------
Train loss 0.06026703004653637 accuracy 1.0
Val   loss 0.051579076796770096 accuracy 1.0



In [None]:
# Prediction on test data
test_dataset = TextDataset(
    texts=test_data_melted['text'].values,
    tokenizer=tokenizer,
    max_len=MAX_LEN
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False
)

# Load best model
model.load_state_dict(torch.load('best_model_state.bin'))

<All keys matched successfully>

In [None]:
# Prediction function
def predict(model, data_loader):
    model = model.eval()
    predictions = []

    with torch.no_grad():
        for d in data_loader:
            input_ids = d['ids'].to(device)
            attention_mask = d['mask'].to(device)
            token_type_ids = d['token_type_ids'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids
            )

            # Collect logits and apply softmax to get probabilities
            logits = outputs
            probs = torch.softmax(logits, dim=1)
            predictions.append(probs.cpu().numpy())

    return np.vstack(predictions)

In [None]:
# Get probabilities for each sentence in test set
test_pred_probs = predict(model, test_loader)

# Assign probabilities to test data
test_data_melted['prob'] = test_pred_probs[:, 1]

# Extract sentence number and adjust to be 1-4
submission = test_data_melted[['id', 'sentence_type', 'prob']]
submission['sentence_number'] = submission['sentence_type'].str.extract('(\d+)').astype(int)

# Select top 2 sentences for each id
top2_sentences = submission.sort_values(['id', 'prob'], ascending=[True, False]).groupby('id').head(2)

# Ensure labels are in the range 1-4
top2_sentences['sentence_number'] = top2_sentences['sentence_number'] + 1

# Create a DataFrame to hold the final submission
final_submission = pd.DataFrame(top2_sentences['id'].unique(), columns=['id'])

# Split the top 2 sentences into separate columns
top2_sentences = top2_sentences.groupby('id')['sentence_number'].apply(list).reset_index()

# Assign the top 2 sentence numbers to separate columns in final submission DataFrame
final_submission['label1'] = top2_sentences['sentence_number'].apply(lambda x: x[0] if len(x) > 0 else np.nan)
final_submission['label2'] = top2_sentences['sentence_number'].apply(lambda x: x[1] if len(x) > 1 else np.nan)

# Save to CSV
final_submission.to_csv('./submission.csv', index=False)

final_submission.head()

Unnamed: 0,id,label1,label2
0,TEST_0000,2,5
1,TEST_0001,5,4
2,TEST_0002,2,3
3,TEST_0003,4,5
4,TEST_0004,2,3
