In [1]:
import os
import re
import csv
import random
from glob import glob

import numpy as np
import pandas as pd

from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

import torchvision.transforms as v2
import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2

from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import LabelEncoder

import matplotlib.pyplot as plt
import seaborn as sns

from tqdm.cli import tqdm

import warnings
warnings.filterwarnings('ignore')


In [2]:
def seed_everything(seed=42):
    random.seed(seed)  # Python 내장 random 모듈
    os.environ['PYTHONHASHSEED'] = str(seed)  # 환경변수 설정
    np.random.seed(seed)  # NumPy
    torch.manual_seed(seed)  # PyTorch CPU 시드 고정
    torch.cuda.manual_seed(seed)  # PyTorch GPU 시드 고정
    torch.cuda.manual_seed_all(seed)  # 멀티 GPU 환경에서도 시드 고정
    torch.backends.cudnn.deterministic = True  # CuDNN 관련 설정
    torch.backends.cudnn.benchmark = False  # 동일한 입력 크기의 데이터가 반복될 경우 속도 향상을 위한 벤치마크 모드 비활성화

# 사용 예시
seed_everything(seed=42)

이미지 경로 포함

In [3]:
valid_df = pd.read_csv('datasets/val_label.csv')
valid_df['data_path'] = valid_df.apply(lambda row: f"datasets/image_datasets/user{row['subject_id']}_{row['date']}_valid.png", axis=1)

In [4]:
valid_df['subject_id'] = valid_df['subject_id'].apply(lambda x: f'user{str(x).zfill(2)}_valid')

In [5]:
valid_df.head()

Unnamed: 0,subject_id,date,Q1,Q2,Q3,S1,S2,S3,S4,data_path
0,user01_valid,2023-08-20,1,1,1,0,0,0,0,datasets/image_datasets/user1_2023-08-20_valid...
1,user01_valid,2023-08-21,1,1,1,0,0,1,0,datasets/image_datasets/user1_2023-08-21_valid...
2,user01_valid,2023-08-22,0,1,1,0,1,1,0,datasets/image_datasets/user1_2023-08-22_valid...
3,user01_valid,2023-08-23,0,1,1,0,0,1,0,datasets/image_datasets/user1_2023-08-23_valid...
4,user01_valid,2023-08-24,1,1,1,0,0,1,0,datasets/image_datasets/user1_2023-08-24_valid...


In [6]:
class CustomDataset(Dataset):
    def __init__(self, df, transforms):
        self.path = df['data_path'].values
        self.class_ = df[['Q1', 'Q2', 'Q3', 'S1', 'S2', 'S3', 'S4']].values
        self.transform = transforms

    def __getitem__(self, idx):
        try:
            img = np.array(Image.open(self.path[idx]).convert('RGB'))
        except FileNotFoundError:
            return self.__getitem__((idx + 1) % len(self))
        
        img = self.transform(image=img)['image']
        img = torch.tensor(img, dtype=torch.float)  # Explicitly specify the type as float
        y = self.class_[idx]
        
        return img, y
    
    def __len__(self):
        return len(self.path)

### 데이터 mean, std 계산

In [7]:
# import os
# import numpy as np
# import cv2
# import albumentations as A

# # 이미지가 저장된 폴더 경로
# folder_path = 'datasets/image_datasets'

# # 이미지 파일 목록 가져오기
# image_files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f)) and f.endswith("valid.png")]

# # 모든 이미지를 읽어 numpy 배열에 저장
# images = []
# for file in image_files:
#     image_path = os.path.join(folder_path, file)
#     image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)  # 흑백 이미지로 읽기
#     if image is not None:
#         images.append(image)

# # 이미지 배열을 numpy 배열로 변환
# images_array = np.array(images)

# # 각 픽셀의 평균과 표준편차 계산
# mean = np.mean(images_array)
# std = np.std(images_array)

# # 결과 출력
# print(f"Mean: {mean}, Std: {std}")

mean = 15.359733188267395
std = 58.28444875927197


In [8]:
import numpy as np
import albumentations as A
from albumentations.core.transforms_interface import ImageOnlyTransform
from albumentations.pytorch import ToTensorV2

class RandomHorizontalStrips(ImageOnlyTransform):
    def __init__(self, num_strips=(1, 3), strip_width=84, always_apply=False, p=0.5):
        super(RandomHorizontalStrips, self).__init__(always_apply, p)
        self.num_strips = num_strips
        self.strip_width = strip_width

    def apply(self, img, **params):
        h, w = img.shape[:2]
        num_strips = np.random.randint(self.num_strips[0], self.num_strips[1] + 1)
        for _ in range(num_strips):
            x_start = np.random.randint(0, w - self.strip_width)
            img[:, x_start:x_start + self.strip_width] = 0
        return img

    def get_transform_init_args_names(self):
        return ("num_strips", "strip_width")

    def get_params(self):
        return {"num_strips": self.num_strips, "strip_width": self.strip_width}

# 기존의 train_transforms에 새로운 증강 기법 추가
train_transforms = A.Compose([
    # RandomHorizontalStrips(p=0.5),
    A.Resize(height=224, width=224, p=1),
    A.GaussNoise(p=0.5),
    A.OneOf([
        A.GaussianBlur(p=0.5),
        A.Sharpen(p=0.5)
    ], p=0.5),
    A.Normalize(mean=[mean, mean, mean], std=[std, std, std], p=1.0),
    ToTensorV2()
])


