# 5.2 CNN 모델 구현하기

In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, datasets

In [16]:
USE_CUDA = torch.cuda.is_available()   # cuda를 쓸 수 있는지 확인하는 코드
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")  # use_cude의 결과에 따라 데이터를 cuda 혹은 cpu로 보내도록 가리키는 역할
EPOCHS = 40  
BATCH_SIZE = 64

In [17]:
# 학습용, 테스트용 데이터셋 불러오기
# 코드 간결하게 하기 위해 fashion MNIST데이터 셋을 DataLoader부를 때 정의

train_loader = torch.utils.data.DataLoader(datasets.FashionMNIST('./data', train = True, download = True, transform = transforms.Compose([
    transforms.ToTensor(), transforms.Normalize((0.1307, ), (0.3081,))])), batch_size = BATCH_SIZE , shuffle = True)

test_loader = torch.utils.data.DataLoader(datasets.FashionMNIST('./data', train = False, download = True, transform = transforms.Compose([
    transforms.ToTensor(), transforms.Normalize((0.1307, ), (0.3081,))])), batch_size = BATCH_SIZE , shuffle = True)


In [18]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        # 첫 컨볼루션 계층에서는 10개의 특징 맵을 생성하고 두 번째 컨볼루션 계층에서는 10개의 특징 맵을 받아 20개의 특징 맵을 만듦
        self.conv1 = nn.Conv2d(1,10,kernel_size = 5)  # nn.Conv2d의 첫 두 파라미터는 입력 채널 수와 출력 채널 수, 여기서 사용하는 데이터셋은 흑백 이미지이기 때문에 색상 채널이 1개 뿐
        self.conv2 = nn.Conv2d(10,20,kernel_size = 5)  # 커널 사이즈는 숫자 하나만 지정하면 정사각형으로 간주. (3,5)하면 3*5크기의 직사각형 만들 수 있음
        self.drop = nn.Dropout2d()  # 컨볼루션 결과로 나온 출력값에 드롭아웃. 드롭아웃 함수 사용 안하고 모듈 이용해 드롭아웃 인스턴스 생성
        self.fc1 = nn.Linear(320,50) # 입력 크기 320, 출력 크기 50
        self.fc2 = nn.Linear(50,10)  # 입력 크기 50, 출력 크기 10
        
    def forward(self,x):
        x = F.relu(F.max_pool2d(self.conv1(x),2))  # max_pool2d함수의 두 번째 파라미터는 커널 크기
        x = F.relu(F.max_pool2d(self.conv2(x),2))  # 드롭아웃과 같이 학습 파라미터가 없으므로 nn.MaxPool2d같이 일반 모듈 사용해도 됨
        # 특징 맵 이후 출력을 하는 일반 인공 신경망은 1차원 입력을 받음
        x = x.view(-1,320)  # 2차원 -> 1차원
        # 앞서 추출한 특징들을 입력으로 받아 분류하는 (일반)신경망 계층 구성
        x = F.relu(self.fc1(x))
        x = self.drop(x)
        x = self.fc2(x)
        return x

In [19]:
model = CNN().to(DEVICE)  # 만든 모델 인스턴스
optimizer = optim. SGD(model.parameters(), lr = 0.01, momentum = 0.5) # 최적화 함수

In [20]:
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data,target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output,target)
        loss.backward()
        optimizer.step()
        
        if batch_idx % 200 == 0 :
            print('train epoch : {}[{}/{} ({: .0f}%)]\tloss:{:.6f}'.format(epoch, len(data), len(train_loader.dataset),
                                                                           100* batch_idx / len(train_loader),loss.item()))

In [21]:
# 에폭이 끝날 때 마다 테스트셋으로 모델의 성능 측정

def evaluate(model, test_loader):
    model.eval()
    test_loss =0   # 테스트 오차 값 0으로 초기화
    correct=0     # 예측이 맞은 수 0으로 초기화
    with torch.no_grad() : # 평가 과정에서는 기울기를 계산하지 않아도 됨
        for data, target in test_loader :
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)
            # 모든 오차 더하기
            test_loss += F.cross_entropy(output,target, reduction = 'sum').item()  #미니배치 평균 대신 합을 받아와야 함
            pred = output.max(1, keepdim = True)[1]  # output.max()함수는 가장 큰 값과 그 값이 있는 인덱스를 출력
            correct +=pred.eq(target.view_as(pred)).sum().item()  #eq()함수는 값이 일치하면 1, 아니면 0을 출력 
    test_loss /= len(test_loader.dataset)  # 모델의 전체 데이터셋에 대한 오차를 테스트셋 데이터 수로 나눠 평균 구함
    test_accuracy = 100 * correct / len(test_loader.dataset)  # 맞힌 개수의 합을 테스트셋 데이터 수로 나누고 100을 곱해 정확도 구함
    return test_loss, test_accuracy

