# CycleGAN 手把手教學

# 1.生成器與鑑別器建置

## 生成器

In [3]:
import torch
from torch import nn
from torchsummary import summary

def conv_norm_relu(in_dim, out_dim, kernel_size, stride = 1, padding=0):
    
    layer = nn.Sequential(nn.Conv2d(in_dim, out_dim, kernel_size, stride, padding),
                          nn.InstanceNorm2d(out_dim), 
                          nn.ReLU(True))
    return layer

def dconv_norm_relu(in_dim, out_dim, kernel_size, stride = 1, padding=0, output_padding=0):
    
    layer = nn.Sequential(nn.ConvTranspose2d(in_dim, out_dim, kernel_size, stride, padding, output_padding),
                          nn.InstanceNorm2d(out_dim), 
                          nn.ReLU(True))
    return layer

class ResidualBlock(nn.Module):
    
    def __init__(self, dim, use_dropout):
        super(ResidualBlock, self).__init__()
        res_block = [nn.ReflectionPad2d(1),
                     conv_norm_relu(dim, dim, kernel_size=3)]
        
        if use_dropout:
            res_block += [nn.Dropout(0.5)]
        res_block += [nn.ReflectionPad2d(1),
                      nn.Conv2d(dim, dim, kernel_size=3, padding=0),
                      nn.InstanceNorm2d(dim)]

        self.res_block = nn.Sequential(*res_block)

    def forward(self, x):
        return x + self.res_block(x)


class Generator(nn.Module):
    
    def __init__(self, input_nc=3, output_nc=3, filters=64, use_dropout=True, n_blocks=2):
        super(Generator, self).__init__()
        
        # 向下採樣 ( shape + 2 * padding - kernel + 1 ) / stride 
        # 256 + 3*2 = 262
        # 262 - 7 + 0 + 1 = 256 
        # ( 256 + 2 - 3 + 1 / 2 = 128
        # 128 + 2 - 3 + 1 / 2  = 64
        model = [nn.ReflectionPad2d(3),
                 conv_norm_relu(input_nc   , filters * 1, 7),
                 conv_norm_relu(filters * 1, filters * 2, 3, 2, 1),
                 conv_norm_relu(filters * 2, filters * 4, 3, 2, 1)]

        # 頸脖層
        for i in range(n_blocks):
            model += [ResidualBlock(filters * 4, use_dropout)]

        # 向上採樣 (input-1)*stride + kernel - 2*padding +  output_padding
        # (64-1)*2 + 3 -2 +1 = 128
        # (128-1)*2 + 3 -2 + 1 = 256
        # 256 + 6 = 262
        # 262 - 7 + 1 = 256
        model += [dconv_norm_relu(filters * 4, filters * 2, 3, 2, 1, 1),
                  dconv_norm_relu(filters * 2, filters * 1, 3, 2, 1, 1),
                  nn.ReflectionPad2d(3),
                  nn.Conv2d(filters, output_nc, 7),
                  nn.Tanh()]

        self.model = nn.Sequential(*model)    # model 是 list 但是 sequential 需要將其透過 , 分割出來

    def forward(self, x):
        return self.model(x)
    
