In [0]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

In [0]:
!pwd

/content


In [0]:
cd drive/My Drive/BW2G

In [0]:
!pip install numpy==1.16.1

In [0]:
import os
import cv2
import time
import h5py
import keras
import shutil
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from PIL import Image
from keras.models import Sequential, Model, load_model
from keras.layers import Input, InputLayer, Conv2D, MaxPooling2D, UpSampling2D, Concatenate, Conv2DTranspose, Dropout, BatchNormalization
from keras.callbacks import EarlyStopping, TensorBoard
from keras import regularizers, losses, optimizers
from sklearn.model_selection import train_test_split

Using TensorFlow backend.


In [0]:
dim_size = 256
l_channel = 1
ab_channel = 2

dir_size = 9601

dataset_path = 'images/0'
dataset_test_path = 'images/1'

resized_train_path = 'squared_images/flowers/'
resized_test_path = 'squared_images/test/'

train_path = 'bw_images/'
train_path_v2 = 'blur_images/'
validate_path = 'gray_images/'

test_train_path = 'bw_test_images/'
test_val_path = 'gray_test_images/'

# Hyperparameters
models_path = 'saved_parameters/'

saved_model = 'saved_model-adam-mse.h5'
saved_weights = 'saved_weights.h5'

saved_data = 'loaded_images_bin2gr_gr2rgb.npz'

In [0]:
# Create directory
def create_dir(folder_name):
    folder = os.path.join(folder_name)
    if not os.path.exists(folder):
        os.mkdir(folder)
    return folder
                          
def save_path(dir_path, file, appended_name):
    file_base = os.path.basename(file)
    file_name, file_ext = os.path.splitext(file_base)   
    return dir_path + file_name + appended_name + file_ext

def file_count(dir_path):
    counter = 0
    for file in os.listdir(dir_path):
        counter += 1
    return counter

#-------------------------------------------------------------------------------
# Image Preprocessing
# Edge Detection Preprocessing
def blur_images(rgb_arr):
    blur_arr = []
    for i in range(0, len(rgb_arr)):
        blur = cv2.GaussianBlur(rgb_arr[i], (3,3), 0)
        blur_arr.append(blur)
    return blur_arr

def get_edges(rgb_arr):
    edge_arr = []
    for i in range(0, len(rgb_arr)):
        grayscale = cv2.cvtColor(rgb_arr[i], cv2.COLOR_BGR2GRAY)
        x_edges = cv2.Sobel(grayscale, cv2.CV_16S, 1, 0)
        y_edges = cv2.Sobel(grayscale, cv2.CV_16S, 0, 1)
        
        # Remove negative values
        x_abs = cv2.convertScaleAbs(x_edges)
        y_abs = cv2.convertScaleAbs(y_edges)
        
        # Edges are white, background is black
        detected_edges = cv2.addWeighted(x_abs, 0.5, y_abs, 0.5, 0)
        # Prepare colorspaces, Edges are black, background is white
        thresh, inverted_colors = cv2.threshold(detected_edges, 50, 255, cv2.THRESH_BINARY_INV)
        edge_arr.append(inverted_colors)
    return edge_arr

def RBG2LAB(rgb_np):
    lab_arr = []
    for i in range(rgb_np.shape[0]):
        lab_arr.append(cv2.cvtColor(rgb_np[i], RGB2LAB))
    return np.array(lab_arr)

# Resize Image
# Creates a white image and pastes data into white image
def square_images(dataset_path, size):
    dir_path = create_dir('squared_images/')
    for file in os.listdir(dataset_path):
        image = Image.open(os.path.join(dataset_path, file))
        width, height = image.size
        if(width != height and width != size):
            squared_image = image.resize((size,size), Image.LANCZOS)
            #squared_image.paste(image)
            squared_image.save(save_path(dir_path, file, '_SQ'))

#-------------------------------------------------------------------------------
# Load in Preprocessed Images
def load_saved(saved_data):
    data = np.load(saved_data)
    return data['BIN'], data['GR']#, data['RGB'], data['PR_GR'], data['PR_RGB']

