# Library import

In [2]:
# 필요 library들을 import합니다.
import os
from typing import Tuple, Any, Callable, List, Optional, Union

import cv2
import timm
import torch
import numpy as np
import pandas as pd
import albumentations as A
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models, datasets, transforms
from tqdm.auto import tqdm
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from albumentations.pytorch import ToTensorV2


  from .autonotebook import tqdm as notebook_tqdm


# Dataset Class

In [3]:
class CustomDataset(Dataset):
    def __init__(
        self, 
        root_dir: str, 
        info_df: pd.DataFrame, 
        transform: Callable,
        is_inference: bool = False
    ):
        # 데이터셋의 기본 경로, 이미지 변환 방법, 이미지 경로 및 레이블을 초기화합니다.
        self.root_dir = root_dir  # 이미지 파일들이 저장된 기본 디렉토리
        self.transform = transform  # 이미지에 적용될 변환 처리
        self.is_inference = is_inference # 추론인지 확인
        self.image_paths = info_df['image_path'].tolist()  # 이미지 파일 경로 목록
        
        if not self.is_inference:
            self.targets = info_df['target'].tolist()  # 각 이미지에 대한 레이블 목록

    def __len__(self) -> int:
        # 데이터셋의 총 이미지 수를 반환합니다.
        return len(self.image_paths)

    def __getitem__(self, index: int) -> Union[Tuple[torch.Tensor, int], torch.Tensor]:
        # 주어진 인덱스에 해당하는 이미지를 로드하고 변환을 적용한 후, 이미지와 레이블을 반환합니다.
        img_path = os.path.join(self.root_dir, self.image_paths[index])  # 이미지 경로 조합
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)  # 이미지를 BGR 컬러 포맷의 numpy array로 읽어옵니다.
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # BGR 포맷을 RGB 포맷으로 변환합니다.
        image = self.transform(image)  # 설정된 이미지 변환을 적용합니다.

        if self.is_inference:
            return image
        else:
            target = self.targets[index]  # 해당 이미지의 레이블
            return image, target  # 변환된 이미지와 레이블을 튜플 형태로 반환합니다. 

# Transform Class

