In [0]:
# Load the google drive
from google.colab import drive
drive.mount('/content/gdrive')
directory = '/content/gdrive/My Drive/ColorNetData/'
train_directory = directory + 'Train/'
test_directory = directory + 'Test/'

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
# Import statements
from os import listdir
from pickle import load, dump
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from skimage.color import rgb2lab, lab2rgb, rgb2gray, gray2rgb
from keras.models import Sequential, Model
from keras.layers import Conv2D, UpSampling2D, Input, RepeatVector, Reshape, concatenate
from keras.preprocessing.image import load_img, img_to_array
from keras.applications.inception_resnet_v2 import InceptionResNetV2, preprocess_input
from keras.utils import plot_model
from PIL import Image

inception = InceptionResNetV2(include_top=True, weights='imagenet')
dump(inception, open(directory + 'inception.p', 'wb'))

In [0]:
# Skip if done
# Create image embeddings
inception = InceptionResNetV2(include_top=True, weights='imagenet')

train_image_encoding = dict()
for name in tqdm(listdir(train_directory)):
  if not name.endswith('.jpg'):
    continue
  filename = train_directory + name
  image = load_img(filename, target_size=(299, 299))
  image = np.array(img_to_array(image), dtype=float)
  image = gray2rgb(rgb2gray(image))
  image = image.reshape(1, image.shape[0], image.shape[1], image.shape[2])
  image = preprocess_input(image)
  feature = inception.predict(image)
  train_image_encoding[name] = feature
  
dump(train_image_encoding, open(directory + 'train_image_encoding.p', 'wb'))

test_image_encoding = dict()
for name in tqdm(listdir(test_directory)):
  if not name.endswith('.jpg'):
    continue
  filename = test_directory + name
  image = load_img(filename, target_size=(299, 299))
  image = np.array(img_to_array(image), dtype=float)
  image = gray2rgb(rgb2gray(image))
  image = image.reshape(1, image.shape[0], image.shape[1], image.shape[2])
  image = preprocess_input(image)
  feature = inception.predict(image)
  test_image_encoding[name] = feature

dump(test_image_encoding, open(directory + 'test_image_encoding.p', 'wb'))

In [0]:
# Load the training and testing data
train_image_encoding = load(open(directory + 'train_image_encoding.p', 'rb'))
test_image_encoding = load(open(directory + 'test_image_encoding.p', 'rb'))

print("Length of training data : ", len(train_image_encoding))
print("Length of testing data : ", len(test_image_encoding))

print("Encoding shape : ", list(train_image_encoding.values())[0].shape) 

Length of training data :  8396
Length of testing data :  396
Encoding shape :  (1, 1000)


In [0]:
# Skip if once done
# Model
# Image encoding model
image_encoder_input = Input(shape=(256, 256, 1))
image_encoder = Conv2D(64, (3, 3), activation='relu', padding='same', strides=2)(image_encoder_input)
image_encoder = Conv2D(128, (3,3), activation='relu', padding='same')(image_encoder)
image_encoder = Conv2D(128, (3,3), activation='relu', padding='same', strides=2)(image_encoder)
image_encoder = Conv2D(256, (3,3), activation='relu', padding='same')(image_encoder)
image_encoder = Conv2D(256, (3,3), activation='relu', padding='same', strides=2)(image_encoder)
image_encoder = Conv2D(512, (3,3), activation='relu', padding='same')(image_encoder)
image_encoder = Conv2D(512, (3,3), activation='relu', padding='same')(image_encoder)
image_encoder = Conv2D(256, (3,3), activation='relu', padding='same')(image_encoder)

# Image embedding model for inception
image_embedder = Input(shape=(1000,))

# Final encoder model using fusion
fusion_output = RepeatVector(32 * 32)(image_embedder)
fusion_output = Reshape(([32, 32, 1000]))(fusion_output)
fusion_output = concatenate([image_encoder, fusion_output], axis=3) 
fusion_output = Conv2D(256, (1, 1), activation='relu', padding='same')(fusion_output)

# Decoder model
decoder_output = Conv2D(128, (3,3), activation='relu', padding='same')(fusion_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(64, (3,3), activation='relu', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(32, (3,3), activation='relu', padding='same')(decoder_output)
decoder_output = Conv2D(16, (3,3), activation='relu', padding='same')(decoder_output)
decoder_output = Conv2D(2, (3, 3), activation='tanh', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)

# Building the final model
model = Model(inputs=[image_encoder_input, image_embedder], outputs=decoder_output)
model.compile(optimizer='adam', loss='mse', metrics=['mse'])
model.save_weights(directory + 'weights.h5')
dump(model, open(directory + 'final_model.p', 'wb'))

Instructions for updating:
Colocations handled automatically by placer.


In [0]:
# Load model and visualize
model = load(open(directory + 'final_model.p', 'rb'))
plot_model(model, to_file=directory + 'final_model.png', show_shapes=True)
Image.open(directory + 'final_model.png')

In [0]:
def data_generator(batch_size=32):
  encoder_input = list()
  embedding_input = list()
  model_output = list()
  from itertools import cycle
  for name, embedding in cycle(train_image_encoding.items()):
    # Encoded image input
    image = np.array(img_to_array(load_img(train_directory + name, target_size=(256, 256))), dtype=float)
    encoded_image = rgb2lab(1.0/255*image)[:, :, 0]
    encoded_image = encoded_image.reshape(encoded_image.shape[0], encoded_image.shape[1], 1)
    # Output image
    output_image = rgb2lab(1.0/255*image)[:, :, 1:]
    output_image = output_image/128
    output_image = output_image.reshape(output_image.shape[0], output_image.shape[1], 2)
    # Add to return
    encoder_input.append(encoded_image)
    embedding_input.append(embedding.flatten())
    model_output.append(output_image)
    
    if len(model_output) >= batch_size:
      encoder_input = np.asarray(encoder_input)
      embedding_input = np.asarray(embedding_input)
      model_output = np.asarray(model_output)
      yield [[encoder_input, embedding_input], model_output]
      encoder_input = list()
      embedding_input = list()
      model_output = list()
# Save the generator
dump(data_generator, open(directory + 'data_generator.p', 'wb'))

In [0]:
# Compile model and save weights
batch_size = 32
total_epochs = 1000
steps_per_epoch = int(len(train_image_encoding)/batch_size)
model.fit_generator(data_generator(batch_size), nb_epoch=1, verbose=1, steps_per_epoch=steps_per_epoch)
model.save_weights(directory + 'weights.h5')
print('Weights saved!')