In [None]:
# import argparse
import os
import numpy as np
import math
import itertools

import torchvision.transforms as transforms
from torchvision.utils import save_image

from torch.utils.data import DataLoader
from torchvision import datasets
from torch.autograd import Variable

import torch.nn as nn
import torch.nn.functional as F
import torch
cuda = True if torch.cuda.is_available() else False

# Set random seed for reproducibility
manualSeed = 999
#manualSeed = random.randint(1, 10000) # use if you want new results
print("Random Seed: ", manualSeed)
np.random.seed(manualSeed)
torch.manual_seed(manualSeed)

def gen_noise(n_instance, n_dim):
    return torch.Tensor(np.random.uniform(low=-1.0, high=1.0, size=(n_instance, n_dim)))


Random Seed:  999


In [None]:
# parser = argparse.ArgumentParser()
# parser.add_argument("--n_epochs", type=int, default=200, help="number of epochs of training")
# parser.add_argument("--batch_size", type=int, default=64, help="size of the batches")
# parser.add_argument("--lr", type=float, default=0.0002, help="adam: learning rate")
# parser.add_argument("--b1", type=float, default=0.5, help="adam: decay of first order momentum of gradient")
# parser.add_argument("--b2", type=float, default=0.999, help="adam: decay of first order momentum of gradient")
# parser.add_argument("--n_cpu", type=int, default=8, help="number of cpu threads to use during batch generation")
# parser.add_argument("--latent_dim", type=int, default=62, help="dimensionality of the latent space")
# parser.add_argument("--img_size", type=int, default=32, help="size of each image dimension")
# parser.add_argument("--channels", type=int, default=1, help="number of image channels")
# parser.add_argument("--sample_interval", type=int, default=400, help="interval between image sampling")
# opt = parser.parse_args()

class Object(object):
    pass

opt = Object()
opt.n_epochs = 1000 # number of epochs of training
opt.batch_size = 80 # size of the batches

opt.lr = 0.0002

# ---------------
# Laplacian pyramid
opt.laplacian_fsize = 5
opt.laplacian_sigma = 1.4
opt.n_level = 3 # Laplacian pyramid n_level
opt.dis_lrs = [0.0002, 0.0003, 0.001] # adam: learning rate for each discriminator
opt.gen_lrs = [0.001, 0.005, 0.01] # adam: learning rate for each generators

opt.dis_lrs = [0.0002, 0.0003, 0.0002] # adam: learning rate for each discriminator
opt.gen_lrs = [0.001, 0.005, 0.0002] # adam: learning rate for each generators

opt.n_update_gen = 1 # update generator parameters every n_update_gen epochs
opt.n_update_dis = 1 # update discriminator parameters every n_update_gen epochs
# ---------------

# ---------------
# WGAN GP
opt.lambda_gp = 10
# ---------------

opt.b1 = 0.5 # adam: decay of first order momentum of gradient
opt.b2 = 0.999 # adam: decay of first order momentum of gradient
opt.n_cpu = 8 # number of cpu threads to use during batch generation
opt.latent_dim = 50 # dimensionality of the latent space
opt.img_size = 128 # size of each image dimension
opt.channels = 3 # number of image channels
opt.sample_interval = 100 # interval between image sampling
img_shape = (opt.channels, opt.img_size, opt.img_size)

## Dataloader

In [None]:
imgDir = 'emoji'
# Configure data loader
os.makedirs(imgDir, exist_ok=True)
dataloader = torch.utils.data.DataLoader(
    datasets.ImageFolder(root=imgDir,
      transform=transforms.Compose([
          transforms.Resize(opt.img_size),
          transforms.CenterCrop(opt.img_size),
          transforms.ToTensor(),
          transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
      ])),
    batch_size=opt.batch_size,
    shuffle=True,
)

# Prepare folder for storing results
os.makedirs("images", exist_ok=True)

def sample_image(n_row, batches_done):
    """Saves a grid of generated digits"""
    # Sample noise
    z = Variable(Tensor(np.random.normal(0, 1, (n_row ** 2, opt.latent_dim))))
    gen_imgs = decoder(z)
    save_image(gen_imgs.data, "images/%d.png" % batches_done, nrow=n_row, normalize=True)