In [4]:
import random
import string
class AlbumentationsTransform:
    def __init__(self, is_train: bool = True,max_size: int = 288):
        self.max_size=max_size
        # 공통 변환 설정: 이미지 리사이즈, 정규화, 텐서 변환
        common_transforms = [
            A.Resize(self.max_size, self.max_size),  # 이미지를 224x224 크기로 리사이즈
            #A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 정규화
            A.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
            ToTensorV2()  # albumentations에서 제공하는 PyTorch 텐서 변환
        ]
        if is_train:
            # 훈련용 변환: 랜덤 수평 뒤집기, 랜덤 회전, 랜덤 밝기 및 대비 조정 추가
            dropout_transform = A.CoarseDropout(
                max_holes=15, 
                max_height=int(0.1 * max_size), 
                max_width=int(0.1 * max_size), 
                fill_value=[random.randint(0, 127)] * 3,  # 0(검정)~128(회색) 사이 무작위 값을 선택하여 RGB 동일하게 적용
                p=0.5
            )
            # OpenCV 기반의 Erosion 및 Dilation 함수 정의
            def apply_erosion(img, **params):
                kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
                return cv2.erode(img, kernel, iterations=1)

            def apply_dilation(img, **params):
                kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
                return cv2.dilate(img, kernel, iterations=1)

            # 훈련용 변환: 랜덤 수평 뒤집기, 랜덤 회전, 랜덤 밝기 및 대비 조정 추가
            self.transform = A.Compose(
                [
                    # Geometric transformations
                    A.Rotate(limit=15, p=0.5, border_mode=cv2.BORDER_CONSTANT, value=255),  # 빈 공간을 흰색(255)으로 채움
                    A.HorizontalFlip(p=0.5),  # 50% 확률로 이미지를 수평 뒤집기
                    A.VerticalFlip(p=0.2),
                    A.Affine(scale=(0.8, 1.2), shear=(-10, 10), p=0.5, border_mode=cv2.BORDER_CONSTANT, cval=255),  # 빈 공간을 흰색(255)으로 채움
                    A.ElasticTransform(alpha=1, sigma=10, p=0.5, border_mode=cv2.BORDER_CONSTANT, value=255),  # 빈 공간을 흰색(255)으로 채움

                    #dropout_transform,
                    A.Lambda(image=self.add_random_text, p=0.3),
                    A.Lambda(image=apply_dilation,p=0.4),
                    A.Lambda(image=apply_erosion,p=0.4),

                    # Noise and blur
                    A.GaussNoise(var_limit=(10.0, 50.0), p=0.5),
                    A.MotionBlur(blur_limit=(3, 7), p=0.5),

                    A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=0.5),

                    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.2, rotate_limit=15, p=0.5),
                    A.RandomBrightnessContrast(p=0.2),
                ] + common_transforms
            )
        else:
            # 검증/테스트용 변환: 공통 변환만 적용
            self.transform = A.Compose(common_transforms)
    
    def add_random_text(self, image: np.ndarray, **kwargs) -> np.ndarray: #증강용 함수, 이미지에 랜덤 텍스트를 추가한다.
        def random_word():
            letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
            length = random.randint(3, 15)  # 4에서 12 사이의 길이로 랜덤 설정
            return ''.join(random.choice(letters) for _ in range(length))

        # 임의의 투명도 생성 함수
        def random_alpha():
            return random.uniform(0.5, 1.0)  # 0.3은 거의 투명, 1.0은 불투명

        # 임의의 폰트 크기 생성 함수
        def random_font_size():
            return random.uniform(0.3, 2)  # OpenCV에서는 폰트 크기를 스케일로 조정

        # 이미지 크기
        height, width, _ = image.shape

        # 텍스트를 그릴 위치들 (상단 및 하단에서만 랜덤 좌표 선택)
        top_y = random.randint(10, height // 5)  # 상단 영역의 랜덤 Y 좌표
        bottom_y = random.randint(height - height // 5, height - 10)  # 하단 영역의 랜덤 Y 좌표
        random_y = random.choice([top_y, bottom_y])

        # X 좌표는 이미지 폭에 따라 랜덤 설정
        random_x = random.randint(10, width - 100)

        # OpenCV 폰트 설정
        font = cv2.FONT_HERSHEY_SIMPLEX


        # 랜덤 폰트, 색상, 투명도, 텍스트 추가
        random_text = random_word()
        color = (0,0,0)
        font_scale = random_font_size()  # 폰트 크기
        alpha = random_alpha()  # 투명도
        
        # 투명도를 적용한 텍스트 이미지 생성
        overlay = image.copy()
        cv2.putText(overlay, random_text, (random_x, random_y), font, font_scale, color, thickness=2, lineType=cv2.LINE_AA)
        
        # 알파 블렌딩으로 텍스트 투명도 조절
        image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)    
        return image
    
    def image_resize_with_padding(self, image): #이미지를 종횡비가 깨지지 않게, max_size*max_size로 Resize 및 Padding한다.
        h, w = image.shape[:2]
        if w > h:
            new_w = self.max_size
            new_h = int(h * (self.max_size / w))
        else:
            new_h = self.max_size
            new_w = int(w * (self.max_size / h))
    
        resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
        # 패딩 추가
        top = (self.max_size - new_h) // 2
        bottom = self.max_size - new_h - top
        left = (self.max_size - new_w) // 2
        right = self.max_size - new_w - left
        
        padded_image = cv2.copyMakeBorder(resized_image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=255)
        return padded_image
    
    def blackBackgorund_to_whiteBackground(self, image): #검은색 배경의 이미지라면 하얀색 배경의 이미지로 바꾼다.
        if np.mean(image) <= 127:
            image = 255 - image
        return image
    
    def enhance_and_binarize(self, image: np.ndarray, canny_threshold1: int = 100, canny_threshold2: int = 200, weight: float = 0.8) -> np.ndarray:
        """
        그레이스케일 이미지에 대해 평균값을 기반으로 픽셀을 강조하고
        Canny Edge를 적용하여 선을 강조한 뒤 다시 평균값을 기반으로 픽셀을 강조한다..
        """
        gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

        # 픽셀의 평균값 계산 후 평균보다 큰 픽셀들을 255로 설정
        mean_value = np.mean(gray_image)
        gray_image[gray_image > mean_value] = 255

        # Canny Edge를 이용해 선 강조
        edges = cv2.Canny(gray_image, threshold1=canny_threshold1, threshold2=canny_threshold2)
        enhanced_image = cv2.addWeighted(gray_image, weight, edges, 1 - weight, 0)

        # 픽셀의 평균값 계산 후 평균보다 큰 픽셀들을 255로 설정
        mean_value = np.mean(enhanced_image)
        enhanced_image[enhanced_image > mean_value] = 255

        return enhanced_image


    def __call__(self, image) -> torch.Tensor:
        # 이미지가 NumPy 배열인지 확인
        if not isinstance(image, np.ndarray):
            raise TypeError("Image should be a NumPy array (OpenCV format).")
        channel_diff = np.max(image, axis=-1) - np.min(image, axis=-1)
        if np.mean(channel_diff) > 50:  # 차이 값이 작으면 무채색이 많다고 판단
            image=self.image_resize_with_padding(image=image)
            transformed = self.transform(image=image)  # 이미지에 설정된 변환을 적용
            return transformed['image']
        
        image = self.blackBackgorund_to_whiteBackground(image)
        
        image=self.enhance_and_binarize(image=image)
        image=self.image_resize_with_padding(image=image)

        graytorgb = np.stack([image] * 3, axis=-1)

        transformed = self.transform(image=graytorgb)  # 이미지에 설정된 변환을 적용
        
        return transformed['image']  # 변환된 이미지의 텐서를 반환

In [5]:
class TransformSelector:
    """
    이미지 변환 라이브러리를 선택하기 위한 클래스.
    """
    def __init__(self, transform_type: str, max_size:int):
        self.max_size=max_size
        # 지원하는 변환 라이브러리인지 확인
        if transform_type in ["torchvision", "albumentations"]:
            self.transform_type = transform_type
        
        else:
            raise ValueError("Unknown transformation library specified.")

    def get_transform(self, is_train: bool):
        if self.transform_type == 'albumentations':
            transform = AlbumentationsTransform(is_train=is_train,max_size=self.max_size)
        
        return transform

# Model Class

In [6]:
class TimmModel(nn.Module):
    """
    Timm 라이브러리를 사용하여 다양한 사전 훈련된 모델을 제공하는 클래스.
    """
    def __init__(
        self, 
        model_name: str, 
        num_classes: int, 
        pretrained: bool
    ):
        super(TimmModel, self).__init__()
        self.model = timm.create_model(
            model_name, 
            pretrained=pretrained, 
            num_classes=num_classes
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        return self.model(x)

In [7]:
class ModelSelector:
    """
    사용할 모델 유형을 선택하는 클래스.
    """
    def __init__(
        self, 
        model_type: str, 
        num_classes: int, 
        **kwargs
    ):
        
        if model_type == 'timm':
            self.model = TimmModel(num_classes=num_classes, **kwargs)
        
        else:
            raise ValueError("Unknown model type specified.")

    def get_model(self) -> nn.Module:

        # 생성된 모델 객체 반환
        return self.model

# Loss Class

In [8]:
class Loss(nn.Module):
    """
    모델의 손실함수를 계산하는 클래스.
    """
    def __init__(self):
        super(Loss, self).__init__()
        self.loss_fn = nn.CrossEntropyLoss(label_smoothing=0.02)

    def forward(
        self, 
        outputs: torch.Tensor, 
        targets: torch.Tensor
    ) -> torch.Tensor:
    
        return self.loss_fn(outputs, targets)

# Trainer Class

In [9]:
from torch.cuda.amp import autocast, GradScaler

class Trainer:
    def __init__(
        self, 
        model: nn.Module, 
        device: torch.device, 
        train_loader: DataLoader, 
        val_loader: DataLoader, 
        optimizer: optim.Optimizer,
        scheduler: optim.lr_scheduler,
        loss_fn: torch.nn.modules.loss._Loss, 
        epochs: int,
        result_path: str
    ):
        # 클래스 초기화: 모델, 디바이스, 데이터 로더 등 설정
        self.model = model  # 훈련할 모델
        self.device = device  # 연산을 수행할 디바이스 (CPU or GPU)
        self.train_loader = train_loader  # 훈련 데이터 로더
        self.val_loader = val_loader  # 검증 데이터 로더
        self.optimizer = optimizer  # 최적화 알고리즘
        self.scheduler = scheduler # 학습률 스케줄러
        self.loss_fn = loss_fn  # 손실 함수
        self.epochs = epochs  # 총 훈련 에폭 수
        self.result_path = result_path  # 모델 저장 경로
        self.best_models = [] # 가장 좋은 상위 3개 모델의 정보를 저장할 리스트
        self.lowest_loss = float('inf') # 가장 낮은 Loss를 저장할 변수
        self.patience=14
        self.early_stopping_counter = 0 

    def save_model(self, epoch, loss, fold):
        # 모델 저장 경로 설정
        os.makedirs(self.result_path, exist_ok=True)

        # 현재 에폭 모델 저장
        current_model_path = os.path.join(self.result_path, f'fold_{fold}_epoch_{epoch}_loss_{loss:.4f}.pt')
        torch.save(self.model.state_dict(), current_model_path)

        # 최상위 3개 모델 관리
        self.best_models.append((loss, epoch, current_model_path))
        self.best_models.sort()
        if len(self.best_models) > 3:
            _, _, path_to_remove = self.best_models.pop(-1)  # 가장 높은 손실 모델 삭제
            if os.path.exists(path_to_remove):
                os.remove(path_to_remove)

        # 가장 낮은 손실의 모델 저장
        if loss <= self.lowest_loss:
            self.lowest_loss = loss
            self.early_stopping_counter = 0  # 손실이 개선된 경우, 카운터 리셋
            print(f"Improvement in validation loss. Reset early stopping counter.")
            best_model_path = os.path.join(self.result_path, f'fold_{fold}_best_model.pt')
            torch.save(self.model.state_dict(), best_model_path)
            print(f"Save {epoch} epoch result. Loss = {loss:.4f}")
        else:
            self.early_stopping_counter += 1  # 손실이 개선되지 않은 경우, 카운터 증가
            print(f"No improvement in validation loss. Early stopping counter: {self.early_stopping_counter}/{self.patience}")
            
    def train_epoch(self) -> float:
        # 한 에폭 동안의 훈련을 진행
        self.model.train()
        scaler = GradScaler() 
        
        total_loss = 0.0
        progress_bar = tqdm(self.train_loader, desc="Training", leave=False)
        
        for images, targets in progress_bar:
            images, targets = images.to(self.device), targets.to(self.device)
            self.optimizer.zero_grad()
            
            with autocast():
                outputs = self.model(images)
                loss = self.loss_fn(outputs, targets)
            
            # GradScaler로 backward 및 optimizer step
            scaler.scale(loss).backward()
            scaler.step(self.optimizer)
            scaler.update()
            
            #self.scheduler.step()
            total_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())
        
        return total_loss / len(self.train_loader)

    def validate(self) -> float:
        # 모델의 검증을 진행
        self.model.eval()
        
        total_loss = 0.0
        progress_bar = tqdm(self.val_loader, desc="Validating", leave=False)
        
        with torch.no_grad():
            for images, targets in progress_bar:
                images, targets = images.to(self.device), targets.to(self.device)
                outputs = self.model(images)    
                loss = self.loss_fn(outputs, targets)
                #self.scheduler.step()
                total_loss += loss.item()
                progress_bar.set_postfix(loss=loss.item())
        
        return total_loss / len(self.val_loader)

    def train(self, fold) -> None:
        # 전체 훈련 과정을 관리
        for epoch in range(self.epochs):
            print(f"Epoch {epoch+1}/{self.epochs}")
            current_lr = self.optimizer.param_groups[0]['lr']
            print(f"Current Learning Rate: {current_lr:.9f}")
            train_loss = self.train_epoch()
            val_loss = self.validate()
            print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}\n")
            
            self.save_model(epoch, val_loss, fold)
            
            # Early Stopping 조건을 만족하면 중단
            if self.early_stopping_counter >= self.patience:
                print(f"Early stopping at epoch {epoch+1} due to no improvement in validation loss.")
                break

            self.scheduler.step()

