<a href="https://colab.research.google.com/github/kmc3661/Basic-machine-Learning/blob/master/ResNet_20.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.optim as optim
import os
import torch.nn.init as init

from torch.autograd import Variable

class LambdaLayer(nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer,self).__init__()
        self.lambd=lambd

    def forward(self,x):
        return self.lambd

class BasicBlock(nn.Module):
    expansion=1

    def __init__(self,in_planes, planes, stride=1):
        super(BasicBlock, self).__init__() #부모 클래스를 먼저 실행시켜서 적용되기 하기 위함
        #nn.module의 함수들을 가져와서 사용하기 위해

        self.conv1 = nn.Conv2d(in_planes, planes,kernel_size=3, stride=stride,padding=1,bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes,planes,kernel_size=3,stride=1, padding=1,bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential() #identity, stride=1 일때
        if stride !=1 or in_planes != planes: #identity mapping이 아닌경우
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes)
            )
            """
            self.shortcut = LambdaLayer(lambda x: 
                                       F.pad(x[:, :, ::2, ::2], (0, 0, 0, 0, planes//4, planes//4, "constant", 0))
            )
            """

    def forward(self,x):
        out= F.relu(self.bn1(self.conv1(x)))
        out=self.bn2(self.conv2(out))
        out+=self.shortcut(x)
        out=F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10): #block: basic block
        super(ResNet,self).__init__()
        self.in_planes=16 # CIFAR-10에 맞춰서 imageNet의 크기보다 줄여서 학습
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3,stride=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1) # layer 한개당 basick block 3개(Convlayer 6개)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.linear = nn.Linear(64, num_classes) # FCL


    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1) # 리스트 +리스트는 연산이 아닌 가로로 concatenate
        #[1]*n은 [1,1,1,1 -> n개]로 만들어줌
        #strides=[1,1,1] in layer1,[2,1,1] in 2,[2,1,1] in 3
        #첫번째 conv연산에서만 downsampling해주기 위함
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride)) #Basic block이 추가됨
            self.in_planes = planes # 다음 레이어를 위한 채널 수 변경

        return nn.Sequential(*layers)# 차원 한개를 없앰

    def forward(self, x):
        out= F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, out.size()[3])
        out = out.view(out.size(0),-1) # view: reshape역할, 텐서의 차원을 한 단계 낮추는 역할
        out = self.linear(out)
        return out

def ResNet20():
    return ResNet(BasicBlock,[3,3,3]) # block이 세번씩 반복되도록



        


Dataset download and load

In [None]:
import torchvision
import numpy as np
import torchvision.transforms as transforms
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import random_split

valid_size=0.1
shuffle = True
normalize = transforms.Normalize(mean=[0.485, 0.456,0.456], std=[0.229, 0.224, 0.225])

transform_train = transforms.Compose([
     transforms.RandomCrop(32, padding=4),
     transforms.RandomHorizontalFlip(),
     transforms.ToTensor(),  #Numpy 배열의 이미지를 torch 텐서로 바꿔줌   , swap axes  (H*W*C -> C*H*W) 
     normalize,                       
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    normalize,
])

train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='/data', train=False, download=True, transform=transform_test)

"""
num_train=len(train_dataset)
indices = list(range(num_train))
np.random.shuffle(indices)
split = int(np.floor(valid_size * num_train))
train_idx, valid_idx = indices[split:], indices[:split]

train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)

"""
torch.manual_seed(43)
val_size = 5000
train_size =len(train_dataset) - val_size
train_ds, val_ds = random_split(train_dataset,[train_size, val_size])

print(len(train_ds))
print(len(val_ds))

