

# CS 547 Deep learning Final Report: CycleGAN Code Notebook Version
### Group Member: Chuqiao Shi, JOsh Vita, Manish Shanka, Tim Murry



This is the CycleGAN code in a Jupyter Notebook. This can be easily 
run on the Google Colab Free GPU

In [0]:
#check the GPU type in the colab, noramlly the Tesla P100 is the best one
!nvidia-smi

## Import packages and global variables

Connect the files in the Google Drive

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Import necessary packages

In [0]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.autograd import Variable, grad
from torch.utils.data import Dataset
import os
import matplotlib.pyplot as plt
import time
import copy

In [0]:
num_epochs = 150
batch_size = 16

## Preparing Data

Define the dataloader to input training data

In [0]:
class cycle_data(Dataset):

    def __init__(self, path, train):

        self.paintings = []
        self.photos = []

        if train:
            train_str = 'train'
        else:
            train_str = 'test'

        # Training the cycleGAN for painting<->photo transformation, 
        # A is for paintings and B is for photos

        painting_path = path + train_str + 'A/'
        photo_path = path + train_str + 'B/'

        print(painting_path, len(os.listdir(painting_path)))
        print(photo_path, len(os.listdir(photo_path)))

        for painting_name in os.listdir(painting_path):
            self.paintings.append(painting_path + painting_name)

        for photo_name in os.listdir(photo_path):
            self.photos.append(photo_path + photo_name)

        self.paintings_size = len(self.paintings)
        self.photos_size = len(self.photos)

        self.size = max(self.paintings_size, self.photos_size)
        # self.size = min(self.paintings_size, self.photos_size)

    def __len__(self):
        return self.size

    def __getitem__(self, idx):

        # TODO: handle the fact that paintings and photos aren't same length
        # Random pick up the photo & image pairs
        x_idx = idx % self.paintings_size
        y_idx = np.random.randint(0, self.photos_size - 1)

        x ,y = plt.imread(self.paintings[x_idx]), plt.imread(self.photos[y_idx])
        x = (x-np.min(x))/np.ptp(x)
        y = (y-np.min(y))/np.ptp(y)

        x = np.moveaxis(x,(0,1,2),(1,2,0))
        y = np.moveaxis(y,(0,1,2),(1,2,0))
        return x,y




Sample fake images from the image pool, this class can save the memory during training the discriminator.

In [0]:
class Sample_from_Pool(object):
    def __init__(self, max_elements=50):
        self.max_elements = max_elements
        self.cur_elements = 0
        self.items = []

    def __call__(self, in_items):
        
        return_items = []
        for in_item in in_items:
            # If there are under 50 images in the image pool, add new images in the pool
            if self.cur_elements < self.max_elements:
                self.items.append(in_item)
                self.cur_elements = self.cur_elements + 1
                return_items.append(in_item)
            # If there are moe than 50 images, randomly sample images to replce the fake images
            # from the generator
            else:
                if np.random.ranf() > 0.5:
                    idx = np.random.randint(0, self.max_elements)
                    tmp = copy.copy(self.items[idx])
                    self.items[idx] = in_item
                    return_items.append(tmp)
                else:
                    return_items.append(in_item)
        return return_items

Data Transforms

In [0]:

transform_train = transforms.Compose([
    
    
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0, 0, 0), (1, 1, 1))
])

transform_test = transforms.Compose([
    
    transforms.ToTensor(),
    transforms.Normalize((0, 0, 0), (1, 1, 1)),
])

Define the data loaders

In [0]:
trainset = cycle_data('/content/drive/My Drive/ukiyoe2photo/ukiyoe2photo/', train=True)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=8)

testset = cycle_data('/content/drive/My Drive/ukiyoe2photo/ukiyoe2photo/', train=False)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=8)

## Define the CycleGAN Structures


CycleGAN includes a generator and a discriminator. The generator is an auto-encoder convolutional network with Res blocks in the middle to generate fake images. The discriminator includes the convolution layers and fully connective layers to indentify fake and real images

### Define the generator

In [0]:
class Encoder(nn.Module):

    def __init__(self):
        super(Encoder, self).__init__()
        self.conv1 = nn.Conv2d(3,32,3,padding=1)
        self.norm1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32,64,3,padding=1)
        self.norm2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64,128,3,padding=1)
        self.norm3 = nn.BatchNorm2d(128)
        
        self.lru = nn.LeakyReLU()
        self.maxpool = nn.MaxPool2d(2,2)

    def forward(self,x):

        x = self.lru(self.norm1(self.conv1(x)))
        x = self.maxpool(x)
        x = self.lru(self.norm2(self.conv2(x)))
        x = self.maxpool(x)
        x = self.lru(self.norm3(self.conv3(x)))
        x = self.maxpool(x)
        
        
        return x