# Model Training

In [9]:
# 학습에 사용할 장비를 선택.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 학습 데이터의 경로와 정보를 가진 파일의 경로를 설정.
traindata_dir = "/home/min000914/cv_17_last/data/train"
traindata_info_file = "/home/min000914/cv_17_last/data/train.csv"
save_result_path = "/home/min000914/cv_17_last/model_output"


In [10]:
import random
from sklearn.model_selection import StratifiedKFold
#from cosine_annealing_warmup import CosineAnnealingWarmupRestarts
ENSEMBLE_MODELS = ["convnextv2_huge" for _ in range(2)]
# 교차 검증을 위한 StratifiedKFold 설정
transform_selector = TransformSelector(
    transform_type = "albumentations",
    max_size=288
)
train_transform = transform_selector.get_transform(is_train=True)
val_transform = transform_selector.get_transform(is_train=False)

skf = StratifiedKFold(n_splits=len(ENSEMBLE_MODELS), shuffle=True, random_state=42)  # 5개의 폴드로 나눔

# 학습 데이터의 class, image path, target에 대한 정보가 들어있는 csv파일을 읽기.
train_info = pd.read_csv(traindata_info_file)

# 총 class의 수를 측정.
num_classes = len(train_info['target'].unique())

fold_range = [0, 1] #다른 서버에서 병렬로 돌리기 위해 fold 분리

