## Import

In [None]:
!sudo pip uninstall opencv-python --y

In [None]:
!pip install --upgrade opencv-python


In [None]:
import io
#import zipfile
#from zipfile import ZipFile
import requests

import random
import pandas as pd
import numpy as np
import os
import re
import glob
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

from tqdm import tqdm

import warnings
warnings.filterwarnings(action='ignore')

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## Hyperparameter Setting

In [None]:
CFG = {
    'IMG_SIZE': 299,  # Inception 모델 입력 크기
    'EPOCHS': 10 , # 10 이상
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 32,
    'SEED': 41
}


## Fixed RandomSeed

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Data Pre-processing

In [None]:
#file_name = "./open/open.zip"
#output_dir = "./open2"
#os.system("unzip "+file_name+" -d "+output_dir)

In [None]:
df = pd.read_csv("./open2/train.csv")

In [None]:
train_len = int(len(df) * 0.8) #전체 데이터의 80%를 훈련용으로
train_df = df.iloc[:train_len]
val_df = df.iloc[train_len:] #나머지 20%를 검증 데이터로

In [None]:
train_label_vec = train_df.iloc[:,2:].values.astype(np.float32)
val_label_vec = val_df.iloc[:,2:].values.astype(np.float32) # 훈련, 검증 데이터에서 레이블 데이터를 추출

In [None]:
CFG['label_size'] = train_label_vec.shape[1] #레이블 크기 설정

## CustomDataset

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms #augmentation 포함

    def __getitem__(self, index):
        img_path = self.img_path_list[index]

        image = cv2.imread(img_path)

        if self.transforms is not None:
            image = self.transforms(image=image)['image']

        if self.label_list is not None:
            label = self.label_list[index]
            return image, label
        else:
            return image

    def __len__(self):
        return len(self.img_path_list)

In [None]:
train_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),  # 299x299로 수정
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225),
                max_pixel_value=255.0, always_apply=True, p=1.0),
    ToTensorV2()
])

test_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225),
                max_pixel_value=255.0, always_apply=True, p=1.0),
    ToTensorV2()
])

In [None]:
train_dataset = CustomDataset(train_df['path'].values, train_label_vec, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

val_dataset = CustomDataset(val_df['path'].values, val_label_vec, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

## Model Define

In [None]:
! pip install timm

In [None]:
import timm
import torch.nn as nn

class BaseModel(nn.Module):
    def __init__(self, gene_size=CFG['label_size'], dropout_rate=0.5):
        super(BaseModel, self).__init__()
        
        # Backbone: Inception-ResNet-V2
        self.backbone = timm.create_model('inception_resnet_v2', pretrained=True)
        
        # Get the number of input features from the last layer
        in_features = self.backbone.classif.in_features
        
        # Remove the original classification layer
        self.backbone.classif = nn.Identity()
        
        # Regressor: Add dropout and activation function
        self.regressor = nn.Sequential(
            nn.SiLU(),  # Activation function
            nn.Dropout(p=dropout_rate),  # Dropout layer
            nn.Linear(in_features, gene_size)  # Linear regression layer
        )
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.regressor(x)
        return x


## Train

In [None]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []

    with torch.no_grad():
        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)

            pred = model(imgs)

            loss = criterion(pred, labels)

            val_loss.append(loss.item())

        _val_loss = np.mean(val_loss)

    return _val_loss

In [None]:
def train(model, optimizer, train_loader, val_loader, scheduler, device, epochs):
    model.to(device)
    criterion = nn.MSELoss().to(device)

    history = {'train_loss': [], 'val_loss': []}

    best_loss = 99999999
    best_model = None

    for epoch in range(1, epochs+1):
        model.train()
        train_loss = []
        for imgs, labels in tqdm(iter(train_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            output = model(imgs)
            loss = criterion(output, labels)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        _train_loss = np.mean(train_loss)
        _val_loss = validation(model, criterion, val_loader, device)

        history['train_loss'].append(_train_loss)
        history['val_loss'].append(_val_loss)

        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}]')

        if scheduler is not None:
            scheduler.step(_val_loss)

        if best_loss > _val_loss:
            best_loss = _val_loss
            best_model = model

    return best_model, history

## Run!!

In [None]:
# 1. 이미지 파일의 기본 경로 지정
base_path = "./open2/"  # train.csv 파일이 위치한 폴더로 변경

# 2. 데이터프레임의 이미지 경로를 절대 경로로 업데이트
train_df['path'] = base_path + train_df['path'].astype(str)
val_df['path'] = base_path + val_df['path'].astype(str)

train_dataset = CustomDataset(train_df['path'].values, train_label_vec, train_transform)
val_dataset = CustomDataset(val_df['path'].values, val_label_vec, test_transform)

In [None]:
train_df["path"]

## Optuna

In [None]:
!pip install optuna

In [None]:
import optuna
from optuna.trial import TrialState

# Optuna objective function
def objective(trial):
    # 하이퍼파라미터 샘플링
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2)
    epochs = trial.suggest_int('epochs', 10, 30)  # epoch 범위 설정

    # 고정된 batch_size 설정
    batch_size = 32

    # 데이터 로더 업데이트
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, num_workers=0
    )
    val_loader = DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False, num_workers=0
    )

    # 모델, 옵티마이저, 스케줄러 설정
    model = BaseModel()
    model.to(device)
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=2,
        threshold_mode='abs', min_lr=1e-8, verbose=False
    )

    # 모델 학습
    _, history = train(model, optimizer, train_loader, val_loader, scheduler, device, epochs)

    # 검증 손실의 마지막 값을 반환 (최소화 대상)
    return min(history['val_loss'])


