In [4]:
from __future__ import print_function, division

from keras.layers import Input, Dense, Flatten, Dropout, Reshape, Concatenate
from keras.layers import BatchNormalization, Activation, Conv2D, Conv2DTranspose, UpSampling2D
# from keras.layers.advanced_activations import LeakyReLU
from tensorflow.keras.layers import LeakyReLU
from keras.models import Model
from keras.optimizers import Adam
# from keras.engine.saving import load_model

from keras.datasets import cifar10
import keras.backend as K

import matplotlib.pyplot as plt
import os
import sys
import numpy as np

%pylab inline

from PIL import Image
from tqdm import tnrange, tqdm_notebook, tqdm
import cv2
import random

import tensorflow as tf

from tensorflow.keras.preprocessing.image import ImageDataGenerator

from sklearn.linear_model import LogisticRegression
import random

%pylab is deprecated, use %matplotlib inline and import the required libraries.
Populating the interactive namespace from numpy and matplotlib


In [5]:
def list_image_files(directory):
    files = sorted(os.listdir(directory))
    return [os.path.join(directory, f) for f in files if is_an_image_file(f)]

In [6]:

def is_an_image_file(filename):
    IMAGE_EXTENSIONS = ['.png', '.jpg', '.jpeg']
    for ext in IMAGE_EXTENSIONS:
        if ext in filename:
            return True
    return False

In [7]:
def load_image(path):
    img = cv2.imread(path[0])
    
    # Make sure all images are 256 x 256 by cropping them
    r, c = img.shape[:2]
    r_diff = (r - 256) // 2
    c_diff = (c - 256) // 2
    cropped = img[r_diff:256 + r_diff, c_diff:256 + c_diff] 
    return cropped

In [8]:
def load_images(path, n_images=-1, shouldShuffle=False):
    all_image_paths = list_image_files(path)
    if shouldShuffle:
        random.shuffle(all_image_paths)
    
    if n_images < 0:
        n_images = len(all_image_paths)
    images_l, images_ab = [], []
    
    # Initialize a progress bar with max of n_images
    pbar = tqdm_notebook(total = n_images, desc="Loading Images...")
    
    for path in zip(all_image_paths):
        img = load_image(path)
        lab_img = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
        lab_img = preprocess_image(lab_img)
        
        l = lab_img[:,:,0]
        l = l[:,:,np.newaxis]
        # Include all 3 channels, overwrite 1st channel with 0's
        ab = lab_img[:,:,1:]

        images_l.append(l)
        images_ab.append(ab)

        images_loaded = len(images_l)
        
        # Increase progress by one
        pbar.update(1)
        
        if images_loaded > n_images - 1: 
            break

    return {
        'l': np.array(images_l),
        'ab': np.array(images_ab)
    }

In [9]:
RESHAPE = (256,256)

def preprocess_image(cv_img):
    img = (cv_img - 127.5) / 127.5
    return img

def deprocess_image(img):
    img = (img * 127.5) + 127.5
    return img.astype('uint8')

In [10]:
def save_image(np_arr, path):
    img = np_arr * 127.5 + 127.5
    im = Image.fromarray(img)
    im.save(path)

In [11]:
def get_generator(H, W, k):
    # Inputs: height and width of the input image
    # Returns the model, which generates the AB channels

    # Pix2pix adapted from 
    # https://github.com/eriklindernoren/Keras-GAN/blob/master/pix2pix/pix2pix.py

    def conv2d(layer_input, filters, f_size=4, bn=True):
        """Layers used during downsampling"""
        d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
        d = LeakyReLU(alpha=0.2)(d)
        if bn:
            d = BatchNormalization(momentum=0.8)(d)
        return d

    def deconv2d(layer_input, skip_input, filters, f_size=4, dropout_rate=0):
        """Layers used during upsampling"""
        u = UpSampling2D(size=2)(layer_input)
        u = Conv2D(filters, kernel_size=f_size, strides=1, padding='same', activation='relu')(u)
        if dropout_rate:
            u = Dropout(dropout_rate)(u)
        u = BatchNormalization(momentum=0.8)(u)
        u = Concatenate()([u, skip_input])
        return u

    gf = 64 # Number of filters in the first layer of G

    noise_in = Input(shape=(100,))
    condition_in = Input(shape=(H, W, 1))
    
    # pass noise through a FC layer to get it to the right size
    noise = Dense(H * H)(noise_in)

    # reshape to be the size of an image channel
    noise = Reshape((H, H, 1))(noise)
    
    # stick the (somewhat modified) noise as the second channel after
    # the gray input. Assuming new dimension of hid will be
    # B x 256 x 256 x 2, where B is the batch size.
    d0 = Concatenate(axis=-1)([condition_in, noise])