# StratifiedKFold로 데이터를 나누고 교차 검증을 수행
for fold, (train_idx, val_idx) in enumerate(skf.split(train_info, train_info['target'])):
    if fold in fold_range:
        # fold마다 학습이 끝난 후 GPU 메모리 비우기
        torch.cuda.empty_cache()
        print(fold,"@@@@@@@@@@@@@@")
        # 랜덤 시드 생성 및 설정
        random_seed = random.randint(0, 10000)  # 0부터 10000 사이의 랜덤한 시드 생성
        random.seed(random_seed)
        np.random.seed(random_seed)
        torch.manual_seed(random_seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(random_seed)

        print(f"Training fold {fold+1}/{skf.get_n_splits()}")

        train_df = train_info.iloc[train_idx]
        val_df = train_info.iloc[val_idx]

        # 학습에 사용할 Dataset을 선언.
        train_dataset1 = CustomDataset(
            root_dir=traindata_dir,
            info_df=train_df,
            transform=train_transform
        )
        train_dataset2 = CustomDataset(
            root_dir=traindata_dir,
            info_df=train_df,
            transform=val_transform
        )
        train_dataset=train_dataset1+train_dataset2
        
        val_dataset1 = CustomDataset(
            root_dir=traindata_dir,
            info_df=val_df,
            transform=train_transform
        )
        val_dataset2 = CustomDataset(
            root_dir=traindata_dir,
            info_df=val_df,
            transform=val_transform
        )
        val_dataset=val_dataset1+val_dataset2

        # 학습에 사용할 DataLoader를 선언.
        train_loader = DataLoader(
            train_dataset, 
            batch_size=32, 
            shuffle=True,
            num_workers=16
        )
        val_loader = DataLoader(
            val_dataset, 
            batch_size=32, 
            shuffle=False,
            num_workers=4
        )

        # 모델 초기화
        model_selector = ModelSelector(
        model_type='timm', 
        num_classes=num_classes,
        model_name='convnext_tiny', 
        pretrained=True
        )
        model = model_selector.get_model()

        # 모델을 장치로 이동
        model.to(device)


        optimizer = optim.Adam(model.parameters(), lr=0.000012,weight_decay=0.0003)
        scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.7)
        '''scheduler = CosineAnnealingWarmupRestarts(optimizer,
                                                  first_cycle_steps=10,
                                                  cycle_mult=0.7,
                                                  max_lr=0.000135,
                                                  min_lr=0.00001,
                                                  warmup_steps=4,
                                                  gamma=0.6)
            scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 
                                                    mode='min',        # 최소화할 지표 (보통 'val_loss' 기준으로)
                                                    factor=0.3,        # 학습률을 감소시키는 비율 (0.1 = 10%로 감소)
                                                    patience=4,        # 5 에폭 동안 성능 개선이 없으면 학습률 감소
                                                    verbose=True)        # 학습률이 줄어들 때 출력                                     
        '''
        
        loss_fn = Loss()

        # Trainer 설정
        trainer = Trainer(
            model=model, 
            device=device, 
            train_loader=train_loader,
            val_loader=val_loader, 
            optimizer=optimizer,
            scheduler=scheduler,
            loss_fn=loss_fn, 
            epochs=2,
            result_path=save_result_path
        )

        # 모델 학습
        trainer.train(fold=fold)

  A.Affine(scale=(0.8, 1.2), shear=(-10, 10), p=0.5, border_mode=cv2.BORDER_CONSTANT, cval=255),  # 빈 공간을 흰색(255)으로 채움


