**Use GPU: Runtime -> Change runtime type -> GPU (Hardware Accelerator)**

Setup

In [None]:
!cat ~/.keras/keras.json

In [None]:
import keras
print(keras.__version__)

Config

In [None]:
# import the necessary packages
import os

# define the path to the input images we will be using to build the
# training crops
INPUT_IMAGES = "drive/MyDrive/pyimagesearch/datasets/ukbench100"

# define the path to the temporary output directories
BASE_OUTPUT = "drive/MyDrive/pyimagesearch/output/35-super-resolution"
IMAGES = os.path.sep.join([BASE_OUTPUT, "images"])
LABELS = os.path.sep.join([BASE_OUTPUT, "labels"])

# define the path to the HDF5 files
INPUTS_DB = os.path.sep.join([BASE_OUTPUT, "inputs.hdf5"])
OUTPUTS_DB = os.path.sep.join([BASE_OUTPUT, "outputs.hdf5"])

# define the path to the output model file and the plot file
MODEL_PATH = os.path.sep.join([BASE_OUTPUT, "srcnn.model"])
PLOT_PATH = os.path.sep.join([BASE_OUTPUT, "plot.png"])

# initialize the batch size and number of epochs for training
BATCH_SIZE = 128
NUM_EPOCHS = 10

# initialize the scale (the factor in which we want to learn how to
# enlarge images by) along with the input width and height dimensions
# to our SRCNN
SCALE = 2.0
INPUT_DIM = 33

# the label size should be the output spatial dimensions of the SRCNN
# while our padding ensures we properly crop the label ROI
LABEL_SIZE = 21
PAD = int((INPUT_DIM - LABEL_SIZE) / 2.0)

# the stride controls the step size of our sliding window
STRIDE = 14

HDF5 Dataset Writer

In [None]:
# import the necessary packages
import h5py
import os

class HDF5DatasetWriter:
	def __init__(self, dims, outputPath, dataKey="images",
		bufSize=1000):
		# check to see if the output path exists, and if so, raise
		# an exception
		if os.path.exists(outputPath):
			raise ValueError("The supplied `outputPath` already "
				"exists and cannot be overwritten. Manually delete "
				"the file before continuing.", outputPath)

		# open the HDF5 database for writing and create two datasets:
		# one to store the images/features and another to store the
		# class labels
		self.db = h5py.File(outputPath, "w")
		self.data = self.db.create_dataset(dataKey, dims, dtype="float")
		self.labels = self.db.create_dataset("labels", (dims[0],), dtype="int")

		# store the buffer size, then initialize the buffer itself
		# along with the index into the datasets
		self.bufSize = bufSize
		self.buffer = {"data": [], "labels": []}
		self.idx = 0

	def add(self, rows, labels):
		# add the rows and labels to the buffer
		self.buffer["data"].extend(rows)
		self.buffer["labels"].extend(labels)

		# check to see if the buffer needs to be flushed to disk
		if len(self.buffer["data"]) >= self.bufSize:
			self.flush()

	def flush(self):
		# write the buffers to disk then reset the buffer
		i = self.idx + len(self.buffer["data"])
		self.data[self.idx:i] = self.buffer["data"]
		self.labels[self.idx:i] = self.buffer["labels"]
		self.idx = i
		self.buffer = {"data": [], "labels": []}

	def storeClassLabels(self, classLabels):
		# create a dataset to store the actual class label names,
		# then store the class labels
		dt = h5py.special_dtype(vlen=str) # `vlen=unicode` for Py2.7
		labelSet = self.db.create_dataset("label_names", (len(classLabels),), dtype=dt)
		labelSet[:] = classLabels

	def close(self):
		# check to see if there are any other entries in the buffer
		# that need to be flushed to disk
		if len(self.buffer["data"]) > 0:
			self.flush()

		# close the dataset
		self.db.close()

Build Dataset

In [None]:
# import the necessary packages
from imutils import paths
from scipy import misc
import shutil
import random
import cv2
import os

from PIL import Image
import numpy as np

# if the output directories do not exist, create them
for p in [IMAGES, LABELS]:
    if not os.path.exists(p):
        os.makedirs(p)

# grab the image paths and initialize the total number of crops
# processed
print("[INFO] creating temporary images...")
imagePaths = list(paths.list_images(INPUT_IMAGES))
random.shuffle(imagePaths)
total = 0

