In [None]:
import os
import torch
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from PIL import Image
from tqdm import tqdm
import torch.nn as nn
from collections import Counter

# Custom dataset
class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = os.listdir(root_dir)
        self.data = []

        for label in range(len(self.classes)):
            class_folder = os.path.join(root_dir, self.classes[label])
            for filename in os.listdir(class_folder):
                img_path = os.path.join(class_folder, filename)
                self.data.append((img_path, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        image = Image.open(img_path).convert('RGB')  # 이미지 RGB로 변환
        if self.transform:
            image = self.transform(image)
        return image, label

# 경로 및 배치 크기 설정
base_dir = './uploadfolder'  # 베이스 경로 (압축을 푼 폴더 경로)
batch_size = 16

# 통합된 train 및 valid 폴더 경로 설정
train_dir = os.path.join(base_dir, '찐막', 'train')
valid_dir = os.path.join(base_dir, '찐막', 'valid')

# 데이터 증강 포함한 이미지 전처리
transform = T.Compose([
    T.Resize((224, 224)),
    T.RandomHorizontalFlip(),
    T.RandomRotation(15),  # 더 큰 회전 각도
    T.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # 이미지 이동
    T.GaussianBlur(kernel_size=3),  # Gaussian Blur 추가
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 학습 및 검증 데이터셋 생성
train_dataset = CustomDataset(train_dir, transform=transform)
valid_dataset = CustomDataset(valid_dir, transform=transform)

# train 데이터셋이 비어있는지 확인
if len(train_dataset) == 0:
    raise ValueError("Train dataset is empty. Check if the images are correctly loaded.")

# 각 클래스의 데이터 개수 계산 (Counter 사용)
class_counts = Counter([label for _, label in train_dataset])

# 클래스별로 가중치를 부여 (데이터 개수의 역수 사용)
class_weights = {cls: 1.0 / count for cls, count in class_counts.items()}

# 각 샘플의 가중치를 리스트로 변환
sample_weights = [class_weights[label] for _, label in train_dataset]

# 샘플 가중치가 비어있는지 확인
if len(sample_weights) == 0:
    raise ValueError("Sample weights are empty. Check the dataset loading process.")

# WeightedRandomSampler 생성
sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)

# WeightedRandomSampler를 적용한 DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

# 사전 학습된 resnext50_32x4d 모델 사용
from torchvision.models import ResNeXt50_32X4D_Weights

weights = ResNeXt50_32X4D_Weights.DEFAULT
model = models.resnext50_32x4d(weights=weights)

# 모든 레이어 학습 가능하도록 설정 (전이 학습 제외)
for param in model.parameters():
    param.requires_grad = True

# 출력 레이어를 분류하려는 클래스 수에 맞게 수정 (드롭아웃 비율 0.3으로 설정)
model.fc = nn.Sequential(
    nn.Dropout(0.3),  # Dropout 비율을 0.3으로 조정
    nn.Linear(model.fc.in_features, len(train_dataset.classes))
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Early Stopping 설정
best_accuracy = 0
patience = 5  # 개선되지 않는 에포크를 허용하는 최대 수
counter = 0

# 손실 함수 및 AdamW 옵티마이저 설정 (학습률 0.0001로 감소)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001)

# 학습률 스케줄러 추가 (학습률 점진적 감소, gamma=0.5, step_size=10)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# 학습 과정 (TQDM으로 진행 상황 시각화, 에포크 30)
num_epochs = 30
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    # TQDM으로 학습 진행 표시
    train_loader_iter = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")

    for images, labels in train_loader_iter:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

        train_loader_iter.set_postfix(loss=running_loss / len(train_loader))

    train_accuracy = 100 * correct_train / total_train
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Train Accuracy: {train_accuracy}%")

    # 학습률 스케줄러 업데이트
    scheduler.step()

    # 검증 평가
    model.eval()
    correct_valid = 0
    total_valid = 0
    with torch.no_grad():
        for images, labels in valid_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total_valid += labels.size(0)
            correct_valid += (predicted == labels).sum().item()

    valid_accuracy = 100 * correct_valid / total_valid
    print(f'Validation Accuracy: {valid_accuracy}%')

    # Early Stopping 기준
    if valid_accuracy > best_accuracy:
        best_accuracy = valid_accuracy
        counter = 0  # 성능 개선 시 카운터 초기화
        torch.save(model.state_dict(), 'best_model.pth')  # 최적의 모델 저장
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping 적용")
            break
