# Dataset.py

In [2]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from autoaugment import ImageNetPolicy
from PIL import Image
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from torch.utils.data import Subset

class CustomDataset(Dataset):
    def __init__(self, csv, transforms, is_low = False, is_test=False, debug=False):
        if debug:
            csv = csv[::500]
            
        
        self.is_test = is_test
        if is_low:
            self.path = csv['img_path'].values
        
        else:
            self.path = csv['upscale_img_path'].values

        if not is_test:
            self.class_ = csv['label'].values

        self.transform = transforms

    def __getitem__(self, idx):
        img = np.array(Image.open(self.path[idx]).convert('RGB'))
        img = Image.fromarray(img)
        
        if self.transform:
            img = self.transform(img)

        if not self.is_test:
            y = self.class_[idx]

            return img, y
        
        else:
            return img
        

    
    def __len__(self):
        return len(self.path)


def get_transforms_AutoAug(is_low):
    size = 64 if is_low else 256

    train_transforms = transforms.Compose([
        transforms.Resize(size), 
        transforms.RandomHorizontalFlip(), 
        ImageNetPolicy(), 
        transforms.ToTensor(), 
        transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])
    ])
    
    valid_transforms = transforms.Compose([
        transforms.Resize(size),
        transforms.ToTensor(), 
        transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])
    ])

    return train_transforms, valid_transforms

def get_loader(cfg, is_test=False):
    if cfg['use_kfold']:
        df = pd.read_csv('train.csv')
        kfold = KFold(n_splits=3, shuffle=True, random_state=1020)
        
        train_transforms, valid_transforms = get_transforms_AutoAug(cfg['is_low'])

        # K-Fold의 각 분할에 대해 DataLoader 리스트를 초기화합니다.
        train_loaders = []
        valid_loaders = []
        
        for fold, (train_idx, valid_idx) in enumerate(kfold.split(df)):
            # 훈련 및 검증 데이터프레임을 생성합니다.
            train_df = df.iloc[train_idx]
            valid_df = df.iloc[valid_idx]

            encoder = LabelEncoder()
            train_df['label'] = encoder.fit_transform(train_df['label'])
            valid_df['label'] = encoder.transform(valid_df['label'])
            
            # CustomDataset을 사용하여 훈련 및 검증 데이터셋을 생성합니다.
            train_dataset = CustomDataset(train_df, train_transforms, cfg['is_low'], False, cfg['debug'])
            valid_dataset = CustomDataset(valid_df, valid_transforms, cfg['is_low'], False, cfg['debug'])
            
            # DataLoader 인스턴스를 생성합니다.
            train_loader = DataLoader(train_dataset, batch_size=cfg['batch_size'], shuffle=True)
            valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False)
            
            train_loaders.append(train_loader)
            valid_loaders.append(valid_loader)

        return train_loaders, valid_loaders
        
    else:
        if is_test:
            test_df = pd.read_csv('test.csv')
            test_dataset = CustomDataset(test_df, valid_transforms, cfg['is_low'], is_test=True)
            test_loader = DataLoader(test_dataset, batch_size=cfg['batch_size'], shuffle=False)

        else:
            df = pd.read_csv('train.csv')
            train_df, valid_df = train_test_split(df, test_size=0.2, random_state=1020, stratify=df['label'])

            encoder = LabelEncoder()
            train_df['label'] = encoder.fit_transform(train_df['label'])
            valid_df['label'] = encoder.transform(valid_df['label'])

            train_transforms, valid_transforms = get_transforms_AutoAug(cfg['is_low'])
            train_dataset = CustomDataset(train_df, train_transforms, cfg['is_low'], False, cfg['debug'])
            valid_dataset = CustomDataset(valid_df, valid_transforms, cfg['is_low'], False, cfg['debug'])

            train_loader = DataLoader(train_dataset, batch_size=cfg['batch_size'], shuffle=True)
            valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False)
    

    if not is_test:
        return train_loader, valid_loader
    
    else:
        return test_loader

