In [None]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torchvision import transforms
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torch.utils.data as data
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch import nn
from torchvision import transforms
import torchvision.datasets as datasets

In [None]:
%reload_ext autoreload
%autoreload

def conv_block_1(in_dim,out_dim,act_fn,stride=1):
    model = nn.Sequential(
        nn.Conv2d(in_dim,out_dim, kernel_size=1, stride=stride),
        act_fn,
    )
    return model


def conv_block_3(in_dim,out_dim,act_fn):
    model = nn.Sequential(
        nn.Conv2d(in_dim,out_dim, kernel_size=3, stride=1, padding=1),
        act_fn,
    )
    return model

class BottleNeck(nn.Module):
    def __init__(self,in_dim,mid_dim,out_dim,act_fn,down=False):
        super(BottleNeck,self).__init__()
        self.down=down
        
        # 특성지도의 크기가 감소하는 경우
        if self.down:
            self.layer = nn.Sequential(
              conv_block_1(in_dim,mid_dim,act_fn,2),
              conv_block_3(mid_dim,mid_dim,act_fn),
              conv_block_1(mid_dim,out_dim,act_fn),
            )
            self.downsample = nn.Conv2d(in_dim,out_dim,1,2)
            
        # 특성지도의 크기가 그대로인 경우
        else:
            self.layer = nn.Sequential(
                conv_block_1(in_dim,mid_dim,act_fn),
                conv_block_3(mid_dim,mid_dim,act_fn),
                conv_block_1(mid_dim,out_dim,act_fn),
            )
            
        # 더하기를 위해 차원을 맞춰주는 부분
        self.dim_equalizer = nn.Conv2d(in_dim,out_dim,kernel_size=1)
                  
    def forward(self,x):
        if self.down:
            downsample = self.downsample(x)
            out = self.layer(x)
            out = out + downsample
        else:
            out = self.layer(x)
            if x.size() is not out.size():
                x = self.dim_equalizer(x)
            out = out + x
        return out
    
class ResNet(nn.Module):

    def __init__(self, base_dim, num_classes=400):
        super(ResNet, self).__init__()
        self.act_fn = nn.ReLU()
        self.layer_1 = nn.Sequential(
            nn.Conv2d(3,base_dim,7,2,3),
            nn.ReLU(),
            nn.MaxPool2d(3,2,1),
        )
        self.layer_2 = nn.Sequential(
            BottleNeck(base_dim,base_dim,base_dim*4,self.act_fn),
            BottleNeck(base_dim*4,base_dim,base_dim*4,self.act_fn),
            BottleNeck(base_dim*4,base_dim,base_dim*4,self.act_fn,down=True),
        )   
        self.layer_3 = nn.Sequential(
            BottleNeck(base_dim*4,base_dim*2,base_dim*8,self.act_fn),
            BottleNeck(base_dim*8,base_dim*2,base_dim*8,self.act_fn),
            BottleNeck(base_dim*8,base_dim*2,base_dim*8,self.act_fn),
            BottleNeck(base_dim*8,base_dim*2,base_dim*8,self.act_fn,down=True),
        )
        self.layer_4 = nn.Sequential(
            BottleNeck(base_dim*8,base_dim*4,base_dim*16,self.act_fn),
            BottleNeck(base_dim*16,base_dim*4,base_dim*16,self.act_fn),
            BottleNeck(base_dim*16,base_dim*4,base_dim*16,self.act_fn),            
            BottleNeck(base_dim*16,base_dim*4,base_dim*16,self.act_fn),
            BottleNeck(base_dim*16,base_dim*4,base_dim*16,self.act_fn),
            BottleNeck(base_dim*16,base_dim*4,base_dim*16,self.act_fn,down=True),
        )
        self.layer_5 = nn.Sequential(
            BottleNeck(base_dim*16,base_dim*8,base_dim*32,self.act_fn),
            BottleNeck(base_dim*32,base_dim*8,base_dim*32,self.act_fn),
            BottleNeck(base_dim*32,base_dim*8,base_dim*32,self.act_fn),
        )
        self.avgpool = nn.AvgPool2d(7,1) 
        self.fc_layer = nn.Linear(base_dim*32,num_classes)
    def forward(self, x):
        out = self.layer_1(x)
        out = self.layer_2(out)
        out = self.layer_3(out)
        out = self.layer_4(out)
        out = self.layer_5(out)
        out = self.avgpool(out)
        out = out.view(batch_size,-1)
        out = self.fc_layer(out)
        
        return out

In [None]:
#데이터셋 및 모델저장 경로 지정
dataset_path = "/home/ycs/Downloads/dataset"
model_weight_save_path = "/home/ycs/Downloads/save/"
num_classes = 400

batch_size = 100
num_workers = 5
lr = 0.0002

total_epoch = 100

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
print(device)

In [None]:
# 훈련 및 테스트 데이터셋 경로 로드
traindir = os.path.join(dataset_path, '/home/ycs/Downloads/dataset/train')
testdir = os.path.join(dataset_path, '/home/ycs/Downloads/dataset/train')

train_dataset = datasets.ImageFolder(
    traindir,
    transforms.Compose([
        transforms.Resize(256), #이미지 256*256 리사이즈
        #transforms.RandomResizedCrop(224), #랜덤한 위치에서 224*224 사이즈로 샘플링 
        #transforms.RandomHorizontalFlip(), #랜덤한 확률로 데이터셋 회전
        transforms.ToTensor(),
    ]))

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True,
    num_workers=num_workers, pin_memory=True, drop_last=False)

test_loader = torch.utils.data.DataLoader(
    datasets.ImageFolder(testdir, transforms.Compose([
        transforms.Resize(256),
        #transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ])),
    batch_size=batch_size, shuffle=False,
    num_workers=num_workers, pin_memory=True)

In [None]:
model = ResNet(base_dim=64).to(device)

In [None]:
print(num_classes)

In [None]:
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=lr)

In [None]:
total_iteration_per_epoch = int(np.ceil(len(train_dataset)/batch_size))

for epoch in range(1, total_epoch + 1):
    model.train()
    for itereation, (input, target) in enumerate(train_loader):
        images = input.to(device)
        labels = target.to(device)

        # Forward pass
        outputs = model(images)
        loss = loss_func(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        print('Epoch [{}/{}], Iteration [{}/{}] Loss: {:.4f}'.format(epoch, total_epoch, itereation+1, total_iteration_per_epoch, loss.item()))
    if epoch % 20 == 0: #20회당 한번저장
        torch.save(model, model_weight_save_path + 'model.pt') #전체 모델저장
        #torch.save(model.state_dict(), model_weight_save_path + 'model' + str(epoch) + ".pth") #모델 객체의 static 저장
        #torch.save({'model': model.state_dict(),'optimizer': optimizer.state_dict()}, PATH + 'all.tar') #학습중 진행상황 등 모든 전체 정보저장
        
        
    
    model.eval()
    with torch.no_grad():
      correct = 0
      total = 0
      for input, target in test_loader:
          images = input.to(device)
          labels = target.to(device)

          # Forward pass
          outputs = model(images)
          _, predicted = torch.max(outputs.data, 1)
          total += len(labels)
          correct += (predicted == labels).sum().item()

      print('Epoch [{}/{}], Test Accuracy of the model on the {} test images: {} %'.format(epoch, total_epoch, total, 100 * correct / total))