In [None]:
from google.colab import files
files.upload()  # kaggle.json 파일 업로드

!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!pip install kaggle

In [None]:
import os
os.environ["KAGGLE_USERNAME"]="tg0120kim"
os.environ["KAGGLE_KEY"]="069f32af449c35caea8215527f3a75b8"

In [None]:
!kaggle competitions download -c osai-project
!unzip -o osai-project.zip

import pandas as pd

# 데이터 불러오기
train = pd.read_csv('./train/text_label.csv')
test = pd.read_csv('./test/text_label.csv')
val = pd.read_csv('./val/text_label.csv')
sol = pd.read_csv('./solution_sample.csv')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import Compose, Resize, Normalize, ToTensor, RandomHorizontalFlip, RandomRotation, ColorJitter
from torchvision.models import efficientnet_b3
from transformers import BertTokenizer, BertModel
from PIL import Image
from tqdm import tqdm
import pandas as pd
import numpy as np
import random

# MixUp 함수 정의
def mixup(data, targets, alpha=1.0):
    indices = torch.randperm(data.size(0))
    shuffled_data = data[indices]
    shuffled_targets = targets[indices]

    lam = np.random.beta(alpha, alpha)
    mixed_data = lam * data + (1 - lam) * shuffled_data

    return mixed_data, targets, shuffled_targets, lam

# 데이터셋 정의
class KaggleDataset(Dataset):
    def __init__(self, dataframe, tokenizer, transform=None, max_len=128):
        self.dataframe = dataframe
        self.tokenizer = tokenizer
        self.transform = transform if transform else Compose([
            Resize((224, 224)),
            RandomHorizontalFlip(p=0.5),
            RandomRotation(15),
            ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
            ToTensor(),
            Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        self.max_len = max_len

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        img_path = row['img']
        text = row['text']
        label = row.get('label', -1)

        # 이미지 처리
        image = Image.open(img_path).convert('RGB')
        image = self.transform(image)

        # 텍스트 처리
        encoding = self.tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_tensors="pt"
        )
        label_tensor = torch.tensor(label, dtype=torch.float32) if label != -1 else torch.tensor(-1, dtype=torch.float32)
        return {
            'image': image,
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'label': label_tensor
        }

# 모델 정의
class MultimodalModel(nn.Module):
    def __init__(self, cnn_output_size=512, bert_output_size=768, num_classes=2):
        super(MultimodalModel, self).__init__()
        # EfficientNet 기반 이미지 처리
        self.cnn = efficientnet_b3(pretrained=True)
        self.cnn.classifier[1] = nn.Linear(self.cnn.classifier[1].in_features, cnn_output_size)

        # BERT 기반 텍스트 처리
        self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.text_fc = nn.Sequential(
            nn.Linear(bert_output_size, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5)
        )

        # 최종 결합 및 분류
        self.fc = nn.Sequential(
            nn.Linear(cnn_output_size + 512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, images, input_ids, attention_mask):
        # 이미지 처리
        image_features = self.cnn(images)

        # 텍스트 처리
        bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        text_features = self.text_fc(bert_output.last_hidden_state[:, 0, :])

        # 결합 및 최종 출력
        combined_features = torch.cat((image_features, text_features), dim=1)
        output = self.fc(combined_features)
        return output

# 데이터 로드 및 전처리
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
transform = Compose([
    Resize((224, 224)),
    RandomHorizontalFlip(p=0.5),
    RandomRotation(15),
    ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    ToTensor(),
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = KaggleDataset(dataframe=train, tokenizer=tokenizer, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4, pin_memory=True)

val_dataset = KaggleDataset(dataframe=val, tokenizer=tokenizer, transform=transform)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4, pin_memory=True)

# 모델, 손실 함수, 옵티마이저 정의
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultimodalModel().to(device)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# Lookahead + SAM 옵티마이저 정의
base_optimizer = optim.AdamW(model.parameters(), lr=3e-5, weight_decay=1e-4)
optimizer = SAM(model.parameters(), base_optimizer)

# Scheduler
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=5, T_mult=1)

# Mixed Precision Training
scaler = torch.cuda.amp.GradScaler()

# 학습 루프
num_epochs = 10
grad_accum_steps = 2
for epoch in range(num_epochs):
    model.train()
    train_loss, train_acc = 0, 0

    for idx, batch in enumerate(tqdm(train_loader, desc=f"Training Epoch {epoch+1}")):
        images = batch['image'].to(device, non_blocking=True)
        input_ids = batch['input_ids'].to(device, non_blocking=True)
        attention_mask = batch['attention_mask'].to(device, non_blocking=True)
        labels = batch['label'].to(device, non_blocking=True).long()

        # MixUp 적용
        if random.random() < 0.5:
            images, labels_a, labels_b, lam = mixup(images, labels)
            with torch.cuda.amp.autocast():
                outputs = model(images, input_ids, attention_mask)
                loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)
        else:
            with torch.cuda.amp.autocast():
                outputs = model(images, input_ids, attention_mask)
                loss = criterion(outputs, labels)

        loss = loss / grad_accum_steps
        scaler.scale(loss).backward()

        # Gradient Accumulation
        if (idx + 1) % grad_accum_steps == 0 or (idx + 1) == len(train_loader):
            optimizer.first_step(zero_grad=True)
            scaler.step(optimizer)
            scaler.update()
            optimizer.second_step(zero_grad=True)

        train_loss += loss.item() * grad_accum_steps
        train_acc += (outputs.argmax(dim=1) == labels).float().mean()

    scheduler.step()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss/len(train_loader):.4f}, Accuracy: {train_acc/len(train_loader):.4f}")


In [None]:
!kaggle competitions submit -c osai-project -f submission_optimized.csv -m "Final optimized model submission"