# CycleGAN, Image-to-Image Translation / 2019. 4. 25


---

# 1. Load and Visualize the Data

In [None]:
# loading in and transforming data
import os
import torch
from torch.utils.data import DataLoader
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms

# visualizing data
import matplotlib.pyplot as plt
import numpy as np
import warnings

%matplotlib inline

### (1) dataloader

In [None]:
def get_data_loader(image_type, image_dir='horse2zebra', 
                    image_size=128, batch_size=16, num_workers=0):
    
    transform = transforms.Compose([transforms.Resize(image_size), # # resize(to 128x128) and normalize 
                                    transforms.ToTensor()])

    # get training and test directories
    image_path = './' + image_dir
    train_path = os.path.join(image_path, image_type)
    test_path = os.path.join(image_path, 'test_{}'.format(image_type))

    # define datasets using ImageFolder
    train_dataset = datasets.ImageFolder(train_path, transform)
    test_dataset = datasets.ImageFolder(test_path, transform)

    # create and return DataLoaders
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    return train_loader, test_loader

In [None]:
# Create train and test dataloaders for images from the two domains X and Y
# image_type = directory names for our data
dataloader_X, test_dataloader_X = get_data_loader(image_type='horse')
dataloader_Y, test_dataloader_Y = get_data_loader(image_type='zebra')

### (2) visualize the data

