In [17]:
import numpy as np
import random
import os
import math

from glob import glob
# https://wikidocs.net/83
import pandas as pd
import cv2
from tqdm.auto import tqdm
# https://frhyme.github.io/python-libs/python_tqdm/

import torch
import torch.nn as nn # 신경망 호출
import torch.nn.functional as F #nn과 똑같지만 nn은 클래스로 정의되고 functional은 함수로 정의됨
from torch.utils.data import DataLoader, Dataset
#데이터 로딩을 위한 클래스 https://huffon.github.io/2020/05/26/torch-data/

import torchvision.models as models
from torchvision import transforms
#https://better-tomorrow.tistory.com/entry/TorchVision-model-funetuning

In [18]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# 쿠다를 사용할수 있으면 쿠다를 사용하고 아니면 CPU로 연산

In [19]:
#이미지크기, 학습횟수, 학습률, 배치사이즈, 시드결정
CFG = {
    'IMG_SIZE':128,
    'EPOCHS':10,
    'LEARNING_RATE':2e-3,
    'BATCH_SIZE':8,
    'SEED':41
}

In [20]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [21]:
def get_train_data(data_dir):
    img_path_list = []
    label_list = []
    # 이미지와 라벨 빈 리스트 생성
    for case_name in os.listdir(data_dir):
        current_path = os.path.join(data_dir, case_name)
        if os.path.isdir(current_path):
            # get image path
            img_path_list.extend(glob(os.path.join(current_path, 'image', '*.jpg')))
            img_path_list.extend(glob(os.path.join(current_path, 'image', '*.png')))
            
            # get label
            label_df = pd.read_csv(current_path+'/label.csv')
            label_list.extend(label_df['leaf_weight'])
                
    return img_path_list, label_list
    # 이미지데이터와 라벨 데이터 분리

def get_test_data(data_dir):
    # get image path
    img_path_list = glob(os.path.join(data_dir, 'image', '*.jpg'))
    img_path_list.extend(glob(os.path.join(data_dir, 'image', '*.png')))
    #img_path_list.sort(key=lambda x:(x.split('/')[-1].split('.')[0]))
    img_path_list.sort(key=lambda x:int(x.split('\\')[-1].split('.')[0])) 
    return img_path_list

In [22]:
all_img_path, all_label = get_train_data('./dataset/train')
test_img_path = get_test_data('./dataset/test')

In [23]:
# Train : Validation = 0.8 : 0.2 Split
train_len = int(len(all_img_path)*0.8)

train_img_path = all_img_path[:train_len]
train_label = all_label[:train_len]

vali_img_path = all_img_path[train_len:]
vali_label = all_label[train_len:]

In [24]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, train_mode=True, transforms=None):
        self.transforms = transforms
        self.train_mode = train_mode
        self.img_path_list = img_path_list
        self.label_list = label_list

    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        # Get image data
        image = cv2.imread(img_path)
        if self.transforms is not None:
            image = self.transforms(image)

        if self.train_mode:
            label = self.label_list[index]
            return image, label
        else:
            return image
    
    def __len__(self):
        return len(self.img_path_list)

In [25]:
train_transform = transforms.Compose([
                    transforms.ToPILImage(),
                    transforms.RandomRotation(30),
                    transforms.RandomHorizontalFlip(),
                    transforms.ToTensor(),
                    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
                    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
                    ])

test_transform = transforms.Compose([
                    transforms.ToPILImage(), 
                    transforms.RandomRotation(30),
                    transforms.RandomHorizontalFlip(),
                    transforms.ToTensor(),
                    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
                    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
                    ])

In [26]:
# Get Dataloader
train_dataset = CustomDataset(train_img_path, train_label, train_mode=True, transforms=train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

vali_dataset = CustomDataset(vali_img_path, vali_label, train_mode=True, transforms=test_transform)
vali_loader = DataLoader(vali_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [27]:
class CNNRegressor(torch.nn.Module):
    def __init__(self):
        super(CNNRegressor, self).__init__()
        self.layer1 = torch.nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(8, momentum=0.1),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer2 = torch.nn.Sequential(
            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16, momentum=0.1),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer3 = torch.nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32, momentum=0.1),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer4 = torch.nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=4, stride=1, padding=1),
            nn.BatchNorm2d(64, momentum=0.1),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer5 = torch.nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=4,stride =1, padding=1),
            nn.BatchNorm2d(64,momentum=0.1),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.regressor = nn.Linear(64*3*3,1)
        # nn.Linear()는 입력의 차원, 출력의 차원

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = torch.flatten(x, start_dim=1)
        out = self.regressor(x)
        
        return out
    

In [28]:
#!pip install torchsummary

In [29]:
from torchsummary import summary
#summary(model,input_size = (3,256,256))

In [30]:
def train(model, optimizer, train_loader, vali_loader, scheduler, device):
    model.to(device)

    # Loss Function
    criterion = nn.L1Loss().to(device)
    ##criterion = nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=2e-3)
    #optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
    best_mae = 9999
    
    for epoch in range(1,CFG["EPOCHS"]+1):
        model.train()
        train_loss = []
        for img, label in tqdm(iter(train_loader)):
            img, label = img.float().to(device), label.float().to(device)
            
            ##optimizer.zero_grad()

            # Data -> Model -> Output
            outputs = model(img)
            # Calc loss
            loss = criterion(outputs, label)
            #print(outputs)
            #print(outputs.squeeze(1))
            

            # backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
            
        if scheduler is not None:
            scheduler.step()
            
        # Evaluation Validation set
        vali_mae = validation(model, vali_loader, criterion, device)
        
        print(f'Epoch [{epoch}] Train MAE : [{np.mean(train_loss):.5f}] Validation MAE : [{vali_mae:.5f}]\n')
        
        # Model Saved
        if best_mae > vali_mae:
            best_mae = vali_mae
            torch.save(model.state_dict(), './saved/best_model.pth')
            print('Model Saved.')

In [31]:
def validation(model, vali_loader, criterion, device):
    model.eval() # Evaluation
    vali_loss = []
    with torch.no_grad():
        for img, label in tqdm(iter(vali_loader)):
            img, label = img.float().to(device), label.float().to(device)

            outputs = model(img)
            loss = criterion(outputs, label)
            
            vali_loss.append(loss.item())

    vali_mae_loss = np.mean(vali_loss)
    return vali_mae_loss

In [32]:
model = CNNRegressor().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=2e-3)
#optimizer = torch.optim.SGD(params = model.parameters(), lr = 2e-3)
scheduler = None

# trans = transforms.ToPILImage()
# train_loader = trans(train_loader)

train(model, optimizer, train_loader, vali_loader, scheduler, device)

  0%|          | 0/160 [00:00<?, ?it/s]

tensor([[ 0.5640],
        [-0.7451],
        [-0.7333],
        [-0.2948],
        [-0.5024],
        [-0.1803],
        [-0.0938],
        [-0.3957]], grad_fn=<AddmmBackward0>)
tensor([ 0.5640, -0.7451, -0.7333, -0.2948, -0.5024, -0.1803, -0.0938, -0.3957],
       grad_fn=<SqueezeBackward1>)


KeyboardInterrupt: 

In [None]:
summary(model,input_size = (3,256,256))