- Use upsampling and fast neural style techniques

In [None]:
import os
import sys
import time
import copy

import numpy as np
from collections import namedtuple

import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
import torch.backends.cudnn as cudnn

from torch.autograd import Variable
from torch.optim import Adam, RMSprop
from torch.utils.data import DataLoader, Dataset

import torchvision
from torchvision import datasets, transforms, models
import torchvision.utils as vutils

from PIL import Image
import matplotlib
import matplotlib.pyplot as plt

seed = 12345
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
matplotlib.rc('figure', figsize=(12, 12))

use_cuda = torch.cuda.is_available()
use_cuda

In [None]:
class SimpleImageDataset(Dataset):
    
    def __init__(self, img_dir, transform):
        super(SimpleImageDataset, self).__init__()
        
        self.image_fnames = [os.path.join(img_dir, f) for f in os.listdir(img_dir)]
        self.transform = transform
    
    def __getitem__(self, index):
        img = Image.open(self.image_fnames[index]).convert('RGB')
        return self.transform(img)
    
    def __len__(self):
        return len(self.image_fnames)

In [None]:
# cudnn.benchmark = True
image_size = 64
batch_size = 32
z_dim = 100
train_dir_name = '/home/samir/Downloads/ILSVRC2012_img_train/train'
test_dir_name = '/home/samir/Downloads/ILSVRC2012_img_train/valid'

train_transform = transforms.Compose([
    transforms.Scale(image_size),
    transforms.CenterCrop(image_size),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
#     transforms.Lambda(lambda x: x.mul(255))
])

test_transform = transforms.Compose([
    transforms.Scale(image_size),
    transforms.CenterCrop(image_size),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
#     transforms.Lambda(lambda x: x.mul(255))
])

train_dataset = SimpleImageDataset(train_dir_name, train_transform)
test_dataset = SimpleImageDataset(test_dir_name, test_transform)

train_data_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=7,
    pin_memory=True)

test_data_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=2,
    pin_memory=True)

In [None]:
class DCGAN_D(nn.Module):
    def __init__(self, img_size):
        super(DCGAN_D, self).__init__()
        
        channel_size = img_size / 2
        n_filters = 64
        
        model = nn.Sequential()
        model.add_module(
            'conv1', nn.Conv2d(3, n_filters, 4, stride=2, padding=1, bias=False))
        model.add_module(
            'relu1', nn.LeakyReLU(0.2, inplace=True))
        
        # extra layers
        extra_layers = 0 # add more layers?
        for i in range(extra_layers):
            model.add_module('extra-layer-%d-conv' % i,
                nn.Conv2d(n_filters, n_filters, 3, 1, 1, bias=False))
            model.add_module('extra-layer-%d-batchnorm' % i,
                nn.BatchNorm2d(n_filters))
            model.add_module('extra-layer-%d-relu' % i,
                nn.LeakyReLU(0.2, inplace=True))
        
        # pyramid
        current_n_channels = n_filters
        while channel_size > 4:
            
            in_c = current_n_channels
            out_c = current_n_channels * 2
            
            model.add_module('pyramid-%d-%d-conv' % (in_c, out_c),
                nn.Conv2d(in_c, out_c, 4, 2, 1, bias=False))
            model.add_module('pyramid-%d-batchnorm' % out_c,
                nn.BatchNorm2d(out_c))
            model.add_module('pyramid-%d-relu' % out_c,
                nn.LeakyReLU(0.2, inplace=True))
            
            current_n_channels *= 2
            channel_size /= 2
        
        # final layer outputs a single value
        model.add_module('final-%d-1' % current_n_channels,
            nn.Conv2d(current_n_channels, 1, 4, 1, bias=False))
        
        self.model = model
    
    def forward(self, x):
        out = self.model(x)
        out = out.mean(0)
        return out.view(1)


class DCGAN_G(nn.Module):
    def __init__(self, img_size, z_dim):
        super(DCGAN_G, self).__init__()
        
        n_filters = 32
        s = 4
        while s != img_size:
            n_filters *= 2
            s *= 2
        
        model = nn.Sequential()
        model.add_module('deconv1',
            nn.ConvTranspose2d(z_dim, n_filters, 4, stride=1, bias=False))
        model.add_module('batchnorm1', nn.BatchNorm2d(n_filters))
        model.add_module('relu1', nn.ReLU(inplace=True))
        
        # pyramid
        channel_size = 4
        while channel_size < img_size // 2:
            
            in_c = n_filters
            out_c = n_filters // 2
            
            model.add_module('pyramid-%d-%d-deconv' % (in_c, out_c),
                nn.ConvTranspose2d(in_c, out_c, 4, 2, 1, bias=False))
            model.add_module('pyramid-%d-batchnorm' % out_c,
                nn.BatchNorm2d(out_c))
            model.add_module('pyramid-%d-relu' % out_c,
                nn.ReLU(inplace=True))
            
            n_filters = n_filters // 2
            channel_size *= 2
        
        # extra layers
        extra_layers = 0
        for i in range(extra_layers):
            model.add_module('extra-layer-%d-conv' % i,
                nn.Conv2d(n_filters, n_filters, 3, 1, 1, bias=False))
            model.add_module('extra-layer-%d-batchnorm' % i,
                nn.BatchNorm2d(n_filters))
            model.add_module('extra-layer-%d-relu' % i,
                nn.ReLU(inplace=True))
        
        # final layer outputs a generated image
        model.add_module('final-deconv',
            nn.ConvTranspose2d(n_filters, 3, 4, 2, 1, bias=False))
        model.add_module('final-tanh', nn.Tanh())
        
        self.model = model
    
    def forward(self, x):
        return self.model(x)

