In [1]:
#!pip install -qqq timm torchmetrics
!pip install opencv-python==4.8.0.74
!pip uninstall -y scikit-learn imbalanced-learn
!pip install scikit-learn==1.2.2 imbalanced-learn==0.10.1
!pip install efficientnet_pytorch
!pip install albumentations
!pip install opencv-python

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
[33mDEPRECATION: devscripts 2.22.1ubuntu1 has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of devscripts or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Found existing installation: scikit-learn 1.2.2
Uninstalling scikit-learn-1.2.2:
  Successfully uninstalled scikit-learn-1.2.2
Found existing installation: imbalanced-learn 0.10.1
Uninstalling imbalanced-learn-0.10.1:

In [2]:
import os
import re
import glob
import cv2
import random
import time
import gc

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# torchvision
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
#import torchvision.models as models

# image data augmentation을 위한 albumentations
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

# timm에서 굉장히 많은 pretrained model을 가져와서 사용할 수 있습니다.
#import timm
#from timm import create_model
from efficientnet_pytorch import EfficientNet

import torchmetrics

from sklearn.preprocessing import LabelEncoder

# Utils
from PIL import Image

from tqdm.auto import tqdm, trange

from imblearn.under_sampling import RandomUnderSampler

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

import warnings
warnings.filterwarnings(action='ignore') 

In [3]:
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

In [4]:
config ={
    'model': 'efficientnet_b0',  
    'model_save' : './',
    'sub_path' : './',
    'data_path' : './data/',
    'learning_rate': 3e-4, 
    'seed': 42,
    'img_size': 224,
    'n_epochs': 5,
    'ratio': 0.7,
    "batch_size": 64,
    "min_lr": 1e-6,
    "T_max": 10,
    "weight_decay": 1e-6,
    "device": torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
}


In [5]:
train_csv = pd.read_csv(config['data_path'] + 'train_data.csv')
val_csv = pd.read_csv(config['data_path'] + 'val_data.csv')

train_csv.drop(columns=['identifier', 'symptoms'], inplace=True)
val_csv.drop(columns=['identifier', 'symptoms'], inplace=True)

train_csv = train_csv.dropna()
val_csv = val_csv.dropna()

print(train_csv.info())
print(val_csv.info())

<class 'pandas.core.frame.DataFrame'>
Int64Index: 427993 entries, 0 to 433872
Data columns (total 7 columns):
 #   Column      Non-Null Count   Dtype 
---  ------      --------------   ----- 
 0   imgID       427993 non-null  object
 1   breed       427993 non-null  object
 2   age         427993 non-null  int64 
 3   gender      427993 non-null  object
 4   species     427993 non-null  object
 5   lesions     427993 non-null  object
 6   image_path  427993 non-null  object
dtypes: int64(1), object(6)
memory usage: 26.1+ MB
None
<class 'pandas.core.frame.DataFrame'>
Int64Index: 52939 entries, 0 to 54232
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   imgID       52939 non-null  object
 1   breed       52939 non-null  object
 2   age         52939 non-null  int64 
 3   gender      52939 non-null  object
 4   species     52939 non-null  object
 5   lesions     52939 non-null  object
 6   image_path  52939 non-null  obje

In [6]:
# 'breed'가 'D'인 데이터만 필터링
train_csv = train_csv[train_csv['species'] == 'D']
val_csv = val_csv[val_csv['species'] == 'D']

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2

# Albumentations 변환 설정
transform = A.Compose([
    A.HorizontalFlip(p=0.5),          # 좌우 반전
    A.RandomBrightnessContrast(p=0.2),# 밝기/대비 조정
    A.Rotate(limit=30, p=0.5),        # 회전
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

# 소수 클래스만 증강
augmented_images = []
augmented_labels = []

for _, row in train_csv[train_csv['lesions'].isin(['A5', 'A6', 'A4'])].iterrows():
    image_path = row['image_path']
    label = row['lesions']
    
    # 원본 이미지를 3배 증강
    for _ in range(3):
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        augmented = transform(image=image)
        augmented_images.append(augmented['image'])
        augmented_labels.append(label)

# 증강된 데이터 결합
augmented_df = pd.DataFrame({'image_path': [None] * len(augmented_images), 'lesions': augmented_labels})  # 경로는 None으로 유지
train_csv = pd.concat([train_csv, augmented_df], ignore_index=True)

print("데이터 증강 후 클래스 분포:")
print(train_csv['lesions'].value_counts())

In [None]:
rus = RandomUnderSampler(random_state=42)

X_resampled, y_resampled = rus.fit_resample(train_csv.drop(columns=['lesions']), train_csv['lesions'])

train_resampled = pd.concat(
    [pd.DataFrame(X_resampled, columns=['imgID', 'breed', 'age', 'gender', 'image_path']),
     pd.DataFrame(y_resampled, columns=['lesions'])],
    axis=1
)

X_resampled, y_resampled = rus.fit_resample(val_csv.drop(columns=['lesions']), val_csv['lesions'])

val_resampled = pd.concat(
    [pd.DataFrame(X_resampled, columns=['imgID', 'breed', 'age', 'gender', 'image_path']),
     pd.DataFrame(y_resampled, columns=['lesions'])],
    axis=1
)

print(train_resampled.head())
print(val_resampled.head())


In [None]:
from sklearn.preprocessing import MinMaxScaler

def preprocess_dataframe(df):
    """
    데이터프레임에 대해 전처리 수행:
    - 범주형 변수 원핫 인코딩
    - 기존 범주형 컬럼 제거
    - 연속형 변수 스케일링
    """
    # 원핫 인코딩 적용할 컬럼
    categorical_cols = ['breed', 'gender', 'lesions']
    df = pd.get_dummies(df, columns=categorical_cols, drop_first=False)
    
    # 스케일링 적용할 컬럼
    scaler = MinMaxScaler()
    if 'age' in df.columns:
        df['age'] = scaler.fit_transform(df[['age']])
    
    return df

In [None]:
train_df = preprocess_dataframe(train_resampled)
val_df = preprocess_dataframe(val_resampled)

In [None]:
def find_image_path(row, phase):
    base_path = f'./data/{phase}/image'
    species = str(row['species'])  # species를 문자열로 변환
    imgID = row['imgID']  # 이미지 파일 이름
    
    # species 폴더 정의
    if species == 'D':  # 반려견
        species_folders = ['반려견_01', '반려견_02'] if phase == 'train' else ['반려견']
    elif species == 'C':  # 반려묘
        species_folders = ['반려묘']
    else:
        return None  # 잘못된 species 값
    
    # 폴더들에서 이미지 경로 탐색
    for folder in species_folders:
        target_folder = os.path.join(base_path, folder, symptoms)
        
        # symptoms 폴더 내의 하위 폴더를 탐색
        if os.path.exists(target_folder):
            # symptoms 폴더 내의 하위 폴더들을 리스트업
            for sub_folder in os.listdir(target_folder):
                sub_folder_path = os.path.join(target_folder, sub_folder)
                if os.path.isdir(sub_folder_path):
                    img_path = os.path.join(sub_folder_path, imgID)
                    if os.path.exists(img_path):
                        return img_path
    return None  # 파일이 없으면 None 반환

In [None]:
size = 224 # 정사각형 이미지 기준 한 변의 길이
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)

In [None]:
class ImageTransform():    
    def __init__(self, resize, mean, std):
        """
        이미지 변환 클래스를 초기화합니다.
        
        Args:
            resize (int): 변환된 이미지의 크기 (정사각형 크기).
            mean (tuple): Normalize 과정에서 사용할 평균값.
            std (tuple): Normalize 과정에서 사용할 표준편차값.
        """
        self.data_transform = {
            # 학습 데이터 변환 파이프라인
            'train': transforms.Compose([
                # 이미지를 랜덤 크기로 잘라내어 학습 데이터 다양화
                transforms.RandomResizedCrop(resize, scale=(0.5, 1.0)),
                # 이미지를 좌우로 랜덤 뒤집기
                transforms.RandomHorizontalFlip(),
                # 밝기, 대비, 채도, 색상을 랜덤하게 변경
                transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
                # 이미지를 -15도에서 +15도 사이로 랜덤 회전
                transforms.RandomRotation(degrees=15),
                # 이미지를 랜덤하게 왜곡하여 시점 다양화
                transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
                # 이미지를 Tensor로 변환 (HWC -> CHW)
                transforms.ToTensor(),
                # 이미지 정규화 (Normalize)
                transforms.Normalize(mean, std),
                # 이미지의 일부분을 랜덤 삭제하여 특정 패턴 의존 방지
                transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3))
            ]),
            # 검증 데이터 변환 파이프라인
            'val': transforms.Compose([
                # 이미지를 224x224으로 조정
                transforms.Resize(224),
                # 이미지를 중앙에서 정사각형으로 잘라냄
                transforms.CenterCrop(resize),
                # 이미지를 Tensor로 변환 (HWC -> CHW)
                transforms.ToTensor(),
                # 이미지 정규화 (Normalize)
                transforms.Normalize(mean, std)
            ])
        }
        
    def __call__(self, img, phase):
        """
        이미지를 변환합니다.
        
        Args:
            img (PIL.Image): 입력 이미지.
            phase (str): 변환 단계 ('train' 또는 'val').
        
        Returns:
            Tensor: 변환된 이미지.
        """
        return self.data_transform[phase](img)