def save_loaded(save_dir, save_name, binary=None, grayscale=None, rgb=None, predicted_gray=None, predicted_rgb=None, **kwargs):
    save_dest = create_dir(save_dir)
    save_dest = os.path.join(save_dir, save_name)
    #np.savez(save_dest, BIN=binary, GR=grayscale, RGB=rgb, PR_GR=predicted_gray, PR_RGB=predicted_rgb)
    p.savez(save_dest, BIN=binary, GR=grayscale)
    
def load_images(dataset_path, size=None):
    if size is None:
        size = file_count(dataset_path)
        #size = dir_size
    img_arr = []
    name_arr = []
    
    for file, i in zip(os.listdir(dataset_path), range(0, size)):
        name = os.path.splitext(os.path.basename(file))[0]
        img = cv2.imread(os.path.join(dataset_path, file))
        name_arr.append(name)
        img_arr.append(img)
    return name_arr, img_arr
    
def save_images(train_path, validate_path, names_arr, gray_arr, gray_tag, bw_arr, bw_tag):
    dir_path = create_dir(train_path)
    dir_path = create_dir(validate_path)
    
    gray_np = np.asarray(gray_arr)
    bw_np = np.asarray(bw_arr)
    
    for i in range(0, len(gray_arr)):
        gray_path = save_path(validate_path, names_arr[i], '-{}.jpg'.format(gray_tag))
        bw_path = save_path(train_path, names_arr[i], '-{}.jpg'.format(bw_tag))
        cv2.imwrite(gray_path, gray_np[i])
        cv2.imwrite(bw_path, bw_np[i])
    
def convert_RGB2GR(validate_path, img_arr):
    gray_arr = []
    grayscale_path = create_dir(validate_path)
    
    for i in range(0, len(img_arr)):
        grayscale = cv2.cvtColor(img_arr[i], cv2.COLOR_BGR2GRAY) 
        gray_arr.append(grayscale)
    return gray_arr, np.array(gray_arr)

def convert_BLUR2EDGE(train_path, rgb_arr):
    blur_path = create_dir(train_path)
    blur_rgb = blur_images(rgb_arr)
    edges = get_edges(blur_rgb)
    return edges, np.array(edges)
 
def load_model_data(source_path, train_path, validate_path, model_dir, save_name, load_train_size=None, **kwargs):
    img_names, loaded_RGB = load_images(source_path, load_train_size)
    
    loaded_GR, gray_np = convert_RGB2GR(validate_path, loaded_RGB)
    loaded_BW , bw_np = convert_BLUR2EDGE(train_path_v2, loaded_RGB)
    
    #save_images(train_path, validate_path, img_names, loaded_GR, 'GR', loaded_BW, 'BW')
    save_loaded(model_dir, save_name, binary=bw_np, grayscale=gray_np, rgb=np.array(loaded_RGB))
    return bw_np, gray_np, np.array(loaded_RGB)
  
    
#-------------------------------------------------------------------------------
def test_split(bw_arr, gray_arr, size):
    test_bw = []
    test_gray = []
    for count in range(len(bw_arr), len(bw_arr) - size, -1):
        test_bw.append(bw_arr[count])
        test_gray.append(gray_arr[count])
    
    test_bw_np = test_bw.array(test_bw)
    test_gray_np = test_gray.array(test_gray)
    return test_bw_np, test_gray_np
                                
# Load training and validating images with cv2 to capture channel dim
def load_binary_gray(dir_path, size=None):
    if size is None:
        #size = file_count(dataset_path)
        size = dir_size
    data_arr = []
    for file, count in zip(os.listdir(dir_path), range(0, size)):
        img_read = cv2.imread(os.path.join(dir_path, file), 0)
        img_read = np.array(img_read)
        data_arr.append(img_read)
    return data_arr

