In [1]:
import torch
import torch.nn as nn
from torchvision.transforms import v2

from PIL import Image
import numpy as np

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 데이터셋 표준화를 위한 기본정보 # 패치(반사패딩 추가해야함)
imgNet_val = {'mean' : [0.485, 0.456, 0.406], 'std' : [0.229, 0.224, 0.225]}

In [3]:
from img_utils import *

In [4]:
content_img = './seoul.png'
style_img = './starnight.png'

content_tensor, img_shape = preprocess_img(content_img, 512, imgNet_val, device)
style_tensor, _ = preprocess_img(style_img, 512, imgNet_val, device)

In [5]:
task_img = create_random_noise_img(img_shape)
task_img.save("random_noise.jpg")

task_tensor, _ = preprocess_img('random_noise.jpg', 512, imgNet_val, device)

In [6]:
import torch.nn.functional as F

# 항목별로 lossfn 설계
class TransferLoss():
    def __init__(self, target_feature, item):
        item_msg_list = ['content', 'style']
        self.item = item #해당 feature이 content인지, style인지 확인
        if self.item not in item_msg_list:
            raise Exception("item 종류 잘못 입력")

        # target_feaute이 딕셔너리 형태이니 value값만 추출해 리스트화
        self.target = list(target_feature.values())

        # 그람 행렬(Gram matrix) 함수 정의
    def gram_matrix(self, data_tensor):
        bs, c, h, w = data_tensor.size() #여기서 bs는 1이다.
        #Feature Map에서 채널별로 분리 (Features)
        #그 다음 이 features는 3차원이니 H*W를 곱해서 2D로 차원축소
        features = data_tensor.view(bs*c, h*w)
        #모든 Features별로 내적을 땡겨버리자 -> 모든 instance pair값 계산
        #torch.mm은 행렬곱 메서드임
        G = torch.mm(features, features.t())
        #gram matrix의 값을 정규화 수행
        return G.div(bs*c*h*w)

    def cal_loss(self, input): #입력되는 input도 딕셔너리 형태
        input = list(input.values()) #input를 리스트로 형변환
        # loss를 0으로 초기화 및 디바이스 위치 지정
        device = input[0].device
        loss = torch.zeros(1, device=device)

        # 콘텐츠 로스는 1개의 output_feature에 대해서 loss계산
        if self.item == 'content':
            loss += F.mse_loss(input[0], self.target[0])

        # 스타일 로스는 여러개의 output_feature를 그람 매트릭스 해서 Loss계산
        elif self.item == 'style':
            for e_input, e_target in zip(input, self.target):
                G_input = self.gram_matrix(e_input)
                G_style = self.gram_matrix(e_target)
                loss += F.mse_loss(G_input, G_style)
        
        return loss

In [7]:
# 뉴럴 스타일 트랜스퍼에 사용할 백본 모델 불러오기
# torchvision에 있는 VGG19모델을 사용
from torchvision import models

pr_model = models.vgg19(weights=models.VGG19_Weights.IMAGENET1K_V1).to(device)

In [8]:
class ReModelVGG19(nn.Module):
    def __init__(self, origin_model):
        super(ReModelVGG19, self).__init__()
        origin_modeul = origin_model.features
        self.module = nn.ModuleDict()

        block = []
        block_idx = 0

        for layer in origin_modeul.children():
            if isinstance(layer, nn.Conv2d) and block: #블럭이 비어있지 않은 경우
                # 새로운 conv2d가 나오면 기존 블록을 저장하고 초기화
                block_name = f"conv_{block_idx}_block"
                self.module[block_name] = nn.Sequential(*block)

                #블록 리스트 초기화
                block = []
                block_idx += 1
            if isinstance(layer, nn.ReLU):
                #in-place기능이 켜져있으면 잘 동작을 안한다고 함
                layer = nn.ReLU(inplace=False)
            #레이어를 계속 블럭 리스트에 넣기
            block.append(layer)

        if block: #가장 마지막 블록을 추가
            block_name = f"conv_{block_idx}_block"
            self.module[block_name] = nn.Sequential(*block)

    def forward(self, x):
        for block in self.module.values():
            x = block(x)

        return x

In [9]:
backbone = ReModelVGG19(pr_model)
#backbone는 평가 모드로 설정
backbone.eval()