def upscale_laplacian(data, n_level, batches_done):
    d = data[-1]
    for i in range(n_level-1):
        d1 = F.interpolate(d, scale_factor=2, mode='bilinear')
        d = d1 + data[n_level-1-i-1]
    save_image(d, "images/{}-upscale.png".format(batches_done, j), nrow=5, normalize=True, scale_each=True)

## Laplacian Pyramid

In [None]:
#
# this function implementation is intentionally provided
#
def gaussian_kernel(fsize, sigma):
    """
    Define a Gaussian kernel

    Args:
        fsize: kernel size
        sigma: deviation of the Guassian

    Returns:
        kernel: (fsize, fsize) Gaussian (normalised) kernel
    """

    _x = _y = (fsize - 1) / 2
    x, y = np.mgrid[-_x:_x + 1, -_y:_y + 1]
    G = np.exp(-0.5 * (x**2 + y**2) / sigma**2)

    return G / G.sum()

def generate_laplacian_pyramid(imgs, n_level, fsize, sigma):
    pyramid = [None for _ in range(n_level)]

    _, c, h, w = imgs.shape

    cur = imgs.type(Tensor)
    padding = fsize // 2
    factor = 2
    for i in range(n_level-1):
        kernel = Tensor(gaussian_kernel(fsize, sigma).astype('f4')).unsqueeze_(0).repeat((c, 1, 1)).reshape((c, 1, fsize, fsize))
        blur = F.conv2d(cur, kernel, groups=c, padding=padding)
        p = cur - blur
        pyramid[i] = p
        cur = cur[:, :, ::factor, ::factor] # downsamplex2
    pyramid[n_level-1] = cur
    return pyramid


# GAN架构定义
这部分定义了`Generator`和`Discriminator`，代码摘自现有的工程实现。