In [None]:
def imshow(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    
dataiter = iter(dataloader_X) #반복되는 객체
images, _ = dataiter.next()  #first batch # the "_" is a placeholder for no labels

fig = plt.figure(figsize=(12, 8))  
imshow(torchvision.utils.make_grid(images))

dataiter = iter(dataloader_Y)
images, _ = dataiter.next()
# show images
fig = plt.figure(figsize=(12, 8))  
imshow(torchvision.utils.make_grid(images))

In [None]:
# current range
img = images[0]

print('Min: ', img.min())
print('Max: ', img.max())

In [None]:
# helper scale function
def scale(x, feature_range=(-1, 1)):
    ''' Scale takes in an image x and returns that image, scaled
       with a feature_range of pixel values from -1 to 1. 
       This function assumes that the input x is already scaled from 0-255.'''
    
    # scale from 0-1 to feature_range
    min, max = feature_range
    x = x * (max - min) + min
    return x

In [None]:
# scaled range
scaled_img = scale(img)

print('Scaled min: ', scaled_img.min())
print('Scaled max: ', scaled_img.max())

---
# 2. Define the Model

## (1) function for Discriminator

In [None]:
import torch.nn as nn
import torch.nn.functional as F

# helper conv function
def conv(in_channels, out_channels, kernel_size, stride=2, padding=1, batch_norm=True):
    """Creates a convolutional layer, with optional batch normalization.
    """
    layers = []
    conv_layer = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, 
                           kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
    
    layers.append(conv_layer)

    if batch_norm:
        layers.append(nn.BatchNorm2d(out_channels))
    return nn.Sequential(*layers)

## (2) Discriminator

In [None]:
class Discriminator(nn.Module):
    
    def __init__(self, conv_dim=64):
        super(Discriminator, self).__init__()

        # Define all convolutional layers
        # Should accept an RGB image as input and output a single value

        # Convolutional layers, increasing in depth
    
        #input (16,3,128,128)
        self.conv1 =                              
        self.conv2 =                               
        self.conv3 =                               
        self.conv4 =                               
        
        # Classification layer(batchnorm=false)
        self.conv5 = 

    def forward(self, x):
        # relu applied to all conv layers but last
        out = F.relu(self.conv1(x))      #결과 size [16, 64, 64, 64]
        out = F.relu(self.conv2(out))    #[16, 128, 32, 32]
        out = F.relu(self.conv3(out))    #[16, 256, 16, 16]
        out = F.relu(self.conv4(out))    #[16, 512, 8, 8]
        out = self.conv5(out)            #[16, 1, 7, 7]
        return out

## (3) functions/class of Generators

In [None]:
# residual block class
class ResidualBlock(nn.Module):

    def __init__(self, conv_dim):
        super(ResidualBlock, self).__init__()
        # conv_dim = number of inputs
        
        # define two convolutional layers + batch normalization that will act as our residual function, F(x)
        # layers should have the same shape input as output; I suggest a kernel_size of 3
        
        self.conv_layer1 = conv(in_channels=conv_dim, out_channels=conv_dim, 
                                kernel_size=3, stride=1, padding=1, batch_norm=True)
        
        self.conv_layer2 = conv(in_channels=conv_dim, out_channels=conv_dim, 
                               kernel_size=3, stride=1, padding=1, batch_norm=True)
        
    def forward(self, x):

        
        
        return 
            
def deconv(in_channels, out_channels, kernel_size, stride=2, padding=1, batch_norm=True):
    """Creates a transpose convolutional layer, with optional batch normalization.
    """
    
    
    
    return 

## (4) Generator Architecture

In [None]:
class CycleGenerator(nn.Module):
    
    def __init__(self, conv_dim=64, n_res_blocks=6):
        super(CycleGenerator, self).__init__()

        # Encoder 
        
        self.conv1 = conv(3, conv_dim, 4)
        self.conv2 = conv(conv_dim, conv_dim*2, 4)
        self.conv3 = conv(conv_dim*2, conv_dim*4, 4)

        # Resnet part
        res_layers = []

        
        self.res_blocks = 

        
        
        # 3. Decoder
        self.deconv1 = deconv(conv_dim*4, conv_dim*2, 4)
        self.deconv2 = deconv(conv_dim*2, conv_dim, 4)
        self.deconv3 = deconv(conv_dim, 3, 4, batch_norm=False)  # no batch norm on last layer
        
    def forward(self, x):
        """Given an image x, returns a transformed image."""
        # define feedforward behavior, applying activations as necessary

        out = F.relu(self.conv1(x))
        out = F.relu(self.conv2(out))
        out = F.relu(self.conv3(out))

        out = self.res_blocks(out)

        out = F.relu(self.deconv1(out))
        out = F.relu(self.deconv2(out))
        out = F.tanh(self.deconv3(out))           # tanh applied to last layer

        return out

## (5) Create Model

In [None]:
def create_model(g_conv_dim=64, d_conv_dim=64, n_res_blocks=6):
    
    G_XtoY = 
    G_YtoX = 
    D_X = 
    D_Y = 

    if torch.cuda.is_available():   # move models to GPU, if available
        device = torch.device("cuda:0")
        G_XtoY.to(device)
        G_YtoX.to(device)
        D_X.to(device)
        D_Y.to(device)
        print('Models moved to GPU.')
    else:
        print('Only CPU available.')

    return G_XtoY, G_YtoX, D_X, D_Y

In [None]:
G_XtoY, G_YtoX, D_X, D_Y =              # call the function to get models

# 3. Loss

In [None]:
def real_mse_loss(D_out):
    
    return 

def fake_mse_loss(D_out):
    
    return 

def cycle_consistency_loss(real_im, reconstructed_im, lambda_weight):
    
    
    return     


# 4. Optimizers

In [None]:
import torch.optim as optim

# hyperparams for Adam optimizer #gradient의 지수평균 이용
lr=0.0002
beta1=0.5
beta2=0.999 # default value

d_x_optimizer = optim.Adam(D_X.parameters(), lr, [beta1, beta2])
d_y_optimizer = 

g_params = list(G_XtoY.parameters()) + list(G_YtoX.parameters()) 
g_optimizer = 

# 5. Training

In [None]:
# import from other code
from help import save_samples, checkpoint
import time

In [None]:
# train the network

def training_loop(dataloader_X, dataloader_Y, test_dataloader_X, test_dataloader_Y, 
                  n_epochs=1000):
    
    since = time.time()
    print_every=100
    
    losses = []      # keep track of losses over time
    test_iter_X = iter(test_dataloader_X)
    test_iter_Y = iter(test_dataloader_Y)

    fixed_X = test_iter_X.next()[0]      # Get some fixed data from domains X and Y for sampling. These are images that are held
    fixed_Y = test_iter_Y.next()[0]          # constant throughout training, that allow us to inspect the model's performance.
    fixed_X = scale(fixed_X) # make sure to scale to a range -1 to 1
    fixed_Y = scale(fixed_Y)

    iter_X =      
    iter_Y = 
    num_min_cycle= 

    for epoch in range(1, n_epochs+1):

        if  == 0:         
    
    
    
        images_X
        images_Y
        
        
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        images_X = images_X.to(device)
        images_Y = images_Y.to(device)

        # ============================================
        #            TRAIN THE DISCRIMINATORS
        # ============================================

        # 1. D_X, real and fake loss components   ##

        
        D_X_real_loss = 

        D_X_fake_loss = 
        
        d_x_loss = 
        
        d_x_loss.backward()
        d_x_optimizer.step()

        
        # 2. Second: D_Y, real and fake loss components   ##
        
        
        
        
        
        
        
        
 
        # =========================================
        #            TRAIN THE GENERATORS
        # =========================================

        
      
      
        
        
        
        
        
        
        # Print the log info
        if epoch % print_every == 0:
            # append real and fake discriminator losses and the generator loss
            losses.append((d_x_loss.item(), d_y_loss.item(), g_total_loss.item()))
            time_elapsed = time.time() - since
            print('Epoch [{:5d}/{:5d}] | d_X_loss: {:6.4f} | d_Y_loss: {:6.4f} | g_total_loss: {:6.4f} time : {:.0f}m {:.0f}s'.format(
                    epoch, n_epochs, d_x_loss.item(), d_y_loss.item(), g_total_loss.item(),time_elapsed // 60, time_elapsed % 60))


        sample_every=200
        # Save the generated samples
        if epoch % sample_every == 0:
            G_YtoX.eval() # set generators to eval mode for sample generation
            G_XtoY.eval()
            save_samples(epoch, fixed_Y, fixed_X, G_YtoX, G_XtoY, batch_size=16, sample_dir='samples_cyclegan')
            G_YtoX.train()
            G_XtoY.train()


                    # uncomment these lines, if you want to save your model
#         checkpoint_every=1000
#         # Save the model parameters
#         if epoch % checkpoint_every == 0:
#             checkpoint(epoch, G_XtoY, G_YtoX, D_X, D_Y)

    return losses


In [None]:
n_epochs = 6000 # keep this small when testing if a model first works

losses = 

torch.save(D_X.state_dict(),'D_X.pt')
torch.save(D_Y.state_dict(),'D_Y.pt')
torch.save(G_XtoY.state_dict(),'D_XtoY.pt')
torch.save(G_YtoX.state_dict(),'D_YtoX.pt')

# 6. Evaluate the Result with Pretrained model

In [None]:
import matplotlib.image as mpimg

# helper visualization code
def view_samples(iteration, sample_dir='samples_cyclegan'):
    
    # samples are named by iteration
    path_XtoY = os.path.join(sample_dir, 'sample-{:06d}-X-Y.png'.format(iteration))
    path_YtoX = os.path.join(sample_dir, 'sample-{:06d}-Y-X.png'.format(iteration))
    
    # read in those samples
    try: 
        x2y = mpimg.imread(path_XtoY)
        y2x = mpimg.imread(path_YtoX)
    except:
        print('Invalid number of iterations.')
    
    fig, (ax1, ax2) = plt.subplots(figsize=(18,20), nrows=2, ncols=1, sharey=True, sharex=True)
    ax1.imshow(x2y)
    ax1.set_title('X to Y')
    ax2.imshow(y2x)
    ax2.set_title('Y to X')


In [None]:
# view samples at iteration 100
view_samples(100, 'samples_cyclegan')

In [None]:
# view samples at iteration 4000
view_samples(4000, 'samples_cyclegan')

reference : https://github.com/udacity/deep-learning-v2-pytorch