# Wasserstein GAN with Gradient Penalty (WGAN-GP)

### 목표
이 노트북에서는 지금까지 사용해 온 GAN의 안정성 문제 중 일부를 해결하는 Gradient Penalty 포함하는 Wasserstein GAN(WGAN-GP)을 구축할 것입니다. 특히, W-손실로 알려진 특별한 종류의 손실 함수를 사용할 것입니다. 여기서 W는 Wasserstein을 나타내고 기울기 페널티는 모드 붕괴를 방지합니다.

*재미있는 사실: Wasserstein은 Penn State의 수학자 Leonid Vaseršteĭn의 이름을 따서 명명되었습니다. W로 축약된 것을 볼 수 있습니다(예: WGAN, W-손실, W-거리).*

### 학습 목표
1. 보다 안정적인 GAN: Wasserstein GAN with Gradient Penalty(WGAN-GP)를 구축하는 실습 경험을 얻습니다.
2. 좀 더 고급의 WGAN-GP 모델을 훈련합니다.

## 생성자 및 평가자

몇 가지 유용한 패키지 가져오기, 시각화 함수 정의, 생성자 빌드 및 평가자 빌드로 시작합니다. WGAN-GP의 변경 사항은 학습 중에 손실 함수에 적용되므로 생성자 및 평가자 클래스에 이전 GAN 코드를 재사용 할 수 있습니다. WGAN-GP에서는 더 이상 가짜와 실제를 0과 1로 분류하는 판별기를 사용하지 않고 실제 숫자로 이미지를 채점하는 평가자를 사용합니다.

#### 패키지와 시각화

In [None]:
import torch
from torch import nn
from tqdm.auto import tqdm
import torchvision
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
torch.manual_seed(0) # Set for testing purposes, please do not change!

def show_tensor_images(image_tensor, num_images=25, size=(1, 28, 28)):
    '''
    Function for visualizing images: Given a tensor of images, number of images, and
    size per image, plots and prints the images in an uniform grid.
    '''
    #image_tensor = (image_tensor + 1) / 2
    image_unflat = image_tensor.detach().cpu()
    image_grid = make_grid(image_unflat[:num_images], nrow=5)
    plt.imshow(image_grid.permute(1, 2, 0).squeeze())
    plt.show()

#### 생성자와 잡음

In [None]:
class Generator(nn.Module):
    '''
    Generator Class
    Values:
        z_dim: the dimension of the noise vector, a scalar
        im_chan: the number of channels of the output image, a scalar
              (MNIST is black-and-white, so 1 channel is your default)
        hidden_dim: the inner dimension, a scalar
    '''
    def __init__(self, z_dim=10, im_chan=1, hidden_dim=64):
        super(Generator, self).__init__()
        self.z_dim = z_dim
        # Build the neural network
        self.gen = nn.Sequential(
            self.make_gen_block(z_dim, hidden_dim * 4),
            self.make_gen_block(hidden_dim * 4, hidden_dim * 2, kernel_size=4, stride=1),
            self.make_gen_block(hidden_dim * 2, hidden_dim),
            self.make_gen_block(hidden_dim, im_chan, kernel_size=4, final_layer=True),
        )

    def make_gen_block(self, input_channels, output_channels, kernel_size=3, stride=2, final_layer=False):
        '''
        Function to return a sequence of operations corresponding to a generator block of DCGAN;
        a transposed convolution, a batchnorm (except in the final layer), and an activation.
        Parameters:
            input_channels: how many channels the input feature representation has
            output_channels: how many channels the output feature representation should have
            kernel_size: the size of each convolutional filter, equivalent to (kernel_size, kernel_size)
            stride: the stride of the convolution
            final_layer: a boolean, true if it is the final layer and false otherwise 
                      (affects activation and batchnorm)
        '''
        if not final_layer:
            return nn.Sequential(
                nn.ConvTranspose2d(input_channels, output_channels, kernel_size, stride),
                nn.BatchNorm2d(output_channels),
                nn.ReLU(inplace=True),
            )
        else:
            return nn.Sequential(
                nn.ConvTranspose2d(input_channels, output_channels, kernel_size, stride),
                nn.Tanh(),
            )

    def forward(self, noise):
        '''
        Function for completing a forward pass of the generator: Given a noise tensor,
        returns generated images.
        Parameters:
            noise: a noise tensor with dimensions (n_samples, z_dim)
        '''
        x = noise.view(len(noise), self.z_dim, 1, 1)
        return self.gen(x)