In [0]:
class ResBlock(nn.Module):
    def __init__(self):
        super(ResBlock, self).__init__()
        self.conv4 = nn.Conv2d(128,128,3,padding=1)
        self.lru = nn.LeakyReLU()
        self.norm3 = nn.BatchNorm2d(128)

    def forward(self,x):

        temp_in = x
        x = self.lru(self.norm3(self.conv4(x)))
        x = self.norm3(self.conv4(x))

        return x + temp_in
    
    

In [0]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()

        
        self.conv6 = nn.Sequential(
            nn.Upsample(scale_factor = 2, mode='bilinear'),
            nn.ReflectionPad2d(1),
            nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=0)
        )
        self.conv7 = nn.Sequential(
            nn.Upsample(scale_factor = 2, mode='bilinear'),
            nn.ReflectionPad2d(1),
            nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=0)
        )
        self.conv8 = nn.Sequential(
            nn.Upsample(scale_factor = 2, mode='bilinear'),
            nn.ReflectionPad2d(1),
            nn.Conv2d(32, 3, kernel_size=3, stride=1, padding=0)
        )
       
        self.norm1 = nn.BatchNorm2d(64)
        self.norm0 = nn.BatchNorm2d(32)
        self.lru = nn.LeakyReLU()

    def forward(self,x):

        x = self.lru(self.norm1(self.conv6(x)))
        x = self.lru(self.norm0(self.conv7(x)))
        x = self.conv8(x)

        return x

In [0]:
class generator(nn.Module):
    def __init__(self,Encoder,ResBlock,Decoder):
        super(generator, self).__init__()

        self.encoder = Encoder()
        self.resblock = ResBlock()
        self.decoder = Decoder()
        


    def forward(self, x):

        x = self.encoder(x)
        for i in range(5):
            x = self.resblock(x)
        x = self.decoder(x)

        return x

### Define the discriminator

In [0]:
class discriminator(nn.Module):
    def __init__(self):
        super(discriminator, self).__init__()

        self.conv1 = nn.Conv2d(3,32,3,padding = 1)#128
        self.norm1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32,64,3,padding = 1)#64
        self.norm2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64,128,3,padding = 1)#16
        self.norm3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128,256,3,padding = 1)#4
        self.norm4 = nn.BatchNorm2d(256)
        self.conv5 = nn.Conv2d(256,512,3,padding = 1)#1
        self.norm5 = nn.BatchNorm2d(512)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.maxpool1 = nn.MaxPool2d(2,2)
        self.maxpool2 = nn.MaxPool2d(4,4)
        self.fc = nn.Linear(512,1)
        self.softmax = nn.Sigmoid()

    def forward(self,x):

        x = self.relu(self.norm1(self.conv1(x)))
        x = self.maxpool1(x)
        x = self.relu(self.norm2(self.conv2(x)))
        x = self.maxpool1(x)
        x = self.relu(self.norm3(self.conv3(x)))
        x = self.maxpool2(x)
        x = self.relu(self.norm4(self.conv4(x)))
        x = self.maxpool2(x)
        x = self.relu(self.norm5(self.conv5(x)))
        x = self.maxpool2(x)
        x = x.view(-1,512)
        x = self.fc(x)
        #x = self.softmax(x)

        return x

### CUDA version for the nets

In [0]:
G_x2y,G_y2x = generator(Encoder,ResBlock,Decoder).cuda(), generator(Encoder,ResBlock,Decoder).cuda()

D_x, D_y = discriminator().cuda(), discriminator().cuda()

## Define the loss and optimizer

Gradient penalty for the wgan loss

