In [2]:
import os, time, csv, random
import numpy as np
import pandas as pd
from pathlib import Path
from PIL import Image
from collections import Counter
import matplotlib.pyplot as plt
from torch.optim import Adam

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms
from torchvision.models import resnet50, ResNet50_Weights

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, confusion_matrix

from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter


In [3]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

# 로컬용
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# # 서버용
# DEVICE = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE, torch.cuda.get_device_name(0) if DEVICE.type=="cuda" else "")

Device: cuda NVIDIA GeForce RTX 4070 Laptop GPU


In [4]:
print(torch.cuda.is_available())
print(torch.version.cuda)

True
12.6


# 경로설정

In [5]:
# # 서버PC 버전
# ROOT = Path("/home/wanted-1/PotenupWorkspace/aug-project4/data/Dataset_project4")   # <- 본인 데이터 경로

# 로컬
ROOT = Path("data/Dataset_project4")
CLASS_NAMES = [
    "metal_can_steel","metal_can_aluminum","paper",
    "pet_clear","pet_colored",
    "plastic_pe","plastic_pp","plastic_ps",
    "styrofoam","vinyl",
    "glass_brown","glass_green","glass_clear"
]
CLASS_TO_ID = {c:i for i,c in enumerate(CLASS_NAMES)}

IMG_EXTS = {".jpg",".jpeg",".png",".bmp",".webp"}

def infer_class(p: Path):
    s = str(p)
    if "철캔" in s: return "metal_can_steel"
    if "알루미늄캔" in s: return "metal_can_aluminum"
    if "종이" in s: return "paper"
    if "무색" in s: return "pet_clear"
    if "유색" in s: return "pet_colored"
    if "PE" in s: return "plastic_pe"
    if "PP" in s: return "plastic_pp"
    if "PS" in s: return "plastic_ps"
    if "스티로폼" in s: return "styrofoam"
    if "비닐" in s: return "vinyl"
    if "갈색" in s: return "glass_brown"
    if "녹색" in s: return "glass_green"
    if "투명" in s: return "glass_clear"
    return None

def build_df(root: Path):
    paths, labels = [], []
    
    # 클래스 폴더 이름과 실제 클래스 이름을 매핑
    # '유리병_갈색' -> 'glass_brown', '유리병_녹색' -> 'glass_green' 등
    folder_to_class = {
        "금속캔알루미늄캔": "metal_can_aluminum",
        "금속캔철캔": "metal_can_steel",
        "비닐": "vinyl",
        "스티로폼": "styrofoam",
        "유리병갈색": "glass_brown",
        "유리병녹색": "glass_green",
        "유리병투명": "glass_clear",
        "종이": "paper",
        "페트병무색단일": "pet_clear",
        "페트병유색단일": "pet_colored",
        "플라스틱PE": "plastic_pe",
        "플라스틱PP": "plastic_pp",
        "플라스틱PS": "plastic_ps"
    }

    # 루트 폴더의 각 하위 폴더를 탐색
    for folder_path in root.iterdir():
        if folder_path.is_dir() and folder_path.name in folder_to_class:
            class_name = folder_to_class[folder_path.name]
            
            # 해당 클래스 폴더 내의 모든 이미지 파일 탐색
            for img_path in folder_path.rglob("*"):
                if img_path.suffix.lower() in IMG_EXTS:
                    paths.append(str(img_path))
                    labels.append(class_name)

    df = pd.DataFrame({"path": paths, "label": labels})
    return df

df = build_df(ROOT)
print("총 이미지:", len(df))
print(df["label"].value_counts())

총 이미지: 69898
label
plastic_pp            9118
plastic_ps            9112
pet_clear             9104
pet_colored           7707
metal_can_steel       7016
plastic_pe            5968
metal_can_aluminum    4966
paper                 4963
styrofoam             3501
glass_clear           2141
vinyl                 2103
glass_brown           2101
glass_green           2098
Name: count, dtype: int64


# 데이터 분할

In [6]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])

print("학습 데이터셋 크기:", len(train_df))
print("테스트 데이터셋 크기:", len(test_df))

print("\n학습 데이터셋 클래스별 분포:")
print(train_df['label'].value_counts())

print("\n테스트 데이터셋 클래스별 분포:")
print(test_df['label'].value_counts())

