- 참고 : https://wolfy.tistory.com/243

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.datasets import ImageFolder
from torch.utils.data import Subset
from sklearn.model_selection import train_test_split
from torchvision.transforms import Compose, ToTensor, Resize
from torch.utils.data import DataLoader
from torchsummary import summary

In [None]:
# seed 고정
random_seed = 2020
random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)

In [2]:
if torch.cuda.is_available():
    DEVICE = torch.device('cuda')
else:
    DEVICE = torch.device('cpu')

BATCH_SIZE = 32
EPOCHS = 10

In [3]:
# 코드 참고 : https://wolfy.tistory.com/243
# https://www.python2.net/questions-740968.htm
# https://cool24151.tistory.com/51
# https://mole-starseeker.tistory.com/12
# https://github.com/jhcha08/Implementation_DeepLearningPaper/blob/master/%EC%8A%A4%ED%84%B0%EB%94%94%2020200129%20-%20CNN.%20ResNet.ipynb
class BasicBlock(nn.Module):
    def __init__(self, input_nodes, out_nodes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(input_nodes,input_nodes,kernel_size=(1,1),stride=(1,1)) # 그대로 들어가니까 padding 안 줌 (valid)
        self.bn_1 = nn.BatchNorm2d(input_nodes)

        self.conv2 = nn.Conv2d(input_nodes, input_nodes, kernel_size=(3,3), padding=(1,1)) # 이미지 크기 유지 (same)
        self.bn_2 = nn.BatchNorm2d(input_nodes)
        
        self.conv3 = nn.Conv2d(input_nodes, out_nodes, kernel_size=(1,1), stride=(1,1)) # 그대로 들어가니까 padding 안 줌 (valid)
        self.bn_3 = nn.BatchNorm2d(out_nodes)

        self.shortcut = nn.Sequential()
        
        if stride != 1 : # 전자 조건으로 stride==2 일때 걸리는데 맨 첫 block_2에서 stride [1,1,1] 일때 안 걸림
            self.shortcut = nn.Sequential(nn.Conv2d(input_nodes, out_nodes, kernel_size=(1,1),stride=(1,1)),
                                          nn.BatchNorm2d(out_nodes))           

    def forward(self, x):
        print('x shape :',x.shape)
        out = F.relu(self.bn_1(self.conv1(x)))
        print('out shape :',out.shape)
        out = F.relu(self.bn_2(self.conv2(out)))
        print('out shape :',out.shape)
        out = self.bn_3(self.conv3(out))
        print(out.shape)
        print('shortcut shape :',self.shortcut(x).shape)
        out += self.shortcut(x)
        print('out shape :',out.shape)
        out = F.relu(out)
        print('out shape :',out.shape)

        return out

class ResNet50(nn.Module):
    def __init__(self, num_classes=6):
        super(ResNet50, self).__init__()
        self.zero_pad = nn.ZeroPad2d(1)
        self.conv1_1 = nn.Conv2d(3,64,kernel_size=7,stride=2)
        self.bn1_1 = nn.BatchNorm2d(64)
        
        self.conv2_block = self._make_layer(64,256,3, stride=1)
        self.conv3_block = self._make_layer(128,512,4, stride=2)
        self.conv4_block = self._make_layer(256,1024,6, stride=2)
        self.conv5_block = self._make_layer(512,2048,3, stride=2)

        self.avg_pool = nn.AdaptiveAvgPool2d((1,1))
        self.linear = nn.Linear(2048,6)

    def _make_layer(self, input_nodes, out_nodes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(BasicBlock(input_nodes, out_nodes, stride))
        return nn.Sequential(*layers)

    def forward(self, x):
        print('res50 x shape: ',x.shape)
        out = self.zero_pad(F.relu(self.bn1_1(self.conv1_1(self.zero_pad(x)))))
        print('res50 block1 out shape: ',x.shape)
        out = self.conv2_block(out)
        out = self.conv3_block(out)
        out = self.conv4_block(out)
        out = self.conv5_block(out)

        out = self.avg_pool(out)
        out = out.view(out.size(0),-1)
        out = self.linear(out)
        return out

In [4]:
model = ResNet50().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

summary(model,(3,150,150))

ResNet50(
  (zero_pad): ZeroPad2d(padding=(1, 1, 1, 1), value=0.0)
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(1, 1))
  (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (max_pool): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1))
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1))
        (1): BatchNorm2d(256, eps=1e-05

In [5]:
def train(model, train_loader, optimizer, log_interval):
    model.train()
    for batch_idx, (image, label) in enumerate(train_loader):
        image = image.to(DEVICE)
        label = label.to(DEVICE)
        optimizer.zero_grad()
        output = model(image)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()

        if batch_idx % log_interval == 0:
            print("Train Epoch: {} [{}/{} ({:.0f}%)]\tTrain Loss: {:.6f}".format(
                epoch, batch_idx * len(image), 
                len(train_loader.dataset), 100. * batch_idx / len(train_loader), 
                loss.item()))

def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for image, label in test_loader:
            image = image.to(DEVICE)
            label = label.to(DEVICE)
            output = model(image)
            test_loss += criterion(output, label).item()
            prediction = output.max(1, keepdim = True)[1]
            correct += prediction.eq(label.view_as(prediction)).sum().item()
    
    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

In [None]:
# 다른 참고 : https://wolfy.tistory.com/243 / https://ndb796.tistory.com/373
# https://discuss.pytorch.org/t/how-to-split-dataset-into-test-and-validation-sets/33987/5
def train_val_dataset(dataset, val_split=0.25):
    train_idx, val_idx = train_test_split(list(range(len(dataset))), test_size=val_split)
    datasets = {}
    datasets['train'] = Subset(dataset, train_idx)
    datasets['val'] = Subset(dataset, val_idx)
    return datasets

dataset = ImageFolder('/content/gdrive/MyDrive/vgg_practice_set/seg_train/seg_train',
                      transform=Compose([Resize((150,150)),ToTensor()]))
print('len of dataset : ',len(dataset))
print('type of dataset : ',type(dataset))
datasets = train_val_dataset(dataset)
print('len of train : ',len(datasets['train']))
print('type of dataset_train : ',type(datasets['train']))
print('len of valid : ',len(datasets['val']))
print('type of dataset_val : ',type(datasets['val']))
# The original dataset is available in the Subset class
print('dataset_train.dataset :',datasets['train'].dataset)
print('type of dataset_train.dataset : ',type(datasets['train'].dataset))
dataloaders = {x:DataLoader(datasets[x],BATCH_SIZE, shuffle=True, num_workers=4) for x in ['train','val']}
X_train, y_train = next(iter(dataloaders['train']))
print('X_train shape : ',X_train.shape, '  y_train shape : ',y_train.shape)

In [None]:
for epoch in range(1, EPOCHS + 1):
    train(model, dataloaders['train'], optimizer, log_interval = 200)
    test_loss, test_accuracy = evaluate(model, dataloaders['val'])
    print("\n[EPOCH: {}], \tTest Loss: {:.4f}, \tTest Accuracy: {:.2f} % \n".format(
        epoch, test_loss, test_accuracy))