In [None]:
#ResNet 기본 블록 정의

import torch
import torch.nn as nn

class BasicBlock(nn.Module):
   def __init__(self, in_channels, out_channels, kernel_size=3):#기본 커널 사이즈 3*3
       super(BasicBlock, self).__init__()


       #합성곱층 정의
       self.c1 = nn.Conv2d(in_channels, out_channels,
                           kernel_size=kernel_size, padding=1)
       self.c2 = nn.Conv2d(out_channels, out_channels,
                           kernel_size=kernel_size, padding=1)

       self.downsample = nn.Conv2d(in_channels, out_channels,
                                   kernel_size=1)#x의 채널 수를 F(x)와 같게 맞춰주는 역할.
                                   #채널의 수가 같으면 downsample 부분이 생략되도록 구현 수정 가능

       #배치 정규화층 정의
       self.bn1 = nn.BatchNorm2d(num_features=out_channels)
       self.bn2 = nn.BatchNorm2d(num_features=out_channels)

       self.relu = nn.ReLU()
   def forward(self, x):#Conv → BN → ReLU → Conv → BN 구조
       #스킵 커넥션을 위해 초기 입력을 저장
       x_ = x
       #x_를 다운샘플해서 채널 맞춰주고 x와 더한 뒤에 ReLU 활성화

       x = self.c1(x)
       x = self.bn1(x)
       x = self.relu(x)
       x = self.c2(x)
       x = self.bn2(x)

       #합성곱의 결과와 입력의 채널 수를 맞춤
       x_ = self.downsample(x_)

       #합성곱층의 결과와 저장해놨던 입력값을 더해줌
       x += x_
       x = self.relu(x)

       return x

In [None]:
#ResNet 모델 정의하기

class ResNet(nn.Module):
   def __init__(self, num_classes=10):#분류할 클래스의 수가 10
       super(ResNet, self).__init__()


       #기본 블록
       self.b1 = BasicBlock(in_channels=3, out_channels=64)
       self.b2 = BasicBlock(in_channels=64, out_channels=128)
       self.b3 = BasicBlock(in_channels=128, out_channels=256)


       #풀링을 최댓값이 아닌 평균값으로
       self.pool = nn.AvgPool2d(kernel_size=2, stride=2) #MaxPooling으로 변경 가능

       #분류기 #fully connected layer(MLP)
       #(생각해보기) 왜 4096일까?
       self.fc1 = nn.Linear(in_features=4096, out_features=2048)
       self.fc2 = nn.Linear(in_features=2048, out_features=512)
       self.fc3 = nn.Linear(in_features=512, out_features=num_classes)

       self.relu = nn.ReLU()


   def forward(self, x):
       #기본 블록과 풀링층을 통과 = 전체 특징 추출 파트
       #블록 수 계속 늘려보기
       x = self.b1(x)
       x = self.pool(x)
       x = self.b2(x)
       x = self.pool(x)
       x = self.b3(x)
       x = self.pool(x)


       #분류기의 입력으로 사용하기 위해 flatten
       x = torch.flatten(x, start_dim=1)

       #분류기로 예측값 출력: 최종 출력 생성
       x = self.fc1(x)
       x = self.relu(x)
       x = self.fc2(x)
       x = self.relu(x)
       x = self.fc3(x)

       return x

In [None]:
#데이터 전처리부

import tqdm

from torchvision.datasets.cifar import CIFAR10
from torchvision.transforms import Compose, ToTensor
from torchvision.transforms import RandomHorizontalFlip, RandomCrop
from torchvision.transforms import Normalize
from torch.utils.data.dataloader import DataLoader

from torch.optim.adam import Adam

train_transforms = Compose([
   RandomCrop((32, 32), padding=4),  #주변에 4픽셀 추가한 후, 32*32 크기로 랜덤 크롭핑
   RandomHorizontalFlip(p=0.5),  #50% 확률로 y축으로 뒤집기
   ToTensor(),  #텐서로 변환(픽셀값 범위를 정규화함)
   #이미지 정규화(R, G, B 각 채널별)
   Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.247, 0.243, 0.261))
])

test_transform = Compose([
    ToTensor(),
    Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.247, 0.243, 0.261))
])

In [None]:
#학습 데이터와 평가 데이터 불러오기
#transform = transforms 사용하면서 데이터의 증강이 적용됨.
#실제 저장공간에서의 데이터 수가 늘어나나? no. 하지만 매 epoch에서 랜덤한 변화가 생기면서 다양한 모습의 데이터를 볼 수 있음.
training_data = CIFAR10(root="./", train=True, download=True, transform=train_transforms)

#test데이터는 평가용이므로, 원본 데이터로 측정해야 함.
test_data = CIFAR10(root="./", train=False, download=True, transform=test_transform)

train_loader = DataLoader(training_data, batch_size=32, shuffle=True)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

100%|██████████| 170M/170M [00:04<00:00, 35.3MB/s]


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = ResNet(num_classes=10)
model.to(device)

ResNet(
  (b1): BasicBlock(
    (c1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (c2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (downsample): Conv2d(3, 64, kernel_size=(1, 1), stride=(1, 1))
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU()
  )
  (b2): BasicBlock(
    (c1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (c2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (downsample): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1))
    (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU()
  )
  (b3): BasicBlock(
    (c1): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))


In [None]:
lr = 1e-4
optim = Adam(model.parameters(), lr=lr)

for epoch in range(1):#시간관계상 1, 작동 테스트 후 큰 값으로 변경 바람
   iterator = tqdm.tqdm(train_loader)
   for data, label in iterator:
       # 최적화를 위해 기울기를 초기화
       optim.zero_grad()

       # 모델의 예측값
       preds = model(data.to(device))

       # 손실 계산 및 역전파
       loss = nn.CrossEntropyLoss()(preds, label.to(device))
       loss.backward()
       optim.step()

       iterator.set_description(f"epoch:{epoch+1} loss:{loss.item()}")

torch.save(model.state_dict(), "ResNet.pth")

epoch:1 loss:0.7609254121780396: 100%|██████████| 1563/1563 [00:42<00:00, 36.48it/s]


In [None]:
model.load_state_dict(torch.load("ResNet.pth", map_location=device))

num_corr = 0

with torch.no_grad():
   for data, label in test_loader:

       output = model(data.to(device))
       preds = output.data.max(1)[1]
       corr = preds.eq(label.to(device).data).sum().item()
       num_corr += corr

   print(f"Accuracy:{num_corr/len(test_data)}")

Accuracy:0.7628
