In [4]:
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import StratifiedShuffleSplit

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from sklearn.metrics import precision_score, recall_score, f1_score
import pandas as pd

In [5]:
import numpy as np
from collections import Counter
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import StratifiedShuffleSplit

def load_data(batch_size=32):
    # 1) 경로 설정
    data_dir = "./Garbage classification"  

    # 2) 이미지 전처리 설정
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        # 필요 시 Normalize 추가
    ])

    # 3) 전체 데이터셋 로드
    full_dataset = datasets.ImageFolder(root=data_dir, transform=transform)
    labels = np.array(full_dataset.targets)

    # 4) Stratified Split (Test 비율 0.2)
    sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
    train_idx, test_idx = next(sss.split(np.zeros(len(labels)), labels))

    # 5) Subset & DataLoader 생성
    train_dataset = Subset(full_dataset, train_idx)
    test_dataset  = Subset(full_dataset, test_idx)

    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True,
        num_workers=4, pin_memory=True
    )
    test_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False,
        num_workers=4, pin_memory=True
    )

    # 6) 클래스 이름 확인
    class_names = full_dataset.classes
    print("Classes:", class_names)
    print(f"Total samples: {len(full_dataset)}, Train: {len(train_dataset)}, Test: {len(test_dataset)}")

    # 7) 클래스별 분포 출력
    train_labels = labels[train_idx]
    test_labels  = labels[test_idx]
    train_counts = Counter(train_labels)
    test_counts  = Counter(test_labels)
    print("\nClass distribution:")
    print(f"{'Class':<15}{'Total':>8}{'Train':>8}{'Test':>8}{'Test %':>9}")
    for i, cname in enumerate(class_names):
        tot = sum(labels == i)
        trn = train_counts[i]
        tst = test_counts[i]
        pct = tst / tot * 100
        print(f"{cname:<15}{tot:>8}{trn:>8}{tst:>8}{pct:>8.1f}%")

    return train_loader, test_loader, class_names


In [6]:
def train_vgg16(train_loader, test_loader, class_names,
                num_epochs=10, learning_rate=1e-4,
                best_model_path="best_vgg16.pth",
                final_model_path="final_vgg16.pth",
                metrics_csv="metrics.csv"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # 1) Pretrained VGG16 불러오기
    model = models.vgg16(pretrained=True)
    # 2) feature extractor 동결
    for param in model.features.parameters():
        param.requires_grad = False
    
    # 3) classifier 마지막 레이어 교체
    num_features = model.classifier[6].in_features
    model.classifier[6] = nn.Linear(num_features, len(class_names))
    model = model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.classifier[6].parameters(), lr=learning_rate)
    
    best_acc = 0.0
    history = []
    
    for epoch in range(1, num_epochs+1):
        # --- Training ---
        model.train()
        run_loss, run_corr, run_tot = 0.0, 0, 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()
            
            run_loss += loss.item() * x.size(0)
            preds = out.argmax(1)
            run_corr += (preds == y).sum().item()
            run_tot  += y.size(0)
        train_loss = run_loss / run_tot
        train_acc  = run_corr / run_tot
        
        # --- Validation ---
        model.eval()
        val_loss, val_corr, val_tot = 0.0, 0, 0
        all_preds, all_labels = [], []
        with torch.no_grad():
            for x, y in test_loader:
                x, y = x.to(device), y.to(device)
                out = model(x)
                loss = criterion(out, y)
                
                val_loss += loss.item() * x.size(0)
                preds = out.argmax(1)
                val_corr += (preds == y).sum().item()
                val_tot  += y.size(0)
                
                all_preds.extend(preds.cpu().tolist())
                all_labels.extend(y.cpu().tolist())
        val_loss /= val_tot
        val_acc  = val_corr / val_tot
        val_prec = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        val_rec  = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        val_f1   = f1_score(all_labels, all_preds, average='macro', zero_division=0)
        
        print(f"[{epoch}/{num_epochs}] "
              f"Train Loss:{train_loss:.4f} Acc:{train_acc:.4f} | "
              f"Val   Loss:{val_loss:.4f} Acc:{val_acc:.4f} "
              f"P:{val_prec:.4f} R:{val_rec:.4f} F1:{val_f1:.4f}")
        
        history.append({
            'epoch': epoch,
            'train_loss': train_loss,
            'train_acc': train_acc,
            'val_loss': val_loss,
            'val_acc': val_acc,
            'val_precision': val_prec,
            'val_recall': val_rec,
            'val_f1': val_f1
        })
        
        # 베스트 모델 저장
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), best_model_path)
    
    # 최종 모델 저장
    torch.save(model.state_dict(), final_model_path)
    print(f"\nBest Val Acc: {best_acc:.4f} (saved to {best_model_path})")
    print(f"Final model saved to {final_model_path}")
    
    # CSV로 지표 저장
    df = pd.DataFrame(history)
    df.to_csv(metrics_csv, index=False)
    print(f"Epoch metrics saved to {metrics_csv}")
    
    return model

In [9]:
train_loader, test_loader, class_names = load_data(batch_size=32)
model = train_vgg16(
    train_loader, test_loader, class_names,
    num_epochs=30, learning_rate=1e-4
)

Classes: ['cardboard', 'glass', 'metal', 'paper', 'plastic']
Total samples: 2000, Train: 1600, Test: 400

Class distribution:
Class             Total   Train    Test   Test %
cardboard           400     320      80    20.0%
glass               400     320      80    20.0%
metal               400     320      80    20.0%
paper               400     320      80    20.0%
plastic             400     320      80    20.0%




KeyboardInterrupt: 