In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import SwinForImageClassification, AutoFeatureExtractor
from PIL import Image
import numpy as np
import pandas as pd
import os
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
from scipy.stats import pearsonr

In [None]:
CFG = {
    'EPOCHS': 10,
    'LEARNING_RATE': 5e-5,
    'BATCH_SIZE': 32,
    'SEED': 41
}

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels=None, transform=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert("RGB")  # 이미지를 RGB로 변환

        if self.transform:
            image = self.transform(image)

        if self.labels is not None:
            label = torch.tensor(self.labels[idx], dtype=torch.float)  # 다중 레이블 벡터로 변환
            return image, label
        return image

In [None]:
# 유전자 개수에 맞게 출력 노드 수정
num_genes = 3467  # 유전자 수를 데이터에 맞게 설정

model = SwinForImageClassification.from_pretrained('microsoft/swin-tiny-patch4-window7-224')
model.classifier = nn.Linear(model.classifier.in_features, num_genes)  # 유전자 개수에 맞게 출력 설정
model.to(device)


SwinForImageClassification(
  (swin): SwinModel(
    (embeddings): SwinEmbeddings(
      (patch_embeddings): SwinPatchEmbeddings(
        (projection): Conv2d(3, 96, kernel_size=(4, 4), stride=(4, 4))
      )
      (norm): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): SwinEncoder(
      (layers): ModuleList(
        (0): SwinStage(
          (blocks): ModuleList(
            (0-1): 2 x SwinLayer(
              (layernorm_before): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
              (attention): SwinAttention(
                (self): SwinSelfAttention(
                  (query): Linear(in_features=96, out_features=96, bias=True)
                  (key): Linear(in_features=96, out_features=96, bias=True)
                  (value): Linear(in_features=96, out_features=96, bias=True)
                  (dropout): Dropout(p=0.0, inplace=False)
                )
                (output): SwinSelfOutput(
  

In [None]:
ROOT_DIR = '/content/drive/MyDrive/2024_MAI_DACON/Open/'
%cd /content/drive/MyDrive/2024_MAI_DACON/Open

df = pd.read_csv(os.path.join(ROOT_DIR, 'train.csv'))
train_len = int(len(df) * 0.8)
train_df = df.iloc[:train_len]
val_df = df.iloc[train_len:]
val_df = val_df.reset_index(drop=True)

train_label_vec = train_df.iloc[:,2:].values.astype(np.float32)
val_label_vec = val_df.iloc[:,2:].values.astype(np.float32)

CFG['label_size'] = train_label_vec.shape[1]

/content/drive/MyDrive/2024_MAI_DACON/Open


In [None]:
import torchvision.transforms as transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),  # 수평 뒤집기
    transforms.RandomVerticalFlip(p=0.5),  # 수직 뒤집기
    transforms.RandomRotation(degrees=15),  # 회전
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # 밝기, 대비, 채도 조절
    transforms.RandomResizedCrop(size=(224, 224), scale=(0.8, 1.0)),  # 임의 크기로 자르기
    transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 2.0)),  # 가우시안 블러
    transforms.ToTensor(),  # 텐서 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])

In [None]:
# feature extractor 불러오기
feature_extractor = AutoFeatureExtractor.from_pretrained('microsoft/swin-tiny-patch4-window7-224')

# 데이터셋 정의
train_dataset = CustomDataset(train_df['path'].values, train_label_vec,  transform=transform)
val_dataset = CustomDataset(val_df['path'], val_label_vec, transform=transform)

# 데이터 로더 정의
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)



In [None]:
optimizer = optim.Adam(model.parameters(), lr=CFG['LEARNING_RATE'])
criterion = nn.MSELoss()

def validation_with_metrics(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0
    all_preds = []
    all_labels = []

    # tqdm으로 검증 진행 상황 표시
    val_progress = tqdm(val_loader, desc="Validating", leave=False)

    with torch.no_grad():
        for imgs, labels in val_progress:
            imgs = imgs.to(device)
            labels = labels.to(device)

            outputs = model(imgs)
            outputs = outputs.logits
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            all_preds.append(outputs.cpu().numpy())
            all_labels.append(labels.cpu().numpy())

            # 현재 배치의 손실 표시
            val_progress.set_postfix({"Val Loss": loss.item()})

    avg_val_loss = val_loss / len(val_loader)

    return avg_val_loss  # 상관계수 계산 없이 검증 손실만 반환

best_model_path = "best_model.pth"

def train_model_with_metrics(model, train_loader, val_loader, optimizer, criterion, device):
    best_val_loss = float('inf')  # 초기 값은 무한대로 설정

    for epoch in range(CFG['EPOCHS']):
        model.train()
        total_train_loss = 0

        # tqdm으로 학습 진행 상황 표시
        train_progress = tqdm(train_loader, desc=f"Epoch {epoch+1}/{CFG['EPOCHS']} [Training]", leave=False)
        for imgs, labels in train_progress:
            imgs = imgs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(imgs)
            outputs = outputs.logits
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()

            # 현재 배치의 손실 표시
            train_progress.set_postfix({"Train Loss": loss.item()})

        avg_train_loss = total_train_loss / len(train_loader)

        # Validation Loss 계산
        avg_val_loss = validation_with_metrics(model, val_loader, criterion, device)

        print(f"\nEpoch [{epoch+1}/{CFG['EPOCHS']}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

        # Validation Loss가 가장 낮은 모델을 저장
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), best_model_path)  # 모델의 가중치 저장
            print(f"Best model saved at epoch {epoch+1} with validation loss: {best_val_loss:.4f}")


# 학습 실행
train_model_with_metrics(model, train_loader, val_loader, optimizer, criterion, device)




Epoch [1/10], Train Loss: 0.0461, Val Loss: 0.0459
Best model saved at epoch 1 with validation loss: 0.0459





Epoch [2/10], Train Loss: 0.0459, Val Loss: 0.0457
Best model saved at epoch 2 with validation loss: 0.0457





Epoch [3/10], Train Loss: 0.0459, Val Loss: 0.0457
Best model saved at epoch 3 with validation loss: 0.0457





Epoch [4/10], Train Loss: 0.0458, Val Loss: 0.0457
Best model saved at epoch 4 with validation loss: 0.0457





Epoch [5/10], Train Loss: 0.0458, Val Loss: 0.0457





Epoch [6/10], Train Loss: 0.0458, Val Loss: 0.0457
Best model saved at epoch 6 with validation loss: 0.0457





Epoch [7/10], Train Loss: 0.0457, Val Loss: 0.0457
Best model saved at epoch 7 with validation loss: 0.0457





Epoch [8/10], Train Loss: 0.0457, Val Loss: 0.0457





Epoch [9/10], Train Loss: 0.0457, Val Loss: 0.0456
Best model saved at epoch 9 with validation loss: 0.0456


                                                                            


Epoch [10/10], Train Loss: 0.0456, Val Loss: 0.0456




In [None]:
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Swin Transformer 입력 크기
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
class CustomTestDataset(Dataset):
    def __init__(self, img_paths, transform=None):
        self.img_paths = img_paths
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image

In [None]:
test = pd.read_csv('./test.csv')

test_dataset = CustomDataset(test['path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.eval()  # 모델을 평가 모드로 설정
    predictions = []

    with torch.no_grad():  # 예측 시 그라디언트 계산 비활성화
        for imgs in test_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            outputs = outputs.logits  # logits 사용
            predictions.append(outputs.cpu().numpy())  # GPU에서 CPU로 변환 후 numpy로 변환

    return np.concatenate(predictions)  # 리스트로 반환된 값을 numpy 배열로 변환

In [None]:
# 예측 수행
preds = inference(model, test_loader, device)

submit = pd.read_csv('./sample_submission.csv')
submit.iloc[:, 1:] = np.array(preds).astype(np.float32)
submit.to_csv('./Swin_10_submit.csv', index=False)