# models.py

In [3]:
import torch
import torch.nn as nn
import timm

def get_convnext_base():
    model = timm.create_model('convnext_base', pretrained=True, num_classes=25)
    model = model.to('cuda:1')

    return model

def get_convnext_large():
    model = timm.create_model('convnext_large', pretrained=True, num_classes=25)
    model = model.to('cuda:1')

    return model

def get_mobileNetV3_large():
    model = timm.create_model('mobilenetv3_large_100', pretrained=True, num_classes=25)
    model = model.to('cuda:1')

    return model

def get_resnet50_32x4d():
    model = timm.create_model('resnext50_32x4d', pretrained=True, num_classes=25)
    model = model.to('cuda:1')

    return model

def get_swin_large():
    model = timm.create_model('swin_large_patch4_window7_224', pretrained=True, num_classes=25)
    model = model.to('cuda:1')

    return model

def get_deit3_large():
    model = timm.create_model('deit3_large_patch16_224', pretrained=True, num_classes=25)
    model = model.to('cuda:1')

    return model

  from .autonotebook import tqdm as notebook_tqdm


# Trainer.py

In [4]:
import torch.optim as optim
import torch.nn as nn
from collections import deque
from sklearn.metrics import f1_score, accuracy_score
import numpy as np
import csv
import torch
from tqdm.cli import tqdm

def run_model(model, loader, loss_fn=None, optimizer=None, is_training=False, epoch=None, is_test = False):
    targets = []
    preds = []
    smooth_loss_queue = deque(maxlen=100)

    if is_training:
        model.train()
        mode = 'Train'
    else:
        model.eval()
        mode = 'Valid/Test'

    running_loss = 0.0
    bar = tqdm(loader, ascii=True)

    if not is_test:
        for cnt, (data, target) in enumerate(bar):
            data = data.to('cuda:1')
            target = target.to('cuda:1')
            if is_training:
                optimizer.zero_grad()

            outputs = model(data)

            if outputs.dim() == 0:
                outputs = outputs.unsqueeze(0)

            if target.dim() == 0:
                target = target.unsqueeze(0)

            if not is_test:
                total_loss = loss_fn(outputs, target.long())
                running_loss += total_loss.item()
                smooth_loss_queue.append(total_loss.item())
                smooth_loss = sum(smooth_loss_queue) / len(smooth_loss_queue)

            predicted = torch.argmax(outputs, dim=-1)

            if not is_test:
                preds.extend(predicted.detach().cpu().tolist())
                targets.extend(target.detach().cpu().tolist())

            else:
                preds.extend(predicted.detach().cpu().tolist())

            if is_training:
                total_loss.backward()
                optimizer.step()

            
            bar.set_description(f'Loss: {total_loss:.6f} | Smooth Loss: {smooth_loss:.6f}')
        f1_score_ = f1_score(np.array(targets), np.array(preds), average='macro')
        acc_score = accuracy_score(np.array(targets), np.array(preds))

        return running_loss / len(loader), acc_score, f1_score_, np.array(targets), np.array(preds)

    else:
        preds = []
        smooth_loss_queue = deque(maxlen=50)

        model.eval()
        mode = 'Valid/Test'

        bar = tqdm(loader, ascii=True)

        for cnt, (data) in enumerate(bar):
            data = data.to('cuda:1')

            outputs = model(data)

            if outputs.dim() == 0:
                outputs = outputs.unsqueeze(0)

            predicted = torch.argmax(outputs, dim=-1)
            preds.extend(predicted.detach().cpu().tolist())
            
        return preds
    