In [None]:
class PetSkinDataset(Dataset):
    def __init__(self, dataframe, transform=None, phase='train'):
        self.dataframe = dataframe
        self.transform = transform
        self.phase = phase
        
        # 레이블로 사용할 컬럼 지정
        self.label_columns = [col for col in dataframe.columns if col.startswith('lesions_')]
        
    def __len__(self):
        return len(self.dataframe)
    
    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        
        # 이미지 경로 가져오기
        img_path = row['image_path']
        if pd.isna(img_path):
            raise FileNotFoundError(f"Image not found for imgID: {row['imgID']}")
        
        # 이미지 로드
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image, phase=self.phase)
        
        # 원핫 인코딩된 레이블 추출
        label_values = row[self.label_columns].values.astype(float)
        label = torch.tensor(label_values, dtype=torch.float32)
        
        # 추가 feature 데이터 (age 등)
        features = row.drop(['imgID', 'image_path'] + self.label_columns).values.astype(float)
        features = torch.tensor(features, dtype=torch.float32)
        
        return image, features, label


In [None]:
train_dataset = PetSkinDataset(train_df, transform=ImageTransform(size, mean, std), phase='train')
val_dataset = PetSkinDataset(val_df, transform=ImageTransform(size, mean, std), phase='val')

In [None]:
train_iterator  = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
valid_iterator = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)
dataloader_dict = {'train': train_iterator, 'val': valid_iterator}