def get_noise(n_samples, z_dim, device='cpu'):
    '''
    Function for creating noise vectors: Given the dimensions (n_samples, z_dim)
    creates a tensor of that shape filled with random numbers from the normal distribution.
    Parameters:
      n_samples: the number of samples to generate, a scalar
      z_dim: the dimension of the noise vector, a scalar
      device: the device type
    '''
    return torch.randn(n_samples, z_dim, device=device)

#### 평가자(Critic)

In [None]:
class Critic(nn.Module):
    '''
    Critic Class
    Values:
        im_chan: the number of channels of the output image, a scalar
              (MNIST is black-and-white, so 1 channel is your default)
        hidden_dim: the inner dimension, a scalar
    '''
    def __init__(self, im_chan=1, hidden_dim=64):
        super(Critic, self).__init__()
        self.crit = nn.Sequential(
            self.make_crit_block(im_chan, hidden_dim),
            self.make_crit_block(hidden_dim, hidden_dim * 2),
            self.make_crit_block(hidden_dim * 2, 1, final_layer=True),
        )

    def make_crit_block(self, input_channels, output_channels, kernel_size=4, stride=2, final_layer=False):
        '''
        Function to return a sequence of operations corresponding to a critic block of DCGAN;
        a convolution, a batchnorm (except in the final layer), and an activation (except in the final layer).
        Parameters:
            input_channels: how many channels the input feature representation has
            output_channels: how many channels the output feature representation should have
            kernel_size: the size of each convolutional filter, equivalent to (kernel_size, kernel_size)
            stride: the stride of the convolution
            final_layer: a boolean, true if it is the final layer and false otherwise 
                      (affects activation and batchnorm)
        '''
        if not final_layer:
            return nn.Sequential(
                nn.Conv2d(input_channels, output_channels, kernel_size, stride),
                nn.BatchNorm2d(output_channels),
                nn.LeakyReLU(0.2, inplace=True),
            )
        else:
            return nn.Sequential(
                nn.Conv2d(input_channels, output_channels, kernel_size, stride),
            )

    def forward(self, image):
        '''
        Function for completing a forward pass of the critic: Given an image tensor, 
        returns a 1-dimension tensor representing fake/real.
        Parameters:
            image: a flattened image tensor with dimension (im_chan)
        '''
        crit_pred = self.crit(image)
        return crit_pred.view(len(crit_pred), -1)

## 훈련 초기화
이제 모든 것을 함께 시작할 수 있습니다.
평소와 같이 매개 변수를 설정하여 시작합니다.
   * n_epochs : 훈련 할 때 전체 데이터 세트를 반복하는 횟수
   * z_dim : 노이즈 벡터의 차원
   * display_step : 이미지를 표시 / 시각화하는 빈도
   * batch_size : 정방향 / 역방향 패스 당 이미지 수
   * lr : 학습률
   * beta_1, beta_2 : 모멘텀 매개변수
   * c_lambda : 그라디언트 패널티의 가중치
   * crit_repeats : 생성자 업데이트 당 평가자를 업데이트하는 횟수-이에 대한 자세한 내용은 *Putting It All Together* 섹션에 있습니다.
   * device: 장치 유형

또한 MNIST 데이터 세트를로드하고 텐서로 변환합니다.