학습 데이터셋 크기: 55918
테스트 데이터셋 크기: 13980

학습 데이터셋 클래스별 분포:
label
plastic_pp            7294
plastic_ps            7290
pet_clear             7283
pet_colored           6166
metal_can_steel       5613
plastic_pe            4774
metal_can_aluminum    3973
paper                 3970
styrofoam             2801
glass_clear           1713
vinyl                 1682
glass_brown           1681
glass_green           1678
Name: count, dtype: int64

테스트 데이터셋 클래스별 분포:
label
plastic_pp            1824
plastic_ps            1822
pet_clear             1821
pet_colored           1541
metal_can_steel       1403
plastic_pe            1194
metal_can_aluminum     993
paper                  993
styrofoam              700
glass_clear            428
vinyl                  421
glass_green            420
glass_brown            420
Name: count, dtype: int64


# 클래스 불균형 문제 해결: WeightedRandomSampler

In [7]:
# 클래스별 샘플 수 계산
class_counts = train_df['label'].value_counts()
print("\n클래스별 샘플 수:\n", class_counts)

# 각 클래스에 대한 가중치 계산 (총 샘플 수 / 해당 클래스 샘플 수)
num_samples = len(train_df)
class_weights = {class_name: num_samples / count for class_name, count in class_counts.items()}
print('\n클래스별 가중치:\n', class_weights)

# 학습 데이터프레임의 각 샘플에 대한 가중치 할당
train_weights = train_df['label'].apply(lambda x: class_weights[x])

# weightedRandomSampler 생성
sampler = WeightedRandomSampler(
    weights=train_weights.values,
    num_samples=len(train_weights),
    replacement=True
)

print("\nWeightedRandomSampler가 성공적으로 생성되었습니다.")


클래스별 샘플 수:
 label
plastic_pp            7294
plastic_ps            7290
pet_clear             7283
pet_colored           6166
metal_can_steel       5613
plastic_pe            4774
metal_can_aluminum    3973
paper                 3970
styrofoam             2801
glass_clear           1713
vinyl                 1682
glass_brown           1681
glass_green           1678
Name: count, dtype: int64

클래스별 가중치:
 {'plastic_pp': 7.6663010693720866, 'plastic_ps': 7.670507544581619, 'pet_clear': 7.677879994507758, 'pet_colored': 9.068764190723321, 'metal_can_steel': 9.962230536255122, 'plastic_pe': 11.713028906577293, 'metal_can_aluminum': 14.074502894538133, 'paper': 14.085138539042822, 'styrofoam': 19.963584434130667, 'glass_clear': 32.64331582019848, 'vinyl': 33.244946492271104, 'glass_brown': 33.264723378941106, 'glass_green': 33.32419547079857}

WeightedRandomSampler가 성공적으로 생성되었습니다.


# 파이토치 Dataset & DataLoader 클래스 정의

In [8]:
class CustomDataset(Dataset):
    def __init__(self, df, class_to_id, transform=None):
        self.df = df
        self.class_to_id = class_to_id
        self.transform = transform

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = row['path']
        label_name = row['label']

        image = Image.open(img_path).convert('RGB')
        label_id = self.class_to_id[label_name]

        if self.transform:
            image = self.transform(image)

        return image, label_id


# 데이터 증강 및 전처리 설정

In [11]:
IMG_SIZE = 512
BATCH_SIZE = 32

train_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.2),
    # 무작위 원근 왜곡 추가
    transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [12]:
# Dataset 인스턴스 생성
train_dataset = CustomDataset(train_df, CLASS_TO_ID, train_transforms)
test_dataset = CustomDataset(test_df, CLASS_TO_ID, test_transforms)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, num_workers=0) # WeightedRandomSampler 적용
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0) # num_workers 매개변수는 데이터 로딩에 사용할 서브프로세스의 개수를 결정

In [13]:
print(len(train_loader))
print(len(test_loader))

1748
437


# 모델 정의 및 학습 준비

In [14]:
# 사전 학습된 ResNet50 모델 로드

model = resnet50(weights=ResNet50_Weights.DEFAULT)

# 마지막 Fully Connected 레이어 수정
# ResNet50의 마지막 레이어는 1000개의 클래스를 분류하도록 되어 있음
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs,len(CLASS_NAMES))

# 모델을 GPU(DEVICE)로 이동
model = model.to(DEVICE)