In [0]:
def calc_gradient_penalty(netD, real_data, fake_data, wgan_lambda, batch_size):
    alpha = torch.rand(batch_size, 1)
    alpha = alpha.expand(batch_size, int(real_data.nelement()/batch_size)).contiguous()
    alpha = alpha.view(batch_size, 3, real_data.shape[2], real_data.shape[3])
    alpha = alpha.cuda()

    fake_data = fake_data.view(batch_size, 3, fake_data.shape[2], fake_data.shape[3])
    interpolates = alpha * real_data.detach() + ((1 - alpha) * fake_data.detach())

    interpolates = interpolates.cuda()
    interpolates.requires_grad_(True)

    disc_interpolates  = netD(interpolates)

    gradients = grad(outputs=disc_interpolates, inputs=interpolates,
                              grad_outputs=torch.ones(disc_interpolates.size()).cuda(),
                              create_graph=True, retain_graph=True, only_inputs=True)[0]

    gradients = gradients.view(gradients.size(0), -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean() * wgan_lambda

    return gradient_penalty

Optimizers for each net

In [0]:
optimizer_G_x2y = torch.optim.Adam(G_x2y.parameters(), lr=0.0001, betas=(0,0.9))
optimizer_G_y2x = torch.optim.Adam(G_y2x.parameters(), lr=0.0001, betas=(0,0.9))
optimizer_D_x = torch.optim.Adam(D_x.parameters(), lr=0.0001, betas=(0,0.9))
optimizer_D_y = torch.optim.Adam(D_y.parameters(), lr=0.0001, betas=(0,0.9))

Loss functions

In [0]:
criterion_image = nn.L1Loss()
criterion_type = nn.L1Loss()
criterion_identity = nn.L1Loss()

## Training

In [0]:
#Define the fake images pools
x_fake_sample = Sample_from_Pool()
y_fake_sample = Sample_from_Pool()
#Record the loss for each epoch
temp_G_list = []
temp_D_X_list = []
temp_D_Y_list = []

for epoch in range(num_epochs):

    start_time = time.time()
    temp_G = 0
    temp_D_X = 0
    temp_D_Y = 0

    for batch_idx, (X_real, Y_real) in enumerate(trainloader):
        # X=paintings, Y=photos
        temp_G = 0
        temp_D_X = 0
        temp_D_Y = 0

        G_x2y.train()
        G_y2x.train()
        D_x.train()
        D_y.train()

        if(Y_real.shape[0] < batch_size):
            continue

        G_x2y.zero_grad()
        G_y2x.zero_grad()
        #print(type(Y_real))
        X_real, Y_real = Variable(X_real.float()).cuda(), Variable(Y_real.float()).cuda()
        #print(type(Y_real))
        X_fake = G_y2x(Y_real)      # real photos -> fake paintings
        Y_fake = G_x2y(X_real)      # real paintings -> fake photos
        X_cycle = G_y2x(Y_fake)     # fake photos -> real paintings
        Y_cycle = G_x2y(X_fake)     # fake paintings -> real photos 

        D_X_real = D_x(X_real)      # predicted labels for real paintings
        D_Y_real = D_y(Y_real)      # predicted labels for real photos
        D_X_fake = D_x(X_fake)      # predicted labels for fake paintings
        D_Y_fake = D_y(Y_fake)      # predicted labels for fake photos


        real_label = Variable(torch.ones(D_Y_fake.size())).cuda()       #ones
        fake_label = Variable(torch.zeros(D_Y_fake.size())).cuda()      #zeros

        loss_cycle = criterion_image(X_real,X_cycle)+criterion_image(Y_real,Y_cycle)
        real_label = Variable(torch.ones(D_Y_fake.size())).cuda()
        loss_G_X2Y = criterion_type(D_Y_fake,real_label)
        loss_G_Y2X = criterion_type(D_X_fake,real_label)
        
        loss_idt_A = 0
        loss_idt_B = 0

        # identity loss values taken from cyclegan defaults in the paper
        lambda_identity_loss = 0.5
        lambda_y = 10.0
        lambda_x = 10.0

        if lambda_identity_loss > 0:
            I_x = G_x2y(Y_real)
            I_y = G_y2x(X_real)

            loss_idt_A = criterion_identity(I_x,Y_real) * lambda_y * lambda_identity_loss
            loss_idt_B = criterion_identity(I_y,X_real) * lambda_x * lambda_identity_loss

        # Total generator loss 
        G_loss = loss_G_X2Y + loss_G_Y2X + 10*loss_cycle + loss_idt_A + loss_idt_B
        temp_G += G_loss.item()
        G_loss.backward()

        optimizer_G_x2y.step()
        optimizer_G_y2x.step()
        
        # Get fake images from the images pool
        X_fake = Variable(torch.Tensor(x_fake_sample([X_fake.cpu().data.numpy()])[0])).cuda()
        Y_fake = Variable(torch.Tensor(y_fake_sample([Y_fake.cpu().data.numpy()])[0])).cuda()

        D_x.zero_grad()
        D_y.zero_grad()

        D_X_real = D_x(X_real)          # identify true paintings
        D_Y_real = D_y(Y_real)          # identify true photos
        D_X_fake = D_x(X_fake_samp)     # identify fake paintings
        D_Y_fake = D_y(Y_fake_samp)     # identfiy fake photos

        real_label = Variable(torch.ones(D_Y_fake.size())).cuda()
        fake_label = Variable(torch.zeros(D_Y_fake.size())).cuda()

        # wgan loss from hw7
        wgan_loss_d_x = 0
        wgan_loss_d_y = 0
        wgan_lambda = 10
        if wgan_lambda > 0:
            wgan_loss_d_x = calc_gradient_penalty(D_x, Y_real, Y_fake, wgan_lambda, batch_size)
            wgan_loss_d_y = calc_gradient_penalty(D_y, X_real, X_fake, wgan_lambda, batch_size)

        D_X_loss = criterion_type(D_X_fake,fake_label)+criterion_type(D_X_real,real_label) + wgan_loss_d_x
        D_Y_loss = criterion_type(D_Y_fake,fake_label)+criterion_type(D_Y_real,real_label) + wgan_loss_d_y

        D_X_loss.backward()
        temp_D_X += D_X_loss.item()
        optimizer_D_x.step()

        D_Y_loss.backward()
        temp_D_Y += D_Y_loss.item()
        optimizer_D_y.step()

        
        
    print(temp_G/batch_idx,temp_D_X/batch_idx,temp_D_Y/batch_idx)
    temp_G_list.append(temp_G/batch_idx)
    temp_D_X_list.append(temp_D_X/batch_idx)
    temp_D_Y_list.append(temp_D_Y/batch_idx)
    print('----EPOCH{} FINISHED-----'.format(epoch))

    # output smaple images and save models for each 5 epochs
    if epoch % 5 == 0:
        im = plt.imread('/content/drive/My Drive/research/monet2photo/Test/monet/00010.jpg')
        im = (im-np.min(im))/np.ptp(im)
        input_im_holder = np.zeros((1,3,256,256))
        im = np.moveaxis(im,(0,1,2),(1,2,0))
        input_im_holder[0,:,:,:] = im
        input_im = torch.from_numpy(input_im_holder).float().cuda()
        out_im = G_x2y(input_im)
        out_im = out_im.cpu().data.numpy()[0,:,:,:]
        out_im = np.moveaxis(out_im,(0,1,2),(2,0,1))

        out_im = (out_im-np.min(out_im))/np.ptp(out_im)

        plt.imshow(out_im)
        plt.show()
        torch.save(G_x2y,"/content/drive/My Drive/research/monet2photo/Test/models/G_x2y_{}.model".format(epoch))
        torch.save(G_y2x,"/content/drive/My Drive/research/monet2photo/Test/models/G_y2x_{}.model".format(epoch))




        

## Saving

Save the model

In [0]:
G_x2y = torch.load("/content/drive/My Drive/research/monet2photo/Test/models/G_x2y_105.model")
G_y2x = torch.load("/content/drive/My Drive/research/monet2photo/Test/models/G_y2x_75.model")

Save painting -> photo results

In [0]:
import os

monet_image = os.listdir('/content/drive/My Drive/research/monet2photo/Test/monet/')
path = '/content/drive/My Drive/research/monet2photo/Test/monet/'
for name in monet_image:
    im = plt.imread(path+name)
    im = (im-np.min(im))/np.ptp(im)
    input_im_holder = np.zeros((1,3,256,256))
    im = np.moveaxis(im,(0,1,2),(1,2,0))
    input_im_holder[0,:,:,:] = im
    input_im = torch.from_numpy(input_im_holder).float().cuda()
    out_im = G_x2y(input_im)
    out_im = out_im.cpu().data.numpy()[0,:,:,:]
    out_im = np.moveaxis(out_im,(0,1,2),(2,0,1))

    out_im = (out_im-np.min(out_im))/np.ptp(out_im)

    plt.imsave('/content/drive/My Drive/research/monet2photo/Test/'+'monet_to_photo/'+name,out_im)

Save photo->image results

In [0]:


monet_image = os.listdir('/content/drive/My Drive/research/monet2photo/Test/photo/')
path = '/content/drive/My Drive/research/monet2photo/Test/photo/'
for name in monet_image:
    im = plt.imread(path+name)
    im = (im-np.min(im))/np.ptp(im)
    input_im_holder = np.zeros((1,3,256,256))
    im = np.moveaxis(im,(0,1,2),(1,2,0))
    input_im_holder[0,:,:,:] = im
    input_im = torch.from_numpy(input_im_holder).float().cuda()
    out_im = G_y2x(input_im)
    out_im = out_im.cpu().data.numpy()[0,:,:,:]
    out_im = np.moveaxis(out_im,(0,1,2),(2,0,1))

    out_im = (out_im-np.min(out_im))/np.ptp(out_im)

    plt.imsave('/content/drive/My Drive/research/monet2photo/Test/'+'photo_to_monet/'+'75'+name,out_im)

  "See the documentation of nn.Upsample for details.".format(mode))
