In [1]:
import subprocess
import time
import threading
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.utils.data import random_split, WeightedRandomSampler, Subset
from collections import Counter
import torch.cuda.amp as amp
from torch import nn, optim
import copy
from torch.optim import lr_scheduler
import timm
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
import numpy as np

In [2]:
def get_gpu_usage():
    result = subprocess.run(['nvidia-smi', '--query-gpu=utilization.gpu', '--format=csv,noheader,nounits'], 
                            stdout=subprocess.PIPE)
    return int(result.stdout.decode('utf-8').strip())

class DynamicDataLoader:
    def __init__(self, dataset, batch_size=32, num_workers=4, pin_memory=True, prefetch_factor=2):
        self.dataset = dataset
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.pin_memory = pin_memory
        self.prefetch_factor = prefetch_factor
        self.loader = self.create_loader()
        self.adjusting = False
        self.target_gpu_usage = 95  # Target GPU usage in percent

    def create_loader(self):
        return DataLoader(self.dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers, 
                          pin_memory=self.pin_memory, prefetch_factor=self.prefetch_factor, persistent_workers=True)

    def adjust_num_workers(self):
        while self.adjusting:
            gpu_usage = get_gpu_usage()
            print(f"Current GPU usage: {gpu_usage}%")
            if gpu_usage < self.target_gpu_usage - 10 and self.num_workers < 16:
                self.num_workers += 1
                print(f"Increasing num_workers to {self.num_workers}")
            elif gpu_usage > self.target_gpu_usage + 10 and self.num_workers > 1:
                self.num_workers -= 1
                print(f"Decreasing num_workers to {self.num_workers}")
            self.loader = self.create_loader()
            time.sleep(20)

    def start_adjusting(self):
        self.adjusting = True
        self.adjust_thread = threading.Thread(target=self.adjust_num_workers)
        self.adjust_thread.start()

    def stop_adjusting(self):
        self.adjusting = False
        self.adjust_thread.join()

    def get_loader(self):
        return self.loader

In [3]:
base_dir = './image_data/'

# 데이터 전처리 및 증강
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(380),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.RandomVerticalFlip(),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(400),
        transforms.CenterCrop(380),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# 전체 데이터셋 로드
full_dataset = datasets.ImageFolder(base_dir, transform=data_transforms['train'])

# 클래스별 이미지 개수 출력
class_counts = Counter([full_dataset.targets[i] for i in range(len(full_dataset))])
print("Original class distribution:", class_counts)

# WeightedRandomSampler 사용하여 클래스 불균형 해결
class_weights = [1.0 / class_counts[i] for i in range(len(class_counts))]
sample_weights = [class_weights[label] for label in full_dataset.targets]
sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

print("Splitting dataset into training and test sets...")
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# 훈련 데이터셋과 테스트 데이터셋 각각에 다른 변환 적용
train_dataset.dataset.transform = data_transforms['train']
test_dataset.dataset.transform = data_transforms['test']

# DynamicDataLoader 사용
dynamic_loader = DynamicDataLoader(train_dataset, batch_size=32, num_workers=4, pin_memory=True, prefetch_factor=4)
dynamic_loader.start_adjusting()

test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True, prefetch_factor=4, persistent_workers=True)

dataloaders = {'train': dynamic_loader.get_loader(), 'test': test_loader}
dataset_sizes = {'train': len(train_dataset), 'test': len(test_dataset)}
class_names = full_dataset.classes

print("Training and test data are ready.")


Original class distribution: Counter({0: 126, 1: 126, 2: 126, 3: 126})
Splitting dataset into training and test sets...
Training and test data are ready.


In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [5]:
def train_and_evaluate(params):
    model = timm.create_model('efficientnet_b4', pretrained=True)
    num_ftrs = model.classifier.in_features
    model.classifier = nn.Sequential(
        nn.Dropout(params['dropout']),
        nn.Linear(num_ftrs, len(class_names))
    )
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
    scaler = amp.GradScaler()
    scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=10, threshold_mode='rel')

    num_epochs = 80
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_test_acc = 0.0  # 가장 높은 테스트 데이터 정확도를 저장하기 위한 변수

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    with amp.autocast():
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                    if phase == 'train':
                        scaler.scale(loss).backward()
                        scaler.step(optimizer)
                        scaler.update()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'test':
                scheduler.step(epoch_acc)

                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                
                if epoch_acc > best_test_acc:
                    best_test_acc = epoch_acc

    model.load_state_dict(best_model_wts)
    return {'loss': -best_acc, 'status': STATUS_OK, 'model_state_dict': best_model_wts}

# Hyperopt를 위한 하이퍼파라미터 공간 정의
space = {
    'batch_size': hp.choice('batch_size', [16, 32, 64]),
    'lr': hp.loguniform('lr', np.log(0.00001), np.log(0.001)),
    'weight_decay': hp.loguniform('weight_decay', np.log(0.0001), np.log(0.01)),
    'dropout': hp.uniform('dropout', 0.2, 0.7)
}

# 최적화 실행
trials = Trials()
best = fmin(fn=train_and_evaluate,
            space=space,
            algo=tpe.suggest,
            max_evals=20,
            trials=trials)

print(f"Best parameters: {best}")

# Save the best model
best_model = timm.create_model('efficientnet_b4', pretrained=True)
num_ftrs = best_model.classifier.in_features
best_model.classifier = nn.Sequential(
    nn.Dropout(best['dropout']),
    nn.Linear(num_ftrs, len(class_names))
)
best_model.load_state_dict(trials.best_trial['result']['model_state_dict'])
torch.save(best_model.state_dict(), 'personal_color_efficientnet_b4.pth')

# Print the best test accuracy
best_test_acc = trials.best_trial['result']['best_test_acc']
print(f"Best Test Accuracy: {best_test_acc:.4f}")

print("Training complete")

Current GPU usage: 4%                                 
Increasing num_workers to 5                           
Epoch 0/79                                            
----------                                            
train Loss: 1.3777 Acc: 0.3027                        
test Loss: 1.3703 Acc: 0.3366                         
Epoch 1/79                                            
----------                                            
train Loss: 1.1158 Acc: 0.7767                        
test Loss: 1.2214 Acc: 0.4851                         
Epoch 2/79                                            
----------                                            
train Loss: 0.5387 Acc: 0.8685                        
test Loss: 0.9347 Acc: 0.6337                         
Epoch 3/79                                            
----------                                            
train Loss: 0.0717 Acc: 0.9901                        
test Loss: 1.5579 Acc: 0.6238                         
Epoch 4/79

KeyError: 'best_test_acc'

Current GPU usage: 1%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 1%
Current GPU usage: 1%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 0%
Current GPU usage: 2%
Current GPU usage: 0%
Current GPU usage: 0%


In [6]:
dynamic_loader.stop_adjusting()