<a href="https://colab.research.google.com/github/jhyoo78/jhyoo78/blob/main/14_gan_cnn_celeba.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## CNN GAN - CelebA Human Faces

In [None]:
# mount Drive to access data files

from google.colab import drive
drive.mount('./mount')

In [None]:
# import libraries

import torch
import torch.nn as nn
from torch.utils.data import Dataset

import h5py
import pandas, numpy, random
import matplotlib.pyplot as plt

print(torch.__version__)

## 표준 CUDA 확인 및 설정

In [None]:
# CUDA가 가용한지 검사한다 (즉, Jupyter의 런타임이 GPU로 설정되어 있어야 한다)==> 책 4장에서 설명하였음.
# CUDA가 설치되어 있으면 default tensor type을 CUDA로 설정한다.

if torch.cuda.is_available():
  torch.set_default_tensor_type(torch.cuda.FloatTensor)
  print("using cuda:", torch.cuda.get_device_name(0))
  pass

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

device

## Helper 함수 정의

In [None]:
# functions to generate random data

def generate_random_image(size):
    random_data = torch.rand(size)    # torch.rand() : 0과 1 사이의 숫자를 균등하게 size 크기의 tensor로 random하게  생성, 균등 분포
    return random_data

def generate_random_seed(size):
    random_data = torch.randn(size)   # torch.randn() : 평균이 0이고 표준편차가 1인 가우시안 정규분포로 size 크기의 tensor를 random으로 생성
    return random_data

In [None]:
# modified from https://github.com/pytorch/vision/issues/720

class View(nn.Module):              # nn.Module을 부모 class로 하여 class를 선언하여 Sequential 내에서 사용하도록 하며,
                                    # 3차원 텐서(218, 178, 3)를 1차원 텐서(218x178x3)으로 변환한다.
    def __init__(self, shape):
        super().__init__()
        self.shape = shape,

    def forward(self, x):
        return x.view(*self.shape)   # 매개변수 x에 전달하는 값(인자, argumnet)의 개수가 가변적일 때 *를 붙임

In [None]:
# crop (numpy array) image to given width and height,
# Numpy 이미지를 받아서 중앙을 기준으로 128x128 이미지를 추출함

def crop_centre(img, new_width, new_height):
    height, width, _ = img.shape             #img.shape는 height, width, channel(BGR)을 출력함
    startx = width//2 - new_width//2
    starty = height//2 - new_height//2
    return img[ starty:starty + new_height, startx:startx + new_width, :]   # starty부터 new_height 까지 x startx부터 new_width까지 이미지를 리턴, : 은 전체를 의미

In [None]:
#from matplotlib import image
# dataset class

class CelebADataset(Dataset):

    def __init__(self, file):
        self.file_object = h5py.File(file, 'r')     # HDF5 file을 read로 연다.
        self.dataset = self.file_object['img_align_celeba']  #  group이 img_align_celeba 인 그룹을 dataset에 저장한다.
        pass

    def __len__(self):
        return len(self.dataset)                    # dataset의 개수를 리턴,

    def __getitem__(self, index):                   # __getitem__은 클래스의 인덱스에 접근할 때 자동으로 호출되는 magic method이다.
        if (index >= len(self.dataset)):                    # index가 데이터 수보다 큰 경우 오류가 발생하므로 이를 예외로 처리하도록 한다.
          raise IndexError()
        img = numpy.array(self.dataset[str(index)+'.jpg'])  # index를 jpg 파일의 이름으로하여 numpy 행렬로 변환한다.
        return torch.cuda.FloatTensor(img).permute(2,0,1).view(1,3,128,128) / 255.0         # 코드 12 에서 변경됨 -- 이미지의 텐서를 CUDA 텐서로 리턴한다.

    def plot_image(self, index):                             # 코드 12 에서 변경됨 -- 이미지 crop_center() 사용함
        img = numpy.array(self.dataset[str(index)+'.jpg'])
        # crop to 128x128 square
        img = crop_centre(img, 128, 128)
        plt.imshow(img, interpolation='nearest')
        pass

    pass

In [None]:
# create Dataset object

celeba_dataset = CelebADataset('mount/My Drive/Colab Notebooks/myo_gan/celeba/celeba_aligned_small.h5py')

In [None]:
# check data contains images

celeba_dataset.plot_image(41)

## Discriminator Network

In [None]:
# discriminator class

class Discriminator(nn.Module):

    def __init__(self):
        # initialise parent pytorch class
        super().__init__()

        # define neural network layers
        self.model = nn.Sequential(
            # expect input of shape (1,3,128,128)
            nn.Conv2d(3, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),

            nn.Conv2d(256, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),

            nn.Conv2d(256, 3, kernel_size=8, stride=2),
            nn.LeakyReLU(0.2),

            View(3*10*10),
            nn.Linear(3*10*10, 1),
            nn.Sigmoid()
        )

        # create loss function
        self.loss_function = nn.BCELoss()

        # create optimiser, simple stochastic gradient descent
        self.optimiser = torch.optim.Adam(self.parameters(), lr=0.0001)

        # counter and accumulator for progress
        self.counter = 0;
        self.progress = []

        pass


    def forward(self, inputs):
        # simply run model
        return self.model(inputs)


    def train(self, inputs, targets):
        # calculate the output of the network
        outputs = self.forward(inputs)

        # calculate loss
        loss = self.loss_function(outputs, targets)

        # increase counter and accumulate error every 10
        self.counter += 1;
        if (self.counter % 10 == 0):
            self.progress.append(loss.item())
            pass
        if (self.counter % 1000 == 0):
            print("counter = ", self.counter)
            pass

        # zero gradients, perform a backward pass, update weights
        self.optimiser.zero_grad()
        loss.backward()
        self.optimiser.step()

        pass


    def plot_progress(self):
        df = pandas.DataFrame(self.progress, columns=['loss'])
        df.plot(ylim=(0), figsize=(16,8), alpha=0.1, marker='.', grid=True, yticks=(0, 0.25, 0.5, 1.0, 5.0))
        pass

    pass

