In [1]:
import scipy
import numpy as np
import matplotlib.pyplot as plt
import os
import skimage
from skimage.color import rgb2gray
import keras
from tqdm import tqdm

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [2]:
class CifarGenerator(keras.utils.Sequence):
    """
    a generator of Cifar dataset
    use 'for' iteration to generate image in batch-manner
    """
    def __init__(self, img_dir, batch_size, is_training=True, shuffle=True):
        self.img_dir = img_dir
        self.batch_size = batch_size
        self.shuffle = shuffle
        self._is_training = is_training

        self._img_names = []

        self.indexes = np.arange(len(self.img_names))

        self.w = 32
        self.h = 32

    def __len__(self):
        # how many batches 
        return int(np.ceil(len(self.indexes) / float(self.batch_size)))

    def on_epoch_end(self):
        if self.is_training and self.shuffle:
            np.random.shuffle(self.indexes)

    def __getitem__(self, index):
        upper_bound = min(self.size, (index + 1) * self.batch_size)
        indexes = self.indexes[index * self.batch_size:upper_bound]

        images_A = []
        images_B = []
        f_names = []
        
        for index in indexes:
            f_name = self._img_names[index]
            f_names.append(os.path.split(f_name)[1])
            
            image_A = scipy.misc.imread(f_name, mode='RGB').astype(np.float)
            image_B = rgb2gray(image_A)
        
            image_A = scipy.misc.imresize(image_A, (self.h, self.w))
            image_B = scipy.misc.imresize(image_B, (self.h, self.w))
            
            # convert gray-scale image to a 3-channel image to fit input shape
            image_B = np.stack((image_B,)*3, axis=-1)
                
            images_A.append(image_A)
            images_B.append(image_B)
        
        # normalization is to bring the values in range [-1.0,1.0]
        images_A = np.array(images_A)/127.5 - 1.
        images_B = np.array(images_B)/127.5 - 1.
        
        if self.is_training:
            return images_A, images_B
        else:
            return images_B, f_names

    def _load(self):
        for f_name in tqdm(os.listdir(self.img_dir)):
            if os.path.splitext(f_name)[-1] == '.png':
                self._img_names.append(os.path.join(self.img_dir, f_name))

    @property
    def img_names(self):
        if len(self._img_names) == 0:
            self._load()
        return self._img_names

    @property
    def size(self):
        return len(self._img_names)

    @property
    def is_training(self):
        return self._is_training

In [3]:
TRAIN_DIR = '/data_service/source_datasets/cifar_images/images_train'
TEST_DIR = '/data_service/source_datasets/cifar_images/images_test'


In [4]:
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam


In [5]:
class pix2pixColorizer():
    def __init__(self):
        
        self.h = 32
        self.w = 32
        self.num_channel = 3
        self.image_shape = (self.h, self.w, self.num_channel)
        
        self.image_A = Input(shape=self.image_shape)
        self.image_B = Input(shape=self.image_shape)
        
        self.build_gan()
    
    def load_generator(self, g_model):
        self.generator.load_weights(g_model)
    
    def build_gan(self):
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='mse', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])
        
        self.generator = self.build_generator()
        self.fake_A = self.generator(self.image_B)
        self.discriminator.trainable = False
        
        self.valid = self.discriminator([self.fake_A, self.image_B])
        self.combined = Model(inputs=[self.image_A, self.image_B], outputs=[self.valid, self.fake_A])
        self.combined.compile(loss=['mse', 'mae'], loss_weights=[1, 100], optimizer=Adam(0.0002, 0.5))
        
        self.disc_patch = (int(self.h/2**4), int(self.w/2**4), 1)
    
    def build_discriminator(self):
        image_A = Input(shape=self.image_shape)
        image_B = Input(shape=self.image_shape)
        
        # concat axis=channel axis
        combined_images = Concatenate(axis=-1)([image_A, image_B])
        
        # layer 1
        d1 = Conv2D(filters=64, kernel_size=4, strides=2, padding='same')(combined_images)
        d1 = LeakyReLU(alpha=0.2)(d1)
        
        # layer 2
        d2 = Conv2D(filters=128, kernel_size=4, strides=2, padding='same')(d1)
        d2 = LeakyReLU(alpha=0.2)(d2)
        d2 = BatchNormalization(momentum=0.8)(d2)
        
        # layer 3
        d3 = Conv2D(filters=128, kernel_size=4, strides=2, padding='same')(d2)
        d3 = LeakyReLU(alpha=0.2)(d3)
        d3 = BatchNormalization(momentum=0.8)(d3)
        
        # layer 4
        d4 = Conv2D(filters=128, kernel_size=4, strides=2, padding='same')(d3)
        d4 = LeakyReLU(alpha=0.2)(d4)
        d4 = BatchNormalization(momentum=0.8)(d4)
        
        # 1-dim output
        validity = Conv2D(1, kernel_size=4, strides=1, padding='same')(d4)
        
        return Model([image_A, image_B], validity)
    
    def build_generator(self):
        
        d0 = Input(shape=self.image_shape)
        
        # layer 1
        d1 = Conv2D(filters=64, kernel_size=4, strides=2, padding='same')(d0)
        d1 = LeakyReLU(alpha=0.2)(d1)
        
        # layer 2
        d2 = Conv2D(filters=128, kernel_size=4, strides=2, padding='same')(d1)
        d2 = LeakyReLU(alpha=0.2)(d2)
        d2 = BatchNormalization(momentum=0.8)(d2)
        
        # layer 3
        d3 = Conv2D(filters=256, kernel_size=4, strides=2, padding='same')(d2)
        d3 = LeakyReLU(alpha=0.2)(d3)
        d3 = BatchNormalization(momentum=0.8)(d3)
        
        # layer 4
        d4 = Conv2D(filters=512, kernel_size=4, strides=2, padding='same')(d3)
        d4 = LeakyReLU(alpha=0.2)(d4)
        d4 = BatchNormalization(momentum=0.8)(d4)
        
        # layer 5
        d5 = Conv2D(filters=512, kernel_size=4, strides=2, padding='same')(d4)
        d5 = LeakyReLU(alpha=0.2)(d5)
        d5 = BatchNormalization(momentum=0.8)(d5)
        
        # layer 4
        u4 = UpSampling2D(size=2)(d5)
        u4 = Conv2D(filters=512, kernel_size=4, strides=1, padding='same', activation='relu')(u4)
        u4 = BatchNormalization(momentum=0.8)(u4)
        u4 = Concatenate()([u4, d4])
        
        # layer 3
        u3 = UpSampling2D(size=2)(u4)
        u3 = Conv2D(filters=256, kernel_size=4, strides=1, padding='same', activation='relu')(u3)
        u3 = BatchNormalization(momentum=0.8)(u3)
        u3 = Concatenate()([u3, d3])
        
        # layer 2
        u2 = UpSampling2D(size=2)(u3)
        u2 = Conv2D(filters=128, kernel_size=4, strides=1, padding='same', activation='relu')(u2)
        u2 = BatchNormalization(momentum=0.8)(u2)
        u2 = Concatenate()([u2, d2])
        
        # layer 1
        u1 = UpSampling2D(size=2)(u2)
        u1 = Conv2D(filters=64, kernel_size=4, strides=1, padding='same', activation='relu')(u1)
        u1 = BatchNormalization(momentum=0.8)(u1)
        u1 = Concatenate()([u1, d1])
        
        # layer 0
        u0 = UpSampling2D(size=2)(u1)
        
        # 3-dim output
        u0 = Conv2D(self.num_channel, kernel_size=4, strides=1, padding='same', activation='tanh')(u0)
        
        return Model(d0, u0)
        

In [6]:
colorizer = pix2pixColorizer()

In [7]:
import keras.backend.tensorflow_backend as KTF
import tensorflow as tf