test_transforms = A.Compose([
    A.Resize(always_apply = True, p=1.0, height=224, width=224),
    A.Normalize(mean=[mean, mean, mean], std=[std, std, std], p=1.0),
    ToTensorV2()
])

In [9]:

valid_dataset = CustomDataset(valid_df, test_transforms)

In [10]:

valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)

In [11]:
epochs = 200

In [12]:
from collections import deque

def run_model(model, loader, loss_fn=None, optimizer=None, is_training=False, epoch=None):
    targets = []
    preds = []
    smooth_loss_queue = deque(maxlen=50)  # 최근 50개의 손실을 저장할 큐

    if is_training:
        model.train()
        mode = 'Train'
    else:
        model.eval()
        mode = 'Valid/Test'

    running_loss = 0.0
    bar = tqdm(loader, ascii=True, leave=False)
    for cnt, (data, target) in enumerate(bar):
        data = data.to('cuda')
        target = target.to('cuda')
        if is_training:
            optimizer.zero_grad()

        outputs = torch.sigmoid(model(data))
        total_loss = loss_fn(outputs, target.float())
        running_loss += total_loss.item()
        smooth_loss_queue.append(total_loss.item())  # 현재 손실을 큐에 추가
        smooth_loss = sum(smooth_loss_queue) / len(smooth_loss_queue)  # 큐에 있는 손실의 평균 계산

        predicted = (outputs > 0.5).float()
        preds.extend(predicted.detach().cpu().tolist())
        targets.extend(target.detach().cpu().tolist())

        if is_training:
            total_loss.backward()
            optimizer.step()

        # 에폭 정보와 함께 배치별 손실 평균 및 smooth loss 출력
        bar.set_description(f'Epoch {epoch} {mode} - Loss: {total_loss:.4f}, Smooth Loss: {smooth_loss:.4f}')

    f1_score_ = f1_score(np.array(targets), np.array(preds), average='macro')
    acc_score = accuracy_score(np.array(targets).reshape(-1), np.array(preds).reshape(-1))

    return running_loss / len(loader), acc_score, f1_score_


In [13]:
full_df = pd.read_csv('datasets/val_label.csv')
full_df['data_path'] = full_df.apply(lambda row: f"datasets/image_datasets/user{row['subject_id']}_{row['date']}_valid.png", axis=1)

In [None]:
import logging
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn import BCELoss
from torchvision import transforms
from sklearn.model_selection import KFold
import timm

# Configure logging
log_path = './logs/seresnext101_32x4d.log'
logging.basicConfig(filename=log_path, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 전체 데이터셋 준비
full_dataset = CustomDataset(full_df, train_transforms)  # full_df는 전체 데이터프레임을 나타냅니다.

# KFold 설정
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=1020)

# K-fold 교차 검증 시작
fold_perf = {}

for fold, (train_idx, valid_idx) in enumerate(kf.split(full_dataset)):
    logging.info(f"Starting Fold {fold+1}")
    
    # 데이터셋 분할
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_idx)
    
    # 데이터 로더 설정
    train_loader = DataLoader(full_dataset, batch_size=16, sampler=train_subsampler)
    valid_loader = DataLoader(full_dataset, batch_size=16, sampler=valid_subsampler)
    
    # 모델 초기화 및 이동
    model = timm.create_model('resnext101_32x32d', pretrained=True, num_classes=7)
    model = model.to('cuda')
    
    # 손실 함수, 최적화, 스케줄러 설정
    criterion = BCELoss()
    optimizer = optim.AdamW(model.parameters(), lr=1e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=50, T_mult=1, eta_min=0.00007)
    
    best_f1 = -float('inf')
    best_loss = 0

    # 학습 및 검증
    for epoch in range(epochs):
        train_loss, train_acc, train_f1 = run_model(model, train_loader, criterion, optimizer, is_training=True, epoch=epoch)
        valid_loss, valid_acc, valid_f1 = run_model(model, valid_loader, criterion, optimizer, is_training=False, epoch=epoch)
        
        logging.info(f'Epoch {epoch+1}: Train Loss: {train_loss:.6f}, Train Acc: {train_acc:.6f}, Train F1: {train_f1:.6f}, Valid Loss: {valid_loss:.6f}, Valid Acc: {valid_acc:.6f}, Valid F1: {valid_f1:.6f}')
        
        if valid_f1 > best_f1:
            best_f1 = valid_f1
            best_loss = valid_loss
            model_path = f'./models/model_fold_{fold}_f1-{best_f1:.3f}_loss-{best_loss:.3f}_resnext101_32x32d.pt'
            torch.save(model.state_dict(), model_path)
            logging.info(f'New best F1-score {valid_f1:.4f} achieved at epoch {epoch}, model saved at {model_path}')
        
        scheduler.step()
    
    # 각 fold의 성능 기록
    fold_perf[fold] = {
        'train_loss': train_loss,
        'train_acc': train_acc,
        'train_f1': train_f1,
        'valid_loss': valid_loss,
        'valid_acc': valid_acc,
        'valid_f1': valid_f1
    }

Epoch 8 Train - Loss: 0.2867, Smooth Loss: 0.2734:  50% 3/6 [00:01<00:01,  1.88it/s]       