# Chapter 07

✅ 확률분포를 생성하는 완전 연결 계층 GAN 

In [21]:
# 패키지 임포트
import numpy as np
import matplotlib.pyplot as plt

from keras import models
from keras.layers import Dense,Conv1D, Reshape, Flatten, Lambda
from keras.optimizers import Adam
from keras import backend as K

# 코드 수행 결과 보기
def main():
    machine = Machine(n_batch = 1, ni_D = 100)   # 매 에포크 마다 길이가 100인 벡터 하나를 출력하도록 설정
    machine.run(n_repeat = 200, n_show = 200, n_test = 100)   # repeat : 전체 반복 횟수, show : 결과 표시할 총 에포크 수, test : 성능 평가 시 사용할 샘플 수 
    

# 데이터 관리 클래스 
class Data:
    def __init__(self,mu,sigma, ni_D):
        self.real_sample = lambda n_batch: np.random.normal(mu,sigma,(n_batch, ni_D))   # 흉내 내고자 하는 실제 데이터
        self.in_sample = lambda n_batch: np.random.rand(n_batch,ni_D)    # 무작위 잡음 데이터 
        
# 머신 구현하기
class Machine:
    def __init__(self,n_batch = 10,ni_D = 100):   # 클래스 초기화 함수
        data_mean = 4
        data_stddev = 1.25
        
        self.n_iter_D = 1
        self.n_iter_G = 5
        
        self.data = Data(data_mean, data_stddev, ni_D)    # ni_D : 판별망이 한꺼번에 받아들일 확률변수 수
        self.gan = GAN(ni_D = ni_D , nh_D = 50, nh_G = 50)
        
        self.n_batch = n_batch
        
    def train_D(self):   # 판별망 학습 멤버 함수
        gan = self.gan
        n_batch = self.n_batch
        data = self.data
        
        # real data
        Real = data.real_sample(n_batch)   # 실제 데이터에서 n_batch만큼 샘플을 가져옴
        # generated data
        Z = data.in_sample(n_batch)  # 임의의 분포를 가지는 입력 샘플을 데이터 샘플 수와 같은 수 만큼 만듦
        Gen = gan.G.predict(Z)   # 입력 샘플을 생성기에 통과시켜 생성망의 출력으로 바꿈
        gan.D.trainable = True    # 판별망은 학습용 생성망을 학습할 때는 학습이 되지 않도록 하고 학습 진행해야 함
        gan.D_train_on_batch(Real,Gen)   # 판별망 학습
        
    def train_GD(self):   # 학습용 생성망 학습 멤버 함수
        gan = self.gan
        n_batch = self.n_batch
        data = self.data
        Z = data.in_sample(n_batch)
        
        gan.D.trainable = False     # 판별망은 학습용 생성망을 학습할 때는 학습이 되지 않도록 하고 학습 진행해야 함
        gan.GD_train_on_batch(Z)    # 입력이 생성망에 들어가면 모든 판별망이 실제 샘플로 착각하게 학습
        
    def train_each(self):    # 매순간 학습 진행 멤버 함수
        for it in range(self.n_iter_D):    # 판별망은 n_iter_D만큼 학습
            self.train_D()
        for it in range(self.n_iter_G):    # 학습용 생성망은 n_iter_G만큼 학습
            self.train_GD()
            
    def train(self,epochs):    # GAN 학습 진행하는 멤버 함수
        for epoch in range(epochs):
            self.train_each()
            
    def test(self,n_test):
        gan = self.gan
        data = self.data
        Z = data.in_sample(n_test)
        Gen = gan.G.predict(Z)
        return Gen,Z
    
    def show_hist(self,Real,Gen,Z):   # 학습 진행 경과에 대한 그래프 그리는 멤버 함수
        plt.hist(Real.reshape(-1),histtype='step', label='Real')
        plt.hist(Gen.reshape(-1),histtype='step', label='Generated')
        plt.hist(Z.reshape(-1),histtype = 'step', label ='Input')
        plt.legend(loc=0)
        
    def test_and_show(self,n_test):   # 성능 평가 및 그래프 그리기 멤버 함수
        data= self.data
        Gen,Z = self.test(n_test)
        Real = data.real_sample(n_test)
        self.show_hist(Real,Gen,Z)
        Machine.print_stat(Real,Gen)
        
    def run_epochs(self,epochs,n_test):   # 에포크 단위 실행 멤버 함수
        self.train(epochs)
        self.test_and_show(n_test)
        
    def run(self, n_repeat=200, n_show=200, n_test=100):  # 실행 멤버 함수 , n_show번 학습이 진행될 때마다 그 결과를 그래프로 표시
        for ii in range(n_repeat):
            print('Stage', ii,'(Epoch: {})'.format(ii *n_show))
            self.run_epochs(n_show, n_test)
            plt.show()
            
    def print_stat(Real,Gen):   # 생성망이 얼마나 실제 데이터의 확률분포를 따르는 데이터를 만드는지 확인하는 정적 멤버 함수
        def stat(d):
            return (np.mean(d),np.std(d))
        print('Mean and Std of Real:', stat(Real))
        print('Mean and Std of Gen:', stat(Gen))
        
