In [22]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import torchmetrics
from PIL import Image, ImageFile
import os

In [23]:
# 랜덤 시드 결정
seed_num = 11
np.random.seed(seed_num)
torch.manual_seed(seed_num)

# 폴더에서 데이터 로드
train_dir = "C:\\Users\\Admin\\Desktop\\쓰레기\\Training"
val_dir = "C:\\Users\\Admin\\Desktop\\쓰레기\\Validation"

train_dataset = ImageFolder(root=train_dir)
val_dataset = ImageFolder(root=val_dir)

# PIL에서 손상된 이미지 처리
ImageFile.LOAD_TRUNCATED_IMAGES = True

# 커스텀 데이터셋 클래스 정의
class CustomImageFolder(ImageFolder):
    def __getitem__(self, index):
        try:
            return super(CustomImageFolder, self).__getitem__(index)
        except OSError as e:
            print(f"Error loading image at index {index}: {e}")
            return None

In [24]:
# 폴더에서 데이터 로드
train_dataset = CustomImageFolder(root="C:\\Users\\Admin\\Desktop\\쓰레기\\Training")
val_dataset = CustomImageFolder(root="C:\\Users\\Admin\\Desktop\\쓰레기\\Validation")

# 전체 데이터 사용
train_indices = list(range(len(train_dataset)))
val_indices = list(range(len(val_dataset)))

train_subset = Subset(train_dataset, train_indices)
val_subset = Subset(val_dataset, val_indices)

# 데이터 로더 설정에 collate_fn 추가
def custom_collate_fn(batch):
    batch = list(filter(lambda x: x is not None, batch))
    return torch.utils.data.dataloader.default_collate(batch)

In [25]:
# 데이터 변환 설정
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# transform 설정
train_subset.dataset.transform = train_transform
val_subset.dataset.transform = test_transform

In [26]:
# 데이터 로더 설정
batch_size = 32
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate_fn)

# 사전 훈련된 모델 가져오기 및 수정
transfer_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

# 모든 레이어를 고정
for param in transfer_model.parameters():
    param.requires_grad = False

# 마지막 레이어 교체
num_ftrs = transfer_model.fc.in_features
transfer_model.fc = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(1024, len(train_dataset.classes))  # 클래스 수에 맞게 변경
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transfer_model = transfer_model.to(device)

# 손실 함수 및 최적화 알고리즘 설정
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(transfer_model.fc.parameters(), lr=1e-4)

In [27]:
# 모델 학습 함수
def train_and_validate_best(model, train_loader, val_loader, optimizer, criterion, epochs):
    train_losses = []
    val_losses = []
    accuracies = []

    min_val_loss = float('inf')  # 가장 좋은 모델을 추적하기 위한 변수 초기화

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in tqdm(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)
        train_losses.append(epoch_loss)

        # 검증 과정
        model.eval()
        running_val_loss = 0.0
        accuracy_metric = torchmetrics.Accuracy(task='multiclass', num_classes=len(train_dataset.classes), average='macro').to(device)

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item()
                accuracy_metric.update(outputs, labels)
            accuracy = accuracy_metric.compute()  # 정확도 계산
            accuracies.append(accuracy.item())
        epoch_val_loss = running_val_loss / len(val_loader)
        val_losses.append(epoch_val_loss)

        print(f'Epoch [{epoch + 1}/{epochs}] - Training loss: {epoch_loss:.3f}, Validation loss: {epoch_val_loss:.3f}, accuracy: {accuracy:.2%}')

        # 가장 좋은 모델만 저장
        if epoch_val_loss < min_val_loss:
            min_val_loss = epoch_val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'Best model saved at epoch {epoch + 1}.')

    return train_losses, val_losses, accuracies

In [28]:
# 모델 학습
history = train_and_validate_best(transfer_model, train_loader, val_loader, optimizer, criterion, epochs=50)

  0%|          | 0/19061 [00:00<?, ?it/s]

In [None]:
# 학습된 모델 불러오기
transfer_model_best = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_ftrs = transfer_model_best.fc.in_features
transfer_model_best.fc = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(1024, len(train_dataset.classes))
)
transfer_model_best.load_state_dict(torch.load('best_model.pth'))
transfer_model_best = transfer_model_best.to(device)

In [None]:
# 학습 및 검증 손실 시각화
train_losses, val_losses, accuracies = history

epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(10, 5))
plt.plot(epochs, train_losses, 'bo-', label='Training loss')
plt.plot(epochs, val_losses, 'ro-', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# 모델 평가 함수
def evaluate(model, data_loader, device):
    model.eval()
    accuracy_metric = torchmetrics.Accuracy(task='multiclass', num_classes=len(train_dataset.classes), average='macro').to(device)
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            accuracy_metric.update(outputs, labels)
    accuracy = accuracy_metric.compute()
    return accuracy.item()

# 최종 모델 평가
test_accuracy = evaluate(transfer_model_best, val_loader, device)
print(f'Test Accuracy: {test_accuracy:.2%}')

In [None]:
# 클래스 목록 출력
class_names = train_dataset.classes
print("Model classes:")
for idx, class_name in enumerate(class_names):
    print(f"{idx}: {class_name}")

In [None]:
# 평가에 사용할 이미지 로드 및 변환 함수
def load_img(image_path):
    image = Image.open(image_path)
    image = test_transform(image).unsqueeze(0)
    return image

# 예측 함수
def predict_image(model, image_path, class_names):
    model.eval()
    with torch.no_grad():
        try:
            input_image = load_img(image_path).to(device)
            outputs = model(input_image)
            _, preds = torch.max(outputs, 1)

            original_img = input_image.squeeze(0).cpu().detach().numpy().transpose((1, 2, 0))
            mean = np.array([0.485, 0.456, 0.406])
            std = np.array([0.229, 0.224, 0.225])
            original_img = std * original_img + mean
            original_img = np.clip(original_img, 0, 1)

            plt.imshow(original_img)
            plt.title(f'Predicted: {class_names[preds.item()]}')
            plt.show()
            print("모델 예측 결과 :", preds)
            print("모델 예측 결과 :", class_names[preds.item()])
        except Exception as e:
            print(f"Error loading or processing image: {e}")

# 평가할 이미지 파일 경로 설정
image_path1 = "C:\\Project\\Python_basic\\깨끗한K세상\\test\\페트병 쓰레기.jpg"
image_path2 = "C:\\Project\\Python_basic\\깨끗한K세상\\test\\캔 쓰레기.jpg"

# 파일 존재 여부 확인 및 예측 수행
for image_path in [image_path1, image_path2]:
    if not os.path.exists(image_path):
        print(f"Error: The file {image_path} does not exist.")
        print(f"Current working directory: {os.getcwd()}")
        print(f"Provided path: {image_path}")
    else:
        predict_image(transfer_model_best, image_path, train_dataset.classes)

### 모르겠다

In [2]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import torchmetrics
from torch.optim.lr_scheduler import ReduceLROnPlateau
from PIL import Image
import os

# 랜덤 시드 결정
seed_num = 11
np.random.seed(seed_num)
torch.manual_seed(seed_num)

# 폴더에서 데이터 로드
train_dir = "C:\\Users\\Admin\\Desktop\\쓰레기\\Training"
val_dir = "C:\\Users\\Admin\\Desktop\\쓰레기\\Validation"

train_dataset = ImageFolder(root=train_dir)
val_dataset = ImageFolder(root=val_dir)

# 각 클래스당 10% 데이터 샘플링
def get_subset_indices(dataset, percentage=0.1):
    indices = []
    targets_np = np.array(dataset.targets)
    for class_idx in range(len(dataset.classes)):
        class_indices = np.where(targets_np == class_idx)[0]
        selected_indices = np.random.choice(class_indices, int(len(class_indices) * percentage), replace=False)
        indices.extend(selected_indices)
    return indices

train_indices = get_subset_indices(train_dataset, percentage=0.1)
val_indices = get_subset_indices(val_dataset, percentage=0.1)

train_subset = Subset(train_dataset, train_indices)
val_subset = Subset(val_dataset, val_indices)

# 데이터 변환 설정
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# transform 설정
train_subset.dataset.transform = train_transform
val_subset.dataset.transform = test_transform

# 데이터 로더 설정
batch_size = 4
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

# 사전 훈련된 모델 가져오기 및 수정
transfer_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

# 마지막 몇 개의 레이어만 고정하지 않음
for name, param in transfer_model.named_parameters():
    if 'layer4' in name or 'fc' in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

# 마지막 레이어 교체
num_ftrs = transfer_model.fc.in_features
transfer_model.fc = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, len(train_dataset.classes))  # 클래스 수에 맞게 변경
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transfer_model = transfer_model.to(device)

# 손실 함수 및 최적화 알고리즘 설정
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(transfer_model.fc.parameters(), lr=0.001)

# 학습률 스케줄러 설정
scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5, verbose=True)