In [None]:
n_epochs = 70
z_dim = 64
display_step = 5000
batch_size = 128
lr = 0.0002
beta_1 = 0.5
beta_2 = 0.999
c_lambda = 10
crit_repeats = 5
device = 'cuda'
#device = 'cpu'

# dataset = torchvision.datasets.ImageFolder('./MNIST', transform=transform)
#
# dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,)),
])

dataloader = DataLoader( 
    MNIST('.', download=True, transform=transform),
    batch_size=batch_size,
    shuffle=True)

그런 다음 생성자, 평가자 및 최적화 프로그램을 초기화 할 수 있습니다.

In [None]:
gen = Generator(z_dim).to(device)
gen_opt = torch.optim.Adam(gen.parameters(), lr=lr, betas=(beta_1, beta_2))
crit = Critic().to(device) 
crit_opt = torch.optim.Adam(crit.parameters(), lr=lr, betas=(beta_1, beta_2))

def weights_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
        torch.nn.init.normal_(m.weight, 0.0, 0.02)  # (tensor,mean = 0,std = 1)
    if isinstance(m, nn.BatchNorm2d):
        torch.nn.init.normal_(m.weight, 0.0, 0.02)
        torch.nn.init.constant_(m.bias, 0)
gen = gen.apply(weights_init)
crit = crit.apply(weights_init)

## 그래디언트 페널티
그래디언트 패널티를 계산하는 것은 두 가지 함수로 나눌 수 있습니다 : (1) 이미지에 대한 그래디언트를 계산하고 (2) 그래디언트가 주어진 그래디언트 패널티를 계산합니다.

그라디언트를 얻는 것으로 시작할 수 있습니다. 그라디언트는 먼저 혼합 이미지를 생성하여 계산됩니다. 이것은 엡실론을 사용하여 가짜 이미지와 실제 이미지의 가중을 결정한 다음 합산하여 수행됩니다. 중간 이미지가 있으면 이미지에 대한 비평가의 출력을 얻을 수 있습니다. 마지막으로, 혼합 이미지 (입력)의 픽셀과 관련하여 혼합 이미지 (출력)에 대한 비평가 점수의 기울기를 계산합니다. *None* 이 표시 될 때마다 그래디언트를 얻으려면 코드를 입력해야 합니다. 솔루션을 테스트 할 수 있는 테스트 기능이 다음 블록에 있습니다.

In [None]:
# UNQ_C1 (UNIQUE CELL IDENTIFIER, DO NOT EDIT)
# GRADED FUNCTION: get_gradient
def get_gradient(crit, real, fake, epsilon):
    '''
    Return the gradient of the critic's scores with respect to mixes of real and fake images.
    Parameters:
        crit: the critic model
        real: a batch of real images
        fake: a batch of fake images
        epsilon: a vector of the uniformly random proportions of real/fake per mixed image
    Returns:
        gradient: the gradient of the critic's scores, with respect to the mixed image
    '''
    # Mix the images together
    mixed_images = real * epsilon + fake * (1 - epsilon)

    # Calculate the critic's scores on the mixed images
    mixed_scores = crit(mixed_images)
    
    # Take the gradient of the scores with respect to the images
    gradient = torch.autograd.grad(
        # Note: You need to take the gradient of outputs with respect to inputs.
        # This documentation may be useful, but it should not be necessary:
        # https://pytorch.org/docs/stable/autograd.html#torch.autograd.grad
        #### START CODE HERE (~2 lines)####

        
        #### END CODE HERE ####
        # These other parameters have to do with the pytorch autograd engine works
        grad_outputs=torch.ones_like(mixed_scores), 
        create_graph=True,
        retain_graph=True,
    )[0]
    return gradient