0 @@@@@@@@@@@@@@
Training fold 1/2
Epoch 1/2
Current Learning Rate: 0.000012000


                                                                      

KeyboardInterrupt: 

# Inference

In [10]:

from tqdm import tqdm

def weighted_ensemble_inference_with_augmentation(
    models: List[nn.Module],  # 여러 개의 모델 리스트
    device: torch.device, 
    test_loader: DataLoader,  # 두 데이터의 순서가 동일해야함. shuffle=False
    test_aug_loader: DataLoader,  # 증강된 데이터 로더
    num_classes: int
):
    num_samples = len(test_loader.dataset)
    predictions = np.zeros((num_samples, num_classes))
    
    # 각 모델별 예측 확률을 저장할 리스트
    model_outputs_original = []
    model_outputs_augmented = []

    # 모든 모델에 대해 예측 (원본 데이터와 증강 데이터 각각 수행)
    for model_idx, model in enumerate(models):
        model.to(device)
        model.eval()
        fold_predictions_original = []
        fold_predictions_augmented = []

        with torch.no_grad():
            # tqdm에 설명을 추가하여 진행 상황을 각각 표시
            for batch_idx, (images, images_aug) in enumerate(tqdm(zip(test_loader, test_aug_loader), desc=f"Model {model_idx+1} Inference", total=len(test_loader))):
                images = images.to(device)
                images_aug = images_aug.to(device)

                # 원본 데이터에 대한 예측
                logits_original = model(images)
                probs_original = F.softmax(logits_original, dim=1)
                fold_predictions_original.append(probs_original.cpu().numpy())

                # 증강 데이터에 대한 예측
                logits_augmented = model(images_aug)
                probs_augmented = F.softmax(logits_augmented, dim=1)
                fold_predictions_augmented.append(probs_augmented.cpu().numpy())

        # 모델별 확률 저장
        model_output_original = np.vstack(fold_predictions_original)
        model_output_augmented = np.vstack(fold_predictions_augmented)
        model_outputs_original.append(model_output_original)
        model_outputs_augmented.append(model_output_augmented)

    # 각 데이터별로 가중치를 적용하여 앙상블 예측 (원본 vs 증강)
    for i in range(num_samples):
        # 각 데이터에 대해 모델별 확률 최대값 계산 (원본과 증강 각각)
        sample_confidences_original = [np.max(model_output[i]) for model_output in model_outputs_original]
        sample_confidences_augmented = [np.max(model_output[i]) for model_output in model_outputs_augmented]

        # 원본과 증강의 전체 확신도 합산
        total_confidence_original = sum(sample_confidences_original)
        total_confidence_augmented = sum(sample_confidences_augmented)
        
        # 원본 데이터와 증강 데이터에 대한 가중치 계산
        weight_original = total_confidence_original / (total_confidence_original + total_confidence_augmented)
        weight_augmented = total_confidence_augmented / (total_confidence_original + total_confidence_augmented)
        
        # 각 모델의 예측값에 가중치를 곱하여 합산
        for model_output_original, model_output_augmented in zip(model_outputs_original, model_outputs_augmented):
            predictions[i] += model_output_original[i] * weight_original
            predictions[i] += model_output_augmented[i] * weight_augmented

    # 최종 예측값 반환 (가장 높은 확률을 가진 클래스를 선택)
    return np.argmax(predictions, axis=1)