# GAN 모델링
def add_decorate(x):   # 람다 계층의 처리 함수, 입력 벡터에 새로운 벡터 추가
    m = K.mean(x,axis = -1, keepdims=True)
    d = K.square(x-m)
    return K.concatenate([x,d],axis=-1)

def add_decorate_shape(input_shape):
    shape = list(input_shape)
    assert len(shape) == 2
    shape[1] *= 2
    return tuple(shape)

lr = 2e-4  # 0.0002
adam = Adam(lr = lr, beta_1 = 0.9, beta_2 = 0.999)

def model_compile(model):
    return model.compile(loss='binary_crossentropy', optimizer=adam, metrics = ['accuracy'])

class GAN :
    def __init__(self,ni_D,nh_D,nh_G):
        self.ni_D = ni_D
        self.nh_D = nh_D
        self.nh_G = nh_G
        
        self.D = self.gen_D()
        self.G = self.gen_G()
        self.GD = self.make_GD()
        
    def gen_D(self):
        ni_D = self.ni_D
        nh_D = self.nh_D
        D = models.Sequential()
        D.add(Lambda(add_decorate,output_shape = add_decorate_shape, input_shape = (ni_D,)))
        D.add(Dense(nh_D, activation='relu'))
        D.add(Dense(nh_D, activation='relu'))
        D.add(Dense(1, activation='sigmoid'))
        
        model_compile(D)
        return D
    
    def gen_G(self):
        ni_D = self.ni_D
        nh_G = self.nh_D
        
        G = models.Sequential()
        G.add(Reshape((ni_D,1), input_shape=(ni_D,)))
        G.add(Conv1D(nh_G,1,activation='relu'))
        G.add(Conv1D(nh_G,1,activation='sigmoid'))
        G.add(Conv1D(1,1))
        G.add(Flatten())
        
        model_compile(G)
        return G
    
    def make_GD(self):
        G,D = self.G, self.D
        GD = models.Sequential()
        GD.add(G)
        GD.add(D)
        D.trainable=False  # D가 학습되지 않게 trainable을 끔
        model_compile(GD)
        D.trainable=True
        return GD
    
    def D_train_on_batch(self,Real,Gen):
        D = self.D
        X = np.concatenate([Real,Gen],axis=0)
        y = np.array([1]*Real.shape[0] + [0] * Gen.shape[0])
        D.train_on_batch(X,y)
        
    def GD_train_on_batch(self,Z):   # 왜 Gen이 아니라 Z?
        GD = self.GD
        y = np.array([1]*Z.shape[0])  # 생성망에서 출력되는 허구값을 판별망에서 실제값으로 판별하도록 학습해야하기 때문에 목표 출력값을 모두 1로 설정
        GD.train_on_batch(Z,y)
        
if __name__ == '__main__':
    main()

Stage 0 (Epoch: 0)


FailedPreconditionError:  Error while reading resource variable _AnonymousVar511 from Container: localhost. This could mean that the variable was uninitialized. Not found: Resource localhost/_AnonymousVar511/class tensorflow::Var does not exist.
	 [[node mul_254/ReadVariableOp (defined at C:\Users\mkflo\.conda\envs\keras\lib\site-packages\tensorflow_core\python\framework\ops.py:1751) ]] [Op:__inference_keras_scratch_graph_17693]

Function call stack:
keras_scratch_graph


✅ 필기체를 생성하는 합성곱 계층 GAN

In [31]:
# 공통 패키지 불러오기
from keras.datasets import mnist
import numpy as np
from PIL import Image
import math
import os

import keras.backend as K
import tensorflow as tf

K.set_image_data_format('channels_first')  # 이미지 데이터의 채널이 들어 있는 차원이 1번째가 되도록 설정
print(K.image_data_format)

