In [64]:
import torch
# 연산 장치 설정
if torch.cuda.is_available():
    device = torch.device('cuda')
    torch.manual_seed(42)
    torch.cuda.manual_seed(42)
else:
    device = torch.device('cpu')
    torch.manual_seed(42)

from torchvision import models
from torchvision import datasets
from torchvision import transforms

import matplotlib.pyplot as plt
# Tensor를 Image로 시각화하는 함수입니다
def show_tensor(tensor):
    if (tensor.ndim==4) and (tensor.shape[0]==1):
        tensor = tensor.squeeze(0)
    num_channels = tensor.shape[0]
    if num_channels == 1:
        image = tensor.squeeze(0).cpu().numpy()
        plt.imshow(image, vmin=0, vmax=20)
    elif num_channels == 3:
        image = tensor.permute(1,2,0).cpu().numpy()
        plt.imshow(image)
    else:
        image = torch.argmax(tensor, dim=0).cpu().numpy()
        plt.imshow(image)
    plt.axis('off')
    plt.show()

import tqdm

In [None]:
# Image 파이프라인 정의
image_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    # 각 픽셀이 갖는 값의 범위: 0 ~ 1
    transforms.ToTensor()
])

# Label 파이프라인 정의
label_transform = transforms.Compose([
    # DeepLabV1의 Output 해상도에 맞추기 위함입니다
    transforms.Resize((32, 32)),
    # 각 픽셀이 갖는 값의 범위: 0 ~ 255
    transforms.PILToTensor(),
    transforms.Lambda(lambda x: torch.where(x==255, 0, x.long())),
])

# PASCAL-VOC-2012 데이터 로드
dataset = datasets.VOCSegmentation(transform=image_transform,
                                   target_transform=label_transform,
                                   root='./data', image_set='train', download=True)

# 데이터 미니배치 처리                  
loader  = torch.utils.data.DataLoader(dataset, batch_size=20, shuffle=True)

# 1. DeepLabV1

