# PART 04. 작물 잎 사진으로 질병 분류하기

## 1. 데이터 분할

### 1-1. 데이터 분할을 위한 train, val, test 폴더 생성

In [9]:
import os

# 원본 데이터셋이 위치한 경로 지정
origin_dataset_dif = './dataset'
# os.listdir: 해당 경로 하위에 있는 모든 폴더의 목록을 가져옴
# 이 dataset의 폴더 목록은 클래스 목록에 해당하므로 classes_list에 저장
classes_list = os.listdir(origin_dataset_dif)

# train, val, test로 나눈 데이터를 저장할 폴더 생성
base_dir = './splitted'
os.mkdir(base_dir)

# 분리 후에 각 데이터를 저장할 하위 폴더 train, val, test 생성
train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

for clss in classes_list: 
    os.mkdir(os.path.join(train_dir, clss))
    os.mkdir(os.path.join(validation_dir, clss))
    os.mkdir(os.path.join(test_dir, clss))

### 1-2. train, val, test 데이터 분할과 클래스별 데이터 수 확인

In [10]:
import math
import shutil

# 모든 클래스(폴더)에 대한 작업 반복
for clss in classes_list: 
    # 해당 폴더 안의 이미지 파일 목록을 변수 fnames에 저장
    path   = os.path.join(origin_dataset_dif, clss)
    fnames = os.listdir(path)
    
    # train, val, test를 6: 2: 2 비율로 지정
    train_size      = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size       = math.floor(len(fnames) * 0.2)
    
    # train 데이터에 해당하는 이미지의 이름을 train_fnames에 저장
    train_fnames = fnames[:train_size]
    print('Train size(',clss,'):', len(train_fnames))
    # train에 해당하는 이미지를 splitted에 이동
    for fname in train_fnames: 
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(train_dir, clss), fname)
        shutil.copyfile(src, dst)
    
    # val 데이터에 해당하는 이미지의 이름을 validation_fnames에 저장
    validation_fnames = fnames[train_size:(validation_size+train_size)]
    print('Validation size(',clss,'):', len(validation_fnames))
    # val에 해당하는 이미지를 splitted에 이동
    for fname in validation_fnames: 
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, clss), fname)
        shutil.copyfile(src, dst)
        
    # test 데이터에 해당하는 이미지의 이름을 test_fnames에 저장
    test_fnames = fnames[:test_size]
    print('Test size(',clss,'):', len(test_fnames))
    # test에 해당하는 이미지를 splitted에 이동
    for fname in test_fnames: 
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, clss), fname)
        shutil.copyfile(src, dst)

Train size( Strawberry___healthy ): 273
Validation size( Strawberry___healthy ): 91
Test size( Strawberry___healthy ): 91
Train size( Grape___Black_rot ): 708
Validation size( Grape___Black_rot ): 236
Test size( Grape___Black_rot ): 236
Train size( Potato___Early_blight ): 600
Validation size( Potato___Early_blight ): 200
Test size( Potato___Early_blight ): 200
Train size( Cherry___Powdery_mildew ): 631
Validation size( Cherry___Powdery_mildew ): 210
Test size( Cherry___Powdery_mildew ): 210
Train size( Tomato___Target_Spot ): 842
Validation size( Tomato___Target_Spot ): 280
Test size( Tomato___Target_Spot ): 280
Train size( Peach___healthy ): 216
Validation size( Peach___healthy ): 72
Test size( Peach___healthy ): 72
Train size( Potato___Late_blight ): 600
Validation size( Potato___Late_blight ): 200
Test size( Potato___Late_blight ): 200
Train size( Tomato___Late_blight ): 1145
Validation size( Tomato___Late_blight ): 381
Test size( Tomato___Late_blight ): 381
Train size( Tomato___To

데이터셋이 준비되었으니 학습에 필요한 데이터를 불러오고 Dataloader를 생성하자.

### 1-3. 베이스라인 모델 학습을 위한 준비

In [13]:
import torch

DEVICE = "mps" if torch.backends.mps.is_available() and torch.backends.mps.is_built() else "cpu"
print(DEVICE)

BATCH_SIZE = 256
EPOCH      = 30

mps


In [14]:
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder

# traonsform.compose() : 이미지 데이터의 전처리, Augmentation 등의 과정에서 사용되는 메서드
transform_base = transforms.Compose([transforms.Resize((64,64)), transforms.ToTensor()])
# ToTensor() : 이미지를 Tensor 형태로 변환하고 모든 값을 0~1 사이로 정규화

# ImageFolder : 데이터셋을 불러오는 메서드 (root에서 불러와 transform으로 전처리 또는 증강)
train_dataset  = ImageFolder(root='./splitted/train', transform=transform_base)
val_dataset    = ImageFolder(root='./splitted/val', transform=transform_base)

In [15]:
from torch.utils.data import DataLoader

# DataLoader : 불러온 이미지 데이터를 주어진 조건에 다라 미니 배치단위로 분리하는 역할
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE,
                        shuffle=True, num_workers=4)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE,
                        shuffle=True, num_workers=4)

## 2. 베이스라인 모델 설계 및 학습

### 2-1. 베이스라인 모델 설계하기

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
    def __init__(self):
        

* 모델 학습을 위한 함수