for epoch in range(1, EPOCHS+1):
    train(model, train_loader, optimizer,epoch)
    test_loss, test_accuracy = evaluate(model, test_loader)
    print('[{}] test loss : {:.4f}, accuracy: {:.2f}%'.format(epoch, test_loss, test_accuracy))

[1] test loss : 0.6550, accuracy: 74.83%
[2] test loss : 0.5630, accuracy: 77.67%
[3] test loss : 0.5155, accuracy: 80.28%
[4] test loss : 0.4703, accuracy: 81.49%
[5] test loss : 0.4463, accuracy: 83.33%
[6] test loss : 0.4154, accuracy: 84.67%
[7] test loss : 0.3939, accuracy: 85.78%
[8] test loss : 0.3731, accuracy: 86.04%
[9] test loss : 0.3680, accuracy: 86.51%
[10] test loss : 0.3738, accuracy: 86.28%
[11] test loss : 0.3500, accuracy: 87.16%
[12] test loss : 0.3476, accuracy: 87.56%
[13] test loss : 0.3350, accuracy: 87.98%
[14] test loss : 0.3298, accuracy: 88.09%
[15] test loss : 0.3266, accuracy: 88.11%
[16] test loss : 0.3285, accuracy: 88.38%
[17] test loss : 0.3205, accuracy: 88.76%
[18] test loss : 0.3201, accuracy: 88.55%
[19] test loss : 0.3069, accuracy: 88.99%
[20] test loss : 0.3098, accuracy: 88.72%
[21] test loss : 0.3068, accuracy: 89.12%
[22] test loss : 0.3050, accuracy: 89.08%
[23] test loss : 0.3145, accuracy: 88.54%
[24] test loss : 0.3022, accuracy: 89.41%
[

[30] test loss : 0.2914, accuracy: 89.36%
[31] test loss : 0.2991, accuracy: 89.56%
[32] test loss : 0.2917, accuracy: 89.59%
[33] test loss : 0.2896, accuracy: 89.67%
[34] test loss : 0.2948, accuracy: 89.54%
[35] test loss : 0.3103, accuracy: 88.81%
[36] test loss : 0.2915, accuracy: 89.62%
[37] test loss : 0.2921, accuracy: 89.62%
[38] test loss : 0.2948, accuracy: 89.75%
[39] test loss : 0.3004, accuracy: 89.80%
[40] test loss : 0.2953, accuracy: 89.78%


# ResNet으로 컬러 데이터셋에 적용하기

In [23]:
# 컬러 데이터셋은 흑백 이미지보다 복잡하므로 학습을 더 많이 해야함

EPOCHS = 300
BATCH_SIZE = 128

In [24]:
# 데이터셋을 불러오고 전처리하는 과정은 동일

# 과적합 방지하기 위해 학습용 데이터셋에 RandomCrop과 RandomHorizontalFlip 같은 노이즈 추가
train_loader = torch.utils.data.DataLoader(datasets.CIFAR10('./data', train = True, download = True, transform = transforms.Compose([
    transforms.ToTensor(),transforms.RandomHorizontalFlip(),transforms.RandomCrop(32, padding =4),
    transforms.Normalize((0.5, 0.5, 0.5 ), (0.5, 0.5, 0.5))])), batch_size = BATCH_SIZE , shuffle = True)

test_loader = torch.utils.data.DataLoader(datasets.CIFAR10('./data', train = False, download = True, transform = transforms.Compose([
    transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5 ), (0.5, 0.5, 0.5))])), batch_size = BATCH_SIZE , shuffle = True)

Files already downloaded and verified
Files already downloaded and verified


In [29]:
# Residual 블록을 BasicBlock이라는 새로운 모듈로 정의해서 사용