Test Discriminator  
코드에서 오류 발생 시, Pytorch를 12.0.1 이상으로 upgrade해야 함.
임시방편으로 Adam optimizer를 SGD로 환원시킴

In [None]:
%%time
# test discriminator can separate real data from random noise

D = Discriminator()
# move model to cuda device
D.to(device)

for image_data_tensor in celeba_dataset:
    # real data
    D.train(image_data_tensor, torch.cuda.FloatTensor([1.0]))
    # fake data
    D.train(generate_random_image((1,3,128,128)), torch.cuda.FloatTensor([0.0]))
    pass

 # counter=40000에서 종료됨

In [None]:
# plot discriminator loss

D.plot_progress()

In [None]:
# D를 수동 구동하여 실제와 가짜 데이터를 구별하는지 확인해 보자.

for i in range(4):
  image_data_tensor = celeba_dataset[random.randint(0,20000)]
  print( D.forward( image_data_tensor ).item() )
  pass

for i in range(4):
  print( D.forward( generate_random_image((1,3,128,128))).item() )
  pass

## Generator Network

In [None]:
# Generator 클래스

class Generator(nn.Module):

    def __init__(self):
        # initialise parent pytorch class
        super().__init__()

        # define neural network layers
        self.model = nn.Sequential(
            # input is a 1d array
            nn.Linear(100, 3*11*11),
            nn.LeakyReLU(0.2),

            # reshape to 4d
            View((1, 3, 11, 11)),

            nn.ConvTranspose2d(3, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),

            nn.ConvTranspose2d(256, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),

            nn.ConvTranspose2d(256, 3, kernel_size=8, stride=2, padding=1),
            nn.BatchNorm2d(3),

            # output should be (1,3,128,128)
            nn.Sigmoid()
        )

        # create optimiser, simple stochastic gradient descent
        self.optimiser = torch.optim.Adam(self.parameters(), lr=0.0001)

        # counter and accumulator for progress
        self.counter = 0;
        self.progress = []

        pass

    def forward(self, inputs):
        # simply run model
        return self.model(inputs)


    def train(self, D, inputs, targets):
        # calculate the output of the network
        g_output = self.forward(inputs)

        # Discriminator에게 전달하여 처리하도록 한다.
        d_output = D.forward(g_output)

        # 손실을 계산한다.
        loss = D.loss_function(d_output, targets)

        # increase counter and accumulate error every 10
        self.counter += 1;
        if (self.counter % 10 == 0):
            self.progress.append(loss.item())
            pass

        # zero gradients, perform a backward pass, update weights
        self.optimiser.zero_grad()
        loss.backward()
        self.optimiser.step()

        pass


    def plot_progress(self):
        df = pandas.DataFrame(self.progress, columns=['loss'])
        df.plot(ylim=(0), figsize=(16,8), alpha=0.1, marker='.', grid=True, yticks=(0, 0.25, 0.5, 1.0, 5.0))
        pass

    pass

## Test Generator Output

In [None]:
# G의 출력이 올바른 type과 shape를 가지고 있는지 확인한다.

G = Generator()
# move model to cuda device
G.to(device)

output = G.forward(generate_random_seed(100))    # G의 self.model의 입력에 맞추어 100으로 설정함

img = output.detach().permute(0,2,3,1).view(128,128,3).cpu().numpy()

plt.imshow(img, interpolation='none', cmap='Blues')   # 훈련되지 않은 G의 출력은 임의의 데이터 패턴을 만듬. 어떤 패턴이 보이면 코드 오류로 봐야 함,

## Train GAN

In [None]:
%%time

# create Discriminator and Generator

D = Discriminator()
D.to(device)
G = Generator()
G.to(device)

epochs = 1

for epoch in range(epochs):
  print ("epoch = ", epoch + 1)

  # train Discriminator and Generator

  for image_data_tensor in celeba_dataset:
    # D를 True에 대해 훈련시킨다.
    D.train(image_data_tensor, torch.cuda.FloatTensor([1.0]))

    # D를 False에 대해 훈련시킨다.
    D.train(G.forward(generate_random_seed(100)).detach(), torch.cuda.FloatTensor([0.0]))

    # G를 훈련시킨다.
    G.train(D, generate_random_seed(100), torch.cuda.FloatTensor([1.0]))

    pass

  pass

  # epoch=1일 때 counter=40000에서 종료됨

In [None]:
# plot discriminator error

D.plot_progress()

In [None]:
# plot generator error

G.plot_progress()

## Run Generator

In [None]:
# plot several outputs from the trained generator

# plot a 3 column, 2 row array of generated images
f, axarr = plt.subplots(2,3, figsize=(16,8))
for i in range(2):
    for j in range(3):
        output = G.forward(generate_random_seed(100))
        img = output.detach().permute(0,2,3,1).view(128,128,3).cpu().numpy()
        axarr[i,j].imshow(img, interpolation='none', cmap='Blues')
        pass
    pass

In [None]:
# current memory allocated to tensors (in Gb)

torch.cuda.memory_allocated(device) / (1024*1024*1024)

In [None]:
# total memory allocated to tensors during program (in Gb)

torch.cuda.max_memory_allocated(device) / (1024*1024*1024)

In [None]:
# summary of memory consumption

print(torch.cuda.memory_summary(device, abbreviated=True))