ReModelVGG19(
  (module): ModuleDict(
    (conv_0_block): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
    )
    (conv_1_block): Sequential(
      (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
      (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (conv_2_block): Sequential(
      (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
    )
    (conv_3_block): Sequential(
      (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
      (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (conv_4_block): Sequential(
      (0): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
    )
    (conv_5_block): Sequential(
      (0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1):

In [10]:
content_block = ['conv_3_block']
style_block = ['conv_0_block', 
                'conv_1_block', 
                'conv_2_block', 
                'conv_3_block', 
                'conv_4_block', ]

In [11]:
class NeuralStyleNet(nn.Module):
    def __init__(self, backbone, block_setting):
        super(NeuralStyleNet, self).__init__()

        self.backbone = backbone #백본 모델의 인스턴스화
        self.block_setting = block_setting #캡쳐할 output feature리스트 가져오기
        self.outputs = {} #캡쳐할 feature out 저장

        def hook_fn(module, input, output, name):
            #캡쳐한 블럭 이름이 outputs 딕셔너리에 없을 경우
            #블럭의 출력feature이랑 block이름을 key,value로 묶어서 outputs에 저장
            if name not in self.outputs:
                self.outputs[name] = output

        for module in self.backbone.children():
            for sub_name, sub_module in module.named_modules():
                if any(block == sub_name for block in self.block_setting):
                    # block_setting에 명기된 블럭의 output_feature만 캡쳐
                    self._register_hook(sub_name, sub_module, hook_fn)

    def _register_hook(self, name, module, hook_fn):
        def hook(module, input, output): #여기서 name = 캡쳐한 모듈(블럭)의 이름
            return hook_fn(module, input, output, name)
        module.register_forward_hook(hook)

    def forward(self, x):
        _ = self.backbone(x)
        return self.outputs

In [12]:
from tqdm import tqdm
import torch.optim as optim

class StyleTransfer:
    def __init__(self, backbone, content_block, style_block,
                 learning_rate = 0.01,
                 weight_c = 1, weight_s = 1):
        self.backbone = backbone
        self.content_block = content_block
        self.style_block = style_block

        self.weight = [weight_c, weight_s]

        self.lr = learning_rate

        self.task_model = None

    def initalize(self, content, style): #처음 초기화 해야할 항목들
        content_model = NeuralStyleNet(self.backbone, self.content_block)
        style_model = NeuralStyleNet(self.backbone, self.style_block)

        #Target 콘텐츠, 스타일 Feature을 캡쳐함(초기, 단 한번만 수행)
        with torch.no_grad(): #평가모드이니 grad계산 중지
            target_content = content_model(content)
            style_content = style_model(style)
        
        # 콘텐츠&스타일 Feature기반으로 LossFn을 초기화
        self.content_loss_fn = TransferLoss(target_content, 'content')
        self.style_loss_fn = TransferLoss(style_content, 'style')

        #모델에 작업에 필요한 캡쳐 블록 리스트를 넣고 초기화
        #이때 캡쳐할 블럭은 콘텐츠+스타일 블럭 합산
        task_block = self.content_block + self.style_block
        self.task_model = NeuralStyleNet(backbone, task_block)

    def compute_loss(self, pred_task): #작업이미지가 모델을 통과한 결과물로 로스 계산
        #콘텐츠 로스, 스타일 로스랑 비교할 블럭들을 추출
        pred_content = {k: v for k, v in pred_task.items() if k in self.content_block}
        pred_style = {k: v for k, v in pred_task.items() if k in self.style_block}

        #추출한 블럭을 바탕으로 콘텐츠로스, 스타일로스 계산
        content_loss = self.content_loss_fn.cal_loss(pred_content)
        style_loss = self.style_loss_fn.cal_loss(pred_style)

        #사전에 정의한 '항목별'가중치를 로스에 곱해서 토탈 로스 정의
        total_loss = self.weight[0]*content_loss + self.weight[1]*style_loss
        
        return total_loss, content_loss, style_loss
    
    #딥드림 함수 설계 방법론을 바탕으로 Gradient Descent Step 함수 설계
    def gradient_descent_step(self, task):
        task.requires_grad = True #teans(tensor)의 기울기 계산 활성화

        # 옵티마이저 설정(뉴럴 스타일 트랜스퍼는 LBFGS를 사용)
        optimizer = optim.Adam([task], lr= self.lr)

        optimizer.zero_grad() #옵티마이저 기울기 0으로 초기화  

        #전사 과정 수행
        pred_task = self.task_model(task)

        # 로스 함수 계산 (전사 결과물을 로스함수로 보낸다.)
        total_loss, content_loss, style_loss = self.compute_loss(pred_task)

        # 역전파 수행 #retain_graph=True -> 기울기 값 생성
        total_loss.backward()

        with torch.no_grad():
            # 옵티마이저 파라미터가 [task](텐서자료형) 단 하나니 [0]번째 리스트임
            for param in optimizer.param_groups[0]['params']:
                param.sub_(self.lr * param.grad) #이게 경사 하강법의 코드임
                grads_norm = param.grad.norm()
                print(grads_norm)

        task = task.detach()  # 그래디언트 추적 중단

        return task, content_loss.item(), style_loss.item()
    

    def gradient_descent_loop(self, task, img_shape, normal_val,
                              content, style, num_steps=300):
        #초기화 함수 구동
        self.initalize(content, style)

        for step in tqdm(range(num_steps)):
            #경사하강법 함수 구동
            task, content_loss, style_loss = self.gradient_descent_step(task)

            if step % 25 == 0:
                # 로스 결과값 산출
                print(f"Step {step}", end=' ')
                print(f"[콘텐츠 로스: {content_loss:.4f}, ", end=' ')
                print(f"스타일 로스: {style_loss:.4f}]")

                #중간 결과물을 이미지로 저장
                step_res = task.detach().clone()
                task_img = deprocess_img(step_res, img_shape, normal_val)
                name = f'combine_{step}'
                task_img.save(f"{name}.png")
        
        return task


In [13]:
style_transfer = StyleTransfer(backbone, content_block, style_block)

In [14]:
style_transfer.gradient_descent_loop(task_tensor, img_shape, imgNet_val,
                                     content_tensor, style_tensor)

  0%|          | 0/300 [00:00<?, ?it/s]

tensor(0.1456, device='cuda:0')
Step 0 [콘텐츠 로스: 22.3436,  스타일 로스: 0.0053]


  0%|          | 1/300 [00:00<02:51,  1.75it/s]


RuntimeError: Trying to backward through the graph a second time (or directly access saved tensors after they have already been freed). Saved intermediate values of the graph are freed when you call .backward() or autograd.grad(). Specify retain_graph=True if you need to backward through the graph a second time or if you need to access saved tensors after calling backward.