# 손실 함수와 옵티마이저 정의
lr = 1e-3
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

print("model, loss, optimizer가 준비되었습니다.")

model, loss, optimizer가 준비되었습니다.


# 학습 루프 및 평가

## epochs = 5 , batch size = 32

In [None]:
def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=5):
    writer = SummaryWriter()
    best_accuracy = 0.0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        
        for i, (images, labels) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")):
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * images.size(0)
            if (i + 1) % 50 == 0:
                print(f"[Epoch {epoch+1}/{num_epochs}, Batch {i+1}/{len(train_loader)}] Loss: {loss.item():.4f}")

        epoch_train_loss = train_loss / len(train_loader.dataset)
        writer.add_scalar('Loss/train', epoch_train_loss, epoch)

        # 모델 평가
        model.eval()
        test_loss = 0.0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for images, labels in tqdm(test_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Eval]"):
                images, labels = images.to(DEVICE), labels.to(DEVICE)
                outputs = model(images)
                loss = criterion(outputs, labels)
                test_loss += loss.item() * images.size(0)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        epoch_test_loss = test_loss / len(test_loader.dataset)
        
        # --- 새로운 평가 지표 계산 및 출력 ---
        accuracy = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)
        
        # 상세 Classification Report 출력
        print(f"\n--- Epoch {epoch+1}/{num_epochs} Performance Report ---")
        print(f"Test Loss: {epoch_test_loss:.4f}")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1-Score: {f1:.4f}")
        print("\nClassification Report:\n", classification_report(all_labels, all_preds, target_names=CLASS_NAMES, zero_division=0))
        print("--------------------------------------------------")

        # 텐서보드에 지표 기록
        writer.add_scalar('Loss/test', epoch_test_loss, epoch)
        writer.add_scalar('Accuracy/test', accuracy, epoch)
        writer.add_scalar('Precision/test', precision, epoch)
        writer.add_scalar('Recall/test', recall, epoch)
        writer.add_scalar('F1-Score/test', f1, epoch)

        if accuracy > best_accuracy:
            best_accuracy = accuracy

            # 더 많은 정보를 딕셔너리에 추가
            best_classification_report = classification_report(all_labels, all_preds, target_names=CLASS_NAMES, zero_division=0)
        
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'best_accuracy': best_accuracy,
            'best_f1_score': f1,  # F1-Score 추가
            'best_classification_report': best_classification_report  # Classification Report 추가
        }, 'best_model_checkpoint.pth')
        print(f"새로운 최고 정확도 달성: {best_accuracy:.4f}. 상세 지표를 저장합니다.")

    writer.close()

# --- 학습 시작 ---
if __name__ == '__main__':
    train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=5)
    print("학습이 완료되었습니다.")
    final_model_path = "model/Best_ResNet50_model.pth"
    torch.save(model.state_dict(), final_model_path)
    print(f"최종 모델이 '{final_model_path}'에 저장되었습니다.")

# 나중에 불러와서, 하이퍼파라미터 수정하고 체크포인트 부분부터 수정할 때.

## epochs = 12, batch size =32

In [None]:
# 모델 정의 (ResNet50)
model = resnet50(weights=ResNet50_Weights.DEFAULT)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(CLASS_NAMES))
model = model.to(DEVICE)

optimizer = optim.Adam(model.parameters(), lr=lr)

# --- 체크포인트 로드 ---
start_epoch = 0
best_accuracy = 0.0
checkpoint_path = 'checkpoint/best_model_checkpoint.pth'

if os.path.exists(checkpoint_path):
    print("체크포인트를 로드하여 학습을 재개합니다.")
    checkpoint = torch.load(checkpoint_path, map_location=DEVICE)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch']
    best_accuracy = checkpoint['best_accuracy']
    print(f"이전 최고 정확도: {best_accuracy:.4f}, 이어서 시작할 에포크: {start_epoch + 1}")
else:
    print("체크포인트 파일이 없습니다. 0 에포크부터 학습을 시작합니다.")