In [None]:
# UNIT TEST
# DO NOT MODIFY THIS
def test_get_gradient(image_shape):
    real = torch.randn(*image_shape, device=device) + 1
    fake = torch.randn(*image_shape, device=device) - 1
    epsilon_shape = [1 for _ in image_shape]
    epsilon_shape[0] = image_shape[0]
    epsilon = torch.rand(epsilon_shape, device=device).requires_grad_()
    gradient = get_gradient(crit, real, fake, epsilon)
    assert tuple(gradient.shape) == image_shape
    assert gradient.max() > 0
    assert gradient.min() < 0
    return gradient

gradient = test_get_gradient((256, 1, 28, 28))
print("Success!")

완료해야하는 두 번째 함수는 그래디언트가 주어진 경우 그래디언트 패널티를 계산하는 것입니다. 먼저 각 이미지의 그라데이션 크기를 계산합니다. 기울기의 크기를 norm 이라고도 합니다. 그런 다음 각 크기와 이상적인 norm 1 사이의 거리를 제곱하고 모든 제곱 거리의 평균을 취하여 패널티를 계산합니다.

다시 말하지만 *None* 이 표시 될 때마다 코드를 입력해야 합니다. 도움이 필요한 경우 볼 수있는 힌트가 있으며 다음 블록에 솔루션을 테스트 할 수있는 테스트 기능이 있습니다.

<details>

<summary>
<font size="3" color="green">
<b><code><font size="4">gradient_penalty</font></code> 에 대한 선택적 힌트</b>
</font>
</summary>
    
1. 마지막에 평균을 취하십시오.
2. 각 그라디언트의 크기는 이미 계산되어 있습니다.
</details>


In [None]:
# UNQ_C2 (UNIQUE CELL IDENTIFIER, DO NOT EDIT)
# GRADED FUNCTION: gradient_penalty
def gradient_penalty(gradient):
    '''
    Return the gradient penalty, given a gradient.
    Given a batch of image gradients, you calculate the magnitude of each image's gradient
    and penalize the mean quadratic distance of each magnitude to 1.
    Parameters:
        gradient: the gradient of the critic's scores, with respect to the mixed image
    Returns:
        penalty: the gradient penalty
    '''
    # Flatten the gradients so that each row captures one image
    gradient = gradient.view(len(gradient), -1)

    # Calculate the magnitude of every row
    gradient_norm = gradient.norm(2, dim=1)
    
    # Penalize the mean squared distance of the gradient norms from 1
    #### START CODE HERE  (~1 line)####

    #### END CODE HERE ####
    return penalty

In [None]:
# UNIT TEST
def test_gradient_penalty(image_shape):
    bad_gradient = torch.zeros(*image_shape)
    bad_gradient_penalty = gradient_penalty(bad_gradient)
    assert torch.isclose(bad_gradient_penalty, torch.tensor(1.))

    image_size = torch.prod(torch.Tensor(image_shape[1:]))
    good_gradient = torch.ones(*image_shape) / torch.sqrt(image_size)
    good_gradient_penalty = gradient_penalty(good_gradient)
    assert torch.isclose(good_gradient_penalty, torch.tensor(0.))

    random_gradient = test_get_gradient(image_shape)
    random_gradient_penalty = gradient_penalty(random_gradient)
    assert torch.abs(random_gradient_penalty - 1) < 0.1

test_gradient_penalty((256, 1, 28, 28))
print("Success!")

## 손실
다음으로 생성자와 평가자의 손실을 계산해야 합니다.

생성자의 경우 생성자의 가짜 이미지에 대한 평가자의 예측을 최대화하여 손실을 계산합니다. 인수에는 배치의 모든 가짜 이미지에 대한 점수가 있지만 그 평균을 사용합니다.

아래에 선택적 힌트가 있으며 솔루션을 테스트 할 수 있도록 다음 블록에 테스트 기능이 있습니다.


<details>
    <summary><font size="3" color="green"><b><code><font size="4">get_gen_loss</font></code>에 대한 선택적 힌트</b></font></summary>