def train_kfold(model, cfg, train_loaders, valid_loaders):
    for idx, (train_loader, valid_loader) in enumerate(zip(train_loaders, valid_loaders)):
        model = get_swin_large()
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.AdamW(model.parameters(), lr=cfg['lr'])
        scheduler = optim.lr_scheduler.LambdaLR(optimizer=optimizer, lr_lambda=lambda epoch: 0.95 ** epoch)

        best_f1 = 0

        # Initialization list for train visualization
        train_per_loss = []
        valid_per_loss = []

        train_per_acc = []
        valid_per_acc = []

        train_per_f1 = []
        valid_per_f1 = []

        # Print table header

        with open(f"logs/{cfg['attempt_name']}_{idx+1}fold.csv", "w", newline='') as csvfile:
            fieldnames = ['Epoch', 'Train Loss', 'Train Acc', 'Train F1', 'Valid Loss', 'Valid Acc', 'Valid F1', 'Learning Rate']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

            # Write the header
            writer.writeheader()
            for e in range(cfg['epochs']):
                train_loss, train_acc, train_f1, train_targets, train_preds = run_model(model, train_loader, criterion, optimizer, is_training=True, epoch=cfg['epochs'])
                valid_loss, valid_acc, valid_f1, valid_targets, valid_preds = run_model(model, valid_loader, criterion, optimizer, is_training=False, epoch=cfg['epochs'])
                
                train_per_loss.append(train_loss)
                valid_per_loss.append(valid_loss)
                
                train_per_acc.append(train_acc)
                valid_per_acc.append(valid_acc)
                
                train_per_f1.append(train_f1)
                valid_per_f1.append(valid_f1)
                
                # Print epoch results in table format
                print(f'{"-"*75}')
                print_output = f'Epoch: {e} | Train Loss: {train_loss:.6f} | Train Acc: {train_acc:.6f} | Train F1: {train_f1:.6f} | Valid Loss: {valid_loss:.6f} | Valid Acc: {valid_acc:.6f} | Valid F1: {valid_f1:.6f} | LR: {optimizer.param_groups[0]["lr"]:.2e}'
                print(print_output)
                print(f'{"-"*75}')
                writer.writerow({
                    'Epoch': e,
                    'Train Loss': train_loss,
                    'Train Acc': train_acc,
                    'Train F1': train_f1,
                    'Valid Loss': valid_loss,
                    'Valid Acc': valid_acc,
                    'Valid F1': valid_f1,
                    'Learning Rate': optimizer.param_groups[0]['lr']
                })
                
                scheduler.step()

                if valid_f1 < best_f1:
                    print(f'{"*"*75}\nModel saved! Improved from {best_f1:.6f} to {valid_f1:.6f}\n{"*"*75}')
                    best_f1 = valid_loss
                    torch.save(model.state_dict(), f'models/{cfg["attempt_name"]}_{idx+1}fold.pt')
                
def train(model, cfg, train_loader, valid_loader):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=cfg['lr'])
    scheduler = optim.lr_scheduler.LambdaLR(optimizer=optimizer, lr_lambda=lambda epoch: 0.95 ** epoch)

    best_loss = float('inf')

    # Initialization list for train visualization
    train_per_loss = []
    valid_per_loss = []

    train_per_acc = []
    valid_per_acc = []

    train_per_f1 = []
    valid_per_f1 = []

    # Print table header

    with open(f"logs/{cfg['attempt_name']}.csv", "w", newline='') as csvfile:
        fieldnames = ['Epoch', 'Train Loss', 'Train Acc', 'Train F1', 'Valid Loss', 'Valid Acc', 'Valid F1', 'Learning Rate']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        # Write the header
        writer.writeheader()
        for e in range(cfg['epochs']):
            train_loss, train_acc, train_f1, train_targets, train_preds = run_model(model, train_loader, criterion, optimizer, is_training=True, epoch=cfg['epochs'])
            valid_loss, valid_acc, valid_f1, valid_targets, valid_preds = run_model(model, valid_loader, criterion, optimizer, is_training=False, epoch=cfg['epochs'])
            
            train_per_loss.append(train_loss)
            valid_per_loss.append(valid_loss)
            
            train_per_acc.append(train_acc)
            valid_per_acc.append(valid_acc)
            
            train_per_f1.append(train_f1)
            valid_per_f1.append(valid_f1)
            
            # Print epoch results in table format
            print(f'{"-"*75}')
            print_output = f'Epoch: {e} | Train Loss: {train_loss:.6f} | Train Acc: {train_acc:.6f} | Train F1: {train_f1:.6f} | Valid Loss: {valid_loss:.6f} | Valid Acc: {valid_acc:.6f} | Valid F1: {valid_f1:.6f} | LR: {optimizer.param_groups[0]["lr"]:.2e}'
            print(print_output)
            print(f'{"-"*75}')
            writer.writerow({
                'Epoch': e,
                'Train Loss': train_loss,
                'Train Acc': train_acc,
                'Train F1': train_f1,
                'Valid Loss': valid_loss,
                'Valid Acc': valid_acc,
                'Valid F1': valid_f1,
                'Learning Rate': optimizer.param_groups[0]['lr']
            })
            
            scheduler.step()

            if valid_loss < best_loss:
                print(f'{"*"*75}\nModel saved! Improved from {best_loss:.6f} to {valid_loss:.6f}\n{"*"*75}')
                best_loss = valid_loss
                torch.save(model.state_dict(), f'models/{cfg["attempt_name"]}.pt')