# 모델 학습 함수
def train_and_validate_best(model, train_loader, val_loader, optimizer, criterion, scheduler, epochs, patience):
    train_losses = []
    val_losses = []
    accuracies = []

    min_val_loss = float('inf')  # 가장 좋은 모델을 추적하기 위한 변수 초기화
    early_stop_counter = 0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in tqdm(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)
        train_losses.append(epoch_loss)

        # 검증 과정
        model.eval()
        running_val_loss = 0.0
        accuracy_metric = torchmetrics.Accuracy(task='multiclass', num_classes=len(train_dataset.classes), average='macro').to(device)

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item()
                accuracy_metric.update(outputs, labels)
            accuracy = accuracy_metric.compute()  # 정확도 계산
            accuracies.append(accuracy.item())
        epoch_val_loss = running_val_loss / len(val_loader)
        val_losses.append(epoch_val_loss)

        print(f'Epoch [{epoch + 1}/{epochs}] - Training loss: {epoch_loss:.3f}, Validation loss: {epoch_val_loss:.3f}, accuracy: {accuracy:.2%}')

        # 가장 좋은 모델만 저장
        if epoch_val_loss < min_val_loss:
            min_val_loss = epoch_val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'Best model saved at epoch {epoch + 1}.')
            early_stop_counter = 0  # 초기화
        else:
            early_stop_counter += 1
            if early_stop_counter >= patience:
                print(f'Early stopping at epoch {epoch + 1}')
                break

        # 학습률 스케줄러 업데이트
        scheduler.step(epoch_val_loss)

    return train_losses, val_losses, accuracies

# 모델 학습
history = train_and_validate_best(transfer_model, train_loader, val_loader, optimizer, criterion, scheduler, epochs=100, patience=10)

# 학습된 모델 불러오기
transfer_model_best = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_ftrs = transfer_model_best.fc.in_features
transfer_model_best.fc = nn.Sequential(
    nn.Linear(num_ftrs, 1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, len(train_dataset.classes))
)
transfer_model_best.load_state_dict(torch.load('best_model.pth'))
transfer_model_best = transfer_model_best.to(device)

# 학습 및 검증 손실 시각화
train_losses, val_losses, accuracies = history

epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(10, 5))
plt.plot(epochs, train_losses, 'bo-', label='Training loss')
plt.plot(epochs, val_losses, 'ro-', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# 모델 평가 함수
def evaluate(model, data_loader, device):
    model.eval()
    accuracy_metric = torchmetrics.Accuracy(task='multiclass', num_classes=len(train_dataset.classes), average='macro').to(device)
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            accuracy_metric.update(outputs, labels)
    accuracy = accuracy_metric.compute()
    return accuracy.item()

# 최종 모델 평가
test_accuracy = evaluate(transfer_model_best, val_loader, device)
print(f'Test Accuracy: {test_accuracy:.2%}')

# 클래스 목록 출력
class_names = train_dataset.classes
print("Model classes:")
for idx, class_name in enumerate(class_names):
    print(f"{idx}: {class_name}")

# 평가에 사용할 이미지 로드 및 변환 함수
def load_img(image_path):
    image = Image.open(image_path)
    image = test_transform(image).unsqueeze(0)
    return image

# 예측 함수
def predict_image(model, image_path, class_names):
    model.eval()
    with torch.no_grad():
        try:
            input_image = load_img(image_path).to(device)
            outputs = model(input_image)
            _, preds = torch.max(outputs, 1)

            original_img = input_image.squeeze(0).cpu().detach().numpy().transpose((1, 2, 0))
            mean = np.array([0.485, 0.456, 0.406])
            std = np.array([0.229, 0.224, 0.225])
            original_img = std * original_img + mean
            original_img = np.clip(original_img, 0, 1)

            plt.imshow(original_img)
            plt.title(f'Predicted: {class_names[preds.item()]}')
            plt.show()
            print("모델 예측 결과 :", preds)
            print("모델 예측 결과 :", class_names[preds.item()])
        except Exception as e:
            print(f"Error loading or processing image: {e}")

# 평가할 이미지 파일 경로 설정
image_path1 = "C:\\Project\\Python_basic\\깨끗한K세상\\test\\페트병 쓰레기.jpg"
image_path2 = "C:\\Project\\Python_basic\\깨끗한K세상\\test\\캔 쓰레기.jpg"

# 파일 존재 여부 확인 및 예측 수행
for image_path in [image_path1, image_path2]:
    if not os.path.exists(image_path):
        print(f"Error: The file {image_path} does not exist.")
        print(f"Current working directory: {os.getcwd()}")
        print(f"Provided path: {image_path}")
    else:
        predict_image(transfer_model_best, image_path, train_dataset.classes)