In [None]:
netG = DCGAN_G(image_size, z_dim).cuda()
netD = DCGAN_D(image_size).cuda()

In [None]:
def weights_init(model):
    classname = model.__class__.__name__
    if classname.find('Conv') != -1:
        model.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        model.weight.data.normal_(1.0, 0.02)
        model.bias.data.fill_(0)

netG.apply(weights_init), netD.apply(weights_init)

In [None]:
# create memory placeholders (manual pinned memory)
input_batch = torch.FloatTensor(batch_size, 3, image_size, image_size).cuda()
noise_batch = torch.FloatTensor(batch_size, z_dim, 1, 1).cuda()
fixed_noise_batch = torch.FloatTensor(batch_size, z_dim, 1, 1).normal_(0,1).cuda()
one = torch.FloatTensor([1]).cuda()
minus_one = -1 * one

# optimiserD = Adam(netD.parameters(), lr=5e-5, betas=(0.5, 0.999))
optimiserD = RMSprop(netD.parameters(), lr=5e-5)

# optimiserG = Adam(netG.parameters(), lr=5e-5, betas=(0.5, 0.999))
optimiserG = RMSprop(netG.parameters(), lr=5e-5)

In [None]:
experiment_name = 'new-run-experiment'
epochs = 25
G_iterations = 0

for epoch in range(epochs):
    
    data_generator = iter(train_data_loader)
    
    i = 0
    while i < len(train_data_loader):
        
        ###
        # train discriminator (D)
        ###
        
        # set discrimnator to trainable
        for param in netD.parameters():
            param.requires_grad = True
        
        # if starting or every now and then, increase D's iterations
        D_iterations = 5
        if G_iterations < 25 or G_iterations % 500 == 0:
            D_iterations = 100
        
        j = 0
        while j < D_iterations and i < len(train_data_loader):
            j += 1
            
            # clamp parameters
            for param in netD.parameters():
                param.data.clamp_(-0.01, 0.01)
            
            # load next batch of real data
            real_batch = next(data_generator)
            i += 1
            
            batch_size = real_batch.size(0)
            real_batch = real_batch.cuda()
            input_batch.resize_as_(real_batch).copy_(real_batch)
            input_batch_var = Variable(input_batch)
            
            # use D to output error of predicting real
            netD.zero_grad()
            D_real_error = netD(input_batch_var)
            D_real_error.backward(one)
            
            # sample a batch of fake data from a noisy normal distribution
            noise_batch.resize_(batch_size, z_dim, 1, 1).normal_(0,1) # shouldnt this be fixed?
            noise_var = Variable(noise_batch, volatile=True)
            
            # use noise as input to generator to generate images
            fake_batch = Variable(netG(noise_var).data)
            input_batch_var = fake_batch
            
            # use D to output error of predicting fake
            D_fake_error = netD(input_batch_var)
            D_fake_error.backward(minus_one)
            
            # optimise D
            D_error = D_real_error - D_fake_error
            optimiserD.step()
            
        ###
        # train generator (G)
        ###
        
        # stop training discriminator
        for param in netD.parameters():
            param.required_grad = False
        
        netG.zero_grad()
        
        # sample a batch of fake data from a noisy normal distribution
        noise_batch.resize_(batch_size, z_dim, 1, 1).normal_(0,1) # shouldnt this be fixed?
        noise_var = Variable(noise_batch)
        
        # use noise as input to generator to generate images
        fake_batch = netG(noise_var)
        
        # use discrimantor's output to train generator
        G_error = netD(fake_batch)
        G_error.backward(one)
        optimiserG.step()
        
        G_iterations += 1
        
        ###
        # log
        ###
        
        print('[%d/%d][%d/%d][%d] Loss_D: %f Loss_G: %f Loss_D_real: %f Loss_D_fake %f'
            % (epoch, epochs, i, len(train_data_loader), G_iterations,
               D_error.data[0], G_error.data[0],
               D_real_error.data[0], D_fake_error.data[0]))
        
        if G_iterations % 500 == 0:
            
            # save some real images
            real_batch = real_batch.mul(0.5).add(0.5)
            vutils.save_image(real_batch, '{}/real_samples.png'.format(experiment_name))
            
            # save some generated images
            fake_batch = netG(Variable(fixed_noise_batch, volatile=True))
            fake_batch.data = fake_batch.data.mul(0.5).add(0.5)
            vutils.save_image(
                fake_batch.data,
                '{}/fake_samples_{}.png'.format(experiment_name, G_iterations))
    
    # save models
    torch.save(netG.state_dict(),
               '{0}/netG_epoch_{1}.pth'.format(experiment_name, epoch))
    torch.save(netD.state_dict(),
               '{0}/netD_epoch_{1}.pth'.format(experiment_name, epoch))