In [91]:
class DeepLabV1(torch.nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        # Backbone: Imagenet Pretrained VGG-16
        vgg16 = models.vgg16(weights=models.VGG16_Weights.DEFAULT)
        # Feature Extractor 로드
        self.feature1 = torch.nn.ModuleList(list(vgg16.features[:5]))
        self.feature2 = torch.nn.ModuleList(list(vgg16.features[5:10]))
        self.feature3 = torch.nn.ModuleList(list(vgg16.features[10:17]))
        self.feature4 = torch.nn.ModuleList(list(vgg16.features[17:24]))
        self.feature5 = torch.nn.ModuleList(list(vgg16.features[24:]))
        # Classifier 로드
        self.classifier1 = torch.nn.ModuleList(list(vgg16.classifier[:3]))
        self.classifier2 = torch.nn.ModuleList(list(vgg16.classifier[3:6]))
        self.classifier3 = vgg16.classifier[6]

        # ============================  Feature Extractor 수정  ============================ #
        # 1. 마지막 두 개의 Pooling Layers 제거
        del self.feature4[-1]
        del self.feature5[-1]
        # 2. 마지막 세 개의 Convolutional Layers --> Atrous Convolutional Layers
        self.feature5[0] = torch.nn.Conv2d(512, 512, kernel_size=3, dilation=2, padding=2)
        self.feature5[2] = torch.nn.Conv2d(512, 512, kernel_size=3, dilation=2, padding=2)
        self.feature5[4] = torch.nn.Conv2d(512, 512, kernel_size=3, dilation=2, padding=2)
        # ================================================================================== #

        # ============================     Classifier 수정      ============================ #
        # 1. 첫 번째 FC Layer --> Atrous Convolutional Layer
        self.classifier1[0] = torch.nn.Conv2d(512, 1024, kernel_size=3, dilation=4, padding=4)
        # 2. 나머지 FC Layers --> Convolutional Layers
        self.classifier2[0] = torch.nn.Conv2d(1024,1024, kernel_size=1)
        self.classifier3    = torch.nn.Conv2d(1024,num_classes, kernel_size=1)
        # 3. Dropout 삭제
        del self.classifier1[-1]
        del self.classifier2[-1]
        # ================================================================================== #
    
    def forward(self, x):
        for layer in self.feature1:    x = layer(x)
        for layer in self.feature2:    x = layer(x)
        for layer in self.feature3:    x = layer(x)
        for layer in self.feature4:    x = layer(x)
        for layer in self.feature5:    x = layer(x)
        for layer in self.classifier1: x = layer(x)
        for layer in self.classifier2: x = layer(x)
        return self.classifier3(x)

In [68]:
# 모델: DeepLabV1
model = DeepLabV1(num_classes=21)
model.to(device)

# 옵티마이저: Momentum + Schedular
optimizer = torch.optim.SGD([{'params': list(model.feature1.parameters()) +
                                        list(model.feature2.parameters()) +
                                        list(model.feature3.parameters()) +
                                        list(model.feature4.parameters()) +
                                        list(model.feature5.parameters()),   'lr': 0.001},
                             {'params': list(model.classifier1.parameters()) +
                                        list(model.classifier2.parameters()) +
                                        list(model.classifier3.parameters()),'lr': 0.01}],
                            momentum=0.9, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.1)

# 손실함수: Cross Entropy Loss
criterion = torch.nn.CrossEntropyLoss()

In [None]:
num_epochs = 10

model.train()
for epoch in range(num_epochs):
    tqdm_format = f'[Epoch {epoch+1:03d}] {{bar}} {{percentage:3.0f}}% {{remaining}}'
    for image, label in tqdm.tqdm(loader, ncols=60, bar_format=tqdm_format):
        image, label = image.to(device), label.to(device)
        # 누적된 그래디언트를 0으로 초기화
        optimizer.zero_grad()
        # 순전파: 전방 계산
        output = model(image)
        # 순전파: Loss 계산
        loss = criterion(output, label.squeeze(1))
        # 역전파: 그래디언트 계산
        loss.backward()
        # 역전파: 가중치 갱신
        optimizer.step()
        scheduler.step()

# 가중치 저장
torch.save(model.state_dict(), 'v1_weights.pth')

# 2. DeepLabV2

In [119]:
class DeepLabV2(DeepLabV1):
    def __init__(self, num_classes):
        super().__init__(num_classes)

        # ============================   ASSP 관련 계층 추가    ============================ #
        # Filters at Multiple Sampling Rates
        self.conv1 = torch.nn.Conv2d(512, 1024, kernel_size=3, dilation=6, padding=6)
        self.conv2 = torch.nn.Conv2d(512, 1024, kernel_size=3, dilation=12,padding=12)
        self.conv3 = torch.nn.Conv2d(512, 1024, kernel_size=3, dilation=18,padding=18)
        self.conv4 = torch.nn.Conv2d(512, 1024, kernel_size=3, dilation=24,padding=24)
        # 1 x 1 Convolution Layer
        self.conv  = torch.nn.Conv2d(4096, num_classes, kernel_size=1)
        # ================================================================================== #
    
    def forward(self, x):

        for layer in self.feature1: x = layer(x)
        for layer in self.feature2: x = layer(x)
        for layer in self.feature3: x = layer(x)
        for layer in self.feature4: x = layer(x)
        for layer in self.feature5: x = layer(x)
        
        # Atrous Spatial Pyramid Pooling
        x1, x2, x3, x4 = self.conv1(x), self.conv2(x), self.conv3(x), self.conv4(x)

        # Sum Fusion
        x = torch.cat([x1, x2, x3, x4], dim=1)
        return self.conv(x)

In [120]:
# 모델: DeepLabV2
model = DeepLabV2(num_classes=21)
model.to(device)

# 옵티마이저: Momentum + Poly Schedular
optimizer = torch.optim.SGD([{'params': list(model.feature1.parameters()) +
                                        list(model.feature2.parameters()) +
                                        list(model.feature3.parameters()) +
                                        list(model.feature4.parameters()) +
                                        list(model.feature5.parameters()), 'lr': 0.001},
                             {'params': list(model.conv1.parameters()) +
                                        list(model.conv2.parameters()) +
                                        list(model.conv3.parameters()) +
                                        list(model.conv4.parameters()) +
                                        list(model.conv.parameters()),     'lr': 0.01}],
                            momentum=0.9, weight_decay=5e-4)
# Poly Schedular
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lambda step: (1 - step/740)**0.9)

# 손실함수: Cross Entropy Loss
criterion = torch.nn.CrossEntropyLoss()

In [None]:
num_epochs = 10

model.train()
for epoch in range(num_epochs):
    tqdm_format = f'[Epoch {epoch+1:03d}] {{bar}} {{percentage:3.0f}}% {{remaining}}'
    for image, label in tqdm.tqdm(loader, ncols=60, bar_format=tqdm_format):
        image, label = image.to(device), label.to(device)
        # 누적된 그래디언트를 0으로 초기화
        optimizer.zero_grad()
        # 순전파: 전방 계산
        output = model(image)
        # 순전파: Loss 계산
        loss = criterion(output, label.squeeze(1))
        # 역전파: 그래디언트 계산
        loss.backward()
        # 역전파: 가중치 갱신
        optimizer.step()
        scheduler.step()

# 가중치 저장
torch.save(model.state_dict(), 'v2_weights.pth')