In [1]:
import time
from ops import *
from utils import *
import tensorflow as tf
from SpectralNormalizationKeras import DenseSN, ConvSN2D, ConvSN2DTranspose

Using TensorFlow backend.


In [2]:
from tensorflow.python.keras.utils import conv_utils

In [3]:
import os
import cv2

%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.utils import shuffle

#### Remember to change the path of the dataset

In [4]:
path = './dataset/scene/lake'

In [6]:
img_rows = 1024
img_cols = 1024
channels = 3
hs = img_rows//2

In [7]:
img_list = [os.path.join( path, each) for each in os.listdir(path)]

In [9]:
def load_img(img_list):
    data = np.zeros([len(img_list), 512, 512, 3], dtype='float32')
    for each in img_list:
        img = cv2.imread(each)
        img = img.astype('float32')/127.5 - 1
    return data

In [10]:
real_data = load_img(img_list)

In [11]:
batch_size = 4
z_dim = 128
sn = True
is_training = True
c_dim = 3

In [12]:
def generator(input_shape, is_training=True):
    
    inputs = tf.keras.Input(shape=(1, 1, z_dim))
    
    split_dim = 16
    split_dim_remainder = z_dim - (split_dim * 7)
    z_split = tf.split(inputs, num_or_size_splits=8, axis=-1)
    #z_split = tf.split(input_shape, num_or_size_splits=[split_dim] * 7 + [split_dim_remainder], axis=-1)
    
    ch = 16 * 96
    
    x = fully_conneted(z_split[0], units=4 * 4 * ch, sn=sn, scope='generator/dense')
    
    
    x = tf.reshape(x, shape=[-1, 4, 4, ch])
    
    x = resblock_up_condition(x, z_split[1], channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='generator/resblock_up_16')
    ch = ch // 2
    
    x = resblock_up_condition(x, z_split[2], channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='generator/resblock_up_8_0')    
    x = resblock_up_condition(x, z_split[3], channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='generator/resblock_up_8_1')
    ch = ch // 2

    
    x = resblock_up_condition(x, z_split[4], channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='generator/resblock_up_4')
    
    # Non-Local Block
    x = self_attention_2(x, channels=ch, sn=sn, scope='self_attention')
    ch = ch // 2
    
    x = resblock_up_condition(x, z_split[5], channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_up_2')
    ch = ch // 2

    x = resblock_up_condition(x, z_split[6], channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_up_1_0')
    x = resblock_up_condition(x, z_split[7], channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_up_1_1')

    x = batch_norm(x, is_training)
    x = relu(x)
    x = conv(x, channels=c_dim, kernel=3, stride=1, pad=1, use_bias=False, sn=sn, scope='G_logit')

    x = tanh(x)
    
    G = tf.keras.Model(inputs=inputs, outputs=x)
    return G

In [None]:
G = generator((1, 1, z_dim))

In [17]:
def discriminator(input_shape, is_training=True):

    ch = 16
    
    inputs = tf.keras.layers.Input(input_shape)
#     x = inputs

    x = resblock_down(inputs, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_down_1_0')
    x = resblock_down(x, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_down_1_1')
    ch = ch * 2

    x = resblock_down(x, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_down_2')

    # Non-Local Block
    x = self_attention_2(x, channels=ch, sn=sn, scope='self_attention')
    ch = ch * 2

    x = resblock_down(x, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_down_4')
    ch = ch * 2

    x = resblock_down(x, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_down_8_0')
    x = resblock_down(x, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_down_8_1')
    ch = ch * 2

    x = resblock_down(x, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock_down_16')

    x = resblock(x, channels=ch, use_bias=False, is_training=is_training, sn=sn, scope='resblock')
    x = relu(x)

    x = global_sum_pooling(x)

    x = fully_conneted(x, units=1, sn=sn, scope='D_logit')

    D = tf.keras.Model(inputs=inputs, outputs=x)
    return D

In [18]:
D = discriminator((512, 512, 3))

In [20]:
def gradient_penalty(real, fake):

    alpha = tf.random.uniform(shape=[real.shape[0], 1, 1, 1], minval=0., maxval=1.)
    interpolated = tf.Variable(real, dtype=tf.float32) + alpha * (fake - tf.Variable(real, dtype=tf.float32))

    with tf.GradientTape() as tape_p:
        tape_p.watch(interpolated)
        logit= D(interpolated)
    grad = tape_p.gradient(logit, interpolated)
    grad_norm = tf.norm(tf.reshape(grad,(batch_size, grad.shape[1]*grad.shape[2]*grad.shape[3])), axis=1)  
    
    GP = 10.0 * tf.reduce_mean(tf.square(grad_norm - 1.))

    return GP

In [21]:
def discriminator_loss(y_true, y_pred):
    real_loss = -tf.reduce_mean(y_true)
    fake_loss = tf.reduce_mean(y_pred)
    return real_loss + fake_loss

def generator_loss(y_pred):
    return -tf.reduce_mean(y_pred)

In [22]:
opt_d = tf.keras.optimizers.Adam(0.0002, beta_1=0.0, beta_2=0.9)
opt_g = tf.keras.optimizers.Adam(0.00005, beta_1=0.0, beta_2=0.9)
# opt_e = tf.train.ExponentialMovingAverage(decay=0.95)

In [23]:
# inputs[0]:real image,  inputs[1]:noise
# @tf.function
def train_discriminator_step(inputs):
    with tf.GradientTape() as tape:

        real= D(inputs[0], training=True)
        fake_img= G(inputs[1], training=False)
        fake = D(fake_img, training=True)
        
#         regularization_loss = tf.math.add_n(model.losses)
        real_loss = discriminator_loss(real, fake)
        
        GP = gradient_penalty(inputs[0], fake_img)
        
        total_loss = real_loss  + GP
        
    gradients = tape.gradient(total_loss, D.trainable_variables)
    opt_d.apply_gradients(zip(gradients, D.trainable_variables))
    
    return total_loss, GP

In [24]:
# @tf.function
def train_generator_step(inputs):
    with tf.GradientTape() as tape:
        img = G(inputs, training=True)
        valid = D(img, training=False)
#         regularization_loss = tf.math.add_n(model.losses)
        g_loss = generator_loss(valid)
        
    gradients = tape.gradient(g_loss, G.trainable_variables)
    opt_g.apply_gradients(zip(gradients, G.trainable_variables))
#     opt_e.apply(G.trainable_variables)
    
    return g_loss

In [None]:
NUM_EPOCHS = 100
for epoch in range(NUM_EPOCHS):
    index = np.arange(real_data.shape[0])
    ind = shuffle(index)
    start_time = time.time()
    for i in range(100):
        z = tf.random.truncated_normal(shape=[batch_size, 1, 1, z_dim], dtype=tf.float32)
        for _ in range(5):
            D_loss, gp_loss= train_discriminator_step([real_data[ind[i*batch_size:(i+1)*batch_size]], z])
        G_loss = train_generator_step(z)

    end_time = time.time()-start_time
    if (epoch%1) == 0:
        print("the time of the %s th epoch: "%(epoch), end_time)
        print("Finished epoch:", epoch ,'D_loss:', D_loss.numpy(), 'GP_loss:',gp_loss.numpy(), 'G_loss:', G_loss.numpy())
                
        z = tf.random.truncated_normal(shape=[4, 1, 1, z_dim], dtype=tf.float32)
        gen_img = G(z)
        total_img = np.zeros((4*512, 512, 3))
        for i, each in enumerate(gen_img.numpy()):
            total_img[i*512:(i+1)*512, :, :] = each*127.5+127.5 
        
        total_img = total_img.astype('int64')
        
        ## remember construct the path
        cv2.imwrite('./biggan_result_img/epoch_%s_result.jpg'%(epoch), total_img)

In [None]:
G.save_model('scene_generator_128.h5')
D.save_model('scene_discrminator_128.h5')