# loop over the image paths
for imagePath in imagePaths:
    # load the input image
    image = cv2.imread(imagePath)

    # grab the dimensions of the input image and crop the image such
    # that it tiles nicely when we generate the training data +
    # labels
    (h, w) = image.shape[:2]
    w -= int(w % SCALE)
    h -= int(h % SCALE)
    image = image[0:h, 0:w]

    # to generate our training images we first need to downscale the
    # image by the scale factor...and then upscale it back to the
    # original size -- this will process allows us to generate low
    # resolution inputs that we'll then learn to reconstruct the high
    # resolution versions from

    scaled = misc.imresize(image, 1.0 / SCALE, interp="bicubic")
    scaled = misc.imresize(scaled, SCALE / 1.0, interp="bicubic")

    # slide a window from left-to-right and top-to-bottom
    for y in range(0, h - INPUT_DIM + 1, STRIDE):
        for x in range(0, w - INPUT_DIM + 1, STRIDE):
            # crop output the `INPUT_DIM x INPUT_DIM` ROI from our
            # scaled image -- this ROI will serve as the input to our network
            crop = scaled[y:y + INPUT_DIM, x:x + INPUT_DIM]

            # crop out the `LABEL_SIZE x LABEL_SIZE` ROI from our
            # original image -- this ROI will be the target output
            # from our network
            target = image[
                y + PAD:y + PAD + LABEL_SIZE,
                x + PAD:x + PAD + LABEL_SIZE]

            # construct the crop and target output image paths
            cropPath = os.path.sep.join([IMAGES, "{}.png".format(total)])
            targetPath = os.path.sep.join([LABELS, "{}.png".format(total)])

            # write the images to disk
            cv2.imwrite(cropPath, crop)
            cv2.imwrite(targetPath, target)

            # increment the crop total
            total += 1

# grab the paths to the images
print("[INFO] building HDF5 datasets...")
inputPaths = sorted(list(paths.list_images(IMAGES)))
outputPaths = sorted(list(paths.list_images(LABELS)))

# initialize the HDF5 datasets
inputWriter = HDF5DatasetWriter((len(inputPaths), INPUT_DIM, INPUT_DIM, 3), INPUTS_DB)
outputWriter = HDF5DatasetWriter((len(outputPaths), LABEL_SIZE, LABEL_SIZE, 3), OUTPUTS_DB)

# loop over the images
for (inputPath, outputPath) in zip(inputPaths, outputPaths):
    # load the two images and add them to their respective datasets
    inputImage = cv2.imread(inputPath)
    outputImage = cv2.imread(outputPath)
    inputWriter.add([inputImage], [-1])
    outputWriter.add([outputImage], [-1])

# close the HDF5 datasets
inputWriter.close()
outputWriter.close()

# delete the temporary output directories
print("[INFO] cleaning up...")
shutil.rmtree(IMAGES)
shutil.rmtree(LABELS)

SRCNN

In [None]:
# import the necessary packages
from keras.models import Sequential
from keras.layers.convolutional import Conv2D
from keras.layers.core import Activation
from keras import backend as K

class SRCNN:
	@staticmethod
	def build(width, height, depth):
		# initialize the model
		model = Sequential()
		inputShape = (height, width, depth)

		# if we are using "channels first", update the input shape
		if K.image_data_format() == "channels_first":
			inputShape = (depth, height, width)

		# the entire SRCNN architecture consists of three CONV =>
		# RELU layers with *no* zero-padding
		model.add(Conv2D(64, (9, 9), kernel_initializer="he_normal",
			input_shape=inputShape))
		model.add(Activation("relu"))
		model.add(Conv2D(32, (1, 1), kernel_initializer="he_normal"))
		model.add(Activation("relu"))
		model.add(Conv2D(depth, (5, 5),
			kernel_initializer="he_normal"))
		model.add(Activation("relu"))

		# return the constructed network architecture
		return model

HDF5 Dataset Generator

In [None]:
# import the necessary packages
from keras.utils import np_utils
import numpy as np
import h5py

class HDF5DatasetGenerator:
    def __init__(self, dbPath, batchSize, preprocessors=None,
        aug=None, binarize=True, classes=2):
        # store the batch size, preprocessors, and data augmentor,
        # whether or not the labels should be binarized, along with
        # the total number of classes
        self.batchSize = batchSize
        self.preprocessors = preprocessors
        self.aug = aug
        self.binarize = binarize
        self.classes = classes

        # open the HDF5 database for reading and determine the total
        # number of entries in the database
        self.db = h5py.File(dbPath)
        self.numImages = self.db["labels"].shape[0]

    def generator(self, passes=np.inf):
        # initialize the epoch count
        epochs = 0

        # keep looping infinitely -- the model will stop once we have
        # reach the desired number of epochs
        while epochs < passes:
            # loop over the HDF5 dataset
            for i in np.arange(0, self.numImages, self.batchSize):
                # extract the images and labels from the HDF dataset
                images = self.db["images"][i: i + self.batchSize]
                labels = self.db["labels"][i: i + self.batchSize]

                # check to see if the labels should be binarized
                if self.binarize:
                    labels = np_utils.to_categorical(labels,
                        self.classes)

                # check to see if our preprocessors are not None
                if self.preprocessors is not None:
                    # initialize the list of processed images
                    procImages = []

                    # loop over the images
                    for image in images:
                        # loop over the preprocessors and apply each
                        # to the image
                        for p in self.preprocessors:
                            image = p.preprocess(image)

                        # update the list of processed images
                        procImages.append(image)

                    # update the images array to be the processed
                    # images
                    images = np.array(procImages)

                # if the data augmenator exists, apply it
                if self.aug is not None:
                    (images, labels) = next(self.aug.flow(images,
                        labels, batch_size=self.batchSize))

                # yield a tuple of images and labels
                yield (images, labels)

            # increment the total number of epochs
            epochs += 1

    def close(self):
        # close the database
        self.db.close()

