In [None]:
import os
import random
import pandas as pd
import numpy as np
from PIL import Image
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision.models as models
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch import nn, optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

CFG = {
    'IMG_SIZE': 224,
    'BATCH_SIZE': 8,
    'EPOCHS': 50,
    'LEARNING_RATE': 1e-4,
    'SEED' : 42
}

Using device: cuda


In [2]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CFG['SEED'])


In [3]:
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None, is_test=False):
        self.root_dir = root_dir
        self.transform = transform
        self.is_test = is_test
        self.samples = []
        if is_test:
            for fname in sorted(os.listdir(root_dir)):
                if fname.lower().endswith('.jpg'):
                    img_path = os.path.join(root_dir, fname)
                    self.samples.append((img_path,))
        else:
            self.classes = sorted(os.listdir(root_dir))
            self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
            for cls_name in self.classes:
                cls_folder = os.path.join(root_dir, cls_name)
                for fname in os.listdir(cls_folder):
                    if fname.lower().endswith('.jpg'):
                        img_path = os.path.join(cls_folder, fname)
                        label = self.class_to_idx[cls_name]
                        self.samples.append((img_path, label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        if self.is_test:
            img_path = self.samples[idx][0]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image
        else:
            img_path, label = self.samples[idx]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image, label


In [4]:
rgb_transform = transforms.Compose([
    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
gray_transform = transforms.Compose([
    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.449], std=[0.226])
])


In [5]:
train_root = './train'
test_root = './test'

full_dataset = CustomImageDataset(train_root, transform=None)
targets = [label for _, label in full_dataset.samples]
class_names = full_dataset.classes

train_idx, val_idx = train_test_split(
    range(len(targets)), test_size=0.2, stratify=targets, random_state=42
)

train_dataset_rgb = Subset(CustomImageDataset(train_root, transform=rgb_transform), train_idx)
val_dataset_rgb = Subset(CustomImageDataset(train_root, transform=rgb_transform), val_idx)
train_dataset_gray = Subset(CustomImageDataset(train_root, transform=gray_transform), train_idx)
val_dataset_gray = Subset(CustomImageDataset(train_root, transform=gray_transform), val_idx)

train_loader_rgb = DataLoader(train_dataset_rgb, batch_size=CFG['BATCH_SIZE'], shuffle=True)
val_loader_rgb = DataLoader(val_dataset_rgb, batch_size=CFG['BATCH_SIZE'], shuffle=False)
train_loader_gray = DataLoader(train_dataset_gray, batch_size=CFG['BATCH_SIZE'], shuffle=True)
val_loader_gray = DataLoader(val_dataset_gray, batch_size=CFG['BATCH_SIZE'], shuffle=False)


In [6]:
def modify_first_conv(model, model_type):
    if model_type == 'densenet':
        conv = model.features.conv0
        new_conv = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        with torch.no_grad():
            new_conv.weight = nn.Parameter(conv.weight.sum(dim=1, keepdim=True))
        model.features.conv0 = new_conv
    elif model_type == 'vgg':
        conv = model.features[0]
        new_conv = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1)
        with torch.no_grad():
            new_conv.weight = nn.Parameter(conv.weight.sum(dim=1, keepdim=True))
        model.features[0] = new_conv
    elif model_type == 'efficientnet':
        conv = model._conv_stem
        new_conv = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1, bias=False)
        with torch.no_grad():
            new_conv.weight = nn.Parameter(conv.weight.sum(dim=1, keepdim=True))
        model._conv_stem = new_conv
    return model


In [7]:
import timm
import torch.nn as nn
import torch

class EnsembleModel(nn.Module):
    def __init__(self, num_classes, is_gray=False):
        super().__init__()
        in_chans = 1 if is_gray else 3

        # timm 모델 로딩 (분류기 제거)
        self.densenet = timm.create_model('densenet121', pretrained=True, num_classes=0, in_chans=in_chans)
        self.vgg = timm.create_model('vgg16_bn', pretrained=True, num_classes=0, in_chans=in_chans)
        self.effnet = timm.create_model('efficientnet_b0', pretrained=True, num_classes=0, in_chans=in_chans)

        # 각 모델 출력 차원 (미리 확인된 값)
        self.dim1 = 1024  # densenet121
        self.dim2 = 4096  # vgg16_bn
        self.dim3 = 1280  # efficientnet_b0

        # 최종 classifier
        self.classifier = nn.Linear(self.dim1 + self.dim2 + self.dim3, num_classes)

    def forward(self, x):
        x1 = self.densenet(x)
        x2 = self.vgg(x)
        x3 = self.effnet(x)
        x_cat = torch.cat((x1, x2, x3), dim=1)
        return self.classifier(x_cat)



  from .autonotebook import tqdm as notebook_tqdm