def test_load(dir_path):
    data_arr = []
    counter = 0
    for file in sorted(os.listdir(dir_path)):
        img_read = cv2.imread(os.path.join(dir_path, file), 0)
        img_read = np.array(img_read)
        print(file)
        print(img_read.shape)
        data_arr.append(img_read)
        counter += 1
        if counter == 2:
            return data_arr
    return data_arr

#-------------------------------------------------------------------------------
# Calculate epoch size to traverse all training data
def step_size(dir_path, batch_size):
    counter = 0
    for file in os.listdir(dir_path):
        if os.path.isfile(os.path.join(dir_path, file)):
            counter += 1
    return int(counter/batch_size)

#-------------------------------------------------------------------------------
# ML Functions
def encoder(filters, kernel_size, stride_size, drop_rate, inputs):
    encode = Conv2D(filters, kernel_size, strides=stride_size, padding = 'same', activation='relu', kernel_regularizer=regularizers.l2(0.001))(inputs)
    encode = BatchNormalization()(encode)
    encode = Dropout(drop_rate)(encode)
    encode = Conv2D(filters, kernel_size, strides=stride_size, padding = 'same', activation='relu', kernel_regularizer=regularizers.l2(0.01))(encode)
    encode = BatchNormalization()(encode)
    pool = MaxPooling2D((2,2), padding='same')(encode)
    return encode, pool

def latent_space(filters, kernel_size, stride_size, inputs):
    latent = Conv2D(filters, kernel_size, strides=stride_size, padding = 'same', activation='relu', kernel_regularizer=regularizers.l2(0.001))(inputs)
    latent = Dropout(drop_rate)(latent)
    latent = BatchNormalization()(latent)
    latent = Conv2D(filters, kernel_size, strides=stride_size, padding = 'same', activation='relu', kernel_regularizer=regularizers.l2(0.001))(latent)
    latent = BatchNormalization()(latent)
    return latent

def decoder(filters, up_kernel, up_strides, de_kernel, de_strides, inputs, up_inputs):
    decode = Conv2DTranspose(filters, up_kernel, strides=up_strides, padding='same')(inputs)
    #decode = UpSampling2D((2,2))(inputs)
    up = Concatenate()([up_inputs, decode])
    decode = Conv2D(filters, de_kernel, strides=de_strides, padding='same', activation='relu', kernel_regularizer=regularizers.l2(0.001))(up)
    decode = Dropout(drop_rate)(decode)
    decode = BatchNormalization()(decode)
    decode = Conv2D(filters, de_kernel, strides=de_strides, padding='same', activation='relu', kernel_regularizer=regularizers.l2(0.001))(decode)
    decode = BatchNormalization()(decode)
    return decode

def autoencoder(train_shape, filters, kernel_sizes, stride_sizes, drop_rate, decoder_activation='sigmoid', **kwargs):
    input_shape = Input(shape=(train_shape))

    encode1, pool1 = encoder(filters[0], kernel_sizes['encode'], stride_sizes['encode'], drop_rate, input_shape)
    encode2, pool2 = encoder(filters[1], kernel_sizes['encode'], stride_sizes['encode'], drop_rate, pool1)
    encode3, pool3 = encoder(filters[2], kernel_sizes['encode'], stride_sizes['encode'], drop_rate, pool2)
    encode4, pool4 = encoder(filters[3], kernel_sizes['encode'], stride_sizes['encode'], drop_rate, pool3)
    encode5, pool5 = encoder(filters[4], kernel_sizes['encode'], stride_sizes['encode'], drop_rate, pool4)

    latent = latent_space(filters[5], en_kernel, en_strides, pool5)

    decode1 = decoder(filters[4], kernel_sizes['upsample'], stride_sizes['upsample'], kernel_sizes['decode'], stride_sizes['decode'], latent, encode5)
    decode2 = decoder(filters[3], kernel_sizes['upsample'], stride_sizes['upsample'], kernel_sizes['decode'], stride_sizes['decode'], decode1, encode4)
    decode3 = decoder(filters[2], kernel_sizes['upsample'], stride_sizes['upsample'], kernel_sizes['decode'], stride_sizes['decode'], decode2, encode3)
    decode4 = decoder(filters[1], kernel_sizes['upsample'], stride_sizes['upsample'], kernel_sizes['decode'], stride_sizes['decode'], decode3, encode2)
    decode5 = decoder(filters[0], kernel_sizes['upsample'], stride_sizes['upsample'], kernel_sizes['decode'], stride_sizes['decode'], decode4, encode1)

    output_shape = Conv2D(1, (1,1), activation='decoder_activation')(decode5)

    auto = Model(inputs=[input_shape], outputs=[output_shape])
    auto.compile(optimizer=optimizers.Adam(0.0001), loss=losses.mean_squared_error, metrics=['mae'])
    #auto.summary()
    return auto