# 신경망의 출력이 스칼라나 벡터가 아니라 다차원일 경우에는 해당 차원에 맞는 손실함수가 필요 -> 사용자가 직접 만듦
# 4차원 데이터를 이용하는 손실 함수를 케라스 백엔드와 텐서플로로 구현
def mse_4d(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=(1,2,3))

def mse_4d_tf(y_true, y_pred):
    return tf.reduce_mean(tf.square(y_pred - y_true), axis=(1,2,3))

# 합성곱 계층 GAN 수행
import argparse

def main():
    parser = argparse.ArgumentParser()  # 인자값 받을 수 있는 인스턴스 생성
    
    # 입력받을 인자값 등록
    parser.add_argument('--batch_size', type = int, default = 16, help = 'Batch size for the networks')
    parser.add_argument('--epochs', type = int, default = 1000, help = 'Epochs for the networks')
    parser.add_argument('--output_fold', type = str, default = 'GAN_OUT', help = 'Output fold to save the results')
    parser.add_argument('--input_dim', type = int, default = 10, help = 'Input dimension for the generator')
    parser.add_argument('--n_train',type = int, default = 32, help = 'The number of training data')
    
    args = parser.parse_args()  # 입력받은 인자들을 args에 저장
    
    train(args)
    
# 합성곱 계층 GAN 모델링
from keras import models, layers, optimizers

class GAN(models.Sequential):
    def __init__(self, input_dim = 64):
        super().__init__()
        self.input_dim = input_dim
        
        self.generator = self.GENERATOR()
        self.discriminator = self.DISCRIMINATOR()
        
        # 학습을 위한 생성망은 생성망과 판별망이 결합된 형태(학습용 생성망)
        self.add(self.generator)
        self.discriminator.trainable = False   # 결합 시 판별망 쪽은 학습이 진행되지 않도록 만듦. 학습용 생성망을 학습할 때는 이미 학습된 판별망을 사용하기 때문
        self.add(self.discriminator)
        
        self.compile_all()
        
    def compile_all(self):  # 전체 신경망을 컴파일하는 함수
        d_optim = optimizers.SGD(lr = 0.0005, momentum = 0.9, nesterov = True)
        g_optim = optimizers.SGD(lr = 0.0005, momentum = 0.9, nesterov=True)
        
        self.generator.compile(loss = mse_4d_tf, optimizer = "SGD")  # 학습용 생성망 컴파일
        self.compile(loss = 'binary_crossentropy',optimizer = g_optim)   # 순수 생성망은 학습 시 최적화 인스턴스인 g_optim을 사용해 최적화
        self.discriminator.trainable = True
        self.discriminator.compile(loss = 'binary_crossentropy', optimizer = d_optim)
        
    def GENERATOR(self):
        input_dim = self.input_dim
        model = models.Sequential()
        model.add(layers.Dense(1024, activation = 'tanh', input_dim = input_dim))   # 입력은 1차원 행렬
        model.add(layers.Dense(128 * 7 * 7 , activation = 'tanh'))  # 완전 연결 계층 이용해 6772로 더 확장
        model.add(layers.BatchNormalization())
        model.add(layers.Reshape((128,7,7), input_shape = (128 * 7 * 7,)))  # 이미지의 채널, 가로 및 세로를 128 * 7 * 7 모양의 3차원 행렬로 재조정
        model.add(layers.UpSampling2D(size = (2,2)))  # 이미지를 (2,2)배 확장
        model.add(layers.Conv2D(64, (5,5), padding='same', activation='tanh'))
        model.add(layers.UpSampling2D(size = (2,2)))
        model.add(layers.Conv2D(1,(5,5), padding='same', activation='tanh'))
        
        return model
    
    def DISCRIMINATOR(self):
        model = models.Sequential()
        model.add(layers.Conv2D(64,(5,5), padding = 'same', activation='tanh', input_shape = (1,28,28)))   # 64 * 28 * 28 
        model.add(layers.MaxPooling2D(pool_size = (2,2)))    # 64 * 14 * 14 
        model.add(layers.Conv2D(128,(5,5),activation='tanh'))   # 128 * 14 * 14
        model.add(layers.Maxpooling2D(pool_size = (2,2)))    # 128 * 7 * 7 
        model.add(layers.Flatten())   # 3차원 -> 1차원 
        model.add(layers.Dense(1024,activation='tanh'))
        model.add(layers.Dense(1,activation='sigmoid'))
        
        return model
    
    def get_z(self,ln):  # 주어진 input_dim만큼 원소를 가지는 무작위 벡터를 만듦
        input_dim = self.input_dim  
        return np.random.uniform(-1,1,(ln,input_dim))  # 배치 크기인 ln으로 무작위 벡터 수를 설정
    
    def train_both(self,x):  # 모델들의 학습을 수행하는 함수
        ln = x.shape[0]  # 배치 크기
        
        # first trial for training discriminator
        z = self.get_z(ln)
        w = self.generator.predict(z, verbose = 0)
        xw = np.concatenate((x,w))  # 실제 이미지들과 허구 이미지들을 데이터셋 하나로 합침
        y2 = [1] * ln + [0] * ln
        d_loss = self.discriminator.train_on_batch(xw,y2)
        
        # second trial for training generator
        z = self.get_z(ln)
        self.disciminator.trainable = False
        g_loss = self.generator.train_on_batch(z,[1]*ln)
        self.discriminator.trainable = True
        
        return d_loss, g_loss
    