1. 이것은 한 줄로 쓸 수 있습니다.
2. 이것은 평가자 점수 평균의 음수입니다.
</details>

In [None]:
# UNQ_C3 (UNIQUE CELL IDENTIFIER, DO NOT EDIT)
# GRADED FUNCTION: get_gen_loss
def get_gen_loss(crit_fake_pred):
    '''
    Return the loss of a generator given the critic's scores of the generator's fake images.
    Parameters:
        crit_fake_pred: the critic's scores of the fake images
    Returns:
        gen_loss: a scalar loss value for the current batch of the generator
    '''
    #### START CODE HERE (~1 line)####

    #### END CODE HERE ####
    return gen_loss

In [None]:
# UNIT TEST
assert torch.isclose(
    get_gen_loss(torch.tensor(1.)), torch.tensor(-1.0)
)

assert torch.isclose(
    get_gen_loss(torch.rand(10000)), torch.tensor(-0.5), 0.05
)

print("Success!")

평가자의 경우, 실제 이미지에 대한 평가자의 예측과 가짜 이미지에 대한 예측 사이의 거리를 최대화하고 기울기 패널티를 추가하여 손실을 계산합니다. 기울기 패널티는 $\lambda$ 에 따라 가중치가 부여됩니다. 인수는 배치의 모든 이미지에 대한 점수이며 평균을 사용합니다.

문제가 발생하면 아래 힌트와 솔루션을 테스트 할 수있는 다음 블록의 테스트 기능이 있습니다.

<details><summary><font size="3" color="green"><b><code><font size="4">get_crit_loss</font></code>에 대한 선택적 힌트</b></font></summary>

1. 평균 가짜 점수가 높을수록 평가자의 손실이 더 높습니다.
2. 이것은 평균 실제 점수에 대해 무엇을 시사합니까?
3. 그래디언트 패널티가 높을수록 $\lambda$  에 비례하여 평가자의 손실이 높아집니다.
</details>


In [None]:
# UNQ_C4 (UNIQUE CELL IDENTIFIER, DO NOT EDIT)
# GRADED FUNCTION: get_crit_loss
def get_crit_loss(crit_fake_pred, crit_real_pred, gp, c_lambda):
    '''
    Return the loss of a critic given the critic's scores for fake and real images,
    the gradient penalty, and gradient penalty weight.
    Parameters:
        crit_fake_pred: the critic's scores of the fake images
        crit_real_pred: the critic's scores of the real images
        gp: the unweighted gradient penalty
        c_lambda: the current weight of the gradient penalty 
    Returns:
        crit_loss: a scalar for the critic's loss, accounting for the relevant factors
    '''
    #### START CODE HERE  (~1 line)####

    #### END CODE HERE ####
    return crit_loss

In [None]:
# UNIT TEST
assert torch.isclose(
    get_crit_loss(torch.tensor(1.), torch.tensor(2.), torch.tensor(3.), 0.1),
    torch.tensor(-0.7)
)
assert torch.isclose(
    get_crit_loss(torch.tensor(20.), torch.tensor(-20.), torch.tensor(2.), 10),
    torch.tensor(60.)
)

print("Success!")

## 함께 모아 완성하기
모든 것을 정리하기 전에 몇 가지 유의해야 할 사항이 있습니다.
1. GPU에서도 그래디언트 패널티로 인해 그래디언트의 그래디언트를 계산해야하기 때문에 **훈련이 이전 실습보다 더 느리게 실행됩니다**. 이는 잠재적으로 epoch 당 몇 분을 의미합니다! 최상의 결과를 얻으려면 GPU에서 최대한 오래 실행하십시오.


2. 이전 버전과의 한 가지 중요한 차이점은 생성자를 업데이트 할 때마다 **평가자를 여러 번 업데이트**한다는 것입니다. 이는 생성자가 평가자를 압도하는 것을 방지하는 데 도움이 됩니다. 때로는 생성자가 평가자보다 더 많이 업데이트 된 반전을 볼 수 있습니다. 이것은 아키텍처 (예 : 네트워크의 깊이와 너비) 및 알고리즘 선택 (예: 사용중인 손실)에 따라 다릅니다.