G = Generator()
summary(G, (3,256,256))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
   ReflectionPad2d-1          [-1, 3, 262, 262]               0
            Conv2d-2         [-1, 64, 256, 256]           9,472
    InstanceNorm2d-3         [-1, 64, 256, 256]               0
              ReLU-4         [-1, 64, 256, 256]               0
            Conv2d-5        [-1, 128, 128, 128]          73,856
    InstanceNorm2d-6        [-1, 128, 128, 128]               0
              ReLU-7        [-1, 128, 128, 128]               0
            Conv2d-8          [-1, 256, 64, 64]         295,168
    InstanceNorm2d-9          [-1, 256, 64, 64]               0
             ReLU-10          [-1, 256, 64, 64]               0
  ReflectionPad2d-11          [-1, 256, 66, 66]               0
           Conv2d-12          [-1, 256, 64, 64]         590,080
   InstanceNorm2d-13          [-1, 256, 64, 64]               0
             ReLU-14          [-1, 256,

#### *list 分割範例

In [34]:
list1 = [1, 2, 3, 4, 5, 6, 7, 8]
print(list1)
print(*list1)

[1, 2, 3, 4, 5, 6, 7, 8]
1 2 3 4 5 6 7 8


## 鑑別器

In [5]:
import torch
from torch import nn
from torchsummary import summary

def conv_norm_leakyrelu(in_dim, out_dim, kernel_size, stride = 1, padding=0, output_padding=0):

    layer = nn.Sequential(nn.Conv2d(in_dim, out_dim, kernel_size, stride, padding),
                          nn.InstanceNorm2d(out_dim), 
                          nn.LeakyReLU(0.2,True))
    return layer

class Discriminator(nn.Module):
    
    def __init__(self, input_nc=3, filters=64, n_layer = 3):
        super(Discriminator, self).__init__()
        
        
        # 第一層不做 batchNorm
        # 256 -1 +1 = 256
        model = [
            nn.Conv2d(input_nc, filters, kernel_size=1, stride=1, padding=0),
            nn.LeakyReLU(0.2, True)]
        
        # 第二、三層相同
        # 256 +2 -4 +1 / 2 = 
        for i in range(1, n_layer):
            n_filters_prev = 2**(i-1)
            n_filters = 2**i
            model += [conv_norm_leakyrelu(filters * n_filters_prev , filters * n_filters, kernel_size=4,
                                           stride=2, padding=1)]
        # 第四層 stride 為 1
        n_filters_prev = 2**(n_layer-1)
        n_filters = 2**n_layer
        model += [conv_norm_leakyrelu(filters * n_filters_prev , filters * n_filters, kernel_size=4,
                                           stride=1, padding=1)]
        # 輸出層
        model += [nn.Conv2d(filters * n_filters, 1, kernel_size=4, stride=1, padding=1)]
        
        self.model = nn.Sequential(*model)

    def forward(self, input):
        return self.model(input)
    
D = Discriminator()
summary(D, (3,256,256))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 256, 256]             256
         LeakyReLU-2         [-1, 64, 256, 256]               0
            Conv2d-3        [-1, 128, 128, 128]         131,200
    InstanceNorm2d-4        [-1, 128, 128, 128]               0
         LeakyReLU-5        [-1, 128, 128, 128]               0
            Conv2d-6          [-1, 256, 64, 64]         524,544
    InstanceNorm2d-7          [-1, 256, 64, 64]               0
         LeakyReLU-8          [-1, 256, 64, 64]               0
            Conv2d-9          [-1, 512, 63, 63]       2,097,664
   InstanceNorm2d-10          [-1, 512, 63, 63]               0
        LeakyReLU-11          [-1, 512, 63, 63]               0
           Conv2d-12            [-1, 1, 62, 62]           8,193
Total params: 2,761,857
Trainable params: 2,761,857
Non-trainable params: 0
---------------------------

# 2. Initial CycleGAN

In [41]:
import torch
import os, shutil
from torch.nn import init
from torch.utils.data import DataLoader
import torchvision.datasets as dsets
import torchvision.transforms as transforms
import itertools


###### initial ######

def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        init.normal_(m.weight.data, 0.0, 0.02)

###### basic parameters ######
        
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
batch_size = 12
epochs = 1
decay_epoch = 10
lr = 2e-3
log_freq = 100

############ Create Relative Dir ############

weights_path = r'.\weights'
output_path = r'.\results'

if os.path.exists(output_path) == False:
    os.makedirs(output_path)
    print('Create dir : ', output_path)
    
if os.path.exists(weights_path) == False:
    os.makedirs(weights_path)
    print('Create dir : ', weights_path)

############ Define Model ############

G_A2B = Generator().to(device)
G_B2A = Generator().to(device)
D_A = Discriminator().to(device)
D_B = Discriminator().to(device)

G_A2B.apply(weights_init_normal)
G_B2A.apply(weights_init_normal)
D_A.apply(weights_init_normal)
D_B.apply(weights_init_normal)

############ define Loss function ############

MSE = nn.MSELoss()
L1 = nn.L1Loss()

############ define optimizer ############

class LambdaLR():
    def __init__(self, epochs, offset, decay_epoch):
        self.epochs = epochs
        self.offset = offset
        self.decay_epoch = decay_epoch

    def step(self, epoch):
        return 1.0 - max(0, epoch + self.offset - self.decay_epoch)/(self.epochs - self.decay_epoch)


optim_G = torch.optim.Adam(itertools.chain(G_A2B.parameters(), G_B2A.parameters()), lr=lr, betas=(0.5, 0.999))
optim_D = torch.optim.Adam(itertools.chain(D_A.parameters()  , D_B.parameters()), lr=lr, betas=(0.5, 0.999))
        
lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(optim_G,lr_lambda=LambdaLR(epochs, 0, decay_epoch).step)
lr_scheduler_D = torch.optim.lr_scheduler.LambdaLR(optim_D, lr_lambda=LambdaLR(epochs, 0, decay_epoch).step)

############ Prepare Data ############

transform = transforms.Compose(
            [transforms.RandomHorizontalFlip(),
             transforms.Resize((256, 256)),
             transforms.RandomCrop((224, 224)),
             transforms.ToTensor(),
             transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])

train_path = r'.\vangogh2photo'

trainA_path =  os.path.join(train_path, r'trainA')
targetA_path = os.path.join(train_path, r'new_trainA')

trainB_path =  os.path.join(train_path, r'trainB')
targetB_path = os.path.join(train_path, r'new_trainB')

if os.path.exists(targetA_path) == False:
    os.makedirs(targetA_path)
    print('Create dir : ', targetA_path)
    shutil.move(trainA_path, targetA_path)
if os.path.exists(targetB_path) == False:
    os.makedirs(targetB_path)
    print('Create dir : ', targetB_path)
    shutil.move(trainB_path, targetB_path)

dataA_loader = DataLoader(dsets.ImageFolder(targetA_path, transform=transform), batch_size=batch_size, shuffle=True, num_workers=4)
dataB_loader = DataLoader(dsets.ImageFolder(targetB_path, transform=transform), batch_size=batch_size, shuffle=True, num_workers=4)

# 準備圖片供預測

In [39]:
# To store 50 generated image in a pool and sample from it when it is full
# Shrivastava et al’s strategy
class ReplayBuffer:
    def __init__(self, max_size=50):
        assert (max_size > 0), "Empty buffer or trying to create a black hole. Be careful."
        self.max_size = max_size
        self.data = []

    def push_and_pop(self, data):
        to_return = []
        for element in data.data:
            element = torch.unsqueeze(element, 0)
            if len(self.data) < self.max_size:
                self.data.append(element)
                to_return.append(element)
            else:
                if random.uniform(0, 1) > 0.5:
                    i = random.randint(0, self.max_size - 1)
                    to_return.append(self.data[i].clone())
                    self.data[i] = element
                else:
                    to_return.append(element)
        return torch.cat(to_return)
    
fake_A_sample = ReplayBuffer()
fake_B_sample = ReplayBuffer()

# 測試

In [46]:
import numpy as np
import cv2

def show_AB():
    img1 = data[0][0][0].numpy().transpose((1,2,0))    # vangogh
    img2 = data[1][0][0].numpy().transpose((1,2,0))    # real pic
    res = cv2.hconcat([img1, img2])

    cv2.imshow('test' , res)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

for idx, data in enumerate(zip(dataA_loader, dataB_loader)):
    if idx > 0 :
        break 
    else :
        print(len(data))           # two data loader
        print(data[0][0].shape)    # trainA input_x
        print(data[0][1].shape)    # trainA input_y
        print(data[0][0].shape[0]) # get batch size


2
torch.Size([12, 3, 224, 224])
torch.Size([12])
12


# 4. Training

In [47]:
from tqdm import tqdm
import torchvision.utils as vutils