# 합성곱 계층 GAN 학습
def combine_images(generated_images):
    num = generated_iamges.shape[0]
    width = int(math.sqrt(num))
    height = int(math.ceil(float(num)/ width))
    shape = generated_images.shape[2:]
    image = np.zeros((height *shape[0], width *shape[1]), dtype=generated_images.dtype)
    
    for index,img in enumerate(generated_images):
        i = int(index/width)
        j = index % width
        image[i * shape[0]:(i + 1) * shape[0], j * shape[1]:(j + 1) * shape[1]] = img[0, :, :]
        
    return image

def get_x(X_train, index, BATCH_SIZE):
    return X_train[index * BATCH_SIZE : (index +1) *BATCH_SIZE]

def save_images(generated_images, output_fold, epoch, index):
    image = combine_images(generated_images)
    image = image * 127.5 + 127.5
    Image.fromarray(image.astype(np.unit8)).save(output_ford+'/',str(epoch)+"_" +str(index)+".png")
    
def load_data(n_train):
    (X_train, y_train), (_,_) = mnist.load_data()
    return X_train[:n_train]

def train(args):
    BATCH_SIZE = args.batch_size
    epochs = args.epochs
    output_fold = args.output_fold    # 학습과정에서 생성된 이미지 중 일부를 간헐적으로 출력하는 폴더의 이름
    input_dim = args.input_dim   # 무작위 벡터의 길이
  
    os.makedirs(output_fold, exist_ok = True)
    print('Output_fold is', output_fold)    
    
    X_train = load_data(n_train)
    
    X_train = (X_train.astype(np.float32) - 127.5) / 127.5
    X_train = X_train.reshape((X_train.shape[0],1)+ X_train.reshape[1:])
    
    gan = GAN(input_dim)
    
    d_loss_ll = []
    g_loss_ll = []
    for epoch in range(epochs):
        print("Epoch is", epoch)
        print("Number of batches", int(X_train.shape[0] / BATCH_SIZE))

        d_loss_l = []
        g_loss_l = []
        
        for index in range(int(X_train.shape[0] / BATCH_SIZE)):
            x = get_x(X_train,index, BATCH_SIZE)   # x : 배치 크기 만큼의 입력 데이터
  
            d_loss, g_loss = gan.train_both(x)   # gan에 전달
      
            d_loss_l.append(d_loss)
            g_loss_l.append(g_loss)
            
        # 에포크가 매 10회가 진행될 때 마다 결과 이미지를 파일로 저장
        if epoch % 10 == 0 or epoch  == epochs -1 :
            z = gan.get_z(x.shape[0])
            w = gan.generator.predict(z, verbose = 0)
            save_images(w, output_fold, epoch, index)
            
            d_loss_ll.append(d_loss_l)
            g_loss_ll.append(g_loss_l)
        
    gan.generator.save_weights(output_fold + '/' +'generator', True)
    gan.discriminator.save_weights(output_fold + '/' + 'discriminator', True)

    np.savetext(output_fold + '/' + 'd_loss', d_loss_ll)
    np.savetext(output_fold + '/' + 'g_loss', g_loss_ll)    
    
if __name__ =='__main__':
    main()

<function image_data_format at 0x000001E625A0CE18>


usage: ipykernel_launcher.py [-h] [--batch_size BATCH_SIZE] [--epochs EPOCHS]
                             [--output_fold OUTPUT_FOLD]
                             [--input_dim INPUT_DIM] [--n_train N_TRAIN]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\mkflo\AppData\Roaming\jupyter\runtime\kernel-dbc058b0-40f1-4a07-bac5-8e87d0bcb75e.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