参考[`lapgan.py`](https://github.com/witnessai/LAPGAN/blob/master/mnist/lapgan.py)

## Generator

In [None]:
class LAPGenerator(nn.Module):
    def __init__(self, img_size, condd=False):
        super(LAPGenerator, self).__init__()
        self.img_size = img_size
        self.condd = condd

        # self.l1, self.conv_blocks = self.conv_blocks1()
        self.conv_blocks2()

    def forward(self, z, c=None):
        out = self.l1(z)
        if self.condd:
            c = c.reshape(z.shape[0], opt.channels * (self.img_size//2) ** 2)
            out2 = self.l2(c)
            out = torch.cat((out, out2), 1)
            out = self.l3(out)
        out = out.view(out.shape[0], self.layer1_depth, self.init_size, self.init_size)
        img = self.conv_blocks(out)
        return img

    def conv_blocks1(self):
        # ----------------------
        # adjustable parameters
        self.init_size = opt.img_size // 4
        self.layer1_depth = 512
        self.layer2_depth = 256
        self.layer3_depth = 128
        self.layer4_depth = 64
        # ----------------------

        l1 = nn.Sequential(nn.Linear(opt.latent_dim, self.layer1_depth * self.init_size ** 2))
        # output dim: (self.layer1_depth, self.init_size, self.init_size)
        return l1, nn.Sequential(
            # Layer 1
            nn.Upsample(scale_factor=2, mode='bicubic'),
            nn.LeakyReLU(0.8, inplace=True),
            # output dim: (self.layer1_depth, scale_factor * self.init_size, scale_factor * self.init_size)
            nn.Conv2d(self.layer1_depth, self.layer2_depth, 3, stride=1, padding=1),
            # output dim: (self.layer2_depth, h, w)
            # where h=w=(scale_factor * self.init_size)

            # Layer 2
            nn.BatchNorm2d(self.layer2_depth, 0.8),
            nn.LeakyReLU(0.8, inplace=True),
            nn.Upsample(scale_factor=2, mode='bicubic'),
            # output dim: (self.layer2_depth, 2*h, 2*w)
            nn.Conv2d(self.layer2_depth, self.layer3_depth, 3, stride=1, padding=1),
            # output dim: (self.layer3_depth, 2*h, 2*w)

            # Layer 3
            nn.BatchNorm2d(self.layer3_depth, 0.8),
            nn.LeakyReLU(0.8, inplace=True),
            nn.Conv2d(self.layer3_depth, self.layer4_depth, 3, stride=1, padding=1),
            # output dim: (self.layer4_depth, 2*h, 2*w)

            # Layer 4
            nn.BatchNorm2d(self.layer4_depth, 0.8),
            nn.LeakyReLU(0.8, inplace=True),
            nn.Conv2d(self.layer4_depth, opt.channels, 3, stride=1, padding=1),
            # output dim: (opt.channels, 2*h, 2*w)

            # Output Layer
            nn.Tanh(),
            #nn.Sigmoid(),
        )

    def conv_blocks2(self):
        # ----------------------
        # adjustable parameters
        self.init_size = self.img_size // 4
        self.condition_depth = 1
        self.layer1_depth = 128
        self.layer2_depth = 64
        self.layer3_depth = 256
        self.layer4_depth = 128
        self.layer5_depth = 64
        # ----------------------

        # random noise layer
        if self.condd:
            self.l1 = nn.Sequential(
                nn.Linear(opt.latent_dim, self.condition_depth * self.init_size ** 2),
                nn.BatchNorm1d(self.condition_depth * self.init_size ** 2, 0.8),
                nn.ReLU(),
            )
        else:
            self.l1 = nn.Sequential(nn.Linear(opt.latent_dim, self.layer1_depth * self.init_size ** 2))
        # conditional input layer
        self.l2 = nn.Sequential(
            nn.Linear(opt.channels * (self.img_size//2) ** 2, self.condition_depth * (self.img_size//2) ** 2),
            nn.BatchNorm1d(self.condition_depth * (self.img_size//2) ** 2, 0.8),
            nn.ReLU(),
        )
        # concatenate layer
        feature_size = self.condition_depth * self.init_size ** 2
        if self.condd:
            feature_size += self.condition_depth * (self.img_size//2) ** 2
        self.l3 = nn.Sequential(
            nn.Linear(feature_size, self.layer1_depth * self.init_size ** 2),
            nn.ReLU(),
        )
        # output dim: (self.layer1_depth, self.init_size, self.init_size)
        self.conv_blocks = nn.Sequential(
            # Layer 1
            nn.Dropout2d(),
            #nn.BatchNorm2d(self.layer1_depth, 0.8),
            # nn.LeakyReLU(0.2, inplace=True),
            nn.ReLU(),
            nn.ConvTranspose2d(self.layer1_depth, self.layer2_depth, 4, stride=2, padding=1),
            # output dim: (self.layer2_depth, h, w)
            # where h=w=(self.init_size-1)*stride-2*padding+(kernel_size-1)+1

            # Layer 2
            nn.Dropout2d(),
            nn.BatchNorm2d(self.layer2_depth, 0.8),
            # nn.LeakyReLU(0.2, inplace=True),
            nn.ReLU(),
            # nn.ConvTranspose2d(self.layer2_depth, self.layer3_depth, 4, stride=2, padding=1, bias=False),
            nn.ConvTranspose2d(self.layer2_depth, opt.channels, 4, stride=2, padding=1),
            # output dim: (self.layer3_depth, hh, ww)
            # where hh=ww=(h-1)*stride-2*padding+(kernel_size-1)+1

            # # Layer 3
            # nn.Dropout2d(),
            # nn.BatchNorm2d(self.layer3_depth, 0.8),
            # nn.LeakyReLU(0.2, inplace=True),
            # #nn.ReLU(),
            # nn.Conv2d(self.layer3_depth, self.layer4_depth, 3, stride=1, padding=1, bias=False),
            # # output dim: (self.layer4_depth, hh, ww)

            # # Layer 4
            # nn.Dropout2d(),
            # nn.BatchNorm2d(self.layer4_depth, 0.8),
            # nn.LeakyReLU(0.2, inplace=True),
            # #nn.ReLU(),
            # nn.ConvTranspose2d(self.layer4_depth, opt.channels, 1, stride=1, padding=0, bias=False),
            # # output dim: (opt.channels, hh, ww)

            # # Layer 5
            # nn.BatchNorm2d(self.layer5_depth, 0.8),
            # nn.LeakyReLU(0.2, inplace=True),
            # nn.ConvTranspose2d(self.layer5_depth, opt.channels, 1, stride=1, padding=0),
            # # output dim: (self.layer5_depth, h, w)

            # Output Layer
            nn.Tanh(),
            #nn.Sigmoid(),
        )

## Discriminator



In [None]:
class Discriminator(nn.Module):
    def __init__(self, img_size, condd=False):
        super(Discriminator, self).__init__()
        self.img_size = img_size
        self.condd = condd

        def discriminator_block(in_filters, out_filters, bn=True):
            block = [nn.Conv2d(in_filters, out_filters, 3, 2, 1), nn.LeakyReLU(0.2, inplace=True), nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        in_filters = opt.channels
        if self.condd:
            in_filters += opt.channels
        self.model = nn.Sequential(
            *discriminator_block(in_filters, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )

        # The height and width of downsampled image
        ds_size = self.img_size // 2 ** 4
        # print('ds_size: {}'.format(ds_size))
        self.adv_layer = nn.Linear(128 * ds_size ** 2, 1)


    def forward(self, img, c=None):
        # if self.condd:
        #     out = torch.cat([out1, out2], 1)
        # else:
        #     out = img
        out = img
        out = self.model(out)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)

        return validity

In [None]:
def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.05)
    elif classname.find("BatchNorm2d") != -1:
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)



In [None]:
Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if cuda else torch.LongTensor

# Optimizer

[example](https://github.com/eriklindernoren/PyTorch-GAN/blob/a163b82beff3d01688d8315a3fd39080400e7c01/implementations/lsgan/lsgan.py#L101)

In [None]:
# !!! Minimizes MSE instead of BCE
adversarial_loss = [torch.nn.MSELoss() for i in range(opt.n_level)]
# adversarial_loss = torch.nn.BCELoss()
# adversarial_loss = [torch.nn.BCELoss() for i in range(opt.n_level)]

# Initialize generator and discriminator
generator = [LAPGenerator(opt.img_size//(2**i), i!=opt.n_level-1).apply(weights_init_normal) for i in range(opt.n_level)]
discriminator = [Discriminator(opt.img_size//(2**i), i!=opt.n_level-1).apply(weights_init_normal) for i in range(opt.n_level)]

if cuda:
    for i in range(opt.n_level):
        generator[i].cuda()
        discriminator[i].cuda()
        adversarial_loss[i].cuda()


In [None]:
# Optimizers
optimizer_G = [torch.optim.Adam(generator[i].parameters(), lr=opt.gen_lrs[i], betas=(opt.b1, opt.b2)) for i in range(opt.n_level)]
optimizer_D = [torch.optim.Adam(discriminator[i].parameters(), lr=opt.dis_lrs[i], betas=(opt.b1, opt.b2)) for i in range(opt.n_level)]

In [None]:
def compute_gradient_penalty(D, real_samples, fake_samples):
    """Calculates the gradient penalty loss for WGAN GP"""
    # Random weight term for interpolation between real and fake samples
    alpha = Tensor(np.random.random((real_samples.size(0), 1, 1, 1)))
    # Get random interpolation between real and fake samples
    interpolates = (alpha * real_samples + ((1 - alpha) * fake_samples)).requires_grad_(True)
    d_interpolates = D(interpolates)
    fake = Variable(Tensor(real_samples.shape[0], 1).fill_(1.0), requires_grad=False)
    # Get gradient w.r.t. interpolates
    gradients = torch.autograd.grad(
        outputs=d_interpolates,
        inputs=interpolates,
        grad_outputs=fake,
        create_graph=True,
        retain_graph=True,
        only_inputs=True,
    )[0]
    gradients = gradients.view(gradients.size(0), -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty

# Training

[example](https://github.com/eriklindernoren/PyTorch-GAN/blob/a163b82beff3d01688d8315a3fd39080400e7c01/implementations/lsgan/lsgan.py#L142)

In [None]:
# Commented out IPython magic to ensure Python compatibility.
for epoch in range(opt.n_epochs):
    for i, (imgs, _) in enumerate(dataloader):

        # Adversarial ground truths
        valid = Variable(Tensor(imgs.shape[0], 1).fill_(1.0), requires_grad=False)
        fake = Variable(Tensor(imgs.shape[0], 1).fill_(0.0), requires_grad=False)

        with torch.no_grad():
            # Laplacian Pyramid
            lap_imgs = generate_laplacian_pyramid(imgs, opt.n_level, opt.laplacian_fsize, opt.laplacian_sigma)
            gen_imgs = [None for _ in range(opt.n_level)]

        # Sample noise as generator input
        z = Variable(Tensor(np.random.normal(0, 1, (imgs.shape[0], opt.latent_dim))))

        for j in range(opt.n_level):
        # for j in range(1):
        #     j = opt.n_level - 1
            real_imgs = Variable(lap_imgs[j].type(Tensor))

            # Generate a batch of images
            if j < opt.n_level-1:
                c = Variable(lap_imgs[j+1].type(Tensor))
            else:
                c = None

            # -----------------
            #  Train Generator
            # -----------------

            optimizer_G[j].zero_grad()

            gen_imgs[j] = generator[j](z, c)

            # if epoch % 20 == 0:
            # Loss measures generator's ability to fool the discriminator
            g_loss = adversarial_loss[j](discriminator[j](gen_imgs[j], c), valid)
            # g_loss = adversarial_loss[j](1-discriminator[j](gen_imgs[j], c), fake)
            # lll = torch.nn.BCELoss()
            # g_loss = lll(discriminator[j](gen_imgs[j], c), valid)
          
          
            # fake_validity = discriminator[j](gen_imgs[j], c)
            # g_loss = -torch.mean(fake_validity)
    
            g_loss.backward()
            optimizer_G[j].step()
    
            # ---------------------
            #  Train Discriminator
            # ---------------------
    
            optimizer_D[j].zero_grad()

            # # Real images
            # real_validity = discriminator[j](real_imgs, c)
            # # Fake images
            # fake_validity = discriminator[j](gen_imgs[j].detach(), c)
            # # Gradient penalty
            # gradient_penalty = compute_gradient_penalty(discriminator[j], real_imgs.data, gen_imgs[j].data)
            # # Adversarial loss
            # d_loss = -torch.mean(real_validity) + torch.mean(fake_validity) + opt.lambda_gp * gradient_penalty
    
            # # Measure discriminator's ability to classify real from generated samples
            # lll = torch.nn.BCELoss()
            # real_loss = lll(discriminator[j](real_imgs, c), valid)
            # fake_loss = lll(discriminator[j](gen_imgs[j].detach(), c), fake)
            
            real_loss = adversarial_loss[j](discriminator[j](real_imgs, c), valid)
            fake_loss = adversarial_loss[j](discriminator[j](gen_imgs[j].detach(), c), fake)
            d_loss = 0.5 * real_loss + 0.9 * fake_loss
    
            # adaptive discriminator weight update
            # if d_loss.item()/g_loss.item() > 0.1:
            d_loss.backward()
            optimizer_D[j].step()
    
            print(
                "[Epoch %d/%d] [nlevel %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
                % (epoch+1, opt.n_epochs, j+1, opt.n_level, i+1, len(dataloader), d_loss.item(), g_loss.item())
            )

        with torch.no_grad():
          batches_done = epoch * len(dataloader) + i + 1
          if batches_done % opt.sample_interval == 0:
              data = [gen_imgs[j].data[:25]]
              # data = [real_imgs.data[:25]]
              save_image(data[0], "images/{}-{}.png".format(batches_done, 2), nrow=5, normalize=True, scale_each=True)
              # data = [gen_imgs[j].data[:25] for j in range(opt.n_level)]
              # for j in range(opt.n_level):
              #     save_image(data[j], "images/{}-{}.png".format(batches_done, j), nrow=5, normalize=True, scale_each=True)
              #     save_image(real_imgs.data[:25], "images/%d.png" % batches_done, nrow=5, normalize=True)
              # upscale_laplacian(data, opt.n_level, batches_done)

In [None]:
print(real_imgs.size())
print(gen_imgs[j].size())

In [None]:
# gen_model_image(LapGan_model, n_samples=5)

In [None]:
# torch.save(generator,'LAPGAN.pt')

In [None]:
for j in range(opt.n_level):
  print(type(gen_imgs[j]))

In [None]:
data = [gen_imgs[j].data[:25] for j in range(opt.n_level)]
upscale_laplacian(data, opt.n_level, batches_done)