In [9]:
import random
import pandas as pd
import numpy as np
import os
import cv2
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from tqdm import tqdm
from transformers import SwinForImageClassification
import warnings
import torch
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
warnings.filterwarnings(action='ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

import torch
import torchvision

print("PyTorch version:", torch.__version__)
print("TorchVision version:", torchvision.__version__)

CFG = {
    'IMG_SIZE': 224,
    'EPOCHS': 5,
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 32,
    'SEED': 41
}

PyTorch version: 2.3.0+cu121
TorchVision version: 0.18.0+cu121


In [10]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED'])

In [11]:
df = pd.read_csv('./train.csv')
train, val, _, _ = train_test_split(df, df['label'], test_size=0.3, stratify=df['label'], random_state=CFG['SEED'])

le = preprocessing.LabelEncoder()
train['label'] = le.fit_transform(train['label'])
val['label'] = le.transform(val['label'])

In [12]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert to RGB
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
        label = self.label_list[index] if self.label_list is not None else -1
        return image, label
    
    def __len__(self):
        return len(self.img_path_list)

In [13]:
train_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0),
    ToTensorV2()
])

test_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0),
    ToTensorV2()
])

train_dataset = CustomDataset(train['img_path'].values, train['label'].values, train_transform)
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=0)
val_dataset = CustomDataset(val['img_path'].values, val['label'].values, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [14]:
class CustomSwinTransformer(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.model = SwinForImageClassification.from_pretrained("microsoft/swin-base-patch4-window7-224-in22k", num_labels=num_classes, ignore_mismatched_sizes=True)

    def forward(self, x):
        outputs = self.model(x)
        return outputs.logits  # 직접 logits를 반환

In [15]:
num_classes = len(le.classes_)
model = CustomSwinTransformer(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=CFG['LEARNING_RATE'])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)


Some weights of SwinForImageClassification were not initialized from the model checkpoint at microsoft/swin-base-patch4-window7-224-in22k and are newly initialized because the shapes did not match:
- classifier.bias: found shape torch.Size([21841]) in the checkpoint and torch.Size([25]) in the model instantiated
- classifier.weight: found shape torch.Size([21841, 1024]) in the checkpoint and torch.Size([25, 1024]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [16]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    best_score = 0.0
    best_model = None

    for epoch in range(CFG['EPOCHS']):
        model.train()
        train_loss = []
        # tqdm을 사용하여 훈련 데이터 로더를 감싸 훈련 과정의 진행 상태를 나타냅니다.
        for imgs, labels in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
            imgs = imgs.to(device).float()
            labels = labels.to(device).long()
            optimizer.zero_grad()
            logits = model(imgs)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())

        val_loss, val_score = validation(model, criterion, val_loader, device)
        print(f'Epoch {epoch+1}, Train Loss: {np.mean(train_loss):.5f}, Val Loss: {val_loss:.5f}, Val F1 Score: {val_score:.5f}')
        scheduler.step(val_score)

        if val_score > best_score:
            best_score = val_score
            best_model = model

    return best_model

def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    true_labels, preds = [], []
    with torch.no_grad():
        for imgs, labels in tqdm(val_loader, desc="Validating"):
            imgs = imgs.to(device).float()
            labels = labels.to(device).long()
            outputs = model(imgs)  # 이 부분을 변경
            loss = criterion(outputs, labels)  # 여기에서도 변경
            preds.extend(outputs.argmax(dim=1).detach().cpu().numpy().tolist())
            true_labels.extend(labels.detach().cpu().numpy().tolist())
            val_loss.append(loss.item())
    val_loss = np.mean(val_loss)
    val_score = f1_score(true_labels, preds, average='macro')
    return val_loss, val_score

In [17]:
infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

test = pd.read_csv('./test.csv')
test_dataset = CustomDataset(test['img_path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

Training Epoch 1:   1%|          | 3/347 [00:49<1:34:04, 16.41s/it]


KeyboardInterrupt: 

In [None]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs, _ in test_loader:  # 라벨이 없으므로, _를 사용하여 무시
            imgs = imgs.to(device).float()
            outputs = model(imgs)
            preds.extend(outputs.argmax(dim=1).detach().cpu().numpy().tolist())
    preds = le.inverse_transform(preds)
    return preds

In [None]:
preds = inference(infer_model, test_loader, device)

submit = pd.read_csv('./sample_submission.csv')
submit['label'] = preds
submit.to_csv('./swinmodel.csv', index=False)