for epoch in range(epochs):
    
    progress_bar = tqdm(enumerate(zip(dataA_loader, dataB_loader)),
                        total = len(data_loader))
    
    for idx, data in progress_bar:
        
        ############ define training data & label ############
        
        real_A = data[0][0].to(device)    # vangogh image
        real_B = data[1][0].to(device)    # real picture
        real_batch_size = real_A[0][0].shape[0]
        
        real_label = torch.ones( (real_batch_size, 1) , dtype=torch.float32).to(device)
        fake_label = torch.zeros( (real_batch_size, 1) , dtype=torch.float32).to(device)
        
        
        ############ Train G ############
        
        optim_G.zero_grad()
        
        # Train G - Adversial Loss  
        
        fake_A = G_B2A(real_B)
        fake_out_A = D_A(fake_A)
        
        fake_B = G_A2B(real_A)
        fake_out_B = D_B(fake_B)
        
        adversial_loss_B2A = MSE(fake_out_A, real_label)
        adversial_loss_A2B = MSE(fake_out_B, real_label)
        
        adv_loss = adversial_loss_B2A + adversial_loss_A2B
        
        # G - Consistency Loss (Reconstruction)  
        
        rec_A = G_B2A(fake_B)
        rec_B = G_A2B(fake_A)
        
        consistency_loss_B2A = L1(rec_A, real_A)
        consistency_loss_A2B = L1(rec_B, real_B)
        
        rec_loss = consistency_loss_B2A + consistency_loss_A2B
        
        # G - Identity  Loss  
        
        idt_A = G_B2A(real_A)
        idt_B = G_A2B(real_B)
        
        identity_loss_A = L1(idt_A, real_A)
        identity_loss_B = L1(idt_B, real_B)
        
        idt_loss = identity_loss_A + identity_loss_B
        
        # G - Total Loss  
        
        beta_rec = 10
        beta_idt = 5
        
        loss_G = adv_loss + ( rec_loss * beta_rec ) + ( idt_loss * beta_idt )
        
        # G - Backward & Update  
        
        loss_G.backward()
        optim_G.step()
        
        ############ Train D ############
        
        optim_D.zero_grad()
        
        # D - Adversial D_A Loss  
        
        real_out_A = D_A(real_A)
        real_out_A_loss = MSE(real_out_A, real_label)
        
        fake_out_A = D_A(fake_A_sample.push_and_pop(fake_A))
        fake_out_A_loss = MSE(real_out_A, fake_label)
        
        loss_DA = real_out_A_loss + fake_out_A_loss
        
        # D - Adversial D_B Loss  
        
        real_out_B = D_B(real_B)
        real_out_B_loss = MSE(real_out_B, real_label)
        
        fake_out_B = D_B(fake_B_sample.push_and_pop(fake_B))
        fake_out_B_loss = MSE(fake_out_B, fake_label)
        
        loss_DB = ( real_out_B_loss + fake_out_B_loss )
        
        # D - Total Loss 
        
        loss_D = ( loss_DA + loss_DB ) * 0.5
        
        # Backward & Update
        
        loss_D.backward()
        optim_D.step()
        
        ############ progress info ############
        
        progress_bar.set_description(
            f"[{epoch}/{epochs - 1}][{idx}/{len(dataloader) - 1}] "
            f"Loss_D: {(loss_DA + loss_DB).item():.4f} "
            f"Loss_G: {loss_G.item():.4f} "
            f"Loss_G_identity: {(idt_loss).item():.4f} "
            f"loss_G_GAN: {(adv_loss).item():.4f} "
            f"loss_G_cycle: {(rec_loss).item():.4f}")
        
        if i % log_freq == 0:
            
            vutils.save_image(real_A, f"{output_path}/real_A_{epoch}.jpg", normalize=True)
            vutils.save_image(real_B, f"{output_path}/real_B_{epoch}.jpg", normalize=True)
            
            fake_A = ( G_B2A( real_B ).data + 1.0 ) * 0.5
            fake_B = ( G_A2B( real_A ).data + 1.0 ) * 0.5
            
            vutils.save_image(fake_A, f"{output_path}/fake_A_{epoch}.jpg", normalize=True)
            vutils.save_image(fake_B, f"{output_path}/fake_A_{epoch}.jpg", normalize=True)
        
        
    torch.save(netG_A2B.state_dict(), f"weights/netG_A2B_epoch_{epoch}.pth")
    torch.save(netG_B2A.state_dict(), f"weights/netG_B2A_epoch_{epoch}.pth")
    torch.save(netD_A.state_dict(), f"weights/netD_A_epoch_{epoch}.pth")
    torch.save(netD_B.state_dict(), f"weights/netD_B_epoch_{epoch}.pth")

    ############ Update learning rates ############
    lr_scheduler_G.step()
    lr_scheduler_D.step()

############ save last check pointing ############
torch.save(netG_A2B.state_dict(), f"weights/netG_A2B.pth")
torch.save(netG_B2A.state_dict(), f"weights/netG_B2A.pth")
torch.save(netD_A.state_dict(), f"weights/netD_A.pth")
torch.save(netD_B.state_dict(), f"weights/netD_B.pth")   

  0%|                                                                                          | 0/654 [01:48<?, ?it/s]


RuntimeError: [enforce fail at ..\c10\core\CPUAllocator.cpp:72] data. DefaultCPUAllocator: not enough memory: you tried to allocate 7552892928 bytes. Buy new RAM!


# 5. Save File