train_loader = torch.utils.data.DataLoader(train_ds, batch_size=128,shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(val_ds, batch_size=100,shuffle=False, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False, num_workers=2)



In [None]:
import torch
from torch.utils.tensorboard import SummaryWriter
import numpy as np

writer = SummaryWriter('runs/ResNet_50')
device = 'cuda'

net= ResNet20()
net = net.to(device) #모델을 GPU에 넣기
net = torch.nn.DataParallel(net) #모델을 병렬로 실행하여 다수의 GPU에서 작업
cudnn.benchmark = True

learning_rate = 0.1
file_name = 'resnet20_cifar10.pt'

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=learning_rate,momentum=0.9, weight_decay=0.0001)


def train(epoch):
    print('\n[ Train epoch:%d ]'%epoch)
    net.train()
    train_loss = 0
    correct =0
    total =0

    for batch_idx, (inputs, targets) in enumerate(train_loader): #batch_size만큼 데이터를 뽑아냄
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()# 한번의 학습이 완료될 때마다 gradient를 0으로 초기화

        benign_outputs = net(inputs)
        loss = criterion(benign_outputs, targets) #benign_outputs: 예측된 값, traget: GT
        loss.backward() #backpropagation

        optimizer.step()#Gradient descent -> 모델 업데이트
        train_loss += loss.item()
        _, predicted = benign_outputs.max(1)

        total += targets.size(0)
        correct += predicted.eq(targets).sum().item() #예측한 값이 target과 일치할때 1씩 더해짐
        
        if batch_idx %100 ==0:
            print('\nCurrent batch:',str(batch_idx))
            print('Current benign train accuracy:', str(predicted.eq(targets).sum().item()/ targets.size(0)))
            print('Current benign train loss:', loss.item())

    print('\nTotal benign train accuracy:', 100. * correct / total) #전체 target중 맞춘 비율
    writer.add_scalar("train accuracy", 100. * correct / total,epoch)
    print('train_average loss:', train_loss/total)
    writer.add_scalar("train loss", train_loss/total,epoch)


def validate(epoch):
    print('\n[ val_epoch: %d ]' % epoch)
    net.eval()
    loss = 0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(val_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        total += targets.size(0)

        outputs = net(inputs)
        loss += criterion(outputs, targets).item()

        _, predicted = outputs.max(1)
        correct += predicted.eq(targets).sum().item()

    print('\nval_accuarcy:', 100. * correct / total)
    writer.add_scalar("validation accuracy", 100. * correct / total, epoch)
    print('val_average loss:', loss / total)
    writer.add_scalar("validation loss", loss/total, epoch)

    state = {
        'net': net.state_dict()
    }
    if not os.path.isdir('checkpoint'):
        os.mkdir('checkpoint')
    torch.save(state,'./checkpoint/'+ file_name)
    print('Model Saved!')

def test(epoch):
    print('\n[ test_epoch: %d ]' % epoch)
    net.eval()
    loss = 0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        total += targets.size(0)

        outputs = net(inputs)
        loss += criterion(outputs, targets).item()

        _, predicted = outputs.max(1)
        correct += predicted.eq(targets).sum().item()

    print('\ntest_accuarcy:', 100. * correct / total)
    writer.add_scalar("test_accuarcy", 100* correct / total, epoch)
    print('test_average loss:', loss / total)
    writer.add_scalar("test_loss",loss/total,epoch)

    state = {
        'net': net.state_dict()
    }
    if not os.path.isdir('checkpoint'):
        os.mkdir('checkpoint')
    torch.save(state,'./checkpoint/'+ file_name)
    print('Model Saved!')

def adjust_learning_rate(optimizer, epoch):
    lr= learning_rate
    if epoch >= 100:
            lr /= 10
    if epoch >= 150:
            lr /= 10
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

for epoch in range(200):
    adjust_learning_rate(optimizer, epoch)
    train(epoch)
    test(epoch)

writer.flush() #버퍼의 내용을 파일에 기록
writer.close() #stream을 종료

In [None]:
pip install tensorboard --upgrade

In [None]:
%load_ext tensorboard


In [None]:
tensorboard --logdir=runs