### Beginning

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
import tensorflow as tf
import numpy as np
import h5py, os, glob, re, itertools, datetime

In [0]:
train_dir = "/content/drive/My Drive/DL/datasets/DIV2K/train/"
valid_dir = "/content/drive/My Drive/DL/datasets/DIV2K/valid/"
hdf_directory = "/content/drive/My Drive/DL/datasets/DIV2K HDF/"
patched_hdf_directory = "/content/drive/My Drive/DL/datasets/DIV2K HDF PATCHED/"
dir_type = { 0:"HR/", 1:"LR_bicub_X2/", 2:"LR_unkn_X2/" }

In [0]:
current_time = lambda: datetime.datetime.now().strftime("%m/%d %H:%M:%S")

def get_hdf_dir(training, hsv, patching):
    dataset_type = "train/" if training == True else "valid/"
    color_model = "RGB/" if hsv == False else "HSV/" 
    dir_ = patched_hdf_directory if patching == True else hdf_directory
    return "{}{}{}".format(dir_, color_model, dataset_type)

def paths_gen(training):
    if training == True:
        directory = train_dir
    else: directory = valid_dir
    hr_paths = glob.glob(os.path.join(directory, dir_type[0]) + "*.png")
    for i in range(len(hr_paths)):
        path_changer = re.sub(".png", "x2.png", hr_paths[i])
        lr_bicub = re.sub(dir_type[0], dir_type[1], path_changer)
        lr_unkn = re.sub(dir_type[0], dir_type[2], path_changer)
        yield hr_paths[i], lr_bicub, lr_unkn  

def path2image(img_path, hsv):
    img_raw = tf.read_file(img_path)
    img_dec = tf.image.decode_png(img_raw, channels=3)
    img = tf.image.convert_image_dtype(img_dec, tf.float32).eval()
    if hsv == True:
            img = tf.image.rgb_to_hsv(img).eval()
    return img
  
def patching(hr_img, lr_bicub_img, lr_unkn_img, patch_size):
    hr_patchsize, lr_patchsize = [1, patch_size, patch_size, 1], [1, patch_size//2, patch_size//2, 1]
    hr_patches = tf.extract_image_patches(
        hr_img, hr_patchsize, hr_patchsize, [1, 1, 1, 1], 'VALID').eval()
    lr_bicub_patches = tf.extract_image_patches(
        lr_bicub_img, lr_patchsize, lr_patchsize, [1, 1, 1, 1], 'VALID').eval()
    lr_unkn_patches = tf.extract_image_patches(
        lr_unkn_img, lr_patchsize, lr_patchsize, [1, 1, 1, 1], 'VALID').eval()
    _, R, C, _ = np.asarray(hr_patches.shape)
    hr_list, lr_bicub_list, lr_unkn_list = [], [], []
    patched_shape = [tf.constant(R), tf.constant(C)]
    for r, c in itertools.product(*map(range, [R, C])):
        hr_patch = tf.reshape(hr_patches[0, r, c, :], [patch_size, patch_size, 3])
        lr_bicub_patch = tf.reshape(lr_bicub_patches[0, r, c, :], [patch_size//2, patch_size//2, 3])
        lr_unkn_patch = tf.reshape(lr_unkn_patches[0, r, c, :], [patch_size//2, patch_size//2, 3])
        hr_list.append(hr_patch)
        lr_bicub_list.append(lr_bicub_patch)
        lr_unkn_list.append(lr_unkn_patch)
    return hr_list, lr_bicub_list, lr_unkn_list, patched_shape
  
def save_img(img, filename, hsv):
    if hsv == True:
        img = tf.image.hsv_to_rgb(img)
    img = tf.image.convert_image_dtype(img, tf.uint8)
    img_raw = tf.image.encode_png(img).eval()
    return tf.write_file(tf.constant(filename), img_raw) 

### Images preprocessing

In [0]:
TRAINING = True
HSV = False #if HSV == False => RGB = True
PATCHING = True 
PATCH_SIZE = 96 #high resolution patch size

patched_shape_log = "/content/drive/My Drive/DL/metrics/patched_shape.txt"

to_print_or_not_to_print = 1
with open(patched_shape_log, "w") as txt_file: pass
if PATCHING == True:
    with open(patched_shape_log, "a+") as txt_file:
        txt_file.write("PATCH_SIZE = {}\n\n".format(PATCH_SIZE))
print("Start: {}\n".format(current_time()))
hdf_dir = get_hdf_dir(TRAINING, HSV, PATCHING)
for hr, lr_bicub, lr_unkn in paths_gen(TRAINING):
    tf.reset_default_graph()
    sess = tf.Session()
    with sess.as_default():
        hr_img = path2image(hr, HSV)
        lr_bicub_img = path2image(lr_bicub, HSV)
        lr_unkn_img = path2image(lr_unkn, HSV)
        if PATCHING == True:
            hr_img = tf.expand_dims(hr_img, 0)
            lr_bicub_img = tf.expand_dims(lr_bicub_img, 0)
            lr_unkn_img = tf.expand_dims(lr_unkn_img, 0)
            hr_img, lr_bicub_img, lr_unkn_img, patched_shape = sess.run(
                patching(hr_img, lr_bicub_img, lr_unkn_img, PATCH_SIZE))
        image_number = re.search(r'\d\d\d\d', re.split(r"/", hr)[-1]).group(0)
        with h5py.File("{}{}.hdf5".format(hdf_dir, image_number), "w") as hdf:
            hdf.create_dataset("hr", data=np.asarray(hr_img))
            hdf.create_dataset("lr_bicub", data=np.asarray(lr_bicub_img))
            hdf.create_dataset("lr_unkn", data=np.asarray(lr_unkn_img))
            if PATCHING == True:
                hdf.create_dataset("patched_shape", data=np.asarray(patched_shape))
        with open(patched_shape_log, "a+") as txt_file:
            txt_file.write("image {}... patched_shape = {}\n".format(
                image_number, np.asarray(patched_shape)))       
        if to_print_or_not_to_print % 10 == 0:
            print("{}... image {} done".format(current_time(), image_number))
        to_print_or_not_to_print += 1

        print("\nEnd: {}".format(current_time()))

### Checking

In [0]:
TRAINING = True
HSV = False
PATCHING = True

hdf_dir = get_hdf_dir(TRAINING, HSV, PATCHING)

for hdf_name in glob.glob(hdf_dir + "*.hdf5"):
    img_num = re.search(r'\d\d\d\d', re.split(r"/", hdf_name)[-1]).group(0)
    with tf.Session() as sess:
        with h5py.File(hdf_name, "r") as hdf:
            hr_img = hdf["hr"][()]
            lr_bicub_img = hdf["lr_bicub"][()]
            lr_unkn_img = hdf["lr_unkn"][()]
            img = save_img(
                hr_img[150], "/content/drive/My Drive/DL/test_img/hr_{}.png".format(img_num), hsv=HSV)
            sess.run(img)
            img = save_img(
                lr_bicub_img[150], "/content/drive/My Drive/DL/test_img/lr_bicub_{}.png".format(img_num), hsv=HSV)
            sess.run(img)
            img = save_img(
                lr_unkn_img[150], "/content/drive/My Drive/DL/test_img/lr_unkn_{}.png".format(img_num), hsv=HSV)
            sess.run(img)
            print("{} done".format(img_num))

            