def train_model(model, train_np, validate_np, epoch_size, test_train_np, test_validate_np):
    tensorboard = TensorBoard(log_dir='./logs')
    model.fit(train_np, validate_np, verbose=1, epochs=epoch_size, shuffle=True, validation_data=(test_train_np, test_validate_np), callbacks=[tensorboard])
    
def save_predicted(model, save_dir, train_np, validate_np, save_size):
    predicted_path = create_dir(save_dir)
    predicted_gray = model.predict(train_np)

    for count in range(0, save_size):
        train_img = train_np[count] * 255
        train_img = train_img.astype('int')

        pred_img = predicted_gray[count] * 255
        pred_img = pred_img.astype('int')

        true_img = validate_np[count] * 255
        true_img = true_img.astype('int')
        
        cv2.imwrite(save_path(predicted_path, str(count), '_TR.jpg'), train_img)
        cv2.imwrite(save_path(predicted_path, str(count), '_PR.jpg'), pred_img)
        cv2.imwrite(save_path(predicted_path, str(count), '_GT.jpg'), true_img)
        
#-------------------------------------------------------------------------------
# Pre/Post Visualization
def show_images(image_index, train_np, validate_np, predicted_np=None, rgb_np=None, mode='PRE', model='B2G', row=1, col=4, **kwargs):
    figure = plt.figure(figsize=(10,10))
    show_img = figure.add_subplot(row, col, 1)
    show_img.imshow(train_np[image_index].squeeze(), cmap='gray')
    show_img = figure.add_subplot(row, col, 2)
    show_img.imshow(validate_np[image_index].squeeze(), cmap='gray')
    if mode == 'POST':
        show_img = figure.add_subplot(row, col, 3)
        show_img.imshow(predicted_np[image_index].squeeze(), cmap='gray')
        if model == 'B2RGB':
            show_img = figure.add_subplot(row, col, 4)
            show_img.imshow(rgb_np[image_index].squeeze())

In [0]:
# Resize Image
square_images(dataset_path, dim_size)

In [0]:
# Prepocess, load image into numpy arrays, and saves data to .npz
train_bin, val_gray, _ = load_model_data(resized_train_path, train_path, validate_path, models_path, saved_data, None)

In [0]:
# Load in preprocessed from .npz

#train_bin, val_gray, val_rgb, _, _ = load_saved(os.path.join(models_path, saved_data))
#train_bin, test_train_bin, val_gray, test_val_gray, val_rgb, test_val_rgb = train_test_split(train_bin, val_gray, val_rgb, test_size=0.2)
train_bin, val_gray, val_rgb, _, _ = load_saved(os.path.join(models_path, saved_data))
train_bin, test_train_bin, val_gray, test_val_gray = train_test_split(train_bin, val_gray, test_size=0.2)

In [0]:
#Data Prepocessing (Normalization, augmentation)
generator = ImageDataGenerator(rescale=1./255, rotation_range=30,
                            width_shift_range=0.1, height_shift_range=0.1, 
                            shear_range=0.2,zoom_range=0.2,
                            horizontal_flip=True, fill_mode='nearest')
#Dependent on file structure, each subfolder is a class
#Image size is left at defaulted 256x256
augmented = generator.flow_from_directory(data_dir, save_to_dir=train_dir,
                                         save_format='jpg')