In [None]:
def train(model, train_loader, optimizer):
    model.train()  
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE) 
        optimizer.zero_grad() 
        output = model(data)  
        loss = F.cross_entropy(output, target) 
        loss.backward()  
        optimizer.step()  

* 모델 평가를 위한 함수

In [None]:
def evaluate(model, test_loader):
    model.eval()  
    test_loss = 0 
    correct = 0   
    
    with torch.no_grad(): 
        for data, target in test_loader:  
            data, target = data.to(DEVICE), target.to(DEVICE)  
            output = model(data) 
            
            test_loss += F.cross_entropy(output,target, reduction='sum').item() 
 
            
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item() 
   
    test_loss /= len(test_loader.dataset) 
    test_accuracy = 100. * correct / len(test_loader.dataset) 
    return test_loss, test_accuracy  

* 모델 학습을 실행하기

In [None]:
import time
import copy
 
def train_baseline(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0  
    best_model_wts = copy.deepcopy(model.state_dict()) 
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()  
        train(model, train_loader, optimizer)
        train_loss, train_acc = evaluate(model, train_loader) 
        val_loss, val_acc = evaluate(model, val_loader)
        
        if val_acc > best_acc: 
            best_acc = val_acc 
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since 
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 
    model.load_state_dict(best_model_wts)  
    return model
 

base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)  	 #(16)
torch.save(base,'baseline.pt')

## Transfer Learning 모델 학습

* Transfer Learning을 위한 준비

In [None]:
data_transforms = {
    'train': transforms.Compose([transforms.Resize([64,64]), 
        transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),  
        transforms.RandomCrop(52), transforms.ToTensor(), 
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),
    
    'val': transforms.Compose([transforms.Resize([64,64]),  
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
}

In [None]:
data_dir = './splitted' 
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x), transform=data_transforms[x]) for x in ['train', 'val']} 
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=BATCH_SIZE, shuffle=True, num_workers=4) for x in ['train', 'val']} 
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

class_names = image_datasets['train'].classes

* Pre-Trained Model 불러오기

In [None]:
from torchvision import models
 
resnet = models.resnet50(pretrained=True)  
num_ftrs = resnet.fc.in_features   
resnet.fc = nn.Linear(num_ftrs, 33) 
resnet = resnet.to(DEVICE)
 
criterion = nn.CrossEntropyLoss() 
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)
 
from torch.optim import lr_scheduler
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) 

* Pre-Trained Model의 일부 Layer Freeze하기

In [None]:
ct = 0 
for child in resnet.children():  
    ct += 1  
    if ct < 6: 
        for param in child.parameters():
            param.requires_grad = False

* Transfer Learning 모델 학습과 검증을 위한 함수

In [None]:
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):

    best_model_wts = copy.deepcopy(model.state_dict())  
    best_acc = 0.0  
    
    for epoch in range(num_epochs):
        print('-------------- epoch {} ----------------'.format(epoch+1)) 
        since = time.time()                                     
        for phase in ['train', 'val']: 
            if phase == 'train': 
                model.train() 
            else:
                model.eval()     
 
            running_loss = 0.0  
            running_corrects = 0  
 
            
            for inputs, labels in dataloaders[phase]: 
                inputs = inputs.to(DEVICE)  
                labels = labels.to(DEVICE)  
                
                optimizer.zero_grad() 
                
                with torch.set_grad_enabled(phase == 'train'):  
                    outputs = model(inputs)  
                    _, preds = torch.max(outputs, 1) 
                    loss = criterion(outputs, labels)  
    
                    if phase == 'train':   
                        loss.backward()
                        optimizer.step()
 
                running_loss += loss.item() * inputs.size(0)  
                running_corrects += torch.sum(preds == labels.data)  
            if phase == 'train':  
                scheduler.step()
 
            epoch_loss = running_loss/dataset_sizes[phase]  
            epoch_acc = running_corrects.double()/dataset_sizes[phase]  
 
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc)) 
 
          
            if phase == 'val' and epoch_acc > best_acc: 
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
 
        time_elapsed = time.time() - since  
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
 
    model.load_state_dict(best_model_wts) 

    return model

* 모델 학습을 실행하기

In [None]:
model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=EPOCH) 

torch.save(model_resnet50, 'resnet50.pt')

## 모델 평가

* 베이스라인 모델 평가를 위한 전처리하기

In [None]:
transform_base = transforms.Compose([transforms.Resize([64,64]),transforms.ToTensor()])
test_base = ImageFolder(root='./splitted/test',transform=transform_base)  
test_loader_base = torch.utils.data.DataLoader(test_base, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

* Transfer Learning모델 평가를 위한 전처리하기

In [None]:
transform_resNet = transforms.Compose([
        transforms.Resize([64,64]),  
        transforms.RandomCrop(52),  
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
    ])
    
test_resNet = ImageFolder(root='./splitted/test', transform=transform_resNet) 
test_loader_resNet = torch.utils.data.DataLoader(test_resNet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

* 베이스라인 모델 성능 평가하기

In [None]:
baseline=torch.load('baseline.pt') 
baseline.eval()  
test_loss, test_accuracy = evaluate(baseline, test_loader_base)

print('baseline test acc:  ', test_accuracy)

* Transfer Learning 모델 성능 평가하기

In [None]:
resnet50=torch.load('resnet50.pt') 
resnet50.eval()  
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)

print('ResNet test acc:  ', test_accuracy)