In [11]:
# 추론 데이터의 경로와 정보를 가진 파일의 경로를 설정.
testdata_dir = "/home/min000914/cv_17_last/data/test"
testdata_info_file = "/home/min000914/cv_17_last/data/test.csv"
save_result_path = "/home/min000914/cv_17_last/model_output"

# 추론 데이터의 class, image path, target에 대한 정보가 들어있는 csv파일을 읽기.
test_info = pd.read_csv(testdata_info_file)

# 총 class 수.
num_classes = 500

In [12]:
# 추론에 사용할 Transform을 선언.
transform_selector = TransformSelector(
    transform_type = "albumentations",
    max_size=288
)
test_transform = transform_selector.get_transform(is_train=False)
test__aug_transform = transform_selector.get_transform(is_train=True)

# 추론에 사용할 Dataset을 선언.
test_dataset = CustomDataset(
    root_dir=testdata_dir,
    info_df=test_info,
    transform=test_transform,
    is_inference=True
)
# 추론에 사용할 증강 Dataset을 선언.
test__aug_dataset = CustomDataset(
    root_dir=testdata_dir,
    info_df=test_info,
    transform=test__aug_transform,
    is_inference=True
)

# 추론에 사용할 DataLoader를 선언.
test_loader = DataLoader(
    test_dataset, 
    batch_size=32, 
    shuffle=False,
    drop_last=False
)