#Generate augmentation
aug_count = 0
aug_size = 100
while(aug_count < aug_size):
    next(augmented)
    aug_count += 1

In [0]:
# Load images into arrays
'''
#Testing functions
train_arr = load_binary_gray(train_path, 399)
validate_arr = load_binary_gray(validate_path, 399)
test_train_arr = load_binary_gray(test_train_path, 99)
test_val_arr = load_binary_gray(test_val_path, 99)

train_arr = test_load(train_path)
validate_arr = test_load(validate_path)
test_arr = test_load(test_path)

# Convert image array into numpy array
train_np = np.array(train_arr)
validate_np = np.array(validate_arr)
test_train_np = np.array(test_train_arr)
test_val_np = np.array(test_val_arr)
'''
train_bin = train_bin.astype('float32')
val_gray = val_gray.astype('float32')
test_train_bin = test_train_bin.astype('float32')
test_val_gray = test_val_gray.astype('float32')
#val_rgb = val_rgb.astype('float32')
#test_val_rgb = test_val_rgb.astype('float32')

# Reshape to include channel dim for grayscale/black and white images
train_bin = train_bin.reshape(train_bin.shape[0], dim_size, dim_size, l_channel)
val_gray = val_gray.reshape(val_gray.shape[0], dim_size, dim_size, l_channel)
test_train_bin = test_train_bin.reshape(test_train_bin.shape[0], dim_size, dim_size, l_channel)
test_val_gray = test_val_gray.reshape(test_val_gray.shape[0], dim_size, dim_size, l_channel)
#val_rgb = RGB2LAB(val_rg)
#test_val_rgb = RGB2LAB(test_val_rgb)

# Normalize LAB, L [0,100], ab[-127,127]
#train_gray = (val_gray - min(val_gray))/(max(val_gray) - min(val_gray)) * 100
t#est_train_gray = (test_val_gray - min(test_val_gray))/(max(test_val_gray) - min(test_val_gray)) * 100


# Normalize Binary [0,1]
train_bin = train_bin/255
val_gray = val_gray/255
test_train_bin = test_train_bin/255
test_val_gray = test_val_gray/255

In [0]:
# Check data size, dimensions, and channels
print(train_bin.shape)
print(train_bin.dtype)
print(val_gray.shape)
print(val_gray.dtype)
print(test_train_bin.shape)
print(test_val_gray.shape)

# Check dataset images
show_images(0, train_bin, val_gray, mode='POST')

In [0]:
# Loads saved architecture and weights
auto_BIN2GR = load_model(os.path.join(models_path, "saved_model-adam-mse-512-dr50-flowers-2.h5"))
#auto_GR2RGB = load_model(os.path.join(models_path, ))

In [0]:
drop_rate = 0.5
filters = [16, 32, 64, 128, 256, 512]
kernel_sizes = {'encode': 3, 'upsample': 2, 'decode': 3}
stride_sizes = {'encode': 1, 'upsample': 2, 'decode': 1}

auto_BIN2GR = autoencoder(train_bin.shape[1:], filters, kernel_sizes, stride_sizes, drop_rate)
train_model(auto_BIN2GR, train_bin, val_gray, 30, test_train_bin, test_val_gray)

In [0]:
#auto_GR2RGB = autoencoder(train_bin.shape[1:], filters, kernel_sizes, stride_sizes, drop_rate, decoder_activation='tanh')
#train_model(auto_GR2RGB, val_gray, val_rgb, 30, test_train_bin, test_val_gray)

In [0]:
save_loaded(saved_data, binary=train_bin, grayscale=val_gray)
#save_loaded(saved_data, binary=train_bin, grayscale=val_gray, predicted_gray=predict_gray, predicted_rgb=predict_rgb)

In [0]:
auto.save(os.path.join(models_path, '{}.h5'.format(save_name)))

References:

https://github.com/rrupeshh/Auto-Colorization-Of-GrayScale-Image/blob/master/Auto_color.ipynb

https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/issues/190