Train

In [None]:
# import the necessary packages
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np

def super_res_generator(inputDataGen, targetDataGen):
	# start an infinite loop for the training data
	while True:
		# grab the next input images and target outputs, discarding
		# the class labels (which are irrelevant)
		inputData = next(inputDataGen)[0]
		targetData = next(targetDataGen)[0]

		# yield a tuple of the input data and target data
		yield (inputData, targetData)

# initialize the input images and target output images generators
inputs = HDF5DatasetGenerator(INPUTS_DB, BATCH_SIZE)
targets = HDF5DatasetGenerator(OUTPUTS_DB, BATCH_SIZE)

# initialize the model and optimizer
print("[INFO] compiling model...")
opt = Adam(lr=0.001, decay=0.001 / NUM_EPOCHS)
model = SRCNN.build(width=INPUT_DIM, height=INPUT_DIM, depth=3)
model.compile(loss="mse", optimizer=opt)

# train the model using our generators
H = model.fit_generator(
	super_res_generator(inputs.generator(), targets.generator()),
	steps_per_epoch=inputs.numImages // BATCH_SIZE,
	epochs=NUM_EPOCHS, verbose=1)

# save the model to file
print("[INFO] serializing model...")
model.save(MODEL_PATH, overwrite=True)

# plot the training loss
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, NUM_EPOCHS), H.history["loss"], label="loss")
plt.title("Loss on super resolution training")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend()
plt.savefig(PLOT_PATH)

# close the HDF5 datasets
inputs.close()
targets.close()

Resize

In [None]:
# import the necessary packages
from keras.models import load_model
from scipy import misc
import numpy as np
import cv2

In [None]:
def resize(image_filepath, baseline_filepath, output_filepath):
    # load the pre-trained model
    print("[INFO] loading model...")
    model = load_model(MODEL_PATH)

    # load the input image, then grab the dimensions of the input image
    # and crop the image such that it tiles nicely
    print("[INFO] generating image...")
    image = cv2.imread(image_filepath)
    (h, w) = image.shape[:2]
    w -= int(w % SCALE)
    h -= int(h % SCALE)
    image = image[0:h, 0:w]

    # resize the input image using bicubic interpolation then write the
    # baseline image to disk
    scaled = misc.imresize(image, SCALE / 1.0, interp="bicubic")
    cv2.imwrite(baseline_filepath, scaled)

    # allocate memory for the output image
    output = np.zeros(scaled.shape)
    (h, w) = output.shape[:2]

    # slide a window from left-to-right and top-to-bottom
    for y in range(0, h - INPUT_DIM + 1, LABEL_SIZE):
        for x in range(0, w - INPUT_DIM + 1, LABEL_SIZE):
            # crop ROI from our scaled image
            crop = scaled[y:y + INPUT_DIM, x:x + INPUT_DIM]

            # make a prediction on the crop and store it in our output
            # image
            P = model.predict(np.expand_dims(crop, axis=0))
            P = P.reshape((LABEL_SIZE, LABEL_SIZE, 3))
            output[y + PAD:y + PAD + LABEL_SIZE,
                x + PAD:x + PAD + LABEL_SIZE] = P

    # remove any of the black borders in the output image caused by the
    # padding, then clip any values that fall outside the range [0, 255]
    output = output[PAD:h - ((h % INPUT_DIM) + PAD), PAD:w - ((w % INPUT_DIM) + PAD)]
    output = np.clip(output, 0, 255).astype("uint8")

    # write the output image to disk
    cv2.imwrite(output_filepath, output)

In [None]:
resize(image_filepath="drive/MyDrive/pyimagesearch/datasets/jemma.png", 
       baseline_filepath="drive/MyDrive/pyimagesearch/datasets/baseline.png", 
       output_filepath="drive/MyDrive/pyimagesearch/output/35-super-resolution.png")