batch_iterator = iter(train_iterator)
image, features, label = next(batch_iterator)
print(features.size())
#print(label)

In [None]:
class MultimodalModel(nn.Module):
    def __init__(self, image_model, output_dim):
        super(MultimodalModel, self).__init__()
        
        self.image_model = image_model
        
        # EfficientNet 출력 후 Dropout 추가
        self.image_dropout = nn.Dropout(p=0.5)  # Dropout 적용
        
        # 추가 특징을 처리하는 MLP
        self.fc_additional = nn.Sequential(
            nn.Linear(0, 64),  # 나중에 forward에서 동적으로 설정
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Dropout(p=0.5)
        )
        
        # 이미지 특징과 추가 feature 결합 후 예측하는 fully connected layer
        self.fc_combined = nn.Sequential(
            nn.Linear(1000 + 64, 128),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(128, output_dim)
        )
    
    def forward(self, image, additional_features):
        # EfficientNet에서 이미지 특징 추출
        image_features = self.image_model(image)
        
        # EfficientNet 출력 후 Dropout 적용
        image_features = self.image_dropout(image_features)
        
        # 추가 특징의 입력 크기 동적으로 설정
        self.fc_additional[0] = nn.Linear(additional_features.shape[1], 64).to(additional_features.device)
        
        # 추가 특징 처리
        additional_features = self.fc_additional(additional_features)  
        
        # 이미지 특징과 추가 특징 결합
        combined_features = torch.cat((image_features, additional_features), dim=1)
        
        # 결합된 특징을 통해 최종 예측
        output = self.fc_combined(combined_features)
        return output


In [None]:
#model = models.efficientnet_b1(pretrained=False)
model = EfficientNet.from_pretrained('efficientnet-b1')

In [None]:
# 임의의 입력 텐서 생성 (배치 크기: 1, 채널: 3, 크기: 224x224)
input_tensor = torch.randn(1, 3, 224, 224)

# 모델에 입력 통과시키기
output = model(input_tensor)

# 출력 텐서 크기 확인
print(output.shape)

In [None]:
OUTPUT_DIM = 8 # 클래스의 개수
cnn_model = model
multi_modal_model = MultimodalModel(cnn_model, OUTPUT_DIM)
#multi_modal_model = torch.load('EfficientNet_Multi_Modal_model_v2_fine_tuning_CosineLR.pt')