# --- 학습 함수 정의(이전 코드와 동일) ---
def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs, start_epoch, best_accuracy):
    writer = SummaryWriter()
    
    for epoch in range(start_epoch, num_epochs):
        model.train()
        train_loss = 0.0
        
        for i, (images, labels) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")):
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * images.size(0)
            if (i + 1) % 50 == 0:
                print(f"[Epoch {epoch+1}/{num_epochs}, Batch {i+1}/{len(train_loader)}] Loss: {loss.item():.4f}")

        epoch_train_loss = train_loss / len(train_loader.dataset)
        writer.add_scalar('Loss/train', epoch_train_loss, epoch)
        model.eval()
        test_loss = 0.0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for images, labels in tqdm(test_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Eval]"):
                images, labels = images.to(DEVICE), labels.to(DEVICE)
                outputs = model(images)
                loss = criterion(outputs, labels)
                test_loss += loss.item() * images.size(0)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        epoch_test_loss = test_loss / len(test_loader.dataset)
        accuracy = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)
        
        print(f"\n--- Epoch {epoch+1}/{num_epochs} Performance Report ---")
        print(f"Test Loss: {epoch_test_loss:.4f}")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1-Score: {f1:.4f}")
        print("\nClassification Report:\n", classification_report(all_labels, all_preds, target_names=CLASS_NAMES, zero_division=0))
        print("--------------------------------------------------")

        writer.add_scalar('Loss/test', epoch_test_loss, epoch)
        writer.add_scalar('Accuracy/test', accuracy, epoch)
        writer.add_scalar('Precision/test', precision, epoch)
        writer.add_scalar('Recall/test', recall, epoch)
        writer.add_scalar('F1-Score/test', f1, epoch)

        if accuracy > best_accuracy:
            best_accuracy = accuracy
            # 더 많은 정보를 딕셔너리에 추가
            best_classification_report = classification_report(all_labels, all_preds, target_names=CLASS_NAMES, zero_division=0)
        
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'best_accuracy': best_accuracy,
            'best_f1_score': f1,  # F1-Score 추가
            'best_classification_report': best_classification_report  # Classification Report 추가
        }, 'best_model_checkpoint.pth')
        print(f"새로운 최고 정확도 달성: {best_accuracy:.4f}. 상세 지표를 저장합니다.")
        
    writer.close()
        
# --- 학습 실행 ---
num_epochs_to_run = 10 # 총 학습시키고 싶은 에포크 수
train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=num_epochs_to_run, start_epoch=start_epoch, best_accuracy=best_accuracy)

print("학습이 완료되었습니다.")
# 현재 시간을 이용해 파일 이름 생성
timestamp = time.strftime("%Y%m%d-%H%M%S")
final_model_path = f"model/Best_ResNet50_model_{timestamp}.pth" # 타임라인별로 모델 저장
torch.save(model.state_dict(), final_model_path) 
print(f"최종 모델이 '{final_model_path}'에 저장되었습니다.")



체크포인트 파일이 없습니다. 0 에포크부터 학습을 시작합니다.


Epoch 1/10 [Train]:   0%|          | 0/1748 [00:00<?, ?it/s]

# 최적의 점수 확인

In [None]:
import torch

# 체크포인트 파일 경로 설정
checkpoint_path = 'checkpoint/best_model_checkpoint.pth'

# 체크포인트 파일이 있는지 확인
if os.path.exists(checkpoint_path):
    # GPU 사용 여부 확인 후 체크포인트 로드
    map_location = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    print(f"'{checkpoint_path}' 파일을 불러옵니다.")
    checkpoint = torch.load(checkpoint_path, map_location=map_location) # map_location은 저장된 가중치를 불러올 때 어디로 불러올지를 정해주는 역할

    # 저장된 상세 정보 출력
    print("=====================================================")
    print("최고 성능 기록 정보")
    print("-----------------------------------------------------")
    print(f"최고 정확도 달성 에포크: {checkpoint['epoch']} 에포크")
    print(f"최고 정확도: {checkpoint['best_accuracy']:.4f}")
    # print(f"최고 정밀도: {checkpoint['best_precision']:.4f}")
    # print(f"최고 재현율: {checkpoint['best_recall']:.4f}")
    # print(f"최고 F1-Score: {checkpoint['best_f1']:.4f}")
    # print("-----------------------------------------------------")
    # print("\n상세 Classification Report:\n")
    # print(checkpoint['best_classification_report'])
    print("=====================================================")

else:
    print(f"오류: '{checkpoint_path}' 파일이 존재하지 않습니다. 학습을 먼저 완료해주세요.")