# 추론에 사용할 증강 DataLoader를 선언.
test_aug_loader = DataLoader(
    test__aug_dataset, 
    batch_size=32, 
    shuffle=False,
    drop_last=False
)

  A.Affine(scale=(0.8, 1.2), shear=(-10, 10), p=0.5, border_mode=cv2.BORDER_CONSTANT, cval=255),  # 빈 공간을 흰색(255)으로 채움


In [13]:
# 추론에 사용할 장비를 선택.
# torch라이브러리에서 gpu를 인식할 경우, cuda로 설정.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 추론에 사용할 Model을 선언.
model_selector = ModelSelector(
    model_type='timm', 
    num_classes=num_classes,
    model_name='convnext_tiny', 
    pretrained=True
)


In [14]:
# 4개의 모델 경로 설정 (예시로 best_model1.pt, best_model2.pt, ... )
model_paths = [
    os.path.join(save_result_path, "fold_0_best_model.pt"),
    os.path.join(save_result_path, "fold_1_best_model.pt"),
]

# 모델을 불러와 리스트에 저장
models = []
for model_path in model_paths:
    model = model_selector.get_model()  # 모델 초기화
    model.load_state_dict(torch.load(model_path, map_location='cpu'))
    models.append(model)



In [15]:
# 앙상블 추론 수행
predictions = weighted_ensemble_inference_with_augmentation(
    models=models,  # 모델 리스트 전달
    device=device, 
    test_loader=test_loader,
    test_aug_loader=test_aug_loader,  # 증강된 데이터 로더도 전달
    num_classes=num_classes
)

Model 1 Inference: 100%|██████████| 313/313 [07:11<00:00,  1.38s/it]
Model 2 Inference: 100%|██████████| 313/313 [07:11<00:00,  1.38s/it]


In [16]:
# 모든 클래스에 대한 예측 결과를 하나의 문자열로 합침
test_info['target'] = predictions
test_info = test_info.reset_index().rename(columns={"index": "ID"})
# DataFrame 저장
test_info.to_csv("output2.csv", index=False)