In [None]:
import tensorflow as tf
import numpy as np
from skimage import color, io
from skimage.transform import pyramid_expand as upscale
from itertools import product
import matplotlib.pyplot as plt
from time import time

%matplotlib inline

# Save as lab while collecting ab distribution

In [None]:
load_path = "image_data/mit_cvcl/*.jpg" # jpg 파일 모두 불러오기

rgb_image_set = io.imread_collection(load_path)
ab_data_mit_cvcl = np.zeros([26, 26], dtype=int)

tic = time()
for i, rgb_image in enumerate(rgb_image_set):
    if i % 100 == 0:
        toc = time()
        print("Iter: %d / Took %.4f seconds" % (i, toc - tic))
        
    lab_image = color.rgb2lab(rgb_image).astype(np.float16)
    np.save("Data_preprocessed/lab_image/mit_cvcl/lab_" + str(i), lab_image)

    ab_layer = (lab_image[:, :, 1:] // 10).astype(int)
    H, W, _ = lab_image.shape
    for i, j in product(range(H), range(W)):
        ab_data_mit_cvcl[tuple(ab_layer[i, j])] += 1
        
np.save("Data_preprocessed/Model_data/ab_data_mit_cvcl", ab_data_mit_cvcl)

# Sum all ab_data

In [None]:
ab_data_mit_cvcl = np.load("Data_preprocessed/Model_data/ab_data_mit_cvcl.npy")
ab_data_flickr = np.load("Data_preprocessed/Model_data/ab_data_flickr.npy")

ab_data = ab_data_mit_cvcl + ab_data_flickr

np.save("Data_preprocessed/Model_data/ab_data", ab_data)

In [None]:
ab_data = np.load("Data_preprocessed/Model_data/ab_data.npy")
plt.matshow(np.log(ab_data), cmap='jet')
plt.colorbar()

plt.show()

# Make training data

In [None]:
def gaussian_kernel(x, sigma=1):
    return np.exp(-(x/sigma)**2 / 2) / np.sqrt(2*np.pi*sigma**2)

def l2_norm(x, y):
    x = np.array(x)
    y = np.array(y)
    return np.sqrt(np.sum((x - y)**2))

## 1. Get smoothed ab distribution

In [None]:
ab_data = np.load("Data_preprocessed/Model_data/ab_data.npy")
filter_ab = ab_data > 0

ab_distribution = np.zeros((26, 26))
smooth_ab_data = np.zeros((26, 26))
smooth_ab_distribution = np.zeros((26, 26))

for a, b in product(range(-13, 13), repeat=2):
    value = 0
    for i, j in product(range(-13, 13), repeat=2):
        g = gaussian_kernel(a - i, sigma=0.5) * gaussian_kernel(b - j, sigma=0.5)
        value += ab_data[i, j] * g
    smooth_ab_data[a, b] = value

smooth_ab_distribution = smooth_ab_data / smooth_ab_data.sum()

np.save("Data_preprocessed/Model_data/filter_ab", filter_ab)
np.save("Data_preprocessed/Model_data/smooth_ab_distribution", smooth_ab_distribution)

## 2. Make weight matrix

In [None]:
smooth_ab_distribution = np.load("Data_preprocessed/Model_data/smooth_ab_distribution.npy")
filter_ab = np.load("Data_preprocessed/Model_data/filter_ab.npy")
Q = np.sum(filter_ab)

weight_matrix = np.zeros((26, 26))
lam = 0.5

for i, j in product(range(26), repeat=2):
    weight_matrix[i, j] = 1 / ((1 - lam)*smooth_ab_distribution[i, j] + lam/Q)
weight_matrix = weight_matrix * filter_ab
weight_matrix /= np.sum(weight_matrix * smooth_ab_distribution)

np.save("Data_preprocessed/Model_data/weight_matrix", weight_matrix)

## 3. Get train_X, train_y, weight_train

In [None]:
weight_matrix = np.load("Data_preprocessed/Model_data/weight_matrix.npy")
filter_ab = np.load("Data_preprocessed/Model_data/filter_ab.npy")
Q = np.sum(filter_ab)

tic = time()
for n in range(2687):
    if n % 100 == 0:
        toc = time()
        print("Iter: %d / Took %.4f seconds." % (n, toc - tic))

    image = np.load("Data_preprocessed/lab_image/mit_cvcl/lab_%d.npy" % n)
    H, W, _ = image.shape
    X_train = image[:, :, 0].astype(np.float16).reshape(1, H, W, 1)
    y_train = np.zeros((1, H//4, W//4, Q), dtype=np.float16)
    weight_train = np.zeros((1, H//4, W//4), dtype=np.float16)
    
    y_ab = image[:, :, 1:][2:-1:4, :, :][:, 2:-1:4, :] / 10
    for h, w in product(range(H//4), range(W//4)):
        distance_set = np.zeros((9, 3))
        d = 0
        for i, j in product(range(-1, 2), repeat=2):
            distance_set[d] = (i, j, 10*l2_norm((i+0.5, j+0.5), y_ab[h, w] % 1))
            d += 1
        order = distance_set[distance_set[:, 2].argsort()]

        z = np.zeros((26, 26), dtype=np.float16)
        for i in range(5):
            z[tuple((y_ab[h, w] + order[i, :2]).astype(int))] = gaussian_kernel(order[i, 2], sigma=5)
        z /= np.sum(z)
        y_train[0, h, w] = z[filter_ab]
        
        weight_train[0, h, w] = weight_matrix[tuple(y_ab.astype(int)[h, w])]
    
    np.save("Data_preprocessed/training_data/mit_cvcl/X_train_%d" % n, X_train)
    np.save("Data_preprocessed/training_data/mit_cvcl/y_train_%d" % n, y_train)
    np.save("Data_preprocessed/training_data/mit_cvcl/weight_train_%d" % n, weight_train)

# Get batch

In [None]:
size = 10
X_train_batch = np.zeros((size, 256, 256, 1), dtype=np.float16)
y_train_batch = np.zeros((size, 64, 64, 266), dtype=np.float16)
weight_train_batch = np.zeros((size, 64, 64), dtype=np.float16)
for n in range(2400):
    i = n % size
    X_train = np.load("Data_preprocessed/training_data/mit_cvcl/X_train_%d.npy" % n)
    y_train = np.load("Data_preprocessed/training_data/mit_cvcl/y_train_%d.npy" % n)
    weight_train = np.load("Data_preprocessed/training_data/mit_cvcl/weight_train_%d.npy" % n)
    
    X_train_batch[i] = X_train[0]
    y_train_batch[i] = y_train[0]
    weight_train_batch[i] = weight_train[0]
    
    if i == size - 1:
        np.save("Data_preprocessed/training_data/mit_cvcl_batch/X_train_%d" % (n+1), X_train_batch)
        np.save("Data_preprocessed/training_data/mit_cvcl_batch/y_train_%d" % (n+1), y_train_batch)
        np.save("Data_preprocessed/training_data/mit_cvcl_batch/weight_train_%d" % (n+1), weight_train_batch)

# Get Color_space

In [None]:
color_space = np.zeros((26, 26, 2), dtype=int)
for i in range(-13, 13):
    color_space[i, :, 0] = 10*i + 5
    color_space[:, i, 1] = 10*i + 5
    
np.save("Data_preprocessed/Model_data/color_space", color_space)

# Model

In [None]:
N = 1

In [None]:
Q = 266
T = 0.38

H, W = 256, 256
X = tf.placeholder(tf.float32, [N, H, W, 1], name='X')
y = tf.placeholder(tf.float32, [N, H//4, W//4, Q], name='y')
weight = tf.placeholder(tf.float32, [N, H//4, W//4], name="weight")

def weight_variable(shape, name=None):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial, name=name)

def bias_variable(shape, name=None):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial, name=name)

def conv2d(x, W, stride=1, dilation=1, name=None):
    strides = [1, stride, stride, 1]
    dilations = [1, 1, dilation, dilation]
    return tf.nn.conv2d(x, W, padding="SAME", strides=strides, dilations=dilations, name=name)

def conv2d_T(x, W, output_shape, stride=1, name=None):
    strides = [1, stride, stride, 1]
    return tf.nn.conv2d_transpose(x, W, output_shape=output_shape, strides=strides, name=name)

def batch_normalize(x, shape, epsilon=1e-3, name=None):
    batch_mean, batch_var = tf.nn.moments(x, [0])
    beta = tf.Variable(tf.zeros(shape))
    scale = tf.Variable(tf.ones(shape))
    return tf.nn.batch_normalization(x, batch_mean, batch_var, beta, scale, epsilon, name=name)

In [None]:
with tf.name_scope("conv1"):
    W_conv1_1 = weight_variable([3, 3, 1, 64])
    b_conv1_1 = bias_variable([64])
    conv1_1 = conv2d(X, W_conv1_1) + b_conv1_1
    conv1_1 = tf.nn.relu(conv1_1)

    W_conv1_2 = weight_variable([3, 3, 64, 64])
    b_conv1_2 = bias_variable([64])
    conv1_2 = conv2d(conv1_1, W_conv1_2, stride=2) + b_conv1_2 # Halve size (1/2)
    conv1_2 = tf.nn.relu(conv1_2)

    conv1_norm = batch_normalize(conv1_2, [64])                                 

with tf.name_scope("conv2"):
    W_conv2_1 = weight_variable([3, 3, 64, 128])
    b_conv2_1 = bias_variable([128])
    conv2_1 = conv2d(conv1_norm, W_conv2_1) + b_conv2_1
    conv2_1 = tf.nn.relu(conv2_1)

    W_conv2_2 = weight_variable([3, 3, 128, 128])
    b_conv2_2 = bias_variable([128])
    conv2_2 = conv2d(conv2_1, W_conv2_2, stride=2) + b_conv2_2 # Halve size (1/4)
    conv2_2 = tf.nn.relu(conv2_2)

    conv2_norm = batch_normalize(conv2_2, [128])

with tf.name_scope("conv3"):
    W_conv3_1 = weight_variable([3, 3, 128, 256])
    b_conv3_1 = bias_variable([256])
    conv3_1 = conv2d(conv2_norm, W_conv3_1) + b_conv3_1
    conv3_1 = tf.nn.relu(conv3_1)

    W_conv3_2 = weight_variable([3, 3, 256, 256])
    b_conv3_2 = bias_variable([256])
    conv3_2 = conv2d(conv3_1, W_conv3_2) + b_conv3_2
    conv3_2 = tf.nn.relu(conv3_2)

    W_conv3_3 = weight_variable([3, 3, 256, 256])
    b_conv3_3 = bias_variable([256])
    conv3_3 = conv2d(conv3_2, W_conv3_3, stride=2) + b_conv3_3 # Halve size (1/8)
    conv3_3 = tf.nn.relu(conv3_3)

    conv3_norm = batch_normalize(conv3_3, [256])

with tf.name_scope("conv4"):
    W_conv4_1 = weight_variable([3, 3, 256, 512])
    b_conv4_1 = bias_variable([512])
    conv4_1 = conv2d(conv3_norm, W_conv4_1) + b_conv4_1
    conv4_1 = tf.nn.relu(conv4_1)

    W_conv4_2 = weight_variable([3, 3, 512, 512])
    b_conv4_2 = bias_variable([512])
    conv4_2 = conv2d(conv4_1, W_conv4_2) + b_conv4_2
    conv4_2 = tf.nn.relu(conv4_2)

    W_conv4_3 = weight_variable([3, 3, 512, 512])
    b_conv4_3 = bias_variable([512])
    conv4_3 = conv2d(conv4_2, W_conv4_3) + b_conv4_3
    conv4_3 = tf.nn.relu(conv4_3)

    conv4_norm = batch_normalize(conv4_3, [512])

with tf.name_scope("conv5"):
    W_conv5_1 = weight_variable([3, 3, 512, 512])
    b_conv5_1 = bias_variable([512])
    conv5_1 = conv2d(conv4_norm, W_conv5_1, dilation=2) + b_conv5_1
    conv5_1 = tf.nn.relu(conv5_1)

    W_conv5_2 = weight_variable([3, 3, 512, 512])
    b_conv5_2 = bias_variable([512])
    conv5_2 = conv2d(conv5_1, W_conv5_2, dilation=2) + b_conv5_2
    conv5_2 = tf.nn.relu(conv5_2)

    W_conv5_3 = weight_variable([3, 3, 512, 512])
    b_conv5_3 = bias_variable([512])
    conv5_3 = conv2d(conv5_2, W_conv5_3, dilation=2) + b_conv5_3
    conv5_3 = tf.nn.relu(conv5_3)

    conv5_norm = batch_normalize(conv5_3, [512])

with tf.name_scope("conv6"):
    W_conv6_1 = weight_variable([3, 3, 512, 512])
    b_conv6_1 = bias_variable([512])
    conv6_1 = conv2d(conv5_norm, W_conv6_1, dilation=2) + b_conv6_1
    conv6_1 = tf.nn.relu(conv6_1)

    W_conv6_2 = weight_variable([3, 3, 512, 512])
    b_conv6_2 = bias_variable([512])
    conv6_2 = conv2d(conv6_1, W_conv6_2, dilation=2) + b_conv6_2
    conv6_2 = tf.nn.relu(conv6_2)

    W_conv6_3 = weight_variable([3, 3, 512, 512])
    b_conv6_3 = bias_variable([512])
    conv6_3 = conv2d(conv6_2, W_conv6_3, dilation=2) + b_conv6_3
    conv6_3 = tf.nn.relu(conv6_3)

    conv6_norm = batch_normalize(conv6_3, [512])

with tf.name_scope("conv7"):
    W_conv7_1 = weight_variable([3, 3, 512, 512])
    b_conv7_1 = bias_variable([512])
    conv7_1 = conv2d(conv6_norm, W_conv7_1) + b_conv7_1
    conv7_1 = tf.nn.relu(conv7_1)

    W_conv7_2 = weight_variable([3, 3, 512, 512])
    b_conv7_2 = bias_variable([512])
    conv7_2 = conv2d(conv7_1, W_conv7_2) + b_conv7_2
    conv7_2 = tf.nn.relu(conv7_2)

    W_conv7_3 = weight_variable([3, 3, 512, 512])
    b_conv7_3 = bias_variable([512])
    conv7_3 = conv2d(conv7_2, W_conv7_3) + b_conv7_3
    conv7_3 = tf.nn.relu(conv7_3)

    conv7_norm = batch_normalize(conv7_3, [512])

with tf.name_scope("deconv8"):
    W_conv8_1 = weight_variable([4, 4, 256, 512])
    b_conv8_1 = bias_variable([256])
    conv8_1 = conv2d_T(conv7_norm, W_conv8_1, output_shape=[N, H//4, W//4, 256], stride=2) + b_conv8_1 # Double size (1/4)
    conv8_1 = tf.nn.relu(conv8_1)

    W_conv8_2 = weight_variable([3, 3, 256, 256])
    b_conv8_2 = bias_variable([256])
    conv8_2 = conv2d(conv8_1, W_conv8_2) + b_conv8_2
    conv8_2 = tf.nn.relu(conv8_2)

    W_conv8_3 = weight_variable([3, 3, 256, 256])
    b_conv8_3 = bias_variable([256])
    conv8_3 = conv2d(conv8_2, W_conv8_3) + b_conv8_3
    conv8_3 = tf.nn.relu(conv8_3)

with tf.name_scope("Softmax"):
    W_conv8_Q = weight_variable([1, 1, 256, Q])
    b_conv8_Q = bias_variable([Q])
    conv8_Q = conv2d(conv8_3, W_conv8_Q) + b_conv8_Q

    class8_Q_rh = tf.nn.softmax(conv8_Q / T, name="Prediction")

In [None]:
with tf.name_scope("Loss"):
    cross_entropy = - tf.reduce_sum(y * tf.log(class8_Q_rh + 1e-8), axis=3)
    cost = tf.reduce_sum(weight * cross_entropy, name="Cost")
    train_step = tf.train.AdamOptimizer().minimize(cost)

In [None]:
saver = tf.train.Saver()

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    saver.save(sess, "Model/test")

In [None]:
for _ in range(1000):
    s = (np.random.randint(240) + 1) * 10
    t = np.random.choice(10, N, replace=False)
    X_train = np.load("Data_preprocessed/training_data/mit_cvcl_batch/mit_cvcl_X_train_%d.npy" % s)[t]
    y_train = np.load("Data_preprocessed/training_data/mit_cvcl_batch/mit_cvcl_y_train_%d.npy" % s)[t]
    weight_train = np.load("Data_preprocessed/training_data/mit_cvcl_batch/mit_cvcl_weight_train_%d.npy" % s)[t]

    print("Image %d start." % s)

    with tf.Session() as sess:
        saver.restore(sess, tf.train.latest_checkpoint("Model/"))
        tic = time()
        for i in range(1000):
            if i % 200 == 0:
                toc = time()
                print("Iter: %5d / Took %7.2f seconds. / Loss: %f" % \
                      (i, toc - tic, sess.run(cost, feed_dict={X: X_train, y: y_train, weight: weight_train})))
                saver.save(sess, "Model/test")
            
            if i == 950:
                prediction = sess.run(class8_Q_rh, feed_dict={X: X_train, keep_prob: 1})
                N, H, W, _ = prediction.shape

                annealed_mean = np.sum(np.expand_dims(prediction, axis=4) * np.tile(color_space[filter_ab], (N, H, W, 1, 1)), axis=3)

                image_lab = np.zeros((N, H, W, 3))
                image_lab[:, :, :, 0] = X_train[:, 2:-1:4, :, 0][:, :, 2:-1:4]
                image_lab[:, :, :, 1:] = annealed_mean

                image_rgb = np.zeros((*X_train.shape[:3], 3), dtype=np.uint8)
                for i in range(N):
                    image_rgb[i] = (255*color.lab2rgb(upscale(image_lab[i], 4))).astype(np.uint8)

                figure = plt.figure()
                plt.imshow(image_rgb[0])
                plt.show()

            sess.run(train_step, feed_dict={X: X_train, y: y_train, weight: weight_train})
    print(ctime(), "\n")