3. WGAN-GP는 반드시 GAN의 전체 성능을 향상시키기 위한 것은 아니지만 단지 **안정성을 증가**하고 **모드 붕괴를 방지**합니다. 일반적으로 WGAN은 마지막 할당에서 바닐라 DCGAN보다 훨씬 더 안정적인 방식으로 훈련 할 수 있지만 일반적으로 약간 느리게 실행됩니다. 또한 모델이 무너지지 않고 더 많은 epochs 를 위해 모델을 훈련 할 수 있어야합니다.


<!-- Once again, be warned that this runs very slowly on a CPU. One way to run this more quickly is to download the .ipynb and upload it to Google Drive, then open it with Google Colab and make the runtime type GPU and replace
`device = "cpu"`
with
`device = "cuda"`
and make sure that your `get_noise` function uses the right device.  -->

다음은 WGAN-GP 출력이 어떻게 되어야하는 지에 대한 스냅 샷입니다.![MNIST Digits Progression](MNIST_WGAN_Progression.png)

In [None]:
import matplotlib.pyplot as plt

cur_step = 0
generator_losses = []
critic_losses = []
for epoch in range(n_epochs):
    # Dataloader returns the batches
    for real, _ in tqdm(dataloader):
        cur_batch_size = len(real)
        real = real.to(device)

        mean_iteration_critic_loss = 0
        for _ in range(crit_repeats):
            ### Update critic ###
            crit_opt.zero_grad()
            fake_noise = get_noise(cur_batch_size, z_dim, device=device)
            fake = gen(fake_noise)
            crit_fake_pred = crit(fake.detach())
            crit_real_pred = crit(real)

            epsilon = torch.rand(len(real), 1, 1, 1, device=device, requires_grad=True)
            gradient = get_gradient(crit, real, fake.detach(), epsilon)
            gp = gradient_penalty(gradient)
            crit_loss = get_crit_loss(crit_fake_pred, crit_real_pred, gp, c_lambda)

            # Keep track of the average critic loss in this batch
            mean_iteration_critic_loss += crit_loss.item() / crit_repeats
            # Update gradients
            crit_loss.backward(retain_graph=True)
            # Update optimizer
            crit_opt.step()
        critic_losses += [mean_iteration_critic_loss]

        ### Update generator ###
        gen_opt.zero_grad()
        fake_noise_2 = get_noise(cur_batch_size, z_dim, device=device)
        fake_2 = gen(fake_noise_2)
        crit_fake_pred = crit(fake_2)
        gen_loss = get_gen_loss(crit_fake_pred)
        gen_loss.backward()

        # Update the weights
        gen_opt.step()

        # Keep track of the average generator loss
        generator_losses += [gen_loss.item()]

        ### Visualization code ###
        if cur_step % display_step == 0 or cur_step == 469 * n_epochs - 10:
            gen_mean = sum(generator_losses[-display_step:]) / display_step
            crit_mean = sum(critic_losses[-display_step:]) / display_step
            print(f"Step {cur_step}: Generator loss: {gen_mean}, critic loss: {crit_mean}")
            show_tensor_images(fake)
            show_tensor_images(real)
            step_bins = 20
            num_examples = (len(generator_losses) // step_bins) * step_bins
            plt.plot(
                range(num_examples // step_bins), 
                torch.Tensor(generator_losses[:num_examples]).view(-1, step_bins).mean(1),
                label="Generator Loss"
            )
            plt.plot(
                range(num_examples // step_bins), 
                torch.Tensor(critic_losses[:num_examples]).view(-1, step_bins).mean(1),
                label="Critic Loss"
            )
            plt.legend()
            plt.show()

        cur_step += 1