In [None]:
optimizer = optim.Adam(cnn_model.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
criterion = nn.CrossEntropyLoss()

cnn_model = cnn_model.to(device)
multi_modal_model = multi_modal_model.to(device)
criterion = criterion.to(device)

In [None]:
def calculate_accuracy(y_pred, y):
    with torch.no_grad():
        # y_pred에서 가장 큰 값을 가진 인덱스를 예측값으로 사용
        _, predicted = torch.max(y_pred, 1)
        
        # 실제 값과 예측 값 비교하여 정확도 계산
        correct = (predicted == y).sum().item()  # 맞는 예측의 개수
        accuracy = correct / y.size(0)  # 정확도 계산
        
    return accuracy


In [None]:
def train(model, iterator, optimizer, scheduler, criterion, device):    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()    
    
    accumulation_steps = 2
    optimizer.zero_grad()

    # tqdm을 사용하여 iterator를 감싸 진행 바 추가
    for batch_idx, (image, x, y) in enumerate(tqdm(iterator, desc="Training", unit="batch")):
        image = image.to(device)
        x = x.to(device)
        y = y.to(device)
        
        y = torch.argmax(y, dim=1)
                     
        y_pred = model(image, x)  

        loss = criterion(y_pred, y)
        
        acc = calculate_accuracy(y_pred, y)
        
         # Backward
        (loss / accumulation_steps).backward()  # 손실을 누적 단계로 나눔        
        
        # Gradient 업데이트는 accumulation_steps마다 수행
        if (batch_idx + 1) % accumulation_steps == 0 or (batch_idx + 1) == len(iterator):
            optimizer.step()
            optimizer.zero_grad()    
           
        epoch_loss += loss.item()
        epoch_acc += acc
        if batch_idx % 100 == 0:
            print(f"Batch {batch_idx} - Loss: {loss.item():.4f}, Accuracy: {acc:.2f}")
        
    epoch_loss /= len(iterator)
    epoch_acc /= len(iterator)        
    return epoch_loss, epoch_acc, y_pred

In [None]:
def evaluate(model, iterator, criterion, device):    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()    
    with torch.no_grad():        
        # tqdm을 사용하여 iterator를 감싸 진행 바 추가
        for (image, x, y) in tqdm(iterator, desc="Evaluating", unit="batch"):
            image = image.to(device)
            x = x.to(device)
            y = y.to(device)
            
            y = torch.argmax(y, dim=1)
            
            y_pred = model(image, x)
            
            loss = criterion(y_pred, y)
            
            acc = calculate_accuracy(y_pred, y)

            epoch_loss += loss.item()
            epoch_acc += acc
        
    epoch_loss /= len(iterator)
    epoch_acc /= len(iterator)        
    
    return epoch_loss, epoch_acc, y_pred


In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
best_valid_loss = float('inf')
EPOCHS = 100
softmax = nn.Softmax(dim=1)
cnt = 0

label_names = ['lesions_A1', 'lesions_A2', 'lesions_A3', 'lesions_A4', 'lesions_A5', 'lesions_A6', 'lesions_A7']

# Loss와 Accuracy를 저장할 리스트 초기화
train_losses = []
valid_losses = []
train_accuracies = []
valid_accuracies = []

# tqdm을 사용해 전체 에폭에 대한 진행 상태 표시
for epoch in tqdm(range(EPOCHS), desc="Training Progress", unit="epoch", leave=True):
    start_time = time.monotonic()

    train_loss, train_acc, logits = train(multi_modal_model, train_iterator, optimizer, scheduler, criterion, device)
    valid_loss, valid_acc, logits = evaluate(multi_modal_model, valid_iterator, criterion, device)
    
    # 기록 저장
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    train_accuracies.append(train_acc)
    valid_accuracies.append(valid_acc)
    
    # 학습률 스케줄러 업데이트
    scheduler.step()  # 매 에폭이 끝난 후 호출
        
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(multi_modal_model, 'EfficientNet_Multi_Modal_model_v2_fine_tuning_CosineLR_batch256.pt')
        cnt = 0
    else:
        cnt += 1
        if cnt == 10:
            print('Early Stopping')
            break
        
    end_time = time.monotonic()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    probabilities = softmax(logits)
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:6.2f}%')
    print(f'\tValid Loss: {valid_loss:.3f} | Valid Acc: {valid_acc*100:6.2f}%')
    #for i, label_name in enumerate(label_names):
    # 각 클래스에 대해 평균 확률을 출력
        #print(f"{label_name}: {probabilities[:, i].mean().item():.4f}")


In [None]:
import matplotlib.pyplot as plt

# Loss 그래프
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss', marker='o')  # 점 추가
plt.plot(valid_losses, label='Validation Loss', marker='o')  # 점 추가
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Train and Validation Loss')
plt.legend()
plt.show()

# Accuracy 그래프
plt.figure(figsize=(10, 5))
plt.plot(train_accuracies, label='Train Accuracy', marker='o')  # 점 추가
plt.plot(valid_accuracies, label='Validation Accuracy', marker='o')  # 점 추가
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Train and Validation Accuracy')
plt.legend()
plt.show()