# Train.py

In [5]:
import random, os
import json

def seed_everything(seed=42):
    random.seed(seed)  # Python 내장 random 모듈
    os.environ['PYTHONHASHSEED'] = str(seed)  # 환경변수 설정
    np.random.seed(seed)  # NumPy
    torch.manual_seed(seed)  # PyTorch CPU 시드 고정
    torch.cuda.manual_seed(seed)  # PyTorch GPU 시드 고정
    torch.cuda.manual_seed_all(seed)  # 멀티 GPU 환경에서도 시드 고정
    torch.backends.cudnn.deterministic = True  # CuDNN 관련 설정
    torch.backends.cudnn.benchmark = False  # 동일한 입력 크기의 데이터가 반복될 경우 속도 향상을 위한 벤치마크 모드 비활성화

def train_start(cfg, seed, pretrained=False, pretrained_model_pt=''):
    seed_everything(seed=seed)

    filename = 'method_log.json'
    # 파일이 존재하는지 확인하고, 존재하면 기존 내용을 읽습니다.
    if os.path.exists(filename):
        with open(filename, 'r', encoding='utf-8') as file:
            try:
                data = json.load(file)
            except json.JSONDecodeError:  # 파일이 비어있거나 JSON 형식이 아닌 경우
                data = {}
    else:
        data = {}

    # 'attempt_name'을 키로 하여 'method' 값을 갱신하거나 추가합니다.
    data[cfg['attempt_name']] = cfg['method']

    # 변경된 데이터를 파일에 씁니다.
    with open(filename, 'w', encoding='utf-8') as file:
        json.dump(data, file, ensure_ascii=False, indent=4)

    print(f"'{cfg['attempt_name']}' 설정이 '{filename}' 파일에 저장되었습니다.")

    model = get_convnext_base()
    if pretrained:
        state_dict = torch.load(f'models/{pretrained_model_pt}.pt')
        model.load_state_dict(state_dict)
    print(f"model load 완료!")
    
    train_loader, valid_loader = get_loader(cfg)
    print(f"data load 완료!")

    print('학습을 진행합니다.')
    train(model, cfg, train_loader, valid_loader)

    return model

# main controler

In [6]:
cfg = {
    'lr' : 5e-5,
    'epochs' : 6,
    'batch_size' : 4,
    'is_low' : False,
    'attempt_name' : '0416_swin_large_patch16_224_KD_teacher',
    'method' : '고해상도 학습(Teacher) --> 저해상도 학습(distillation student)',
    'use_kfold' : False,
    'debug' : True
}

train_start(cfg, 1020, pretrained=False)

'0416_swin_large_patch16_224_KD_teacher' 설정이 'method_log.json' 파일에 저장되었습니다.


AssertionError: Torch not compiled with CUDA enabled