os.environ["CUDA_VISIBLE_DEVICES"] = "5"

config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)

KTF.set_session(sess)

In [8]:
from PIL import Image

SAVED_DIR = './samples'

def save_single(save_dir, f_name, image):
    image = np.uint8((np.array(image) * 0.5 + 0.5) * 255)
    image = Image.fromarray(image)
    image.save(save_dir + '/' + f_name, quality=95)
    
def save_intermediate_images(save_dir, batch_i, images_A, images_B, fake_images_A):
    assert images_A.shape[0] == images_B.shape[0] == fake_images_A.shape[0]
    
    batch_size = images_A.shape[0]
    generated_image = Image.new('RGB', (32*3, 32*batch_size))
    for batch_cnt in range(batch_size):
        image_A = np.uint8((np.array(images_A[batch_cnt]) * 0.5 + 0.5) * 255)
        image_B = np.uint8((np.array(images_B[batch_cnt]) * 0.5 + 0.5) * 255)
        image_fake_A = np.uint8((np.array(fake_A[batch_cnt]) * 0.5 + 0.5) * 255)
        
        image_A = Image.fromarray(image_A)
        image_B = Image.fromarray(image_B)
        image_fake_A = Image.fromarray(image_fake_A)
        
        generated_image.paste(image_B,      (0, batch_cnt*32, 32, (batch_cnt+1)*32))
        generated_image.paste(image_fake_A, (32, batch_cnt*32, 32*2, (batch_cnt+1)*32))
        generated_image.paste(image_A,      (32*2, batch_cnt*32, 32*3, (batch_cnt+1)*32))
    
    generated_image.save(save_dir + "/G_%d.jpg" % batch_i, quality=95)

In [9]:
from keras.callbacks import TensorBoard