class BasicBlock(nn.Module):
    def __init__(self,in_planes, planes, stride = 1):
        super(BasicBlock,self).__init__()
        self.conv1 = nn.Conv2d(in_planes,planes,kernel_size = 3, stride = stride, padding =1, bias = False)
        self.bn1 = nn.BatchNorm2d(planes) # 배치 정규화(드롭아웃과 같은 효과를 냄)
        self.conv2 = nn.Conv2d(planes,planes,kernel_size = 3, stride = 1, padding =1, bias = False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.shortcut = nn.Sequential()  # 여러모듈을 하나의 모듈로 묶는 역할
        if stride !=1 or in_planes !=planes:  # '만약 채널이 증폭이 된다면' 의미
            self.shortcut = nn.Sequential(nn.Conv2d(in_planes,planes,kernel_size = 1, stride = stride, padding =1, bias = False),
                           nn.BatchNorm2d(planes))
            
    def forward(self,x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out +=self.shortcut(x)
        out = F.relu(out)
        return out

In [37]:
# 모델 정의
# 이미지를 받아 컨볼루션과 배치 정규화 층을 거친 후 여러 basicblock층을 통과하고 평균 풀링과 신경망 거쳐 예측을 출력

class ResNet(nn.Module):
    def __init__(self,num_classes=10):
        super(ResNet,self).__init__()
        self.in_planes = 16
        self.conv1 = nn.Conv2d(3, 16, kernel_size = 3, stride = 1, padding =1, bias = False)
        self.bn1 = nn.BatchNorm2d(16)
        # 증폭하는 역할을 하는 모듈들은 shortcut 모듈을 따로 갖게됨
        self.layer1 = self._make_layer(16,2, stride =1)  # 16채널에서 16채널을 내보내는 BasicBlock 2개
        self.layer2 = self._make_layer(32,2, stride =2)  # 16채널 받아 32채널을 출력하는 BasicBlock 1개 + 32채널에서 32채널을 내보내는 BasicBlock 1개
        self.layer3 = self._make_layer(64,2, stride =2)  # 32채널 받아 64채널을 출력하는 BasicBlock 1개 + 64채널에서 64채널을 내보내는 BasicBlock 1개
        self.linear = nn.Linear(64, num_classes)
        
    def _make_layer(self,planes, num_blocks,stride):  # 여러 BasicBlock을 모듈 하나로 묶어주는 역할
        strides = [stride] + [1] * (num_blocks-1)  # layer1을 예시로 보면 stride가 1이니까 [1] + [1]*1이어서 [1,1] or [1],[1]이 될 것
        layers = []
        for stride in strides:  # BasicBlock에서 각각 블럭이 stride 몇인지 지정하는 역할
            layers.append(BasicBlock(self.in_planes,planes,stride))
            self.in_planes = planes
        return nn.Sequential(*layers)  # layers의 모든 원소를 가져오겠다는 의미


    def forward(self,x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out,8)
        out = out.view(out.size(0),-1)
        out = self.linear(out)
        return out
        

In [38]:
# 여기서는 학습 효율을 높이기 위해 학습률 감소 기법을 사용 -> optim.lr_scheduler.StepLR도구 사용해 간단하게 적용 가능

model = ResNet().to(DEVICE)  # 만든 모델 인스턴스
optimizer = optim. SGD(model.parameters(), lr = 0.1, momentum = 0.9, weight_decay = 0.0005) # 최적화 함수
scheduler = optim.lr_scheduler.StepLR(optimizer,step_size = 50, gamma = 0.1)  # 50번 호출될 때 학습률에 0.01을 곱한다는 의미

In [39]:
print(model)

ResNet(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=

In [43]:
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data,target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output,target)
        loss.backward()
        optimizer.step()
        
#         if batch_idx % 200 == 0 :
#             print('train epoch : {}[{}/{} ({: .0f}%)]\tloss:{:.6f}'.format(epoch, len(data), len(train_loader.dataset),
#                                                                            100* batch_idx / len(train_loader),loss.item()))

In [44]:
def evaluate(model, test_loader):
    model.eval()
    test_loss =0   # 테스트 오차 값 0으로 초기화
    correct=0     # 예측이 맞은 수 0으로 초기화
    with torch.no_grad() : # 평가 과정에서는 기울기를 계산하지 않아도 됨
        for data, target in test_loader :
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)
            # 모든 오차 더하기
            test_loss += F.cross_entropy(output,target, reduction = 'sum').item()  #미니배치 평균 대신 합을 받아와야 함
            pred = output.max(1, keepdim = True)[1]  # output.max()함수는 가장 큰 값과 그 값이 있는 인덱스를 출력
            correct +=pred.eq(target.view_as(pred)).sum().item()  #eq()함수는 값이 일치하면 1, 아니면 0을 출력 
    test_loss /= len(test_loader.dataset)  # 모델의 전체 데이터셋에 대한 오차를 테스트셋 데이터 수로 나눠 평균 구함
    test_accuracy = 100 * correct / len(test_loader.dataset)  # 맞힌 개수의 합을 테스트셋 데이터 수로 나누고 100을 곱해 정확도 구함
    return test_loss, test_accuracy

In [46]:
for epoch in range(1, EPOCHS+1):
    scheduler.step()  # 앞과 달라진 점. scheduler.step함수로 학습률을 조금 낮춰주는 단계 추가된 것
    train(model, train_loader, optimizer,epoch)
    test_loss, test_accuracy = evaluate(model, test_loader)
    print('[{}] test loss : {:.4f}, accuracy: {:.2f}%'.format(epoch, test_loss, test_accuracy))

RuntimeError: The size of tensor a (16) must match the size of tensor b (17) at non-singleton dimension 3