#     d0 = condition_in # Don't need noise since it's being ignored anyway

    # U-NET
    # Downsampling
    d1 = conv2d(d0, gf, bn=False)
    d2 = conv2d(d1, gf*2)
    d3 = conv2d(d2, gf*4)
    d4 = conv2d(d3, gf*8)
    d5 = conv2d(d4, gf*8)
    d6 = conv2d(d5, gf*8)
    d7 = conv2d(d6, gf*8)

    # Upsampling
    u1 = deconv2d(d7, d6, gf*8)
    u2 = deconv2d(u1, d5, gf*8)
    u3 = deconv2d(u2, d4, gf*8)
    u4 = deconv2d(u3, d3, gf*4)
    u5 = deconv2d(u4, d2, gf*2)
    u6 = deconv2d(u5, d1, gf)

    u7 = UpSampling2D(size=2)(u6)
    
    # Final 2-channel AB image with values between -1 and 1
    img_out = Conv2D(2*k, kernel_size=4, strides=1, padding='same', activation='tanh', name='pred_ab')(u7)

    # Make Model
    model = Model(inputs=[noise_in, condition_in], outputs=img_out)
    
    # Show summary of layers
    print("Generator Model:")
    model.summary()

    return model


In [12]:
def get_discriminator(H, W, k):
    # Inputs: height and width of the input image
    # Returns the model, which predicts real/fake
    # over a set of spatial regions (i.e., predicts a matrix instead of a scalar).

    # Pix2pix adapted from 
    # https://github.com/eriklindernoren/Keras-GAN/blob/master/pix2pix/pix2pix.py

    def d_layer(layer_input, filters, f_size=4, bn=True):
        """Discriminator layer"""
        d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
        d = LeakyReLU(alpha=0.2)(d)
        if bn:
            d = BatchNormalization(momentum=0.8)(d)
        return d

    # Number of filters in the first layer of D
    df = 64

    img_in = Input(shape=(H, W, 2*k)) # AB channels
    condition_in = Input(shape=(H, W, 1)) # L channel
    
    # Concat the L and AB channels
    concat_imgs = Concatenate()([condition_in, img_in])

    d1 = d_layer(concat_imgs, df, bn=False)
    d2 = d_layer(d1, df*2)
    d3 = d_layer(d2, df*4)
    d4 = d_layer(d3, df*8)

    # validity map is a one-channel matrix 1/16 the size of the input (halved 4 times).
    # Each number predicts whether a region of the input is real/fake.
    validity = Conv2D(1*k, kernel_size=4, strides=1, padding='same', name='pred_valid')(d4)

    # Build Model
    model = Model(inputs=[img_in, condition_in], outputs=validity)

    # Show summary of layers
    print("Disciminator Model:")
    model.summary()

    return model

In [13]:
def min_k_diff(y_true, y_pred):
    # Shape: (Batch, H, W, k, 2)
    y_true = K.reshape(y_true, (-1, H, W, k, 2))
    y_pred = K.reshape(y_pred, (-1, H, W, k, 2))

    print("true:", y_true.shape)
    print("pred:", y_pred.shape)

    diff = y_true - y_pred
    diff = K.abs(diff)
    diff = K.mean(diff, axis=(1, 2, 4)) # mean of (H, W, 2) leaves (B, k)
    
    loss_metric = diff

    min_for_each_batch = K.min(loss_metric, axis=1)
    return K.sum(min_for_each_batch) #* .01

In [14]:
from keras.preprocessing import image

def generate_noise(n_samples, noise_dim):
    X = np.random.normal(0, 1, size=(n_samples, noise_dim))
    return X

In [15]:
def save_rgb_img(l, ab, filename):
    # Make sure ab is the right type, generated imgs change to float32
    ab = ab.astype(np.float64)
    
    # Merge
    merged = cv2.merge((l, ab))
    
    # Get between 0, 255
    deprocessed = deprocess_image(merged)
    
    # Change to BGR (Curse you CV2!!!)
    rgb = cv2.cvtColor(deprocessed, cv2.COLOR_LAB2BGR)
    
    # Save
    cv2.imwrite(save_path + filename, rgb)

