In [7]:
import os
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image, UnidentifiedImageError
from torch.utils.data.dataloader import default_collate

In [9]:
# CustomDataset 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, root_dir, mode='Training', transform=None):
        self.root_dir = root_dir
        self.mode = mode
        self.transform = transform
        self.images = []
        self.labels = []

        data_folder = mode  # 'Training', 'Validation', 또는 'Test'
        data_path = os.path.join(root_dir, data_folder)
        self._load_data(data_path)

    def _load_data(self, data_path):
        image_folder_path = os.path.join(data_path, "원천데이터", "05.상추")
        label_folder_path = os.path.join(data_path, "라벨링데이터", "05.상추")

        image_folders = os.listdir(image_folder_path)

        for image_folder in image_folders:
            image_path = os.path.join(image_folder_path, image_folder)
            label_path = os.path.join(label_folder_path, image_folder)

            for image_filename in os.listdir(image_path):
                image_file_path = os.path.join(image_path, image_filename)
                label_file_path = os.path.join(label_path, image_filename.replace('.jpeg', '.json'))

                if os.path.exists(label_file_path):
                    with open(label_file_path, 'r') as f:
                        label_info = json.load(f)
                        disease = label_info['annotations']['disease']
                        if disease == 9: #균핵병
                            label = 1
                        elif disease == 10: #노균병
                            label = 2
                        else:  #정상
                            label = 0
                else: #라벨 정보가 없으면 정상으로 처리
                    label = 0

                self.images.append(image_file_path)
                self.labels.append(label)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_path = self.images[idx]
        label = self.labels[idx]

        try:
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
        except (IOError, UnidentifiedImageError) as e:
            print(f"Warning: Could not read image {image_path}. Skipping. Error: {e}")
            return None, None

        return image, label


In [12]:
# DataLoader에서 None 타입 데이터를 걸러내기 위한 함수
def my_collate_fn(batch):
    batch = list(filter(lambda x: x[0] is not None, batch))
    return default_collate(batch)

# 데이터 변환 정의
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 데이터셋 및 DataLoader 인스턴스 생성
root_dir = '/Volumes/T7/상추 질병 진단/realdata'
train_dataset = CustomDataset(root_dir=root_dir, mode='Training', transform=transform)
val_dataset = CustomDataset(root_dir=root_dir, mode='Validation', transform=transform)
test_dataset = CustomDataset(root_dir=root_dir, mode='Test', transform=transform)  # 테스트 데이터셋 추가

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=my_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=my_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=my_collate_fn)  # 테스트 DataLoader 추가


In [13]:
# 모델, 손실 함수, 최적화 알고리즘 설정
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 3)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

# 훈련 루프
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

    # 검증 루프 #print(f'Validation Accuracy: {accuracy}%')
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    

Epoch 1/10, Loss: 0.26185990289546723
Epoch 2/10, Loss: 0.010090648662298918
Epoch 3/10, Loss: 0.004057515293446391
Epoch 4/10, Loss: 0.002347494405152839
Epoch 5/10, Loss: 0.0014592068375434814
Epoch 6/10, Loss: 0.0011028347301383524
Epoch 7/10, Loss: 0.0008445631741102092
Epoch 8/10, Loss: 0.0006179280415296468
Epoch 9/10, Loss: 0.0005145735028849612
Epoch 10/10, Loss: 0.00043456342753542715