def train():
    
    # set up tensorboard logs
    writer_1 = tf.summary.FileWriter("./logs/g_loss_50")
    writer_2 = tf.summary.FileWriter("./logs/d_loss_50")
         
    loss_var = tf.Variable(0.0)
    tf.summary.scalar("loss", loss_var)
         
    write_op = tf.summary.merge_all()
    
    # set hyper-params
    n_epoch = 50
    batch_size = 64
    
    data_generator = CifarGenerator(img_dir=TRAIN_DIR, batch_size=batch_size, is_training=True)
    test_generator = CifarGenerator(img_dir=TEST_DIR, batch_size=batch_size)

    valid = np.ones((batch_size,) + colorizer.disc_patch)
    fake = np.zeros((batch_size,) + colorizer.disc_patch)
    
    for epoch_cnt in range(n_epoch):
        print('Epoch %d: ' % epoch_cnt)
        
        valid = np.ones((batch_size,) + colorizer.disc_patch)
        fake = np.zeros((batch_size,) + colorizer.disc_patch)

        for batch_cnt, (images_A, images_B) in enumerate(tqdm(data_generator)):
            total_cnt = epoch_cnt * len(data_generator) + batch_cnt
            
            # last batch, less samples
            if images_A.shape[0] != batch_size:
                valid = np.ones((images_A.shape[0],) + colorizer.disc_patch)
                fake = np.zeros((images_A.shape[0],) + colorizer.disc_patch)

            fake_A = colorizer.generator.predict(images_B)

            d_loss_real = colorizer.discriminator.train_on_batch([images_A, images_B], valid)
            d_loss_fake = colorizer.discriminator.train_on_batch([fake_A, images_B], fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            g_loss = colorizer.combined.train_on_batch([images_A, images_B], [valid, images_A])
            
            # write loss for each step
            summary = sess.run(write_op, {loss_var: g_loss[0]})
            writer_1.add_summary(summary, total_cnt)
            writer_1.flush()
             

            summary = sess.run(write_op, {loss_var: d_loss[0]})
            writer_2.add_summary(summary, total_cnt)
            writer_2.flush()

            if batch_cnt and not batch_cnt % 200:
                print ("[Batch %d/%d] [D loss: %f, acc: %3d%%] [G loss: %f]" % 
                               (batch_cnt+1, len(generator), 
                                d_loss[0], 100*d_loss[1], g_loss[0]))
                test_A, test_B = test_generator[0]
                fake_A = colorizer.generator.predict(test_B)
                save_intermediate_images(SAVED_DIR, batch_cnt, test_A, test_B, fake_A)

    colorizer.generator.save_weights('g_weights.h5')
    colorizer.discriminator.save_weights('d_weights.h5')

In [10]:
# test process, save gray/generated/original
# batch_size = 16
# colorizer.load_generator('g_weights_50.h5')  # generator params with 20-epoch training
# data_generator = CifarGenerator(img_dir=TEST_DIR, batch_size=batch_size)

# SAVED_DIR = './cGAN_50_Res'
# for batch_cnt, (images_A, images_B) in enumerate(tqdm(data_generator)):
#     fake_A = colorizer.generator.predict(images_B)
#     save_intermediate_images(SAVED_DIR, batch_cnt, images_A, images_B, fake_A)
    

In [13]:
colorizer.load_generator('g_weights_50.h5')  # generator params with 50-epoch training
data_generator = CifarGenerator(img_dir=TEST_DIR, batch_size=4, is_training=False)

check_files = ['img-3965-airplane.png', 'img-6109-bird.png', 'img-8077-cat.png', 'img-5399-frog.png',
               'img-8629-horse.png', 'img-2663-horse.png', 'img-3786-horse.png', 'img-3473-bird.png', 
               'img-8485-airplane.png','img-950-cat.png', 'img-8953-truck.png', 'img-5628-ship.png', 
               'img-3064-automobile.png', 'img-4683-deer.png', 'img-9229-automobile.png', 'img-7525-ship.png']

for images_B, f_names in tqdm(data_generator):
    for index in range(fake_A.shape[0]):
        if f_names[index] in check_files:
            fake_A = colorizer.generator.predict(images_B)
            save_single('./cgan_gen', f_names[index], fake_A[index])



  0%|          | 0/10000 [00:00<?, ?it/s][A
100%|██████████| 10000/10000 [00:00<00:00, 212608.81it/s][A
`imread` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``imageio.imread`` instead.
`imresize` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``skimage.transform.resize`` instead.
`imresize` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``skimage.transform.resize`` instead.

  1%|          | 20/2500 [00:00<00:12, 194.40it/s][A
  2%|▏         | 43/2500 [00:00<00:12, 202.23it/s][A
  3%|▎         | 65/2500 [00:00<00:11, 207.22it/s][A
  3%|▎         | 84/2500 [00:00<00:12, 199.46it/s][A
  4%|▍         | 102/2500 [00:00<00:12, 191.04it/s][A
  5%|▍         | 122/2500 [00:00<00:12, 192.92it/s][A
  6%|▌         | 143/2500 [00:00<00:12, 195.22it/s][A
  7%|▋         | 163/2500 [00:00<00:12, 194.20it/s][A
  7%|▋         | 182/2500 [00:00<00:12, 190.33it/s][A
  8%|▊         | 203/2500 [00:01<00:11, 193.26it/s][A
  9%|▉         | 22

 93%|█████████▎| 2332/2500 [00:14<00:01, 108.29it/s][A
 94%|█████████▎| 2343/2500 [00:14<00:01, 101.72it/s][A
 94%|█████████▍| 2354/2500 [00:14<00:01, 101.75it/s][A
 95%|█████████▍| 2365/2500 [00:14<00:01, 99.19it/s] [A
 95%|█████████▌| 2376/2500 [00:15<00:01, 96.61it/s][A
 95%|█████████▌| 2387/2500 [00:15<00:01, 97.51it/s][A
 96%|█████████▌| 2398/2500 [00:15<00:01, 99.24it/s][A
 96%|█████████▋| 2409/2500 [00:15<00:00, 100.88it/s][A
 97%|█████████▋| 2420/2500 [00:15<00:00, 100.71it/s][A
 97%|█████████▋| 2431/2500 [00:15<00:00, 103.18it/s][A
 98%|█████████▊| 2443/2500 [00:15<00:00, 105.34it/s][A
 98%|█████████▊| 2455/2500 [00:15<00:00, 107.85it/s][A
 99%|█████████▊| 2467/2500 [00:15<00:00, 109.30it/s][A
 99%|█████████▉| 2479/2500 [00:15<00:00, 110.24it/s][A
100%|█████████▉| 2491/2500 [00:16<00:00, 108.72it/s][A
100%|██████████| 2500/2500 [00:16<00:00, 154.51it/s][A

In [23]:
def calculate(image1, image2):
    g = image1.histogram()
    s = image2.histogram()
    assert len(g) == len(s), "error"

    data = []

    for index in range(0, len(g)):
        if g[index] != s[index]:
            data.append(1 - abs(g[index] - s[index]) / max(g[index], s[index]))
        else:
            data.append(1)

    return sum(data) / len(g)

def split_image(image, part_size):
    pw, ph = part_size
    w, h = image.size

    sub_image_list = []

    assert w % pw == h % ph == 0, "error"

    for i in range(0, w, pw):
        for j in range(0, h, ph):
            sub_image = image.crop((i, j, i + pw, j + ph)).copy()
            sub_image_list.append(sub_image)

    return sub_image_list

size = (32, 32)
part_size = (8, 8)

def get_his_sim(ori_img, fake_img):
    img1 = ori_img.resize(size).convert("RGB")
    sub_image_1 = split_image(img1, part_size)
    
    img2 = fake_img.resize(size).convert("RGB")
    sub_image_2 = split_image(img2, part_size)
    
    sub_data = 0
    for im1, im2 in zip(sub_image_1, sub_image_2):
        sub_data += calculate(im1, im2)
    
    x = size[0] / part_size[0]
    y = size[1] / part_size[1]

    sim = round((sub_data / (x * y)), 6)

def L1(yhat, y):
    loss = np.sum(np.abs(y - yhat))
    return loss
 
def L2(yhat, y):
    loss =np.sum(np.power((y - yhat), 2))
    return loss

In [25]:
# compare with original
batch_size = 1
colorizer.load_generator('g_weights_50.h5')  # generator params with 50-epoch training
data_generator = CifarGenerator(img_dir=TEST_DIR, batch_size=batch_size)

his_sum = 0
l1_sum = 0
l2_sum = 0

for batch_cnt, (images_A, images_B) in enumerate(tqdm(data_generator)):
    fake_A = colorizer.generator.predict(images_B)[0]
    
    image_A = np.uint8((np.array(images_A[0]) * 0.5 + 0.5) * 255)
    image_fake = np.uint8((np.array(fake_A) * 0.5 + 0.5) * 255)
    
    # get histogram similarity
    # ori_img = Image.fromarray(image_A)
    # fake_img = Image.fromarray(image_fake)
    # sim = get_his_sim(ori_img, fake_img)
    # his_sum += sim
    
    loss_l1 = L1(image_A / 255, image_fake / 255)
    loss_l2 = L2(image_A / 255, image_fake / 255)
    
    l1_sum += loss_l1
    l2_sum += loss_l2
    

print(l1_sum)
print(l2_sum)
print(l1_sum / len(data_generator))    
print(l2_sum / len(data_generator))    
# print(sim_sum / len(data_generator))    
    

100%|██████████| 10000/10000 [00:00<00:00, 227759.44it/s]
`imread` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``imageio.imread`` instead.
`imresize` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``skimage.transform.resize`` instead.
`imresize` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``skimage.transform.resize`` instead.
100%|██████████| 10000/10000 [01:42<00:00, 98.54it/s]

1877839.9333333424
262301.55083429546
187.78399333333425
26.230155083429544



