In [1]:
import numpy as np
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

import os
import gc
from tqdm import tqdm
import random

import warnings
warnings.filterwarnings('ignore')

In [2]:
from tensorflow import keras
import tensorflow as tf
import keras.backend as K
from tensorflow.keras import optimizers, callbacks, losses
from tensorflow.keras.layers import Dense, Concatenate, Activation, Add, BatchNormalization, Dropout, Conv2D, MaxPooling2D, Conv2DTranspose,\
concatenate, Input, UpSampling2D
from tensorflow.keras.models import Model, Sequential, load_model
from tensorflow.keras.optimizers import Adam, SGD

from sklearn.metrics import mean_absolute_error as mae
from sklearn.metrics import mean_squared_error as mse
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import random
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
os.environ['PYTHONHASHSEED']=str(SEED)
random.seed(SEED)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        # 프로그램 시작시에 메모리 증가가 설정되어야만 합니다
        print(e)

def mish(x):
    return x*tf.math.tanh(tf.math.softplus(x))

def decay(epochs):
    init = 1e-3
    drop = 10
    ratio = 0.9
    return max(5e-5, (init * (ratio ** (epochs//drop))))

es = callbacks.EarlyStopping(patience=10, restore_best_weights=True)
lrs = callbacks.LearningRateScheduler(decay, verbose=0)


Using TensorFlow backend.


In [3]:
class GAN(keras.Model):
    def __init__(self, x_shape, y_shape):
        super(GAN, self).__init__()
        self.x_shape  = x_shape
        self.y_shape = y_shape
        
        self.df = 64
        self.gf = 64
        
        self.generator = self.build_generator()
        self.discriminator = self.build_discriminator()
        

    
    def compile(self, g_optim, d_optim, d_loss_fn, recon_loss_fn):
        super(GAN, self).compile()
        self.g_optim = g_optim
        self.d_optim = d_optim
        self.d_loss_fn = d_loss_fn
        self.recon_loss_fn = recon_loss_fn
        
    def build_generator(self):
        start_neurons = 64
        
        inputs = Input(shape=self.x_shape)
        
        conv1 = Conv2D(start_neurons * 1, (3, 3), activation=mish, padding="same")(inputs)
        pool1 = BatchNormalization()(conv1)
        pool1 = MaxPooling2D((2, 2))(pool1)

        conv2 = Conv2D(start_neurons * 2, (3, 3), activation=mish, padding="same")(pool1)
        pool2 = BatchNormalization()(conv2)
        pool2 = MaxPooling2D((2, 2))(pool2)

        convm = Conv2D(start_neurons * 4, (3, 3), activation=mish, padding="same")(pool2)

        deconv2 = Conv2DTranspose(start_neurons * 2, (3, 3), strides=(2, 2), padding="same")(convm)
        uconv2 = concatenate([deconv2, conv2])
        uconv2 = Conv2D(start_neurons * 2, (3, 3), activation=mish, padding="same")(uconv2)
        uconv2 = BatchNormalization()(uconv2)

        deconv1 = Conv2DTranspose(start_neurons * 1, (3, 3), strides=(2, 2), padding="same")(uconv2)
        uconv1 = concatenate([deconv1, conv1])
        uconv1 = Conv2D(start_neurons * 1, (3, 3), activation=mish, padding="same")(uconv1)
        uconv1 = BatchNormalization()(uconv1)
        outputs = Conv2D(1, (1,1), padding="same", activation='relu', dtype='float32')(uconv1)
        
        return Model(inputs, outputs)

    def build_discriminator(self):

        def d_layer(layer_input, filters, f_size=4, bn=True):
            """Discriminator layer"""
            d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
            d = Activation(mish)(d)
            if bn:
                d = BatchNormalization(momentum=0.8)(d)
            return d

        img_A = Input(shape=self.x_shape)
        img_B = Input(shape=self.y_shape)

        combined_imgs = concatenate([img_A, img_B])

        d1 = d_layer(combined_imgs, self.df, bn=False)
        d2 = d_layer(d1, self.df*2)
        d3 = d_layer(d2, self.df*4)
        d4 = d_layer(d3, self.df*8)

        validity = Conv2D(1, kernel_size=4, strides=1, padding='same', dtype='float32')(d4)

        return Model([img_A, img_B], validity)
    
    def train_step(self, data):
        x, y = data
        batch_size = tf.shape(x)[0]
        
        preds_y = self.generator(x)
        all_y = tf.concat([y, preds_y], 0)
        all_x = tf.concat([x, x], 0)
        
        fake_labels = tf.ones((batch_size, 1))*0
        real_labels = tf.ones((batch_size, 1))
        labels = tf.concat([fake_labels, real_labels], 0)

        with tf.GradientTape() as tape:
            preds_ = self.generator(x)
            validity, _ = self.discriminator([all_x, all_y])
            d_loss = self.d_loss_fn(labels, validity)
            
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optim.apply_gradients(zip(grads, self.discriminator.trainable_weights))

        with tf.GradientTape() as tape:
            preds = self.generator(x)
            validity, preds = self.discriminator([x, preds])
            g_loss = self.recon_loss_fn([real_labels, y], [validity, preds])
            
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optim.apply_gradients(zip(grads, self.generator.trainable_weights))

        return {'d_loss': d_loss, 'g_loss': g_loss}
    
    def sampler(self, batch_size):
        return tf.random.normal(shape=(batch_size, self.z_dim))


In [4]:
def recon_loss(y_trues, y_preds):
    t1, t2 = y_trues
    p1, p2 = y_preds
    
    validity_loss = losses.BinaryCrossentropy()(t1, p1)
    recon_loss = losses.MeanAbsoluteError()(t2, p2)
    
    return validity_loss+recon_loss

In [5]:
gan = GAN((120, 120, 4), (120, 120, 1))

In [6]:
gan.compile(
    optimizers.Adam(2e-4),
    optimizers.Adam(2e-4),
    losses.BinaryCrossentropy(),
    recon_loss
         )

In [7]:
gan.generator.summary()

Model: "functional_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 120, 120, 4) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 120, 120, 64) 2368        input_1[0][0]                    
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 120, 120, 64) 256         conv2d[0][0]                     
__________________________________________________________________________________________________
max_pooling2d (MaxPooling2D)    (None, 60, 60, 64)   0           batch_normalization[0][0]        
_______________________________________________________________________________________

In [8]:
gan.discriminator.summary()

Model: "functional_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 120, 120, 4) 0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            [(None, 120, 120, 1) 0                                            
__________________________________________________________________________________________________
concatenate_2 (Concatenate)     (None, 120, 120, 5)  0           input_2[0][0]                    
                                                                 input_3[0][0]                    
__________________________________________________________________________________________________
conv2d_6 (Conv2D)               (None, 60, 60, 64)   5184        concatenate_2[0][0]   