In [None]:
def optimize_hyperparameters():
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=15)  # 시도 횟수 설정

    # 최적의 하이퍼파라미터 출력
    print("Best trial:")
    print(f"  Value: {study.best_trial.value}")
    print("  Params: ")
    for key, value in study.best_trial.params.items():
        print(f"    {key}: {value}")

    return study.best_trial.params

In [None]:
# 최적의 하이퍼파라미터 찾기
best_params = optimize_hyperparameters()

# 최적의 하이퍼파라미터로 학습 실행
CFG['LEARNING_RATE'] = best_params['learning_rate']
CFG['EPOCHS'] = best_params['epochs']

## final_model

In [None]:
train_loader = DataLoader(
    train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=0
)
val_loader = DataLoader(
    val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0
)

In [None]:
model = BaseModel()
optimizer = torch.optim.Adam(params=model.parameters(), lr=CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2,
    threshold_mode='abs', min_lr=1e-8, verbose=True
)

infer_model, history = train(model, optimizer, train_loader, val_loader, scheduler, device, CFG['EPOCHS'])

## Inference

In [None]:
test = pd.read_csv('./open2/test.csv')

In [None]:
# 2. 테스트 데이터 경로 절대 경로로 변환
test['path'] = base_path + test['path'].astype(str)

# 3. 테스트 데이터셋과 데이터 로더 생성
test_dataset = CustomDataset(test['path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)


In [None]:
test_dataset = CustomDataset(test['path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs in tqdm(test_loader):
            imgs = imgs.to(device).float()
            pred = model(imgs)

            preds.append(pred.detach().cpu())

    preds = torch.cat(preds).numpy()

    return preds

In [None]:
preds = inference(infer_model, test_loader, device)

## Submission

In [None]:
submit = pd.read_csv('./open2/sample_submission.csv')
submit.iloc[:, 1:] = np.array(preds).astype(np.float32)
submit.to_csv('./open2/inception- resnet- v2-ver3_tuning.csv', index=False)

In [None]:
import matplotlib.pyplot as plt

# 손실 그래프 시각화
plt.figure(figsize=(10, 5))
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Train and Validation Loss')
plt.legend()
plt.grid()
plt.show()