In [8]:
class MultiStateEnsemble(nn.Module):
    def __init__(self, num_classes, gray_weight=0.6, rgb_weight=0.4):
        super().__init__()
        self.rgb_ensemble = EnsembleModel(num_classes, is_gray=False)
        self.gray_ensemble = EnsembleModel(num_classes, is_gray=True)
        total = gray_weight + rgb_weight
        self.gray_weight = gray_weight / total
        self.rgb_weight = rgb_weight / total

    def forward(self, rgb_x, gray_x):
        rgb_logits = self.rgb_ensemble(rgb_x)
        gray_logits = self.gray_ensemble(gray_x)
        rgb_probs = F.softmax(rgb_logits, dim=1)
        gray_probs = F.softmax(gray_logits, dim=1)
        final_probs = self.rgb_weight * rgb_probs + self.gray_weight * gray_probs
        return final_probs


In [9]:
model = MultiStateEnsemble(num_classes=len(class_names)).to(device)
optimizer = optim.Adam(model.parameters(), lr=CFG['LEARNING_RATE'])
criterion = nn.CrossEntropyLoss()

for epoch in range(CFG['EPOCHS']):
    model.train()
    train_loss = 0
    for (rgb_imgs, labels), (gray_imgs, _) in zip(train_loader_rgb, train_loader_gray):
        rgb_imgs, gray_imgs, labels = rgb_imgs.to(device), gray_imgs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(rgb_imgs, gray_imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    print(f"Epoch {epoch+1} Train Loss: {train_loss/len(train_loader_rgb):.4f}")


Epoch 1 Train Loss: 5.9560
Epoch 2 Train Loss: 5.8201
Epoch 3 Train Loss: 5.7197
Epoch 4 Train Loss: 5.6774
Epoch 5 Train Loss: 5.6557
Epoch 6 Train Loss: 5.6445
Epoch 7 Train Loss: 5.6361
Epoch 8 Train Loss: 5.6282
Epoch 9 Train Loss: 5.6238
Epoch 10 Train Loss: 5.6187
Epoch 11 Train Loss: 5.6145
Epoch 12 Train Loss: 5.6115
Epoch 13 Train Loss: 5.6083
Epoch 14 Train Loss: 5.6050
Epoch 15 Train Loss: 5.6031
Epoch 16 Train Loss: 5.6032
Epoch 17 Train Loss: 5.6021
Epoch 18 Train Loss: 5.6016
Epoch 19 Train Loss: 5.6012
Epoch 20 Train Loss: 5.6002
Epoch 21 Train Loss: 5.5992
Epoch 22 Train Loss: 5.5987
Epoch 23 Train Loss: 5.5978
Epoch 24 Train Loss: 5.5967
Epoch 25 Train Loss: 5.5961
Epoch 26 Train Loss: 5.5962
Epoch 27 Train Loss: 5.5953
Epoch 28 Train Loss: 5.5956
Epoch 29 Train Loss: 5.5944
Epoch 30 Train Loss: 5.5937
Epoch 31 Train Loss: 5.5932
Epoch 32 Train Loss: 5.5924
Epoch 33 Train Loss: 5.5925
Epoch 34 Train Loss: 5.5923
Epoch 35 Train Loss: 5.5924
Epoch 36 Train Loss: 5.5919
E

#### 추론 부분 아직은 돌리지 말것

In [12]:
test_dataset_rgb = CustomImageDataset(test_root, transform=rgb_transform, is_test=True)
test_dataset_gray = CustomImageDataset(test_root, transform=gray_transform, is_test=True)
test_loader_rgb = DataLoader(test_dataset_rgb, batch_size=CFG['BATCH_SIZE'], shuffle=False)
test_loader_gray = DataLoader(test_dataset_gray, batch_size=CFG['BATCH_SIZE'], shuffle=False)

model.eval()
results = []
with torch.no_grad():
    for rgb_imgs, gray_imgs in zip(test_loader_rgb, test_loader_gray):
        rgb_imgs, gray_imgs = rgb_imgs.to(device), gray_imgs.to(device)
        probs = model(rgb_imgs, gray_imgs)
        for prob in probs.cpu():
            result = {class_names[i]: prob[i].item() for i in range(len(class_names))}
            results.append(result)
pred = pd.DataFrame(results)


In [13]:
submission = pd.read_csv('./sample_submission.csv', encoding='utf-8')
class_columns = submission.columns[1:]
pred = pred[class_columns]
submission[class_columns] = pred.values
submission.to_csv('ensemble_submission.csv', index=False, encoding='utf-8')