In [16]:
def make_discrim_models():
    # Load training data
    max_imgs = 2500
    if len(list_image_files(train_dataset)) > max_imgs:
        num_imgs = max_imgs
    else:
        num_imgs = -1

    train_data = load_images(train_dataset, num_imgs, True)
    train_l, train_ab = train_data['l'], train_data['ab']

    # Make generated data
    noise = generate_noise(len(train_l), 100)
    train_predictions = generator.predict([noise, train_l])

    # Tile truth data
    tiled = np.tile(train_ab, k)

    # Make discrim predictions
    generated_discrim_values = discriminator.predict([train_predictions, train_l])
    true_discrim_values = discriminator.predict([tiled, train_l])
    
    # Flatten discrim values
    n = generated_discrim_values.shape
    flat_gen_discrim_val = generated_discrim_values.reshape((n[0], n[1] * n[2], n[3]))
    flat_true_discrim_val = true_discrim_values.reshape((n[0], n[1] * n[2], n[3]))
    
    # Make labels
    model_labels = np.concatenate((np.zeros(len(generated_discrim_values)), np.ones(len(true_discrim_values))))
    
    # Loop through and store models
    models = []
    for i in range(k):
        model_data = np.concatenate((flat_gen_discrim_val[:,:,i], flat_true_discrim_val[:,:,i]))
        model = LogisticRegression(max_iter=1000).fit(model_data, model_labels)
        models.append(model)
        
    return models

In [26]:
# Model parameters

# Program will grab 100 epoch weights for G,D in ./Output/trained_model_name/
# trained_model_name = "lsun_colorization_full_model"
# trained_model_name = "places2_colorization_read_imgs_flow_test"
trained_model_name = "Output_final_k_1_with_noise"
# trained_model_name = 'ablation_circles_equal_l_k_3_no_noise_no_aug'

# Program will save output in ./TestOutput/trained_model_name by default
# Change from none to save output in ./TestOutput/overwrite_save_dir
# overwrite_save_dir = "WHJ_Predictions"
# overwrite_save_dir = "LSUN_on_places2"
overwrite_save_dir = None

# Should output have the same file names as the test images?
preserve_img_names = True

# Must match k from model
k = 1

# Testing parameters
num_test_imgs = 100

# Should program randomly colorize some test images and leave some ground truth?
# This was used to generate data for a user study
random_select_gt_or_colorzed = False

# Specify what dataset to test on
# dataset = 'circle_pairs_equal_l_red_blue/'
# dataset = 'new_circles/'
# dataset = '../Colorization_GAN/circle_pairs/'
# test_dataset = 'lsun/test/'
test_dataset = 'Test_Output/'
# train_dataset = 'places2/train/subdir/'
# test_dataset = 'places2/10_imgs_per_cat/'
# dataset = 'William_Henry_Jackson/WHJ_Resized_Square/'

In [24]:
# Find where model is located
saved_GAN_location = "Output/" + trained_model_name + "/GAN_Weights_Epoch_100.h5"
saved_D_location = "Output/" + trained_model_name + "/Discriminator_Weights_Epoch_100.h5"

# Create folder to store output
generic_output_folder = "Test_Output/"

if overwrite_save_dir is None:
    new_output_folder = trained_model_name + "/"
    save_path = generic_output_folder + new_output_folder
else:
    save_path = generic_output_folder + overwrite_save_dir + "/"
    
if random_select_gt_or_colorzed:
    save_path += "random_colorized_or_ground_truth/"
else:
    save_path += "all_predictions/"

# Ensure output can save in desired location
if not os.path.exists(save_path):
    os.makedirs(save_path)

In [25]:
# ===================================
# COULD NOT HANDLE LARGE TRAINING SET
# ===================================

# Get training images
# Load dataset, convert to LAB, normalize to range [-1, 1]
# data = load_images(dataset + 'test', num_test_imgs)
# data = load_images(dataset + "/test/", num_test_imgs)
data = load_images(test_dataset, num_test_imgs)



# Only want l channel
l_channel_imgs, ab_channel_imgs = data['l'], data['ab']

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'TA-2023-2024-4/Test_Output'